Rewrite to remove dependence on rendezvous and its bizarre
data structures. Makes it easier to use pthreads too. Still need to add code for non-pthreads systems. Just a checkpoint to switch work to another machine.
This commit is contained in:
parent
984e353160
commit
06bb4ed20d
15 changed files with 205 additions and 218 deletions
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
static Lock chanlock; /* central channel access lock */
|
||||
|
||||
static void enqueue(Alt*, Channel**);
|
||||
static void enqueue(Alt*, Thread*);
|
||||
static void dequeue(Alt*);
|
||||
static int altexec(Alt*, int);
|
||||
|
||||
|
|
@ -29,7 +29,7 @@ canexec(Alt *a)
|
|||
/* are there senders or receivers blocked? */
|
||||
otherop = (CHANSND+CHANRCV) - a->op;
|
||||
for(i=0; i<c->nentry; i++)
|
||||
if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil){
|
||||
if(c->qentry[i] && c->qentry[i]->op==otherop && c->qentry[i]->thread==nil){
|
||||
_threaddebug(DBGCHAN, "can rendez alt %p chan %p", a, c);
|
||||
return 1;
|
||||
}
|
||||
|
|
@ -100,9 +100,8 @@ static int
|
|||
_alt(Alt *alts)
|
||||
{
|
||||
Alt *a, *xa;
|
||||
Channel *volatile c;
|
||||
Channel *c;
|
||||
int n, s;
|
||||
ulong r;
|
||||
Thread *t;
|
||||
|
||||
/*
|
||||
|
|
@ -131,7 +130,7 @@ _alt(Alt *alts)
|
|||
xa->entryno = -1;
|
||||
if(xa->op == CHANNOP)
|
||||
continue;
|
||||
|
||||
|
||||
c = xa->c;
|
||||
if(c==nil){
|
||||
unlock(&chanlock);
|
||||
|
|
@ -153,11 +152,11 @@ _threadnalt++;
|
|||
}
|
||||
|
||||
/* enqueue on all channels. */
|
||||
c = nil;
|
||||
t->altc = nil;
|
||||
for(xa=alts; xa->op!=CHANEND; xa++){
|
||||
if(xa->op==CHANNOP)
|
||||
continue;
|
||||
enqueue(xa, (Channel**)&c);
|
||||
enqueue(xa, t);
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
@ -166,25 +165,20 @@ _threadnalt++;
|
|||
* is interrupted -- someone else might come
|
||||
* along and try to rendezvous with us, so
|
||||
* we need to be here.
|
||||
*
|
||||
* actually, now we're assuming no interrupts.
|
||||
*/
|
||||
Again:
|
||||
/*Again:*/
|
||||
t->alt = alts;
|
||||
t->chan = Chanalt;
|
||||
|
||||
unlock(&chanlock);
|
||||
t->altrend.l = &chanlock;
|
||||
_procsplx(s);
|
||||
r = _threadrendezvous((ulong)&c, 0);
|
||||
_threadsleep(&t->altrend);
|
||||
s = _procsplhi();
|
||||
lock(&chanlock);
|
||||
|
||||
if(r==~0){ /* interrupted */
|
||||
if(c!=nil) /* someone will meet us; go back */
|
||||
goto Again;
|
||||
c = (Channel*)~0; /* so no one tries to meet us */
|
||||
}
|
||||
|
||||
/* dequeue from channels, find selected one */
|
||||
a = nil;
|
||||
c = t->altc;
|
||||
for(xa=alts; xa->op!=CHANEND; xa++){
|
||||
if(xa->op==CHANNOP)
|
||||
continue;
|
||||
|
|
@ -385,12 +379,12 @@ if(c->nentry > _threadhighnentry) _threadhighnentry = c->nentry;
|
|||
}
|
||||
|
||||
static void
|
||||
enqueue(Alt *a, Channel **c)
|
||||
enqueue(Alt *a, Thread *t)
|
||||
{
|
||||
int i;
|
||||
|
||||
_threaddebug(DBGCHAN, "Queueing alt %p on channel %p", a, a->c);
|
||||
a->tag = c;
|
||||
a->thread = t;
|
||||
i = emptyentry(a->c);
|
||||
a->c->qentry[i] = a;
|
||||
}
|
||||
|
|
@ -466,7 +460,7 @@ altexec(Alt *a, int spl)
|
|||
b = nil;
|
||||
me = a->v;
|
||||
for(i=0; i<c->nentry; i++)
|
||||
if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil)
|
||||
if(c->qentry[i] && c->qentry[i]->op==otherop && c->qentry[i]->thread==nil)
|
||||
if(nrand(++n) == 0)
|
||||
b = c->qentry[i];
|
||||
if(b != nil){
|
||||
|
|
@ -493,13 +487,12 @@ altexec(Alt *a, int spl)
|
|||
else
|
||||
altcopy(waiter, me, c->e);
|
||||
}
|
||||
*b->tag = c; /* commits us to rendezvous */
|
||||
b->thread->altc = c;
|
||||
_procwakeup(&b->thread->altrend);
|
||||
_threaddebug(DBGCHAN, "chanlock is %lud", *(ulong*)(void*)&chanlock);
|
||||
_threaddebug(DBGCHAN, "unlocking the chanlock");
|
||||
unlock(&chanlock);
|
||||
_procsplx(spl);
|
||||
_threaddebug(DBGCHAN, "chanlock is %lud", *(ulong*)(void*)&chanlock);
|
||||
while(_threadrendezvous((ulong)b->tag, 0) == ~0)
|
||||
;
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
Pqueue _threadpq;
|
||||
int _threadprocs;
|
||||
int __pthread_nonstandard_stacks;
|
||||
|
||||
static int nextID(void);
|
||||
|
||||
|
|
@ -21,6 +22,7 @@ newthread(Proc *p, void (*f)(void *arg), void *arg, uint stacksize, char *name,
|
|||
Thread *t;
|
||||
char *s;
|
||||
|
||||
__pthread_nonstandard_stacks = 1;
|
||||
if(stacksize < 32)
|
||||
sysfatal("bad stacksize %d", stacksize);
|
||||
t = _threadmalloc(sizeof(Thread), 1);
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ main(int argc, char **argv)
|
|||
|
||||
//_threaddebuglevel = (DBGSCHED|DBGCHAN|DBGREND)^~0;
|
||||
_systhreadinit();
|
||||
_qlockinit(_threadrendezvous);
|
||||
_qlockinit(_threadsleep, _threadwakeup);
|
||||
_sysfatal = _threadsysfatal;
|
||||
notify(_threadnote);
|
||||
if(mainstacksize == 0)
|
||||
|
|
@ -49,8 +49,9 @@ main(int argc, char **argv)
|
|||
a = _threadmalloc(sizeof *a, 1);
|
||||
a->argc = argc;
|
||||
a->argv = argv;
|
||||
|
||||
malloc(10);
|
||||
p = _newproc(mainlauncher, a, mainstacksize, "threadmain", 0, 0);
|
||||
malloc(10);
|
||||
_schedinit(p);
|
||||
abort(); /* not reached */
|
||||
return 0;
|
||||
|
|
@ -61,7 +62,9 @@ mainlauncher(void *arg)
|
|||
{
|
||||
Mainarg *a;
|
||||
|
||||
malloc(10);
|
||||
a = arg;
|
||||
malloc(10);
|
||||
threadmain(a->argc, a->argv);
|
||||
threadexits("threadmain");
|
||||
}
|
||||
|
|
|
|||
|
|
@ -23,7 +23,6 @@ OFILES=\
|
|||
ioreadn.$O\
|
||||
iosleep.$O\
|
||||
iowrite.$O\
|
||||
kill.$O\
|
||||
lib.$O\
|
||||
main.$O\
|
||||
memset.$O\
|
||||
|
|
@ -43,13 +42,13 @@ HFILES=\
|
|||
<$PLAN9/src/mksyslib
|
||||
|
||||
tprimes: tprimes.$O $PLAN9/lib/$LIB
|
||||
$LD -o tprimes tprimes.$O $LDFLAGS -lthread -l9 -lfmt -lutf
|
||||
$LD -o tprimes tprimes.$O $LDFLAGS -lthread -l9
|
||||
|
||||
texec: texec.$O $PLAN9/lib/$LIB
|
||||
$LD -o texec texec.$O $LDFLAGS -lthread -l9 -lfmt -lutf
|
||||
$LD -o texec texec.$O $LDFLAGS -lthread -l9
|
||||
|
||||
trend: trend.$O $PLAN9/lib/$LIB
|
||||
$LD -o trend trend.$O $LDFLAGS -lthread -l9 -lfmt -lutf
|
||||
$LD -o trend trend.$O $LDFLAGS -lthread -l9
|
||||
|
||||
CLEANFILES=$CLEANFILES tprimes texec
|
||||
|
||||
|
|
|
|||
|
|
@ -1,104 +1,38 @@
|
|||
#include "threadimpl.h"
|
||||
|
||||
Rgrp _threadrgrp;
|
||||
static int isdirty;
|
||||
int _threadhighnrendez;
|
||||
int _threadnrendez;
|
||||
static int nrendez;
|
||||
|
||||
static ulong
|
||||
finish(Thread *t, ulong val)
|
||||
void
|
||||
_threadsleep(_Procrend *r)
|
||||
{
|
||||
ulong ret;
|
||||
Thread *t;
|
||||
|
||||
ret = t->rendval;
|
||||
t->rendval = val;
|
||||
t = _threadgetproc()->thread;
|
||||
r->arg = t;
|
||||
t->nextstate = Rendezvous;
|
||||
t->inrendez = 1;
|
||||
unlock(r->l);
|
||||
_sched();
|
||||
t->inrendez = 0;
|
||||
lock(r->l);
|
||||
}
|
||||
|
||||
void
|
||||
_threadwakeup(_Procrend *r)
|
||||
{
|
||||
Thread *t;
|
||||
|
||||
t = r->arg;
|
||||
while(t->state == Running)
|
||||
sleep(0);
|
||||
lock(&t->proc->lock);
|
||||
if(t->state == Rendezvous){ /* not always true: might be Dead */
|
||||
t->state = Ready;
|
||||
_threadready(t);
|
||||
if(t->state == Dead){
|
||||
unlock(&t->proc->lock);
|
||||
return;
|
||||
}
|
||||
assert(t->state == Rendezvous && t->inrendez);
|
||||
t->state = Ready;
|
||||
_threadready(t);
|
||||
unlock(&t->proc->lock);
|
||||
return ret;
|
||||
}
|
||||
|
||||
ulong
|
||||
_threadrendezvous(ulong tag, ulong val)
|
||||
{
|
||||
ulong ret;
|
||||
Thread *t, **l;
|
||||
|
||||
lock(&_threadrgrp.lock);
|
||||
_threadnrendez++;
|
||||
l = &_threadrgrp.hash[tag%nelem(_threadrgrp.hash)];
|
||||
for(t=*l; t; l=&t->rendhash, t=*l){
|
||||
if(t->rendtag==tag){
|
||||
_threaddebug(DBGREND, "Rendezvous with thread %d.%d", t->proc->pid, t->id);
|
||||
*l = t->rendhash;
|
||||
ret = finish(t, val);
|
||||
--nrendez;
|
||||
unlock(&_threadrgrp.lock);
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
/* Going to sleep here. */
|
||||
t = _threadgetproc()->thread;
|
||||
t->rendbreak = 0;
|
||||
t->inrendez = 1;
|
||||
t->rendtag = tag;
|
||||
t->rendval = val;
|
||||
t->rendhash = *l;
|
||||
*l = t;
|
||||
++nrendez;
|
||||
if(nrendez > _threadhighnrendez)
|
||||
_threadhighnrendez = nrendez;
|
||||
_threaddebug(DBGREND, "Rendezvous for tag %lud (m=%d)", t->rendtag, t->moribund);
|
||||
unlock(&_threadrgrp.lock);
|
||||
t->nextstate = Rendezvous;
|
||||
_sched();
|
||||
t->inrendez = 0;
|
||||
_threaddebug(DBGREND, "Woke after rendezvous; val is %lud", t->rendval);
|
||||
return t->rendval;
|
||||
}
|
||||
|
||||
/*
|
||||
* This is called while holding _threadpq.lock and p->lock,
|
||||
* so we can't lock _threadrgrp.lock. Instead our caller has
|
||||
* to call _threadbreakrendez after dropping those locks.
|
||||
*/
|
||||
void
|
||||
_threadflagrendez(Thread *t)
|
||||
{
|
||||
t->rendbreak = 1;
|
||||
isdirty = 1;
|
||||
}
|
||||
|
||||
void
|
||||
_threadbreakrendez(void)
|
||||
{
|
||||
int i;
|
||||
Thread *t, **l;
|
||||
|
||||
if(isdirty == 0)
|
||||
return;
|
||||
lock(&_threadrgrp.lock);
|
||||
if(isdirty == 0){
|
||||
unlock(&_threadrgrp.lock);
|
||||
return;
|
||||
}
|
||||
isdirty = 0;
|
||||
for(i=0; i<nelem(_threadrgrp.hash); i++){
|
||||
l = &_threadrgrp.hash[i];
|
||||
for(t=*l; t; t=*l){
|
||||
if(t->rendbreak){
|
||||
*l = t->rendhash;
|
||||
finish(t, ~0);
|
||||
}else
|
||||
l=&t->rendhash;
|
||||
}
|
||||
}
|
||||
unlock(&_threadrgrp.lock);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@ _schedinit(void *arg)
|
|||
unlock(&p->lock);
|
||||
while(_setlabel(&p->sched))
|
||||
;
|
||||
malloc(10);
|
||||
_threaddebug(DBGSCHED, "top of schedinit, _threadexitsallstatus=%p", _threadexitsallstatus);
|
||||
if(_threadexitsallstatus)
|
||||
_exits(_threadexitsallstatus);
|
||||
|
|
@ -57,8 +58,9 @@ _schedinit(void *arg)
|
|||
p->threads.tail = t->prevt;
|
||||
unlock(&p->lock);
|
||||
if(t->inrendez){
|
||||
_threadflagrendez(t);
|
||||
_threadbreakrendez();
|
||||
abort();
|
||||
// _threadflagrendez(t);
|
||||
// _threadbreakrendez();
|
||||
}
|
||||
_stackfree(t->stk);
|
||||
free(t->cmdname);
|
||||
|
|
@ -183,15 +185,18 @@ _sched(void)
|
|||
Resched:
|
||||
p = _threadgetproc();
|
||||
//fprint(2, "p %p\n", p);
|
||||
malloc(10);
|
||||
if((t = p->thread) != nil){
|
||||
needstack(512);
|
||||
// _threaddebug(DBGSCHED, "pausing, state=%s set %p goto %p",
|
||||
// psstate(t->state), &t->sched, &p->sched);
|
||||
print("swap\n");
|
||||
if(_setlabel(&t->sched)==0)
|
||||
_gotolabel(&p->sched);
|
||||
_threadstacklimit(t->stk, t->stk+t->stksize);
|
||||
return p->nsched++;
|
||||
}else{
|
||||
malloc(10);
|
||||
t = runthread(p);
|
||||
if(t == nil){
|
||||
_threaddebug(DBGSCHED, "all threads gone; exiting");
|
||||
|
|
@ -206,6 +211,8 @@ Resched:
|
|||
}
|
||||
t->state = Running;
|
||||
t->nextstate = Ready;
|
||||
malloc(10);
|
||||
print("gotolabel\n");
|
||||
_gotolabel(&t->sched);
|
||||
for(;;);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -88,11 +88,9 @@ struct Thread
|
|||
|
||||
char *cmdname; /* ptr to name of thread */
|
||||
|
||||
int inrendez;
|
||||
Thread *rendhash; /* Trgrp linked list */
|
||||
ulong rendtag; /* rendezvous tag */
|
||||
ulong rendval; /* rendezvous value */
|
||||
int rendbreak; /* rendezvous has been taken */
|
||||
int inrendez;
|
||||
Channel *altc;
|
||||
_Procrend altrend;
|
||||
|
||||
Chanstate chan; /* which channel operation is current */
|
||||
Alt *alt; /* pointer to current alt structure (debugging) */
|
||||
|
|
@ -179,11 +177,9 @@ int _schedfork(Proc*);
|
|||
void _schedinit(void*);
|
||||
void _systhreadinit(void);
|
||||
void _threadassert(char*);
|
||||
void _threadbreakrendez(void);
|
||||
void __threaddebug(ulong, char*, ...);
|
||||
#define _threaddebug if(!_threaddebuglevel){}else __threaddebug
|
||||
void _threadexitsall(char*);
|
||||
void _threadflagrendez(Thread*);
|
||||
Proc* _threadgetproc(void);
|
||||
extern void _threadmultiproc(void);
|
||||
Proc* _threaddelproc(void);
|
||||
|
|
@ -193,7 +189,8 @@ void* _threadmalloc(long, int);
|
|||
void _threadnote(void*, char*);
|
||||
void _threadready(Thread*);
|
||||
void _threadidle(void);
|
||||
ulong _threadrendezvous(ulong, ulong);
|
||||
void _threadsleep(_Procrend*);
|
||||
void _threadwakeup(_Procrend*);
|
||||
void _threadsignal(void);
|
||||
void _threadsysfatal(char*, va_list);
|
||||
long _xdec(long*);
|
||||
|
|
|
|||
|
|
@ -41,6 +41,7 @@ threadmain(int argc, char **argv)
|
|||
int i;
|
||||
Channel *c;
|
||||
|
||||
malloc(10);
|
||||
ARGBEGIN{
|
||||
case 'D':
|
||||
_threaddebuglevel = atoi(ARGF());
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue