more new libthread

This commit is contained in:
rsc 2004-12-25 21:57:50 +00:00
parent 1544f90960
commit 619085f0b4
7 changed files with 800 additions and 0 deletions

412
src/libthread/channel.c Normal file
View file

@ -0,0 +1,412 @@
#include "u.h"
#include "libc.h"
#include "thread.h"
#include "threadimpl.h"
/*
* One can go through a lot of effort to avoid this global lock.
* You have to put locks in all the channels and all the Alt
* structures. At the beginning of an alt you have to lock all
* the channels, but then to try to actually exec an op you
* have to lock the other guy's alt structure, so that other
* people aren't trying to use him in some other op at the
* same time.
*
* For Plan 9 apps, it's just not worth the extra effort.
*/
static QLock chanlock;
Channel*
chancreate(int elemsize, int bufsize)
{
Channel *c;
c = malloc(sizeof *c+bufsize*elemsize);
memset(c, 0, sizeof *c);
c->elemsize = elemsize;
c->bufsize = bufsize;
c->nbuf = 0;
c->buf = (uchar*)(c+1);
return c;
}
void
chansetname(Channel *c, char *fmt, ...)
{
char *name;
va_list arg;
va_start(arg, fmt);
name = vsmprint(fmt, arg);
va_end(arg);
free(c->name);
c->name = name;
}
/* bug - work out races */
void
chanfree(Channel *c)
{
if(c == nil)
return;
free(c->name);
free(c->arecv.a);
free(c->asend.a);
free(c);
}
static void
addarray(_Altarray *a, Alt *alt)
{
if(a->n == a->m){
a->m += 16;
a->a = realloc(a->a, a->m*sizeof a->a[0]);
}
a->a[a->n++] = alt;
}
static void
delarray(_Altarray *a, int i)
{
--a->n;
a->a[i] = a->a[a->n];
}
/*
* doesn't really work for things other than CHANSND and CHANRCV
* but is only used as arg to chanarray, which can handle it
*/
#define otherop(op) (CHANSND+CHANRCV-(op))
static _Altarray*
chanarray(Channel *c, uint op)
{
switch(op){
default:
return nil;
case CHANSND:
return &c->asend;
case CHANRCV:
return &c->arecv;
}
}
static int
altcanexec(Alt *a)
{
_Altarray *ar;
Channel *c;
if(a->op == CHANNOP)
return 0;
c = a->c;
if(c->bufsize == 0){
ar = chanarray(c, otherop(a->op));
return ar && ar->n;
}else{
switch(a->op){
default:
return 0;
case CHANSND:
return c->nbuf < c->bufsize;
case CHANRCV:
return c->nbuf > 0;
}
}
}
static void
altqueue(Alt *a)
{
_Altarray *ar;
ar = chanarray(a->c, a->op);
addarray(ar, a);
}
static void
altdequeue(Alt *a)
{
int i;
_Altarray *ar;
ar = chanarray(a->c, a->op);
if(ar == nil){
fprint(2, "bad use of altdequeue op=%d\n", a->op);
abort();
}
for(i=0; i<ar->n; i++)
if(ar->a[i] == a){
delarray(ar, i);
return;
}
fprint(2, "cannot find self in altdq\n");
abort();
}
static void
altalldequeue(Alt *a)
{
int i;
for(i=0; a[i].op!=CHANEND && a[i].op!=CHANNOBLK; i++)
if(a[i].op != CHANNOP)
altdequeue(&a[i]);
}
static void
amove(void *dst, void *src, uint n)
{
if(dst){
if(src == nil)
memset(dst, 0, n);
else
memmove(dst, src, n);
}
}
/*
* Actually move the data around. There are up to three
* players: the sender, the receiver, and the channel itself.
* If the channel is unbuffered or the buffer is empty,
* data goes from sender to receiver. If the channel is full,
* the receiver removes some from the channel and the sender
* gets to put some in.
*/
static void
altcopy(Alt *s, Alt *r)
{
Alt *t;
Channel *c;
uchar *cp;
/*
* Work out who is sender and who is receiver
*/
if(s == nil && r == nil)
return;
assert(s != nil);
c = s->c;
if(s->op == CHANRCV){
t = s;
s = r;
r = t;
}
assert(s==nil || s->op == CHANSND);
assert(r==nil || r->op == CHANRCV);
/*
* Channel is empty (or unbuffered) - copy directly.
*/
if(s && r && c->nbuf == 0){
amove(r->v, s->v, c->elemsize);
return;
}
/*
* Otherwise it's always okay to receive and then send.
*/
if(r){
cp = c->buf + c->off*c->elemsize;
amove(r->v, cp, c->elemsize);
--c->nbuf;
if(++c->off == c->bufsize)
c->off = 0;
}
if(s){
cp = c->buf + (c->off+c->nbuf)%c->bufsize*c->elemsize;
amove(cp, s->v, c->elemsize);
++c->nbuf;
}
}
static void
altexec(Alt *a)
{
int i;
_Altarray *ar;
Alt *other;
Channel *c;
c = a->c;
ar = chanarray(c, otherop(a->op));
if(ar && ar->n){
i = rand()%ar->n;
other = ar->a[i];
altcopy(a, other);
altalldequeue(other->xalt);
other->xalt[0].xalt = other;
_threadready(other->thread);
}else
altcopy(a, nil);
}
#define dbgalt 0
int
chanalt(Alt *a)
{
int i, j, ncan, n, canblock;
Channel *c;
_Thread *t;
for(i=0; a[i].op != CHANEND && a[i].op != CHANNOBLK; i++)
;
n = i;
canblock = a[i].op == CHANEND;
t = proc()->thread;
for(i=0; i<n; i++){
a[i].thread = t;
a[i].xalt = a;
}
qlock(&chanlock);
if(dbgalt) print("alt ");
ncan = 0;
for(i=0; i<n; i++){
c = a[i].c;
if(dbgalt) print(" %c:", "esrnb"[a[i].op]);
if(dbgalt) if(c->name) print("%s", c->name); else print("%p", c);
if(altcanexec(&a[i])){
if(dbgalt) print("*");
ncan++;
}
}
if(ncan){
j = rand()%ncan;
for(i=0; i<n; i++){
if(altcanexec(&a[i])){
if(j-- == 0){
if(dbgalt){
c = a[i].c;
print(" => %c:", "esrnb"[a[i].op]);
if(c->name) print("%s", c->name); else print("%p", c);
print("\n");
}
altexec(&a[i]);
qunlock(&chanlock);
return i;
}
}
}
}
if(dbgalt)print("\n");
if(!canblock){
qunlock(&chanlock);
return -1;
}
for(i=0; i<n; i++){
if(a[i].op != CHANNOP)
altqueue(&a[i]);
}
qunlock(&chanlock);
_threadswitch();
/*
* the guy who ran the op took care of dequeueing us
* and then set a[0].alt to the one that was executed.
*/
return a[0].xalt - a;
}
static int
_chanop(Channel *c, int op, void *p, int canblock)
{
Alt a[2];
a[0].c = c;
a[0].op = op;
a[0].v = p;
a[1].op = canblock ? CHANEND : CHANNOBLK;
if(chanalt(a) < 0)
return -1;
return 1;
}
int
chansend(Channel *c, void *v)
{
return _chanop(c, CHANSND, v, 1);
}
int
channbsend(Channel *c, void *v)
{
return _chanop(c, CHANSND, v, 0);
}
int
chanrecv(Channel *c, void *v)
{
return _chanop(c, CHANRCV, v, 1);
}
int
channbrecv(Channel *c, void *v)
{
return _chanop(c, CHANRCV, v, 0);
}
int
chansendp(Channel *c, void *v)
{
return _chanop(c, CHANSND, (void*)&v, 1);
}
void*
chanrecvp(Channel *c)
{
void *v;
_chanop(c, CHANRCV, (void*)&v, 1);
return v;
}
int
channbsendp(Channel *c, void *v)
{
return _chanop(c, CHANSND, (void*)&v, 0);
}
void*
channbrecvp(Channel *c)
{
void *v;
_chanop(c, CHANRCV, (void*)&v, 0);
return v;
}
int
chansendul(Channel *c, ulong val)
{
return _chanop(c, CHANSND, &val, 1);
}
ulong
chanrecvul(Channel *c)
{
ulong val;
_chanop(c, CHANRCV, &val, 1);
return val;
}
int
channbsendul(Channel *c, ulong val)
{
return _chanop(c, CHANSND, &val, 0);
}
ulong
channbrecvul(Channel *c)
{
ulong val;
_chanop(c, CHANRCV, &val, 0);
return val;
}

