summaryrefslogtreecommitdiff
path: root/concurrency.c
diff options
context:
space:
mode:
authorPeter Mikkelsen <petermikkelsen10@gmail.com>2022-05-16 07:05:17 +0000
committerPeter Mikkelsen <petermikkelsen10@gmail.com>2022-05-16 07:05:17 +0000
commit5c384cab27113e74c998811d85f65700c0827060 (patch)
tree7d04516e7d09d4ffdbeb373874a08c5bf23619ba /concurrency.c
parentfd3f483368ec3c52e00260ae2941b6c0552473cf (diff)
Implement timeouts for receive
Diffstat (limited to 'concurrency.c')
-rw-r--r--concurrency.c65
1 files changed, 49 insertions, 16 deletions
diff --git a/concurrency.c b/concurrency.c
index 1d06a9c..78ac18b 100644
--- a/concurrency.c
+++ b/concurrency.c
@@ -25,10 +25,22 @@ static ThreadData **threads;
int mainstacksize = STACKSIZE;
void
+recvtimeout(void *raw)
+{
+ ThreadData *td = (ThreadData *)raw;
+ if(0 == sleep(td->timeout)){
+ qlock(&td->lock);
+ td->timedout = 1;
+ rwakeup(&td->newmail);
+ qunlock(&td->lock);
+ }
+}
+
+void
initthreads(void)
{
ThreadData *td = newthreaddata();
- void **tdptr = threaddata();
+ void **tdptr = procdata();
*tdptr = td;
}
@@ -62,7 +74,7 @@ spawnthread(Function f, Array *left, Array *right)
ThreadData *
getthreaddata(void)
{
- void **tdptr = threaddata();
+ void **tdptr = procdata();
return (ThreadData *)*tdptr;
}
@@ -85,7 +97,7 @@ messagesend(Array *a, int id)
else
td->mail = newmail;
td->lastmail = newmail;
- rwakeup(&td->empty);
+ rwakeup(&td->newmail);
qunlock(&td->lock);
}else{
throwerror(L"Invalid thread id", EDomain);
@@ -95,20 +107,39 @@ messagesend(Array *a, int id)
Array *
messagerecv(Function match, int timeout)
{
- USED(timeout);
USED(match);
ThreadData *td = getthreaddata();
qlock(&td->lock);
- while(td->mail == nil)
- rsleep(&td->empty);
- Mail *m = td->mail;
- td->mail = m->next;
- if(td->mail == nil)
- td->lastmail = nil;
- qunlock(&td->lock);
- Array *a = m->contents;
- free(m);
- return a;
+ int timeoutid = 0;
+ if(timeout > 0){
+ td->timeout = timeout;
+ qunlock(&td->lock);
+ timeoutid = proccreate(recvtimeout, td, STACKSIZE);
+ qlock(&td->lock);
+ }
+ td->timedout = 0; /* clear the timeout bit */
+ while(td->mail == nil && !td->timedout){
+ if(timeout == 0)
+ td->timedout = 1;
+ else
+ rsleep(&td->newmail);
+ }
+ if(timeout > 0)
+ threadkill(timeoutid);
+ if(td->timedout){
+ qunlock(&td->lock);
+ throwerror(L"Message receive", ETimeout);
+ return nil; /* not reached */
+ }else{
+ Mail *m = td->mail;
+ td->mail = m->next;
+ if(td->mail == nil)
+ td->lastmail = nil;
+ qunlock(&td->lock);
+ Array *a = m->contents;
+ free(m);
+ return a;
+ }
}
@@ -117,7 +148,7 @@ newprocfn(void *data)
{
SpawnData *sp = (SpawnData *)data;
ThreadData *td = newthreaddata();
- void **tdptr = threaddata();
+ void **tdptr = procdata();
*tdptr = td;
ErrorGuard *eg = newerrorguard(mkscalarint(0), nil); /* make a catch-all error guard */
if(setjmp(eg->jmp)){
@@ -149,7 +180,9 @@ newthreaddata(void)
td->currentdfn = nil;
td->mail = 0;
td->lastmail = 0;
- td->empty.l = &td->lock;
+ td->timeout = 0;
+ td->timedout = 0;
+ td->newmail.l = &td->lock;
lock(&threadlock);
nthreads++;