#include #include #include #include #include "apl9.h" #define STACKSIZE (8*1024*1024) /* 8 MB */ typedef struct SpawnData SpawnData; struct SpawnData { Function func; Array *left; Array *right; Channel *setupdone; }; static void newprocfn(void *); static ThreadData *newthreaddata(void); /* global data */ static Lock threadlock; static int nthreads; static ThreadData **threads; int mainstacksize = STACKSIZE; void recvtimeout(void *raw) { ThreadData *td = (ThreadData *)raw; if(0 == sleep(td->timeout)){ qlock(&td->lock); td->timedout = 1; rwakeup(&td->newmail); qunlock(&td->lock); } } void initthreads(void) { ThreadData *td = newthreaddata(); void **tdptr = procdata(); *tdptr = td; } int spawnthread(Function f, Array *left, Array *right) { /* lock the data structures */ /* Spawn a new proc */ /* Allocate a new mailbox */ /* Send a message back to spawnthread that the setup is ready */ /* In spawnthread: unlock datastructures, return thread id */ /* in new proc: run runfunc lock datastructures delete mailbox unlock datastructures exit proc */ Channel *setupdone = chancreate(sizeof(int), 0); SpawnData *sp = emalloc(sizeof(SpawnData)); sp->func = dupfunction(f); sp->left = left ? duparray(left) : nil; sp->right = duparray(right); sp->setupdone = setupdone; int id = proccreate(newprocfn, sp, STACKSIZE); recv(setupdone, nil); /* wait for new proc to signal that the setup is done */ chanfree(setupdone); return id; } ThreadData * getthreaddata(void) { void **tdptr = procdata(); return (ThreadData *)*tdptr; } void messagesend(Array *a, int id) { ThreadData *td = nil; lock(&threadlock); for(int i = 0; i < nthreads && td == nil; i++) if(threads[i]->id == id) td = threads[i]; unlock(&threadlock); if(td != nil){ qlock(&td->lock); Mail *newmail = emalloc(sizeof(Mail)); newmail->contents = fnSame(a); newmail->next = 0; if(td->lastmail != nil) td->lastmail->next = newmail; else td->mail = newmail; td->lastmail = newmail; rwakeup(&td->newmail); qunlock(&td->lock); }else{ throwerror(L"Invalid thread id", EDomain); } } Array * messagerecv(Function match, int timeout) { ThreadData *td = getthreaddata(); qlock(&td->lock); /* Start a "timeout thread" if needed */ int timeoutid = 0; td->timedout = 0; /* clear the timeout bit */ if(timeout > 0){ td->timeout = timeout; timeoutid = proccreate(recvtimeout, td, STACKSIZE); } /* Wait for a message, or timeout */ Mail *prev = nil; Mail *m; Retry: m = prev ? prev->next : td->mail; while((td->mail == nil || (prev != nil && prev->next == nil)) && !td->timedout){ if(timeout == 0) td->timedout = 1; else{ rsleep(&td->newmail); if(prev == nil) m = td->mail; else m = prev->next; } } if(td->timedout){ qunlock(&td->lock); throwerror(L"Message receive", ETimeout); return nil; /* not reached */ } /* Check if the new message is OK according to 'match' */ prev = m; Array *matchres = runfunc(match, nil, m->contents); if(GetRank(matchres) != 1 || GetSize(matchres) != 2){ freearray(matchres); goto Retry; } Array *ok = arrayitem(matchres, 0); if(GetType(ok) != AtypeInt || ok->intdata[0] != 1){ freearray(matchres); freearray(ok); goto Retry; } /* We found a match, remove the mail from the mailbox, and kill the timeout */ if(timeout > 0) threadkill(timeoutid); if(td->mail == m) td->mail = m->next; else{ Mail *tmp; for(tmp = td->mail; tmp->next != m; tmp = tmp->next); tmp->next = m->next; } if(td->mail == nil) td->lastmail = nil; qunlock(&td->lock); Array *res = arrayitem(matchres, 1); free(m); freearray(matchres); freearray(ok); return res; } static void newprocfn(void *data) { SpawnData *sp = (SpawnData *)data; ThreadData *td = newthreaddata(); void **tdptr = procdata(); *tdptr = td; ErrorGuard *eg = newerrorguard(mkscalarint(0), nil); /* make a catch-all error guard */ if(setjmp(eg->jmp)){ print("Thread %d: %S%S%S\n", threadid(), errorstr(td->lasterror), (td->lasterror && td->lasterrormsg) ? L": " : L"", td->lasterrormsg ? td->lasterrormsg : L""); }else{ int done = 1; send(sp->setupdone, &done); runfunc(sp->func, sp->left, sp->right); } lock(&threadlock); for(int i = 0; i < nthreads; i++){ if(threads[i] != td) continue; for(int j = i+1; j < nthreads; j++) threads[j-1] = threads[j]; nthreads--; break; } unlock(&threadlock); freearray(sp->left); freearray(sp->right); freefunction(sp->func); free(sp); free(td); } static ThreadData * newthreaddata(void) { ThreadData *td = emallocz(sizeof(ThreadData), 1); td->id = threadid(); td->currentdfn = nil; td->mail = 0; td->lastmail = 0; td->timeout = 0; td->timedout = 0; td->newmail.l = &td->lock; lock(&threadlock); nthreads++; threads = erealloc(threads, sizeof(ThreadData *) * nthreads); threads[nthreads-1] = td; unlock(&threadlock); return td; } Array * runningtasks(void) { lock(&threadlock); Array *tasks = allocarray(AtypeInt, 1, nthreads); tasks->shape[0] = nthreads; for(int i = 0; i < nthreads; i++) tasks->intdata[i] = threads[i]->id; unlock(&threadlock); return tasks; } Array * taskproperty(vlong t, vlong p) { ThreadData *td = nil; Array *res = nil; lock(&threadlock); for(int i = 0; i < nthreads && td == nil; i++) if(threads[i]->id == t) td = threads[i]; if(td == nil) res = mkscalarint(-1); else switch(p){ case 0: res = mkscalarint(t); /* thread id */ break; default: unlock(&threadlock); throwerror(L"Invalid task property", EDomain); break; } unlock(&threadlock); return res; }