From a63358d02ae15ff9a62961d46e58bff26dbab68c Mon Sep 17 00:00:00 2001 From: Peter Mikkelsen Date: Mon, 16 May 2022 16:34:47 +0000 Subject: Implement selective receive --- concurrency.c | 68 ++++++++++++++++++++++++++++++++++++++++++--------------- operators.c | 5 ++++- tests/chain.apl | 2 +- 3 files changed, 56 insertions(+), 19 deletions(-) diff --git a/concurrency.c b/concurrency.c index 139883c..be1d62c 100644 --- a/concurrency.c +++ b/concurrency.c @@ -25,9 +25,9 @@ static ThreadData **threads; int mainstacksize = STACKSIZE; void -recvtimeout(void *) +recvtimeout(void *raw) { - ThreadData *td = getthreaddata(); + ThreadData *td = (ThreadData *)raw; if(0 == sleep(td->timeout)){ qlock(&td->lock); td->timedout = 1; @@ -107,37 +107,71 @@ messagesend(Array *a, int id) Array * messagerecv(Function match, int timeout) { - USED(match); ThreadData *td = getthreaddata(); qlock(&td->lock); + + /* Start a "timeout thread" if needed */ int timeoutid = 0; + td->timedout = 0; /* clear the timeout bit */ if(timeout > 0){ td->timeout = timeout; - timeoutid = threadcreate(recvtimeout, nil, STACKSIZE); + timeoutid = proccreate(recvtimeout, td, STACKSIZE); } - td->timedout = 0; /* clear the timeout bit */ - while(td->mail == nil && !td->timedout){ + + /* Wait for a message, or timeout */ + Mail *prev = nil; + Mail *m; +Retry: + m = prev ? prev->next : td->mail; + while((td->mail == nil || (prev != nil && prev->next == nil)) && !td->timedout){ if(timeout == 0) td->timedout = 1; - else + else{ rsleep(&td->newmail); + if(prev == nil) + m = td->mail; + else + m = prev->next; + } } - if(timeout > 0) - threadkill(timeoutid); if(td->timedout){ qunlock(&td->lock); throwerror(L"Message receive", ETimeout); return nil; /* not reached */ - }else{ - Mail *m = td->mail; + } + + /* Check if the new message is OK according to 'match' */ + prev = m; + Array *matchres = runfunc(match, nil, m->contents); + if(GetRank(matchres) != 1 || GetSize(matchres) != 2){ + freearray(matchres); + goto Retry; + } + Array *ok = arrayitem(matchres, 0); + if(GetType(ok) != AtypeInt || ok->intdata[0] != 1){ + freearray(matchres); + freearray(ok); + goto Retry; + } + + /* We found a match, remove the mail from the mailbox, and kill the timeout */ + if(timeout > 0) + threadkill(timeoutid); + if(td->mail == m) td->mail = m->next; - if(td->mail == nil) - td->lastmail = nil; - qunlock(&td->lock); - Array *a = m->contents; - free(m); - return a; + else{ + Mail *tmp; + for(tmp = td->mail; tmp->next != m; tmp = tmp->next); + tmp->next = m->next; } + if(td->mail == nil) + td->lastmail = nil; + qunlock(&td->lock); + Array *res = arrayitem(matchres, 1); + free(m); + freearray(matchres); + freearray(ok); + return res; } diff --git a/operators.c b/operators.c index 36dcd6a..5359a54 100644 --- a/operators.c +++ b/operators.c @@ -155,13 +155,13 @@ opSelfReference1(Datum *lefto, Array *left, Array *right) Array * opReceive(Datum *lefto, Array *left, Array *right) { + USED(left); if(lefto->tag != FunctionTag) throwerror(nil, ESyntax); if(GetType(right) != AtypeInt && GetType(right) != AtypeFloat) throwerror(nil, EDomain); if(GetSize(right) != 1 && GetSize(right) != 0) throwerror(nil, ELength); - USED(left); int timeout = 0; if(GetSize(right) == 0) @@ -170,6 +170,9 @@ opReceive(Datum *lefto, Array *left, Array *right) timeout = right->intdata[0]*1000; else if(GetType(right) == AtypeFloat) timeout = right->floatdata[0]*1000; + if(GetSize(right) == 1 && timeout < 0) + throwerror(L"Timeout must be non-negative", EDomain); + return messagerecv(lefto->func, timeout); } diff --git a/tests/chain.apl b/tests/chain.apl index aeeab52..b825bbb 100644 --- a/tests/chain.apl +++ b/tests/chain.apl @@ -1,5 +1,5 @@ worker←{ - msg←1⍨⍇0 + msg←{1 ⍵}⍇⍬ ⍵≡⍬: ⎕←'DONE' ⎕←'Worker id ',(⍕⎕self),' got message: ',(⍕msg) ⎕←'Forwarding from ',(⍕⎕self), ' to ',⍕⍵ -- cgit v1.2.3