summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPeter Mikkelsen <petermikkelsen10@gmail.com>2022-05-16 07:05:17 +0000
committerPeter Mikkelsen <petermikkelsen10@gmail.com>2022-05-16 07:05:17 +0000
commit5c384cab27113e74c998811d85f65700c0827060 (patch)
tree7d04516e7d09d4ffdbeb373874a08c5bf23619ba
parentfd3f483368ec3c52e00260ae2941b6c0552473cf (diff)
Implement timeouts for receive
-rw-r--r--apl9.h5
-rw-r--r--concurrency.c65
-rw-r--r--error.c1
-rw-r--r--operators.c14
-rw-r--r--tests/demo.apl15
5 files changed, 78 insertions, 22 deletions
diff --git a/apl9.h b/apl9.h
index e56014b..bf36f52 100644
--- a/apl9.h
+++ b/apl9.h
@@ -52,6 +52,7 @@ typedef enum
ELength = 5,
EValue = 6,
EDomain = 11,
+ ETimeout = 12, /* not in dyalog */
ENotImplemented = 100, /* not in dyalog */
} errorCodes;
@@ -236,10 +237,12 @@ struct ThreadData
Mail *mail;
Mail *lastmail;
int lasterror;
+ int timeout; /* number of milli seconds to timeout */
+ int timedout; /* true if the recv timed out */
Rune *lasterrormsg;
ErrorGuard *globalerrorguard;
QLock lock;
- Rendez empty;
+ Rendez newmail;
};
struct Mail
diff --git a/concurrency.c b/concurrency.c
index 1d06a9c..78ac18b 100644
--- a/concurrency.c
+++ b/concurrency.c
@@ -25,10 +25,22 @@ static ThreadData **threads;
int mainstacksize = STACKSIZE;
void
+recvtimeout(void *raw)
+{
+ ThreadData *td = (ThreadData *)raw;
+ if(0 == sleep(td->timeout)){
+ qlock(&td->lock);
+ td->timedout = 1;
+ rwakeup(&td->newmail);
+ qunlock(&td->lock);
+ }
+}
+
+void
initthreads(void)
{
ThreadData *td = newthreaddata();
- void **tdptr = threaddata();
+ void **tdptr = procdata();
*tdptr = td;
}
@@ -62,7 +74,7 @@ spawnthread(Function f, Array *left, Array *right)
ThreadData *
getthreaddata(void)
{
- void **tdptr = threaddata();
+ void **tdptr = procdata();
return (ThreadData *)*tdptr;
}
@@ -85,7 +97,7 @@ messagesend(Array *a, int id)
else
td->mail = newmail;
td->lastmail = newmail;
- rwakeup(&td->empty);
+ rwakeup(&td->newmail);
qunlock(&td->lock);
}else{
throwerror(L"Invalid thread id", EDomain);
@@ -95,20 +107,39 @@ messagesend(Array *a, int id)
Array *
messagerecv(Function match, int timeout)
{
- USED(timeout);
USED(match);
ThreadData *td = getthreaddata();
qlock(&td->lock);
- while(td->mail == nil)
- rsleep(&td->empty);
- Mail *m = td->mail;
- td->mail = m->next;
- if(td->mail == nil)
- td->lastmail = nil;
- qunlock(&td->lock);
- Array *a = m->contents;
- free(m);
- return a;
+ int timeoutid = 0;
+ if(timeout > 0){
+ td->timeout = timeout;
+ qunlock(&td->lock);
+ timeoutid = proccreate(recvtimeout, td, STACKSIZE);
+ qlock(&td->lock);
+ }
+ td->timedout = 0; /* clear the timeout bit */
+ while(td->mail == nil && !td->timedout){
+ if(timeout == 0)
+ td->timedout = 1;
+ else
+ rsleep(&td->newmail);
+ }
+ if(timeout > 0)
+ threadkill(timeoutid);
+ if(td->timedout){
+ qunlock(&td->lock);
+ throwerror(L"Message receive", ETimeout);
+ return nil; /* not reached */
+ }else{
+ Mail *m = td->mail;
+ td->mail = m->next;
+ if(td->mail == nil)
+ td->lastmail = nil;
+ qunlock(&td->lock);
+ Array *a = m->contents;
+ free(m);
+ return a;
+ }
}
@@ -117,7 +148,7 @@ newprocfn(void *data)
{
SpawnData *sp = (SpawnData *)data;
ThreadData *td = newthreaddata();
- void **tdptr = threaddata();
+ void **tdptr = procdata();
*tdptr = td;
ErrorGuard *eg = newerrorguard(mkscalarint(0), nil); /* make a catch-all error guard */
if(setjmp(eg->jmp)){
@@ -149,7 +180,9 @@ newthreaddata(void)
td->currentdfn = nil;
td->mail = 0;
td->lastmail = 0;
- td->empty.l = &td->lock;
+ td->timeout = 0;
+ td->timedout = 0;
+ td->newmail.l = &td->lock;
lock(&threadlock);
nthreads++;
diff --git a/error.c b/error.c
index e1a0120..7ec8b68 100644
--- a/error.c
+++ b/error.c
@@ -66,6 +66,7 @@ errorstr(int code)
case ELength: err = L"LENGTH ERROR"; break;
case EValue: err = L"VALUE ERROR"; break;
case EDomain: err = L"DOMAIN ERROR"; break;
+ case ETimeout: err = L"TIMEOUT ERROR"; break;
case ENotImplemented: err = L"NOT IMPLEMENTED"; break;
default: err = L""; break;
}
diff --git a/operators.c b/operators.c
index 7e2520c..36dcd6a 100644
--- a/operators.c
+++ b/operators.c
@@ -157,12 +157,20 @@ opReceive(Datum *lefto, Array *left, Array *right)
{
if(lefto->tag != FunctionTag)
throwerror(nil, ESyntax);
- if(GetType(right) != AtypeInt)
+ if(GetType(right) != AtypeInt && GetType(right) != AtypeFloat)
throwerror(nil, EDomain);
- if(GetSize(right) != 1)
+ if(GetSize(right) != 1 && GetSize(right) != 0)
throwerror(nil, ELength);
USED(left);
- return messagerecv(lefto->func, right->intdata[0]);
+
+ int timeout = 0;
+ if(GetSize(right) == 0)
+ timeout = -1;
+ else if(GetType(right) == AtypeInt)
+ timeout = right->intdata[0]*1000;
+ else if(GetType(right) == AtypeFloat)
+ timeout = right->floatdata[0]*1000;
+ return messagerecv(lefto->func, timeout);
}
/* Dyadic operators */
diff --git a/tests/demo.apl b/tests/demo.apl
index 7a6ddca..99ef783 100644
--- a/tests/demo.apl
+++ b/tests/demo.apl
@@ -8,6 +8,7 @@ iotaServer←{
msg←1⍨⍇0
msg≡'stop': ⎕←'Bye bye from indexer'
(from num)←msg
+ num<0: ∇⍵⊣ ('some slow result'⊣⎕DL 2)⍈from
_←(⍳num)⍈from
∇⍵
}
@@ -23,9 +24,19 @@ stop←{
'stop'⍈id
}
+flush←{
+ ⍺←0
+ 12::⍺
+ msg←1⍨⍇0
+ (⍺+1)∇⍵
+}
+
iota←{
- ⍝ Sending messages to a nonexisting thread throws a domain error (11)
+ ⍺←⍬ ⍝ No timeout by default
11::'Sorry, the iota server is not running'
+ 12::'Sorry, the iota server was too slow to respond'
+ ⍝ Sending messages to a nonexisting thread throws a domain error (11)
+ ⍝ And if a timeout happens, an error 12 will happen.
_←(⎕self ⍵)⍈id
- 1⍨⍇0
+ 1⍨⍇⍺
}