diff options
author | Peter Mikkelsen <petermikkelsen10@gmail.com> | 2022-05-16 07:05:17 +0000 |
---|---|---|
committer | Peter Mikkelsen <petermikkelsen10@gmail.com> | 2022-05-16 07:05:17 +0000 |
commit | 5c384cab27113e74c998811d85f65700c0827060 (patch) | |
tree | 7d04516e7d09d4ffdbeb373874a08c5bf23619ba /concurrency.c | |
parent | fd3f483368ec3c52e00260ae2941b6c0552473cf (diff) |
Implement timeouts for receive
Diffstat (limited to 'concurrency.c')
-rw-r--r-- | concurrency.c | 65 |
1 files changed, 49 insertions, 16 deletions
diff --git a/concurrency.c b/concurrency.c index 1d06a9c..78ac18b 100644 --- a/concurrency.c +++ b/concurrency.c @@ -25,10 +25,22 @@ static ThreadData **threads; int mainstacksize = STACKSIZE; void +recvtimeout(void *raw) +{ + ThreadData *td = (ThreadData *)raw; + if(0 == sleep(td->timeout)){ + qlock(&td->lock); + td->timedout = 1; + rwakeup(&td->newmail); + qunlock(&td->lock); + } +} + +void initthreads(void) { ThreadData *td = newthreaddata(); - void **tdptr = threaddata(); + void **tdptr = procdata(); *tdptr = td; } @@ -62,7 +74,7 @@ spawnthread(Function f, Array *left, Array *right) ThreadData * getthreaddata(void) { - void **tdptr = threaddata(); + void **tdptr = procdata(); return (ThreadData *)*tdptr; } @@ -85,7 +97,7 @@ messagesend(Array *a, int id) else td->mail = newmail; td->lastmail = newmail; - rwakeup(&td->empty); + rwakeup(&td->newmail); qunlock(&td->lock); }else{ throwerror(L"Invalid thread id", EDomain); @@ -95,20 +107,39 @@ messagesend(Array *a, int id) Array * messagerecv(Function match, int timeout) { - USED(timeout); USED(match); ThreadData *td = getthreaddata(); qlock(&td->lock); - while(td->mail == nil) - rsleep(&td->empty); - Mail *m = td->mail; - td->mail = m->next; - if(td->mail == nil) - td->lastmail = nil; - qunlock(&td->lock); - Array *a = m->contents; - free(m); - return a; + int timeoutid = 0; + if(timeout > 0){ + td->timeout = timeout; + qunlock(&td->lock); + timeoutid = proccreate(recvtimeout, td, STACKSIZE); + qlock(&td->lock); + } + td->timedout = 0; /* clear the timeout bit */ + while(td->mail == nil && !td->timedout){ + if(timeout == 0) + td->timedout = 1; + else + rsleep(&td->newmail); + } + if(timeout > 0) + threadkill(timeoutid); + if(td->timedout){ + qunlock(&td->lock); + throwerror(L"Message receive", ETimeout); + return nil; /* not reached */ + }else{ + Mail *m = td->mail; + td->mail = m->next; + if(td->mail == nil) + td->lastmail = nil; + qunlock(&td->lock); + Array *a = m->contents; + free(m); + return a; + } } @@ -117,7 +148,7 @@ newprocfn(void *data) { SpawnData *sp = (SpawnData *)data; ThreadData *td = newthreaddata(); - void **tdptr = threaddata(); + void **tdptr = procdata(); *tdptr = td; ErrorGuard *eg = newerrorguard(mkscalarint(0), nil); /* make a catch-all error guard */ if(setjmp(eg->jmp)){ @@ -149,7 +180,9 @@ newthreaddata(void) td->currentdfn = nil; td->mail = 0; td->lastmail = 0; - td->empty.l = &td->lock; + td->timeout = 0; + td->timedout = 0; + td->newmail.l = &td->lock; lock(&threadlock); nthreads++; |