#include #include #include #include #include "apl9.h" #define STACKSIZE (8*1024*1024) /* 8 MB */ typedef struct SpawnData SpawnData; struct SpawnData { Function func; Array *left; Array *right; Channel *setupdone; }; static void newprocfn(void *); static ThreadData *newthreaddata(void); /* global data */ static Lock threadlock; static int nthreads; static ThreadData **threads; int mainstacksize = STACKSIZE; void initthreads(void) { ThreadData *td = newthreaddata(); void **tdptr = threaddata(); *tdptr = td; } int spawnthread(Function f, Array *left, Array *right) { /* lock the data structures */ /* Spawn a new proc */ /* Allocate a new mailbox */ /* Send a message back to spawnthread that the setup is ready */ /* In spawnthread: unlock datastructures, return thread id */ /* in new proc: run runfunc lock datastructures delete mailbox unlock datastructures exit proc */ Channel *setupdone = chancreate(sizeof(int), 0); SpawnData *sp = emalloc(sizeof(SpawnData)); sp->func = f; sp->left = left ? duparray(left) : nil; sp->right = duparray(right); sp->setupdone = setupdone; int id = proccreate(newprocfn, sp, STACKSIZE); recv(setupdone, nil); /* wait for new proc to signal that the setup is done */ chanfree(setupdone); return id; } ThreadData * getthreaddata(void) { void **tdptr = threaddata(); return (ThreadData *)*tdptr; } void messagesend(Array *a, int id) { ThreadData *td = nil; lock(&threadlock); for(int i = 0; i < nthreads && td == nil; i++) if(threads[i]->id == id) td = threads[i]; unlock(&threadlock); if(td != nil){ qlock(&td->lock); Mail *newmail = emalloc(sizeof(Mail)); newmail->contents = fnSame(a); newmail->next = 0; if(td->lastmail != nil) td->lastmail->next = newmail; else td->mail = newmail; td->lastmail = newmail; rwakeup(&td->empty); qunlock(&td->lock); } } Array * messagerecv(Function match, int timeout) { USED(timeout); USED(match); ThreadData *td = getthreaddata(); qlock(&td->lock); while(td->mail == nil) rsleep(&td->empty); Mail *m = td->mail; td->mail = m->next; if(td->mail == nil) td->lastmail = nil; qunlock(&td->lock); Array *a = m->contents; free(m); return a; } static void newprocfn(void *data) { SpawnData *sp = (SpawnData *)data; ThreadData *td = newthreaddata(); void **tdptr = threaddata(); *tdptr = td; ErrorGuard *eg = newerrorguard(mkscalarint(0), nil); /* make a catch-all error guard */ if(setjmp(eg->jmp)){ print("Thread %d: %S%S%S\n", threadid(), errorstr(td->lasterror), (td->lasterror && td->lasterrormsg) ? L": " : L"", td->lasterrormsg ? td->lasterrormsg : L""); }else{ int done = 1; send(sp->setupdone, &done); runfunc(sp->func, sp->left, sp->right); } lock(&threadlock); /* TODO remove thread */ unlock(&threadlock); freearray(sp->left); freearray(sp->right); free(sp); free(td); } static ThreadData * newthreaddata(void) { ThreadData *td = emallocz(sizeof(ThreadData), 1); td->id = threadid(); td->currentdfn = nil; td->mail = 0; td->lastmail = 0; td->empty.l = &td->lock; lock(&threadlock); nthreads++; threads = erealloc(threads, sizeof(ThreadData *) * nthreads); threads[nthreads-1] = td; unlock(&threadlock); return td; }