#include #include #include #include #include #include "apl9.h" typedef struct SpawnData SpawnData; struct SpawnData { Function func; Array *name; Array *left; Array *right; Array *ns; QLock lock; Rendez done; }; static void newprocfn(SpawnData *); static ThreadData *newthreaddata(Array *, Array *); extern void **_privates; /* global data */ static Lock threadlock; static int nthreads; static ThreadData **threads; int recvtimeout(void *, char *note) { if(strcmp(note, "alarm") != 0) return 0; ThreadData *td = getthreaddata(); if(rfork(RFPROC|RFMEM) == 0){ qlock(&td->lock); td->timedout = 1; rwakeup(&td->newmail); qunlock(&td->lock); exits(nil); } return 1; } void initthreads(void) { Array *name = mkrunearray(L"main"); _privates[0] = newthreaddata(name, nil); freearray(name); } int spawnthread(Function f, Array *name, Array *left, Array *right) { /* lock the data structures */ /* Spawn a new proc */ /* Allocate a new mailbox */ /* Send a message back to spawnthread that the setup is ready */ /* In spawnthread: unlock datastructures, return thread id */ /* in new proc: run runfunc lock datastructures delete mailbox unlock datastructures exit proc */ ThreadData *td = getthreaddata(); SpawnData *sp = emallocz(sizeof(SpawnData), 1); sp->func = dupfunction(f); sp->func.scope = dupscope(f.scope); sp->name = duparray(name); sp->left = left ? duparray(left) : nil; sp->right = duparray(right); sp->ns = fnSame(td->ns); sp->done.l = &sp->lock; qlock(&sp->lock); int id = rfork(RFPROC|RFMEM); if(id == 0){ /* in new process*/ newprocfn(sp); exits(nil); } rsleep(&sp->done); qunlock(&sp->lock); return id; } ThreadData * getthreaddata(void) { return (ThreadData *)_privates[0]; } void messagesend(Array *a, int id) { ThreadData *td = nil; ThreadData *selftd = getthreaddata(); lock(&threadlock); for(int i = 0; i < nthreads && td == nil; i++) if(threads[i]->id == id) td = threads[i]; unlock(&threadlock); if(td != nil){ qlock(&td->lock); Mail *newmail = emalloc(sizeof(Mail)); newmail->contents = allocarray(AtypeArray, 1, 2); newmail->contents->shape[0] = 2; newmail->contents->arraydata[0] = mkscalarint(selftd->id); newmail->contents->arraydata[1] = fnSame(a); newmail->next = 0; if(td->lastmail != nil) td->lastmail->next = newmail; else td->mail = newmail; td->lastmail = newmail; rwakeup(&td->newmail); qunlock(&td->lock); }else{ throwerror(L"Invalid thread id", EDomain); } } Array * messagerecv(Function match, int timeout) { ThreadData *td = getthreaddata(); qlock(&td->lock); /* Start a "timeout thread" if needed */ td->timedout = 0; /* clear the timeout bit */ if(timeout > 0){ atnotify(recvtimeout, 1); alarm(timeout); } /* Wait for a message, or timeout */ Mail *prev = nil; Mail *m; Retry: m = prev ? prev->next : td->mail; while((td->mail == nil || (prev != nil && prev->next == nil)) && !td->timedout){ if(timeout == 0) td->timedout = 1; else{ rsleep(&td->newmail); if(prev == nil) m = td->mail; else m = prev->next; } } if(td->timedout){ qunlock(&td->lock); throwerror(L"Message receive", ETimeout); return nil; /* not reached */ } /* Check if the new message is OK according to 'match' */ prev = m; Array *matchres = runfunc(match, nil, m->contents); if(GetRank(matchres) != 1 || matchres->size != 2){ freearray(matchres); goto Retry; } Array *ok = arrayitem(matchres, 0); if(GetType(ok) != AtypeInt || ok->intdata[0] != 1){ freearray(matchres); freearray(ok); goto Retry; } /* We found a match, remove the mail from the mailbox, and kill the timeout */ if(timeout > 0){ atnotify(recvtimeout, 0); alarm(0); } if(td->mail == m) td->mail = m->next; else{ Mail *tmp; for(tmp = td->mail; tmp->next != m; tmp = tmp->next); tmp->next = m->next; } if(td->mail == nil) td->lastmail = nil; qunlock(&td->lock); Array *res = arrayitem(matchres, 1); free(m); freearray(matchres); freearray(ok); return res; } static void newprocfn(SpawnData *sp) { qlock(&sp->lock); ThreadData *td = newthreaddata(sp->name, sp->ns); _privates[0] = td; rwakeup(&sp->done); qunlock(&sp->lock); ErrorGuard *eg = newerrorguard(mkscalarint(0), nil); /* make a catch-all error guard */ if(setjmp(eg->jmp)) displayerror(); else runfunc(sp->func, sp->left, sp->right); lock(&threadlock); for(int i = 0; i < nthreads; i++){ if(threads[i] != td) continue; for(int j = i+1; j < nthreads; j++) threads[j-1] = threads[j]; nthreads--; break; } unlock(&threadlock); freearray(sp->name); freearray(sp->left); freearray(sp->right); freesymtab(sp->func.scope); freefunction(sp->func); free(sp); free(td); exits(nil); } static ThreadData * newthreaddata(Array *name, Array *ns) { ThreadData *td = emallocz(sizeof(ThreadData), 1); td->id = getpid(); td->stackused = 0; td->currentdfn = nil; td->mail = 0; td->lastmail = 0; td->timeout = 0; td->timedout = 0; td->newmail.l = &td->lock; td->name = fnSame(name); td->ns = ns; lock(&threadlock); nthreads++; threads = erealloc(threads, sizeof(ThreadData *) * nthreads); threads[nthreads-1] = td; unlock(&threadlock); return td; } Array * runningthreads(void) { lock(&threadlock); Array *t = allocarray(AtypeInt, 1, nthreads); t->shape[0] = nthreads; for(int i = 0; i < nthreads; i++) t->intdata[i] = threads[i]->id; unlock(&threadlock); return t; } Array * threadproperty(vlong t, vlong p) { ThreadData *td = nil; Array *res = nil; int mailcount = 0; int framecount = 0; lock(&threadlock); for(int i = 0; i < nthreads && td == nil; i++) if(threads[i]->id == t) td = threads[i]; if(td == nil) res = mkscalarint(-1); else switch(p){ case 0: /* thread id */ res = mkscalarint(td->id); break; case 1: /* thread name */ res = fnSame(td->name); break; case 2: /* messages in mailbox */ qlock(&td->lock); for(Mail *tmp = td->mail; tmp != nil; tmp = tmp->next) mailcount++; qunlock(&td->lock); res = mkscalarint(mailcount); break; case 3: /* used C stacksize */ res = mkscalarint(td->stackused); break; case 4: /* DfnFrame count */ qlock(&td->lock); for(DfnFrame *f = td->currentdfn; f != nil; f = f->prev) framecount++; qunlock(&td->lock); res = mkscalarint(framecount); break; default: unlock(&threadlock); throwerror(L"Invalid thread property", EDomain); break; } unlock(&threadlock); return res; } void stackusage(void) { ThreadData *td = getthreaddata(); td->stackused = ((uchar*)_tos - (uchar *)&td); }