diff options
-rw-r--r-- | apl9.h | 5 | ||||
-rw-r--r-- | concurrency.c | 65 | ||||
-rw-r--r-- | error.c | 1 | ||||
-rw-r--r-- | operators.c | 14 | ||||
-rw-r--r-- | tests/demo.apl | 15 |
5 files changed, 78 insertions, 22 deletions
@@ -52,6 +52,7 @@ typedef enum ELength = 5, EValue = 6, EDomain = 11, + ETimeout = 12, /* not in dyalog */ ENotImplemented = 100, /* not in dyalog */ } errorCodes; @@ -236,10 +237,12 @@ struct ThreadData Mail *mail; Mail *lastmail; int lasterror; + int timeout; /* number of milli seconds to timeout */ + int timedout; /* true if the recv timed out */ Rune *lasterrormsg; ErrorGuard *globalerrorguard; QLock lock; - Rendez empty; + Rendez newmail; }; struct Mail diff --git a/concurrency.c b/concurrency.c index 1d06a9c..78ac18b 100644 --- a/concurrency.c +++ b/concurrency.c @@ -25,10 +25,22 @@ static ThreadData **threads; int mainstacksize = STACKSIZE; void +recvtimeout(void *raw) +{ + ThreadData *td = (ThreadData *)raw; + if(0 == sleep(td->timeout)){ + qlock(&td->lock); + td->timedout = 1; + rwakeup(&td->newmail); + qunlock(&td->lock); + } +} + +void initthreads(void) { ThreadData *td = newthreaddata(); - void **tdptr = threaddata(); + void **tdptr = procdata(); *tdptr = td; } @@ -62,7 +74,7 @@ spawnthread(Function f, Array *left, Array *right) ThreadData * getthreaddata(void) { - void **tdptr = threaddata(); + void **tdptr = procdata(); return (ThreadData *)*tdptr; } @@ -85,7 +97,7 @@ messagesend(Array *a, int id) else td->mail = newmail; td->lastmail = newmail; - rwakeup(&td->empty); + rwakeup(&td->newmail); qunlock(&td->lock); }else{ throwerror(L"Invalid thread id", EDomain); @@ -95,20 +107,39 @@ messagesend(Array *a, int id) Array * messagerecv(Function match, int timeout) { - USED(timeout); USED(match); ThreadData *td = getthreaddata(); qlock(&td->lock); - while(td->mail == nil) - rsleep(&td->empty); - Mail *m = td->mail; - td->mail = m->next; - if(td->mail == nil) - td->lastmail = nil; - qunlock(&td->lock); - Array *a = m->contents; - free(m); - return a; + int timeoutid = 0; + if(timeout > 0){ + td->timeout = timeout; + qunlock(&td->lock); + timeoutid = proccreate(recvtimeout, td, STACKSIZE); + qlock(&td->lock); + } + td->timedout = 0; /* clear the timeout bit */ + while(td->mail == nil && !td->timedout){ + if(timeout == 0) + td->timedout = 1; + else + rsleep(&td->newmail); + } + if(timeout > 0) + threadkill(timeoutid); + if(td->timedout){ + qunlock(&td->lock); + throwerror(L"Message receive", ETimeout); + return nil; /* not reached */ + }else{ + Mail *m = td->mail; + td->mail = m->next; + if(td->mail == nil) + td->lastmail = nil; + qunlock(&td->lock); + Array *a = m->contents; + free(m); + return a; + } } @@ -117,7 +148,7 @@ newprocfn(void *data) { SpawnData *sp = (SpawnData *)data; ThreadData *td = newthreaddata(); - void **tdptr = threaddata(); + void **tdptr = procdata(); *tdptr = td; ErrorGuard *eg = newerrorguard(mkscalarint(0), nil); /* make a catch-all error guard */ if(setjmp(eg->jmp)){ @@ -149,7 +180,9 @@ newthreaddata(void) td->currentdfn = nil; td->mail = 0; td->lastmail = 0; - td->empty.l = &td->lock; + td->timeout = 0; + td->timedout = 0; + td->newmail.l = &td->lock; lock(&threadlock); nthreads++; @@ -66,6 +66,7 @@ errorstr(int code) case ELength: err = L"LENGTH ERROR"; break; case EValue: err = L"VALUE ERROR"; break; case EDomain: err = L"DOMAIN ERROR"; break; + case ETimeout: err = L"TIMEOUT ERROR"; break; case ENotImplemented: err = L"NOT IMPLEMENTED"; break; default: err = L""; break; } diff --git a/operators.c b/operators.c index 7e2520c..36dcd6a 100644 --- a/operators.c +++ b/operators.c @@ -157,12 +157,20 @@ opReceive(Datum *lefto, Array *left, Array *right) { if(lefto->tag != FunctionTag) throwerror(nil, ESyntax); - if(GetType(right) != AtypeInt) + if(GetType(right) != AtypeInt && GetType(right) != AtypeFloat) throwerror(nil, EDomain); - if(GetSize(right) != 1) + if(GetSize(right) != 1 && GetSize(right) != 0) throwerror(nil, ELength); USED(left); - return messagerecv(lefto->func, right->intdata[0]); + + int timeout = 0; + if(GetSize(right) == 0) + timeout = -1; + else if(GetType(right) == AtypeInt) + timeout = right->intdata[0]*1000; + else if(GetType(right) == AtypeFloat) + timeout = right->floatdata[0]*1000; + return messagerecv(lefto->func, timeout); } /* Dyadic operators */ diff --git a/tests/demo.apl b/tests/demo.apl index 7a6ddca..99ef783 100644 --- a/tests/demo.apl +++ b/tests/demo.apl @@ -8,6 +8,7 @@ iotaServer←{ msg←1⍨⍇0 msg≡'stop': ⎕←'Bye bye from indexer' (from num)←msg + num<0: ∇⍵⊣ ('some slow result'⊣⎕DL 2)⍈from _←(⍳num)⍈from ∇⍵ } @@ -23,9 +24,19 @@ stop←{ 'stop'⍈id } +flush←{ + ⍺←0 + 12::⍺ + msg←1⍨⍇0 + (⍺+1)∇⍵ +} + iota←{ - ⍝ Sending messages to a nonexisting thread throws a domain error (11) + ⍺←⍬ ⍝ No timeout by default 11::'Sorry, the iota server is not running' + 12::'Sorry, the iota server was too slow to respond' + ⍝ Sending messages to a nonexisting thread throws a domain error (11) + ⍝ And if a timeout happens, an error 12 will happen. _←(⎕self ⍵)⍈id - 1⍨⍇0 + 1⍨⍇⍺ } |