130
src/libthread/ioproc.c Normal file
View file

@ -0,0 +1,130 @@
#include <u.h>
#include <libc.h>
#include <thread.h>
#include "ioproc.h"
enum
{
STACK = 32768,
};
void
iointerrupt(Ioproc *io)
{
if(!io->inuse)
return;
fprint(2, "bug: cannot iointerrupt yet\n");
}
static void
xioproc(void *a)
{
Ioproc *io, *x;
io = a;
/*
* first recvp acquires the ioproc.
* second tells us that the data is ready.
*/
for(;;){
while(recv(io->c, &x) == -1)
;
if(x == 0) /* our cue to leave */
break;
assert(x == io);
/* caller is now committed -- even if interrupted he'll return */
while(recv(io->creply, &x) == -1)
;
if(x == 0) /* caller backed out */
continue;
assert(x == io);
io->ret = io->op(&io->arg);
if(io->ret < 0)
rerrstr(io->err, sizeof io->err);
while(send(io->creply, &io) == -1)
;
while(recv(io->creply, &x) == -1)
;
}
}
Ioproc*
ioproc(void)
{
Ioproc *io;
io = mallocz(sizeof(*io), 1);
if(io == nil)
sysfatal("ioproc malloc: %r");
io->c = chancreate(sizeof(void*), 0);
chansetname(io->c, "ioc%p", io->c);
io->creply = chancreate(sizeof(void*), 0);
chansetname(io->creply, "ior%p", io->c);
io->tid = proccreate(xioproc, io, STACK);
return io;
}
void
closeioproc(Ioproc *io)
{
if(io == nil)
return;
iointerrupt(io);
while(send(io->c, 0) == -1)
;
chanfree(io->c);
chanfree(io->creply);
free(io);
}
long
iocall(Ioproc *io, long (*op)(va_list*), ...)
{
char e[ERRMAX];
int ret, inted;
Ioproc *msg;
if(send(io->c, &io) == -1){
werrstr("interrupted");
return -1;
}
assert(!io->inuse);
io->inuse = 1;
io->op = op;
va_start(io->arg, op);
msg = io;
inted = 0;
while(send(io->creply, &msg) == -1){
msg = nil;
inted = 1;
}
if(inted){
werrstr("interrupted");
return -1;
}
/*
* If we get interrupted, we have stick around so that
* the IO proc has someone to talk to. Send it an interrupt
* and try again.
*/
inted = 0;
while(recv(io->creply, nil) == -1){
inted = 1;
iointerrupt(io);
}
USED(inted);
va_end(io->arg);
ret = io->ret;
if(ret < 0)
strecpy(e, e+sizeof e, io->err);
io->inuse = 0;
/* release resources */
while(send(io->creply, &io) == -1)
;
if(ret < 0)
errstr(e, sizeof e);
return ret;
}

