diff options
Diffstat (limited to 'concurrency.c')
-rw-r--r-- | concurrency.c | 148 |
1 files changed, 148 insertions, 0 deletions
diff --git a/concurrency.c b/concurrency.c new file mode 100644 index 0000000..4af563c --- /dev/null +++ b/concurrency.c @@ -0,0 +1,148 @@ +#include <u.h> +#include <libc.h> +#include <thread.h> +#include <bio.h> +#include "apl9.h" + +#define STACKSIZE (8*1024*1024) /* 8 MB */ + +typedef struct SpawnData SpawnData; +struct SpawnData +{ + Function func; + Array *left; + Array *right; + Channel *setupdone; +}; + +static void newprocfn(void *); +static ThreadData *newthreaddata(void); + +/* global data */ +static Lock threadlock; +static int nthreads; +static ThreadData **threads; +int mainstacksize = STACKSIZE; + +void +initthreads(void) +{ + ThreadData *td = newthreaddata(); + void **tdptr = threaddata(); + *tdptr = td; +} + +int +spawnthread(Function f, Array *left, Array *right) +{ + /* lock the data structures */ + /* Spawn a new proc */ + /* Allocate a new mailbox */ + /* Send a message back to spawnthread that the setup is ready */ + /* In spawnthread: unlock datastructures, return thread id */ + /* in new proc: + run runfunc + lock datastructures + delete mailbox + unlock datastructures + exit proc + */ + Channel *setupdone = chancreate(sizeof(int), 0); + SpawnData *sp = malloc(sizeof(SpawnData)); + sp->func = f; + sp->left = left ? duparray(left) : nil; + sp->right = duparray(right); + sp->setupdone = setupdone; + int id = proccreate(newprocfn, sp, STACKSIZE); + recv(setupdone, nil); /* wait for new proc to signal that the setup is done */ + chanfree(setupdone); + return id; +} + +ThreadData * +getthreaddata(void) +{ + void **tdptr = threaddata(); + return (ThreadData *)*tdptr; +} + +void +messagesend(Array *a, int id) +{ + ThreadData *td = nil; + lock(&threadlock); + for(int i = 0; i < nthreads && td == nil; i++) + if(threads[i]->id == id) + td = threads[i]; + unlock(&threadlock); + if(td != nil){ + qlock(&td->lock); + Mail *newmail = malloc(sizeof(Mail)); + newmail->contents = fnSame(a); + newmail->next = 0; + if(td->lastmail != nil) + td->lastmail->next = newmail; + else + td->mail = newmail; + td->lastmail = newmail; + rwakeup(&td->empty); + qunlock(&td->lock); + } +} + +Array * +messagerecv(Function match, int timeout) +{ + USED(timeout); + USED(match); + ThreadData *td = getthreaddata(); + qlock(&td->lock); + while(td->mail == nil) + rsleep(&td->empty); + Mail *m = td->mail; + td->mail = m->next; + if(td->mail == nil) + td->lastmail = nil; + qunlock(&td->lock); + Array *a = m->contents; + free(m); + return a; +} + + +static void +newprocfn(void *data) +{ + SpawnData *sp = (SpawnData *)data; + ThreadData *td = newthreaddata(); + void **tdptr = threaddata(); + *tdptr = td; + int done = 1; + send(sp->setupdone, &done); + runfunc(sp->func, sp->left, sp->right); + lock(&threadlock); + /* TODO remove thread */ + unlock(&threadlock); + freearray(sp->left); + freearray(sp->right); + free(sp); + free(td); +} + +static ThreadData * +newthreaddata(void) +{ + ThreadData *td = mallocz(sizeof(ThreadData), 1); + td->id = threadid(); + td->currentdfn = nil; + td->mail = 0; + td->lastmail = 0; + td->empty.l = &td->lock; + + lock(&threadlock); + nthreads++; + threads = realloc(threads, sizeof(ThreadData *) * nthreads); + threads[nthreads-1] = td; + unlock(&threadlock); + return td; +} |