summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--concurrency.c68
-rw-r--r--operators.c5
-rw-r--r--tests/chain.apl2
3 files changed, 56 insertions, 19 deletions
diff --git a/concurrency.c b/concurrency.c
index 139883c..be1d62c 100644
--- a/concurrency.c
+++ b/concurrency.c
@@ -25,9 +25,9 @@ static ThreadData **threads;
int mainstacksize = STACKSIZE;
void
-recvtimeout(void *)
+recvtimeout(void *raw)
{
- ThreadData *td = getthreaddata();
+ ThreadData *td = (ThreadData *)raw;
if(0 == sleep(td->timeout)){
qlock(&td->lock);
td->timedout = 1;
@@ -107,37 +107,71 @@ messagesend(Array *a, int id)
Array *
messagerecv(Function match, int timeout)
{
- USED(match);
ThreadData *td = getthreaddata();
qlock(&td->lock);
+
+ /* Start a "timeout thread" if needed */
int timeoutid = 0;
+ td->timedout = 0; /* clear the timeout bit */
if(timeout > 0){
td->timeout = timeout;
- timeoutid = threadcreate(recvtimeout, nil, STACKSIZE);
+ timeoutid = proccreate(recvtimeout, td, STACKSIZE);
}
- td->timedout = 0; /* clear the timeout bit */
- while(td->mail == nil && !td->timedout){
+
+ /* Wait for a message, or timeout */
+ Mail *prev = nil;
+ Mail *m;
+Retry:
+ m = prev ? prev->next : td->mail;
+ while((td->mail == nil || (prev != nil && prev->next == nil)) && !td->timedout){
if(timeout == 0)
td->timedout = 1;
- else
+ else{
rsleep(&td->newmail);
+ if(prev == nil)
+ m = td->mail;
+ else
+ m = prev->next;
+ }
}
- if(timeout > 0)
- threadkill(timeoutid);
if(td->timedout){
qunlock(&td->lock);
throwerror(L"Message receive", ETimeout);
return nil; /* not reached */
- }else{
- Mail *m = td->mail;
+ }
+
+ /* Check if the new message is OK according to 'match' */
+ prev = m;
+ Array *matchres = runfunc(match, nil, m->contents);
+ if(GetRank(matchres) != 1 || GetSize(matchres) != 2){
+ freearray(matchres);
+ goto Retry;
+ }
+ Array *ok = arrayitem(matchres, 0);
+ if(GetType(ok) != AtypeInt || ok->intdata[0] != 1){
+ freearray(matchres);
+ freearray(ok);
+ goto Retry;
+ }
+
+ /* We found a match, remove the mail from the mailbox, and kill the timeout */
+ if(timeout > 0)
+ threadkill(timeoutid);
+ if(td->mail == m)
td->mail = m->next;
- if(td->mail == nil)
- td->lastmail = nil;
- qunlock(&td->lock);
- Array *a = m->contents;
- free(m);
- return a;
+ else{
+ Mail *tmp;
+ for(tmp = td->mail; tmp->next != m; tmp = tmp->next);
+ tmp->next = m->next;
}
+ if(td->mail == nil)
+ td->lastmail = nil;
+ qunlock(&td->lock);
+ Array *res = arrayitem(matchres, 1);
+ free(m);
+ freearray(matchres);
+ freearray(ok);
+ return res;
}
diff --git a/operators.c b/operators.c
index 36dcd6a..5359a54 100644
--- a/operators.c
+++ b/operators.c
@@ -155,13 +155,13 @@ opSelfReference1(Datum *lefto, Array *left, Array *right)
Array *
opReceive(Datum *lefto, Array *left, Array *right)
{
+ USED(left);
if(lefto->tag != FunctionTag)
throwerror(nil, ESyntax);
if(GetType(right) != AtypeInt && GetType(right) != AtypeFloat)
throwerror(nil, EDomain);
if(GetSize(right) != 1 && GetSize(right) != 0)
throwerror(nil, ELength);
- USED(left);
int timeout = 0;
if(GetSize(right) == 0)
@@ -170,6 +170,9 @@ opReceive(Datum *lefto, Array *left, Array *right)
timeout = right->intdata[0]*1000;
else if(GetType(right) == AtypeFloat)
timeout = right->floatdata[0]*1000;
+ if(GetSize(right) == 1 && timeout < 0)
+ throwerror(L"Timeout must be non-negative", EDomain);
+
return messagerecv(lefto->func, timeout);
}
diff --git a/tests/chain.apl b/tests/chain.apl
index aeeab52..b825bbb 100644
--- a/tests/chain.apl
+++ b/tests/chain.apl
@@ -1,5 +1,5 @@
worker←{
- msg←1⍨⍇0
+ msg←{1 ⍵}⍇⍬
⍵≡⍬: ⎕←'DONE'
⎕←'Worker id ',(⍕⎕self),' got message: ',(⍕msg)
⎕←'Forwarding from ',(⍕⎕self), ' to ',⍕⍵