14
src/libthread/ioproc.h Normal file
View file

@ -0,0 +1,14 @@
#define ioproc_arg(io, type) (va_arg((io)->arg, type))
struct Ioproc
{
int tid;
Channel *c, *creply;
int inuse;
long (*op)(va_list*);
va_list arg;
long ret;
char err[ERRMAX];
Ioproc *next;
};

39
src/libthread/mkfile Normal file
View file

@ -0,0 +1,39 @@
<$PLAN9/src/mkhdr
LIB=libthread.a
OFILES=\
channel.$O\
exec.$O\
ioproc.$O\
iorw.$O\
pthread.$O\
qlock.$O\
ref.$O\
thread.$O\
<$PLAN9/src/mksyslib
HFILES=thread.h threadimpl.h
tprimes: tprimes.$O
9l -o $target $target.$O $PLAN9/lib/$LIB -l9 -lpthread
tspawn: tspawn.$O
9l -o $target $target.$O $PLAN9/lib/$LIB -l9 -lpthread
tspawnloop: tspawnloop.$O
9l -o $target $target.$O $PLAN9/lib/$LIB -l9 -lpthread
%.$O: %.c
9c -I. $stem.c
test:V: tprimes tspawn
primes 1 10007 >p1.txt
$PLAN9/bin/time ./tprimes 10000 >tp1.txt
cmp p1.txt tp1.txt
primes 1 1009 >p2.txt
$PLAN9/bin/time ./tprimes 1000 >tp2.txt
cmp p2.txt tp2.txt
echo tspawn should take 3 seconds, not 6
$PLAN9/bin/time ./tspawn sleep 3 >/dev/null
CLEANFILES=p1.txt p2.txt tp1.txt tp2.txt

