#include #include #include #include #include "apl9.h" #define STACKSIZE (8*1024*1024) /* 8 MB */ typedef struct SpawnData SpawnData; struct SpawnData { Function func; Array *left; Array *right; Channel *setupdone; }; static void newprocfn(void *); static ThreadData *newthreaddata(void); /* global data */ static Lock threadlock; static int nthreads; static ThreadData **threads; int mainstacksize = STACKSIZE; void recvtimeout(void *) { ThreadData *td = getthreaddata(); if(0 == sleep(td->timeout)){ qlock(&td->lock); td->timedout = 1; rwakeup(&td->newmail); qunlock(&td->lock); } } void initthreads(void) { ThreadData *td = newthreaddata(); void **tdptr = procdata(); *tdptr = td; } int spawnthread(Function f, Array *left, Array *right) { /* lock the data structures */ /* Spawn a new proc */ /* Allocate a new mailbox */ /* Send a message back to spawnthread that the setup is ready */ /* In spawnthread: unlock datastructures, return thread id */ /* in new proc: run runfunc lock datastructures delete mailbox unlock datastructures exit proc */ Channel *setupdone = chancreate(sizeof(int), 0); SpawnData *sp = emalloc(sizeof(SpawnData)); sp->func = dupfunction(f); sp->left = left ? duparray(left) : nil; sp->right = duparray(right); sp->setupdone = setupdone; int id = proccreate(newprocfn, sp, STACKSIZE); recv(setupdone, nil); /* wait for new proc to signal that the setup is done */ chanfree(setupdone); return id; } ThreadData * getthreaddata(void) { void **tdptr = procdata(); return (ThreadData *)*tdptr; } void messagesend(Array *a, int id) { ThreadData *td = nil; lock(&threadlock); for(int i = 0; i < nthreads && td == nil; i++) if(threads[i]->id == id) td = threads[i]; unlock(&threadlock); if(td != nil){ qlock(&td->lock); Mail *newmail = emalloc(sizeof(Mail)); newmail->contents = fnSame(a); newmail->next = 0; if(td->lastmail != nil) td->lastmail->next = newmail; else td->mail = newmail; td->lastmail = newmail; rwakeup(&td->newmail); qunlock(&td->lock); }else{ throwerror(L"Invalid thread id", EDomain); } } Array * messagerecv(Function match, int timeout) { USED(match); ThreadData *td = getthreaddata(); qlock(&td->lock); int timeoutid = 0; if(timeout > 0){ td->timeout = timeout; timeoutid = threadcreate(recvtimeout, nil, STACKSIZE); } td->timedout = 0; /* clear the timeout bit */ while(td->mail == nil && !td->timedout){ if(timeout == 0) td->timedout = 1; else rsleep(&td->newmail); } if(timeout > 0) threadkill(timeoutid); if(td->timedout){ qunlock(&td->lock); throwerror(L"Message receive", ETimeout); return nil; /* not reached */ }else{ Mail *m = td->mail; td->mail = m->next; if(td->mail == nil) td->lastmail = nil; qunlock(&td->lock); Array *a = m->contents; free(m); return a; } } static void newprocfn(void *data) { SpawnData *sp = (SpawnData *)data; ThreadData *td = newthreaddata(); void **tdptr = procdata(); *tdptr = td; ErrorGuard *eg = newerrorguard(mkscalarint(0), nil); /* make a catch-all error guard */ if(setjmp(eg->jmp)){ print("Thread %d: %S%S%S\n", threadid(), errorstr(td->lasterror), (td->lasterror && td->lasterrormsg) ? L": " : L"", td->lasterrormsg ? td->lasterrormsg : L""); }else{ int done = 1; send(sp->setupdone, &done); runfunc(sp->func, sp->left, sp->right); } lock(&threadlock); /* TODO remove thread */ unlock(&threadlock); freearray(sp->left); freearray(sp->right); freefunction(sp->func); free(sp); free(td); } static ThreadData * newthreaddata(void) { ThreadData *td = emallocz(sizeof(ThreadData), 1); td->id = threadid(); td->currentdfn = nil; td->mail = 0; td->lastmail = 0; td->timeout = 0; td->timedout = 0; td->newmail.l = &td->lock; lock(&threadlock); nthreads++; threads = erealloc(threads, sizeof(ThreadData *) * nthreads); threads[nthreads-1] = td; unlock(&threadlock); return td; }