From 5c384cab27113e74c998811d85f65700c0827060 Mon Sep 17 00:00:00 2001 From: Peter Mikkelsen Date: Mon, 16 May 2022 07:05:17 +0000 Subject: Implement timeouts for receive --- apl9.h | 5 ++++- concurrency.c | 65 +++++++++++++++++++++++++++++++++++++++++++--------------- error.c | 1 + operators.c | 14 ++++++++++--- tests/demo.apl | 15 ++++++++++++-- 5 files changed, 78 insertions(+), 22 deletions(-) diff --git a/apl9.h b/apl9.h index e56014b..bf36f52 100644 --- a/apl9.h +++ b/apl9.h @@ -52,6 +52,7 @@ typedef enum ELength = 5, EValue = 6, EDomain = 11, + ETimeout = 12, /* not in dyalog */ ENotImplemented = 100, /* not in dyalog */ } errorCodes; @@ -236,10 +237,12 @@ struct ThreadData Mail *mail; Mail *lastmail; int lasterror; + int timeout; /* number of milli seconds to timeout */ + int timedout; /* true if the recv timed out */ Rune *lasterrormsg; ErrorGuard *globalerrorguard; QLock lock; - Rendez empty; + Rendez newmail; }; struct Mail diff --git a/concurrency.c b/concurrency.c index 1d06a9c..78ac18b 100644 --- a/concurrency.c +++ b/concurrency.c @@ -24,11 +24,23 @@ static int nthreads; static ThreadData **threads; int mainstacksize = STACKSIZE; +void +recvtimeout(void *raw) +{ + ThreadData *td = (ThreadData *)raw; + if(0 == sleep(td->timeout)){ + qlock(&td->lock); + td->timedout = 1; + rwakeup(&td->newmail); + qunlock(&td->lock); + } +} + void initthreads(void) { ThreadData *td = newthreaddata(); - void **tdptr = threaddata(); + void **tdptr = procdata(); *tdptr = td; } @@ -62,7 +74,7 @@ spawnthread(Function f, Array *left, Array *right) ThreadData * getthreaddata(void) { - void **tdptr = threaddata(); + void **tdptr = procdata(); return (ThreadData *)*tdptr; } @@ -85,7 +97,7 @@ messagesend(Array *a, int id) else td->mail = newmail; td->lastmail = newmail; - rwakeup(&td->empty); + rwakeup(&td->newmail); qunlock(&td->lock); }else{ throwerror(L"Invalid thread id", EDomain); @@ -95,20 +107,39 @@ messagesend(Array *a, int id) Array * messagerecv(Function match, int timeout) { - USED(timeout); USED(match); ThreadData *td = getthreaddata(); qlock(&td->lock); - while(td->mail == nil) - rsleep(&td->empty); - Mail *m = td->mail; - td->mail = m->next; - if(td->mail == nil) - td->lastmail = nil; - qunlock(&td->lock); - Array *a = m->contents; - free(m); - return a; + int timeoutid = 0; + if(timeout > 0){ + td->timeout = timeout; + qunlock(&td->lock); + timeoutid = proccreate(recvtimeout, td, STACKSIZE); + qlock(&td->lock); + } + td->timedout = 0; /* clear the timeout bit */ + while(td->mail == nil && !td->timedout){ + if(timeout == 0) + td->timedout = 1; + else + rsleep(&td->newmail); + } + if(timeout > 0) + threadkill(timeoutid); + if(td->timedout){ + qunlock(&td->lock); + throwerror(L"Message receive", ETimeout); + return nil; /* not reached */ + }else{ + Mail *m = td->mail; + td->mail = m->next; + if(td->mail == nil) + td->lastmail = nil; + qunlock(&td->lock); + Array *a = m->contents; + free(m); + return a; + } } @@ -117,7 +148,7 @@ newprocfn(void *data) { SpawnData *sp = (SpawnData *)data; ThreadData *td = newthreaddata(); - void **tdptr = threaddata(); + void **tdptr = procdata(); *tdptr = td; ErrorGuard *eg = newerrorguard(mkscalarint(0), nil); /* make a catch-all error guard */ if(setjmp(eg->jmp)){ @@ -149,7 +180,9 @@ newthreaddata(void) td->currentdfn = nil; td->mail = 0; td->lastmail = 0; - td->empty.l = &td->lock; + td->timeout = 0; + td->timedout = 0; + td->newmail.l = &td->lock; lock(&threadlock); nthreads++; diff --git a/error.c b/error.c index e1a0120..7ec8b68 100644 --- a/error.c +++ b/error.c @@ -66,6 +66,7 @@ errorstr(int code) case ELength: err = L"LENGTH ERROR"; break; case EValue: err = L"VALUE ERROR"; break; case EDomain: err = L"DOMAIN ERROR"; break; + case ETimeout: err = L"TIMEOUT ERROR"; break; case ENotImplemented: err = L"NOT IMPLEMENTED"; break; default: err = L""; break; } diff --git a/operators.c b/operators.c index 7e2520c..36dcd6a 100644 --- a/operators.c +++ b/operators.c @@ -157,12 +157,20 @@ opReceive(Datum *lefto, Array *left, Array *right) { if(lefto->tag != FunctionTag) throwerror(nil, ESyntax); - if(GetType(right) != AtypeInt) + if(GetType(right) != AtypeInt && GetType(right) != AtypeFloat) throwerror(nil, EDomain); - if(GetSize(right) != 1) + if(GetSize(right) != 1 && GetSize(right) != 0) throwerror(nil, ELength); USED(left); - return messagerecv(lefto->func, right->intdata[0]); + + int timeout = 0; + if(GetSize(right) == 0) + timeout = -1; + else if(GetType(right) == AtypeInt) + timeout = right->intdata[0]*1000; + else if(GetType(right) == AtypeFloat) + timeout = right->floatdata[0]*1000; + return messagerecv(lefto->func, timeout); } /* Dyadic operators */ diff --git a/tests/demo.apl b/tests/demo.apl index 7a6ddca..99ef783 100644 --- a/tests/demo.apl +++ b/tests/demo.apl @@ -8,6 +8,7 @@ iotaServer←{ msg←1⍨⍇0 msg≡'stop': ⎕←'Bye bye from indexer' (from num)←msg + num<0: ∇⍵⊣ ('some slow result'⊣⎕DL 2)⍈from _←(⍳num)⍈from ∇⍵ } @@ -23,9 +24,19 @@ stop←{ 'stop'⍈id } +flush←{ + ⍺←0 + 12::⍺ + msg←1⍨⍇0 + (⍺+1)∇⍵ +} + iota←{ - ⍝ Sending messages to a nonexisting thread throws a domain error (11) + ⍺←⍬ ⍝ No timeout by default 11::'Sorry, the iota server is not running' + 12::'Sorry, the iota server was too slow to respond' + ⍝ Sending messages to a nonexisting thread throws a domain error (11) + ⍝ And if a timeout happens, an error 12 will happen. _←(⎕self ⍵)⍈id - 1⍨⍇0 + 1⍨⍇⍺ } -- cgit v1.2.3