108
src/libthread/pthread.c Normal file
View file

@ -0,0 +1,108 @@
#include "u.h"
#include <errno.h>
#include "libc.h"
#include "thread.h"
#include "threadimpl.h"
static pthread_mutex_t initmutex = PTHREAD_MUTEX_INITIALIZER;
static void
lockinit(Lock *lk)
{
pthread_mutexattr_t attr;
pthread_mutex_lock(&initmutex);
if(lk->init == 0){
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL);
pthread_mutex_init(&lk->mutex, &attr);
pthread_mutexattr_destroy(&attr);
lk->init = 1;
}
pthread_mutex_unlock(&initmutex);
}
int
_threadlock(Lock *lk, int block, ulong pc)
{
int r;
if(!lk->init)
lockinit(lk);
if(block){
if(pthread_mutex_lock(&lk->mutex) != 0)
abort();
return 1;
}else{
r = pthread_mutex_trylock(&lk->mutex);
if(r == 0)
return 1;
if(r == EBUSY)
return 0;
abort();
return 0;
}
}
void
_threadunlock(Lock *lk, ulong pc)
{
if(pthread_mutex_unlock(&lk->mutex) != 0)
abort();
}
void
_procsleep(_Procrendez *r)
{
/* r is protected by r->l, which we hold */
pthread_cond_init(&r->cond, 0);
r->asleep = 1;
pthread_cond_wait(&r->cond, &r->l->mutex);
pthread_cond_destroy(&r->cond);
r->asleep = 0;
}
void
_procwakeup(_Procrendez *r)
{
if(r->asleep){
r->asleep = 0;
pthread_cond_signal(&r->cond);
}
}
void
_procstart(Proc *p, void (*fn)(void*))
{
//print("pc\n");
if(pthread_create(&p->tid, nil, (void*(*)(void*))fn, p) < 0){
//print("pc1\n");
fprint(2, "pthread_create: %r\n");
abort();
}
//print("pc2\n");
}
static pthread_key_t prockey;
Proc*
_threadproc(void)
{
Proc *p;
p = pthread_getspecific(prockey);
return p;
}
void
_threadsetproc(Proc *p)
{
pthread_setspecific(prockey, p);
}
void
pthreadinit(void)
{
pthread_key_create(&prockey, 0);
}

27
src/libthread/ref.c Normal file
View file

@ -0,0 +1,27 @@
#include "u.h"
#include "libc.h"
#include "thread.h"
static long
refadd(Ref *r, long a)
{
long ref;
lock(&r->lock);
r->ref += a;
ref = r->ref;
unlock(&r->lock);
return ref;
}
long
incref(Ref *r)
{
return refadd(r, 1);
}
long
decref(Ref *r)
{
return refadd(r, -1);
}

View file

@ -0,0 +1,70 @@
#include <ucontext.h>
typedef struct Context Context;
typedef struct Proc Proc;
typedef struct _Procrendez _Procrendez;
enum
{
STACK = 8192
};
struct Context
{
ucontext_t uc;
};
struct _Thread
{
_Thread *next;
_Thread *prev;
_Thread *allnext;
_Thread *allprev;
Context context;
uint id;
uchar *stk;
uint stksize;
int exiting;
void (*startfn)(void*);
void *startarg;
Proc *proc;
char name[256];
char state[256];
};
struct _Procrendez
{
Lock *l;
int asleep;
pthread_cond_t cond;
};
extern void _procsleep(_Procrendez*);
extern void _procwakeup(_Procrendez*);
struct Proc
{
pthread_t tid;
Lock lock;
_Thread *thread;
_Threadlist runqueue;
_Threadlist allthreads;
uint nthread;
uint sysproc;
_Procrendez runrend;
Context schedcontext;
void *udata;
};
extern Proc *xxx;
#define proc() _threadproc()
#define setproc(p) _threadsetproc(p)
extern void _procstart(Proc*, void (*fn)(void*));
extern _Thread *_threadcreate(Proc*, void(*fn)(void*), void*, uint);
extern void _threadexit(void);
extern Proc *_threadproc(void);
extern void _threadsetproc(Proc*);
extern int _threadlock(Lock*, int, ulong);
extern void _threadunlock(Lock*, ulong);