Continue fighting pthreads.

Clean up thread library a bit too.
This commit is contained in:
rsc 2004-09-23 03:01:36 +00:00
parent c6687d4591
commit 7966faa931
19 changed files with 539 additions and 654 deletions

View file

@ -1,4 +1,6 @@
#include "threadimpl.h" #include <u.h>
#include <libc.h>
#include <thread.h>
int int
chanprint(Channel *c, char *fmt, ...) chanprint(Channel *c, char *fmt, ...)

View file

@ -1,45 +1,46 @@
#include "threadimpl.h" #include "threadimpl.h"
Pqueue _threadpq; Pqueue _threadpq; /* list of all procs */
int _threadprocs; int _threadnprocs; /* count of procs */
int __pthread_nonstandard_stacks;
static int nextID(void); static int newthreadid(void);
static int newprocid(void);
/* /*
* Create and initialize a new Thread structure attached to a given proc. * Create and initialize a new Thread structure attached to a given proc.
*/ */
void int
_stackfree(void *v) _newthread(Proc *p, void (*f)(void *arg), void *arg, uint stacksize,
{ char *name, int grp)
free(v);
}
static int
newthread(Proc *p, void (*f)(void *arg), void *arg, uint stacksize, char *name, int grp)
{ {
int id; int id;
Thread *t; Thread *t;
char *s;
__pthread_nonstandard_stacks = 1;
if(stacksize < 32)
sysfatal("bad stacksize %d", stacksize);
t = _threadmalloc(sizeof(Thread), 1); t = _threadmalloc(sizeof(Thread), 1);
t->lastfd = -1;
s = _threadmalloc(stacksize, 0);
t->stk = (uchar*)s;
t->stksize = stacksize;
_threaddebugmemset(s, 0xFE, stacksize);
_threadinitstack(t, f, arg);
t->proc = p; t->proc = p;
t->grp = grp; t->grp = grp;
t->id = id = newthreadid();
if(name) if(name)
t->cmdname = strdup(name); t->name = strdup(name);
t->id = nextID(); _threaddebug(DBGSCHED, "create thread %d.%d name %s", p->id, id, name);
id = t->id;
t->next = (Thread*)~0; /*
_threaddebug(DBGSCHED, "create thread %d.%d name %s", p->pid, t->id, name); * Allocate and clear stack.
*/
if(stacksize < 1024)
sysfatal("bad stacksize %d", stacksize);
t->stk = _threadmalloc(stacksize, 0);
t->stksize = stacksize;
_threaddebugmemset(t->stk, 0xFE, stacksize);
/*
* Set up t->context to call f(arg).
*/
_threadinitstack(t, f, arg);
/*
* Add thread to proc.
*/
lock(&p->lock); lock(&p->lock);
p->nthreads++; p->nthreads++;
if(p->threads.head == nil) if(p->threads.head == nil)
@ -49,14 +50,110 @@ newthread(Proc *p, void (*f)(void *arg), void *arg, uint stacksize, char *name,
t->prevt->nextt = t; t->prevt->nextt = t;
} }
p->threads.tail = t; p->threads.tail = t;
t->next = (Thread*)~0;
/*
* Mark thread as ready to run.
*/
t->state = Ready; t->state = Ready;
_threadready(t); _threadready(t);
unlock(&p->lock); unlock(&p->lock);
return id; return id;
} }
/*
* Free a Thread structure.
*/
void
_threadfree(Thread *t)
{
free(t->stk);
free(t->name);
free(t);
}
/*
* Create and initialize a new Proc structure with a single Thread
* running inside it. Add the Proc to the global process list.
*/
Proc*
_newproc(void)
{
Proc *p;
/*
* Allocate.
*/
p = _threadmalloc(sizeof *p, 1);
p->id = newprocid();
/*
* Add to list. Record if we're now multiprocess.
*/
lock(&_threadpq.lock);
if(_threadpq.head == nil)
_threadpq.head = p;
else
*_threadpq.tail = p;
_threadpq.tail = &p->next;
if(_threadnprocs == 1)
_threadmultiproc();
_threadnprocs++;
unlock(&_threadpq.lock);
return p;
}
/*
* Allocate a new thread running f(arg) on a stack of size stacksize.
* Return the thread id. The thread group inherits from the current thread.
*/
int
threadcreate(void (*f)(void*), void *arg, uint stacksize)
{
return _newthread(_threadgetproc(), f, arg, stacksize, nil, threadgetgrp());
}
/*
* Allocate a new idle thread. Only allowed in a single-proc program.
*/
int
threadcreateidle(void (*f)(void *arg), void *arg, uint stacksize)
{
int id;
assert(_threadnprocs == 1);
id = threadcreate(f, arg, stacksize);
_threaddebug(DBGSCHED, "idle is %d", id);
_threadsetidle(id);
return id;
}
/*
* Threadcreate, but do it inside a fresh proc.
*/
int
proccreate(void (*f)(void*), void *arg, uint stacksize)
{
int id;
Proc *p, *np;
p = _threadgetproc();
np = _newproc();
p->newproc = np;
p->schedfn = _threadstartproc;
id = _newthread(np, f, arg, stacksize, nil, p->thread->grp);
_sched(); /* call into scheduler to create proc XXX */
return id;
}
/*
* Allocate a new thread id.
*/
static int static int
nextID(void) newthreadid(void)
{ {
static Lock l; static Lock l;
static int id; static int id;
@ -68,101 +165,19 @@ nextID(void)
return i; return i;
} }
int
procrfork(void (*f)(void *), void *arg, uint stacksize, int rforkflag)
{
Proc *p;
int id;
p = _threadgetproc();
assert(p->newproc == nil);
p->newproc = _newproc(f, arg, stacksize, nil, p->thread->grp, rforkflag);
id = p->newproc->threads.head->id;
_sched();
return id;
}
int
proccreate(void (*f)(void*), void *arg, uint stacksize)
{
Proc *p;
p = _threadgetproc();
if(p->idle){
fprint(2, "cannot create procs once there is an idle thread\n");
werrstr("cannot create procs once there is an idle thread");
return -1;
}
return procrfork(f, arg, stacksize, 0);
}
void
_freeproc(Proc *p)
{
Thread *t, *nextt;
for(t = p->threads.head; t; t = nextt){
if(t->cmdname)
free(t->cmdname);
assert(t->stk != nil);
_stackfree(t->stk);
nextt = t->nextt;
free(t);
}
free(p);
}
/* /*
* Create a new thread and schedule it to run. * Allocate a new proc id.
* The thread grp is inherited from the currently running thread.
*/ */
int static int
threadcreate(void (*f)(void *arg), void *arg, uint stacksize) newprocid(void)
{ {
return newthread(_threadgetproc(), f, arg, stacksize, nil, threadgetgrp()); static Lock l;
} static int id;
int i;
int
threadcreateidle(void (*f)(void *arg), void *arg, uint stacksize) lock(&l);
{ i = ++id;
int id; unlock(&l);
return i;
if(_threadprocs!=1){
fprint(2, "cannot have idle thread in multi-proc program\n");
werrstr("cannot have idle thread in multi-proc program");
return -1;
}
id = newthread(_threadgetproc(), f, arg, stacksize, nil, threadgetgrp());
_threaddebug(DBGSCHED, "idle is %d", id);
_threadidle();
return id;
}
/*
* Create and initialize a new Proc structure with a single Thread
* running inside it. Add the Proc to the global process list.
*/
Proc*
_newproc(void (*f)(void *arg), void *arg, uint stacksize, char *name, int grp, int rforkflag)
{
Proc *p;
p = _threadmalloc(sizeof *p, 1);
p->pid = -1;
p->rforkflag = rforkflag;
newthread(p, f, arg, stacksize, name, grp);
lock(&_threadpq.lock);
if(_threadpq.head == nil)
_threadpq.head = p;
else
*_threadpq.tail = p;
_threadpq.tail = &p->next;
if(_threadprocs == 1)
_threadmultiproc();
_threadprocs++;
unlock(&_threadpq.lock);
return p;
} }

View file

@ -19,9 +19,9 @@ __threaddebug(ulong flag, char *fmt, ...)
if(p==nil) if(p==nil)
fmtprint(&f, "noproc "); fmtprint(&f, "noproc ");
else if(p->thread) else if(p->thread)
fmtprint(&f, "%d.%d ", p->pid, p->thread->id); fmtprint(&f, "%d.%d ", p->id, p->thread->id);
else else
fmtprint(&f, "%d._ ", p->pid); fmtprint(&f, "%d._ ", p->id);
va_start(arg, fmt); va_start(arg, fmt);
fmtvprint(&f, fmt, arg); fmtvprint(&f, fmt, arg);

View file

@ -10,9 +10,20 @@ _threadexec(Channel *pidc, int fd[3], char *prog, char *args[], int freeargs)
int pfd[2]; int pfd[2];
int n, pid; int n, pid;
char exitstr[ERRMAX]; char exitstr[ERRMAX];
static int firstexec = 1;
static Lock lk;
_threaddebug(DBGEXEC, "threadexec %s", prog); _threaddebug(DBGEXEC, "threadexec %s", prog);
if(firstexec){
lock(&lk);
if(firstexec){
firstexec = 0;
_threadfirstexec();
}
unlock(&lk);
}
/* /*
* We want threadexec to behave like exec; if exec succeeds, * We want threadexec to behave like exec; if exec succeeds,
* never return, and if it fails, return with errstr set. * never return, and if it fails, return with errstr set.
@ -41,6 +52,7 @@ _threadexec(Channel *pidc, int fd[3], char *prog, char *args[], int freeargs)
efork(fd, pfd, prog, args); efork(fd, pfd, prog, args);
_exit(0); _exit(0);
default: default:
_threadafterexec();
if(freeargs) if(freeargs)
free(args); free(args);
break; break;

View file

@ -1,82 +0,0 @@
#include "threadimpl.h"
#define PIPEMNT "/mnt/temp"
void
procexec(Channel *pidc, int fd[3], char *prog, char *args[])
{
int n;
Proc *p;
Thread *t;
_threaddebug(DBGEXEC, "procexec %s", prog);
/* must be only thread in proc */
p = _threadgetproc();
t = p->thread;
if(p->threads.head != t || p->threads.head->nextt != nil){
werrstr("not only thread in proc");
Bad:
if(pidc)
sendul(pidc, ~0);
return;
}
/*
* We want procexec to behave like exec; if exec succeeds,
* never return, and if it fails, return with errstr set.
* Unfortunately, the exec happens in another proc since
* we have to wait for the exec'ed process to finish.
* To provide the semantics, we open a pipe with the
* write end close-on-exec and hand it to the proc that
* is doing the exec. If the exec succeeds, the pipe will
* close so that our read below fails. If the exec fails,
* then the proc doing the exec sends the errstr down the
* pipe to us.
*/
if(bind("#|", PIPEMNT, MREPL) < 0)
goto Bad;
if((p->exec.fd[0] = open(PIPEMNT "/data", OREAD)) < 0){
unmount(nil, PIPEMNT);
goto Bad;
}
if((p->exec.fd[1] = open(PIPEMNT "/data1", OWRITE|OCEXEC)) < 0){
close(p->exec.fd[0]);
unmount(nil, PIPEMNT);
goto Bad;
}
unmount(nil, PIPEMNT);
/* exec in parallel via the scheduler */
assert(p->needexec==0);
p->exec.prog = prog;
p->exec.args = args;
p->exec.stdfd = fd;
p->needexec = 1;
_sched();
close(p->exec.fd[1]);
if((n = read(p->exec.fd[0], p->exitstr, ERRMAX-1)) > 0){ /* exec failed */
p->exitstr[n] = '\0';
errstr(p->exitstr, ERRMAX);
close(p->exec.fd[0]);
goto Bad;
}
close(p->exec.fd[0]);
close(fd[0]);
if(fd[1] != fd[0])
close(fd[1]);
if(fd[2] != fd[1] && fd[2] != fd[0])
close(fd[2]);
if(pidc)
sendul(pidc, t->ret);
/* wait for exec'ed program, then exit */
_schedexecwait();
}
void
procexecl(Channel *pidc, int fd[3], char *f, ...)
{
procexec(pidc, fd, f, &f+1);
}

View file

@ -26,42 +26,13 @@ threadexits(char *exitstr)
void void
threadexitsall(char *exitstr) threadexitsall(char *exitstr)
{ {
Proc *p;
int *pid;
int i, npid, mypid;
_threaddebug(DBGSCHED, "threadexitsall %s", exitstr); _threaddebug(DBGSCHED, "threadexitsall %s", exitstr);
if(exitstr == nil) if(exitstr == nil)
exitstr = ""; exitstr = "";
_threadexitsallstatus = exitstr; _threadexitsallstatus = exitstr;
_threaddebug(DBGSCHED, "_threadexitsallstatus set to %p", _threadexitsallstatus); _threaddebug(DBGSCHED, "_threadexitsallstatus set to %p", _threadexitsallstatus);
mypid = _threadgetpid();
/*
* signal others.
* copying all the pids first avoids other thread's
* teardown procedures getting in the way.
*/
lock(&_threadpq.lock);
npid = 0;
for(p=_threadpq.head; p; p=p->next)
npid++;
pid = _threadmalloc(npid*sizeof(pid[0]), 0);
npid = 0;
for(p = _threadpq.head; p; p=p->next)
pid[npid++] = p->pid;
unlock(&_threadpq.lock);
for(i=0; i<npid; i++){
_threaddebug(DBGSCHED, "threadexitsall kill %d", pid[i]);
if(pid[i]==0 || pid[i]==-1)
fprint(2, "bad pid in threadexitsall: %d\n", pid[i]);
else if(pid[i] != mypid){
kill(pid[i], SIGTERM);
}
}
/* leave */ /* leave */
exits(0); _threadexitallproc(exitstr);
} }
Channel* Channel*

View file

@ -60,10 +60,10 @@ threadsetname(char *fmt, ...)
p = _threadgetproc(); p = _threadgetproc();
t = p->thread; t = p->thread;
if (t->cmdname) if(t->name)
free(t->cmdname); free(t->name);
va_start(arg, fmt); va_start(arg, fmt);
t->cmdname = vsmprint(fmt, arg); t->name = vsmprint(fmt, arg);
va_end(arg); va_end(arg);
/* Plan 9 only /* Plan 9 only
@ -85,7 +85,7 @@ threadsetname(char *fmt, ...)
char* char*
threadgetname(void) threadgetname(void)
{ {
return _threadgetproc()->thread->cmdname; return _threadgetproc()->thread->name;
} }
void** void**

View file

@ -1,4 +1,7 @@
#include "threadimpl.h" #include <u.h>
#include <libc.h>
#include <thread.h>
#include "ioproc.h"
long long
iocall(Ioproc *io, long (*op)(va_list*), ...) iocall(Ioproc *io, long (*op)(va_list*), ...)

View file

@ -1,4 +1,7 @@
#include "threadimpl.h" #include <u.h>
#include <libc.h>
#include <thread.h>
#include "ioproc.h"
enum enum
{ {

View file

@ -1,5 +1,7 @@
#include <u.h> /*
#include <signal.h> * Thread library.
*/
#include "threadimpl.h" #include "threadimpl.h"
typedef struct Mainarg Mainarg; typedef struct Mainarg Mainarg;
@ -10,126 +12,68 @@ struct Mainarg
}; };
int mainstacksize; int mainstacksize;
int _threadnotefd;
int _threadpasserpid;
static void mainlauncher(void*);
extern void (*_sysfatal)(char*, va_list); extern void (*_sysfatal)(char*, va_list);
void
_threadstatus(int x)
{
USED(x);
threadstatus();
}
void
_threaddie(int x)
{
extern char *_threadexitsallstatus;
USED(x);
if(_threadexitsallstatus)
_exits(_threadexitsallstatus);
}
int
main(int argc, char **argv)
{
Mainarg *a;
Proc *p;
//_threaddebuglevel = (DBGSCHED|DBGCHAN|DBGREND)^~0;
_systhreadinit();
_qlockinit(_threadsleep, _threadwakeup);
_sysfatal = _threadsysfatal;
notify(_threadnote);
if(mainstacksize == 0)
mainstacksize = 32*1024;
a = _threadmalloc(sizeof *a, 1);
a->argc = argc;
a->argv = argv;
p = _newproc(mainlauncher, a, mainstacksize, "threadmain", 0, 0);
_scheduler(p);
abort(); /* not reached */
return 0;
}
static void static void
mainlauncher(void *arg) mainlauncher(void *arg)
{ {
Mainarg *a; Mainarg *a;
a = arg; a = arg;
_threadmaininit();
threadmain(a->argc, a->argv); threadmain(a->argc, a->argv);
threadexits("threadmain"); threadexits("threadmain");
} }
void
_threadsignal(void)
{
}
void
_threadsignalpasser(void)
{
}
int int
_schedfork(Proc *p) main(int argc, char **argv)
{ {
int pid; Mainarg a;
lock(&p->lock); Proc *p;
pid = ffork(RFMEM|RFNOWAIT, _scheduler, p);
p->pid = pid;
unlock(&p->lock);
return pid;
/*
* XXX Do daemonize hack here.
*/
/*
* Instruct QLock et al. to use our scheduling functions
* so that they can operate at the thread level.
*/
_qlockinit(_threadsleep, _threadwakeup);
/*
* Install our own _threadsysfatal which takes down
* the whole conglomeration of procs.
*/
_sysfatal = _threadsysfatal;
/*
* XXX Install our own jump handler.
*/
/*
* Install our own signal handlers.
*/
notify(_threadnote);
/*
* Construct the initial proc running mainlauncher(&a).
*/
if(mainstacksize == 0)
mainstacksize = 32*1024;
a.argc = argc;
a.argv = argv;
p = _newproc();
_newthread(p, mainlauncher, &a, mainstacksize, "threadmain", 0);
_threadscheduler(p);
abort(); /* not reached */
return 0;
} }
/*
* No-op function here so that sched.o drags in main.o.
*/
void void
_schedexit(Proc *p) _threadlinkmain(void)
{
char ex[ERRMAX];
Proc **l;
lock(&_threadpq.lock);
for(l=&_threadpq.head; *l; l=&(*l)->next){
if(*l == p){
*l = p->next;
if(*l == nil)
_threadpq.tail = l;
break;
}
}
_threadprocs--;
unlock(&_threadpq.lock);
strncpy(ex, p->exitstr, sizeof ex);
ex[sizeof ex-1] = '\0';
free(p);
_exits(ex);
}
int
nrand(int n)
{
return random()%n;
}
void
_systhreadinit(void)
{ {
} }
void
threadstats(void)
{
extern int _threadnrendez, _threadhighnrendez,
_threadnalt, _threadhighnentry;
fprint(2, "*** THREAD LIBRARY STATS ***\n");
fprint(2, "nrendez %d high simultaneous %d\n",
_threadnrendez, _threadhighnrendez);
fprint(2, "nalt %d high simultaneous entry %d\n",
_threadnalt, _threadhighnentry);
}

View file

@ -28,11 +28,11 @@ OFILES=\
memset.$O\ memset.$O\
memsetd.$O\ memsetd.$O\
note.$O\ note.$O\
proctab.$O\ pthread.$O\
read9pmsg.$O\ read9pmsg.$O\
ref.$O\ ref.$O\
rendez.$O\
sched.$O\ sched.$O\
sleep.$O\
HFILES=\ HFILES=\
$PLAN9/include/thread.h\ $PLAN9/include/thread.h\

View file

@ -65,6 +65,8 @@ delayednotes(Proc *p, void *v)
} }
if(i==NFN){ if(i==NFN){
_threaddebug(DBGNOTE, "Unhandled note %s, proc %p\n", n->s, p); _threaddebug(DBGNOTE, "Unhandled note %s, proc %p\n", n->s, p);
if(strcmp(n->s, "sys: child") == 0)
noted(NCONT);
fprint(2, "unhandled note %s, pid %d\n", n->s, p->pid); fprint(2, "unhandled note %s, pid %d\n", n->s, p->pid);
if(v != nil) if(v != nil)
noted(NDFLT); noted(NDFLT);
@ -85,7 +87,9 @@ _threadnote(void *v, char *s)
Note *n; Note *n;
_threaddebug(DBGNOTE, "Got note %s", s); _threaddebug(DBGNOTE, "Got note %s", s);
if(strncmp(s, "sys:", 4) == 0 && strcmp(s, "sys: write on closed pipe") != 0) if(strncmp(s, "sys:", 4) == 0
&& strcmp(s, "sys: write on closed pipe") != 0
&& strcmp(s, "sys: child") != 0)
noted(NDFLT); noted(NDFLT);
// if(_threadexitsallstatus){ // if(_threadexitsallstatus){

View file

@ -46,7 +46,7 @@ __threadgetproc(int rm)
if(!multi) if(!multi)
return theproc; return theproc;
pid = _threadgetpid(); pid = getpid();
lock(&ptablock); lock(&ptablock);
h = ((unsigned)pid)%PTABHASH; h = ((unsigned)pid)%PTABHASH;

View file

@ -1,3 +1,9 @@
/*
* Atomic reference counts - used by applications.
*
* We use locks to avoid the assembly of the Plan 9 versions.
*/
#include "threadimpl.h" #include "threadimpl.h"
void void

View file

@ -1,76 +1,59 @@
#include <u.h> /*
#include <signal.h> * Thread scheduler.
#include <errno.h> */
#include "threadimpl.h" #include "threadimpl.h"
static Thread *runthread(Proc*); static Thread *runthread(Proc*);
static void schedexit(Proc*);
static char *_psstate[] = { /*
"Dead", * Main scheduling loop.
"Running", */
"Ready",
"Rendezvous",
};
static char*
psstate(int s)
{
if(s < 0 || s >= nelem(_psstate))
return "unknown";
return _psstate[s];
}
void void
needstack(int howmuch) _threadscheduler(void *arg)
{
Proc *p;
Thread *t;
p = _threadgetproc();
if(p == nil || (t=p->thread) == nil)
return;
if((ulong)&howmuch < (ulong)t->stk+howmuch){ /* stack overflow waiting to happen */
fprint(2, "stack overflow: stack at 0x%lux, limit at 0x%lux, need 0x%lux\n", (ulong)&p, (ulong)t->stk, howmuch);
abort();
}
}
void
_scheduler(void *arg)
{ {
Proc *p; Proc *p;
Thread *t; Thread *t;
p = arg; p = arg;
lock(&p->lock);
p->pid = _threadgetpid(); _threadlinkmain();
_threadsetproc(p); _threadinitproc(p);
for(;;){ for(;;){
/*
* Clean up zombie children.
*/
_threadwaitkids(p);
/*
* Find next thread to run.
*/
_threaddebug(DBGSCHED, "runthread");
t = runthread(p); t = runthread(p);
if(t == nil){ if(t == nil)
_threaddebug(DBGSCHED, "all threads gone; exiting"); schedexit(p);
_threaddelproc();
_schedexit(p); /*
} * If it's ready, run it (might instead be marked to die).
_threaddebug(DBGSCHED, "running %d.%d", t->proc->pid, t->id); */
p->thread = t; lock(&p->lock);
if(t->moribund){ if(t->state == Ready){
_threaddebug(DBGSCHED, "%d.%d marked to die"); _threaddebug(DBGSCHED, "running %d.%d", p->id, t->id);
goto Moribund;
}
t->state = Running; t->state = Running;
t->nextstate = Ready; t->nextstate = Ready;
p->thread = t;
unlock(&p->lock); unlock(&p->lock);
_swaplabel(&p->context, &t->context);
_swaplabel(&p->sched, &t->sched);
lock(&p->lock); lock(&p->lock);
p->thread = nil; p->thread = nil;
}
/*
* If thread needs to die, kill it.
*/
if(t->moribund){ if(t->moribund){
Moribund: _threaddebug(DBGSCHED, "moribund %d.%d", p->id, t->id);
if(t->moribund != 1)
fprint(2, "moribund %d\n", t->moribund);
assert(t->moribund == 1); assert(t->moribund == 1);
t->state = Dead; t->state = Dead;
if(t->prevt) if(t->prevt)
@ -82,40 +65,37 @@ _scheduler(void *arg)
else else
p->threads.tail = t->prevt; p->threads.tail = t->prevt;
unlock(&p->lock); unlock(&p->lock);
if(t->inrendez){ _threadfree(t);
abort();
// _threadflagrendez(t);
// _threadbreakrendez();
}
_stackfree(t->stk);
free(t->cmdname);
free(t); /* XXX how do we know there are no references? */
p->nthreads--; p->nthreads--;
t = nil; t = nil;
lock(&p->lock);
continue; continue;
} }
/* unlock(&p->lock);
if(p->needexec){
t->ret = _schedexec(&p->exec); /*
p->needexec = 0; * If there is a request to run a function on the
} * scheduling stack, do so.
*/ */
if(p->newproc){ if(p->schedfn){
t->ret = _schedfork(p->newproc); _threaddebug(DBGSCHED, "schedfn");
if(t->ret < 0){ p->schedfn(p);
//fprint(2, "_schedfork: %r\n"); p->schedfn = nil;
abort(); _threaddebug(DBGSCHED, "schedfn ended");
}
p->newproc = nil;
} }
/*
* Move the thread along.
*/
t->state = t->nextstate; t->state = t->nextstate;
_threaddebug(DBGSCHED, "moveon %d.%d", p->id, t->id);
if(t->state == Ready) if(t->state == Ready)
_threadready(t); _threadready(t);
unlock(&p->lock);
} }
} }
/*
* Called by thread to give up control of processor to scheduler.
*/
int int
_sched(void) _sched(void)
{ {
@ -125,146 +105,14 @@ _sched(void)
p = _threadgetproc(); p = _threadgetproc();
t = p->thread; t = p->thread;
assert(t != nil); assert(t != nil);
_swaplabel(&t->sched, &p->sched); _swaplabel(&t->context, &p->context);
return p->nsched++; return p->nsched++;
} }
static Thread* /*
runthread(Proc *p) * Called by thread to yield the processor to other threads.
{ * Returns number of other threads run between call and return.
Channel *c; */
Thread *t;
Tqueue *q;
Waitmsg *w;
int e, sent;
if(p->nthreads==0 || (p->nthreads==1 && p->idle))
return nil;
q = &p->ready;
relock:
lock(&p->readylock);
if(p->nsched%128 == 0){
/* clean up children */
e = errno;
if((c = _threadwaitchan) != nil){
if(c->n <= c->s){
sent = 0;
for(;;){
if((w = p->waitmsg) != nil)
p->waitmsg = nil;
else
w = waitnohang();
if(w == nil)
break;
if(sent == 0){
unlock(&p->readylock);
sent = 1;
}
if(nbsendp(c, w) != 1)
break;
}
p->waitmsg = w;
if(sent)
goto relock;
}
}else{
while((w = waitnohang()) != nil)
free(w);
}
errno = e;
}
if(q->head == nil){
if(p->idle){
if(p->idle->state != Ready){
fprint(2, "everyone is asleep\n");
exits("everyone is asleep");
}
unlock(&p->readylock);
_threaddebug(DBGSCHED, "running idle thread", p->nthreads);
return p->idle;
}
_threaddebug(DBGSCHED, "sleeping for more work (%d threads)", p->nthreads);
q->asleep = 1;
p->rend.l = &p->readylock;
_procsleep(&p->rend);
if(_threadexitsallstatus)
_exits(_threadexitsallstatus);
}
t = q->head;
q->head = t->next;
unlock(&p->readylock);
return t;
}
long
threadstack(void)
{
Proc *p;
Thread *t;
p = _threadgetproc();
t = p->thread;
return (ulong)&p - (ulong)t->stk;
}
void
_threadready(Thread *t)
{
Tqueue *q;
if(t == t->proc->idle){
_threaddebug(DBGSCHED, "idle thread is ready");
return;
}
assert(t->state == Ready);
_threaddebug(DBGSCHED, "readying %d.%d", t->proc->pid, t->id);
q = &t->proc->ready;
lock(&t->proc->readylock);
t->next = nil;
if(q->head==nil)
q->head = t;
else
q->tail->next = t;
q->tail = t;
if(q->asleep){
assert(q->asleep == 1);
q->asleep = 0;
/* lock passes to runthread */
_procwakeup(&t->proc->rend);
}
unlock(&t->proc->readylock);
if(_threadexitsallstatus)
_exits(_threadexitsallstatus);
}
void
_threadidle(void)
{
Tqueue *q;
Thread *t, *idle;
Proc *p;
p = _threadgetproc();
q = &p->ready;
lock(&p->readylock);
assert(q->tail);
idle = q->tail;
if(q->head == idle){
q->head = nil;
q->tail = nil;
}else{
for(t=q->head; t->next!=q->tail; t=t->next)
;
t->next = nil;
q->tail = t;
}
p->idle = idle;
_threaddebug(DBGSCHED, "p->idle is %d\n", idle->id);
unlock(&p->readylock);
}
int int
yield(void) yield(void)
{ {
@ -276,15 +124,176 @@ yield(void)
return _sched() - nsched; return _sched() - nsched;
} }
void /*
threadstatus(void) * Choose the next thread to run.
*/
static Thread*
runthread(Proc *p)
{ {
Proc *p;
Thread *t; Thread *t;
Tqueue *q;
p = _threadgetproc(); /*
for(t=p->threads.head; t; t=t->nextt) * No threads left?
fprint(2, "[%3d] %s userpc=%lux\n", */
t->id, psstate(t->state), t->userpc); if(p->nthreads==0 || (p->nthreads==1 && p->idle))
return nil;
lock(&p->readylock);
q = &p->ready;
if(q->head == nil){
/*
* Is this a single-process program with an idle thread?
*/
if(p->idle){
/*
* The idle thread had better be ready!
*/
if(p->idle->state != Ready)
sysfatal("all threads are asleep");
/*
* Run the idle thread.
*/
unlock(&p->readylock);
_threaddebug(DBGSCHED, "running idle thread", p->nthreads);
return p->idle;
}
/*
* Wait until one of our threads is readied (by another proc!).
*/
q->asleep = 1;
p->rend.l = &p->readylock;
_procsleep(&p->rend);
/*
* Maybe we were awakened to exit?
*/
if(_threadexitsallstatus)
_exits(_threadexitsallstatus);
assert(q->head != nil);
}
t = q->head;
q->head = t->next;
unlock(&p->readylock);
return t;
}
/*
* Add a newly-ready thread to its proc's run queue.
*/
void
_threadready(Thread *t)
{
Tqueue *q;
/*
* The idle thread does not go on the run queue.
*/
if(t == t->proc->idle){
_threaddebug(DBGSCHED, "idle thread is ready");
return;
}
assert(t->state == Ready);
_threaddebug(DBGSCHED, "readying %d.%d", t->proc->id, t->id);
/*
* Add thread to run queue.
*/
q = &t->proc->ready;
lock(&t->proc->readylock);
t->next = nil;
if(q->head == nil)
q->head = t;
else
q->tail->next = t;
q->tail = t;
/*
* Wake proc scheduler if it is sleeping.
*/
if(q->asleep){
assert(q->asleep == 1);
q->asleep = 0;
_procwakeup(&t->proc->rend);
}
unlock(&t->proc->readylock);
}
/*
* Mark the given thread as the idle thread.
* Since the idle thread was just created, it is sitting
* somewhere on the ready queue.
*/
void
_threadsetidle(int id)
{
Tqueue *q;
Thread *t, **l, *last;
Proc *p;
p = _threadgetproc();
lock(&p->readylock);
/*
* Find thread on ready queue.
*/
q = &p->ready;
for(l=&q->head, last=nil; (t=*l) != nil; l=&t->next, last=t)
if(t->id == id)
break;
assert(t != nil);
/*
* Remove it from ready queue.
*/
*l = t->next;
if(t == q->head)
q->head = t->next;
if(t->next == nil)
q->tail = last;
/*
* Set as idle thread.
*/
p->idle = t;
_threaddebug(DBGSCHED, "p->idle is %d\n", t->id);
unlock(&p->readylock);
}
static void
schedexit(Proc *p)
{
char ex[ERRMAX];
int n;
Proc **l;
_threaddebug(DBGSCHED, "exiting proc %d", p->id);
lock(&_threadpq.lock);
for(l=&_threadpq.head; *l; l=&(*l)->next){
if(*l == p){
*l = p->next;
if(*l == nil)
_threadpq.tail = l;
break;
}
}
n = --_threadnprocs;
unlock(&_threadpq.lock);
strncpy(ex, p->exitstr, sizeof ex);
ex[sizeof ex-1] = '\0';
free(p);
if(n == 0)
_threadexitallproc(ex);
else
_threadexitproc(ex);
} }

View file

@ -11,10 +11,10 @@ _threadsleep(_Procrend *r)
t = _threadgetproc()->thread; t = _threadgetproc()->thread;
r->arg = t; r->arg = t;
t->nextstate = Rendezvous; t->nextstate = Rendezvous;
t->inrendez = 1; t->asleep = 1;
unlock(r->l); unlock(r->l);
_sched(); _sched();
t->inrendez = 0; t->asleep = 0;
lock(r->l); lock(r->l);
} }
@ -31,7 +31,7 @@ _threadwakeup(_Procrend *r)
unlock(&t->proc->lock); unlock(&t->proc->lock);
return; return;
} }
assert(t->state == Rendezvous && t->inrendez); assert(t->state == Rendezvous && t->asleep);
t->state = Ready; t->state = Ready;
_threadready(t); _threadready(t);
unlock(&t->proc->lock); unlock(&t->proc->lock);

View file

@ -5,11 +5,16 @@ extern int _threaddebuglevel;
void void
doexec(void *v) doexec(void *v)
{ {
int fd[3];
char **argv = v; char **argv = v;
print("doexec\n"); fd[0] = dup(0, -1);
procexec(nil, argv[0], argv); fd[1] = dup(1, -1);
fd[2] = dup(2, -1);
threadexec(nil, fd, argv[0], argv);
print("exec failed: %r\n");
sendp(threadwaitchan(), nil); sendp(threadwaitchan(), nil);
threadexits(nil);
} }
void void
@ -28,7 +33,7 @@ threadmain(int argc, char **argv)
proccreate(doexec, argv, 8192); proccreate(doexec, argv, 8192);
w = recvp(c); w = recvp(c);
if(w == nil) if(w == nil)
print("exec failed: %r\n"); print("exec/recvp failed: %r\n");
else else
print("%d %lud %lud %lud %s\n", w->pid, w->time[0], w->time[1], w->time[2], w->msg); print("%d %lud %lud %lud %s\n", w->pid, w->time[0], w->time[1], w->time[2], w->msg);
threadexits(nil); threadexits(nil);

View file

@ -7,20 +7,19 @@
* _threadgetproc()->thread is always a live pointer. * _threadgetproc()->thread is always a live pointer.
* p->threads, p->ready, and _threadrgrp also contain * p->threads, p->ready, and _threadrgrp also contain
* live thread pointers. These may only be consulted * live thread pointers. These may only be consulted
* while holding p->lock or _threadrgrp.lock; in procs * while holding p->lock; in procs other than p, the
* other than p, the pointers are only guaranteed to be live * pointers are only guaranteed to be live while the lock
* while the lock is still being held. * is still being held.
* *
* Thread structures can only be freed by the proc * Thread structures can only be freed by the proc
* they belong to. Threads marked with t->inrendez * they belong to. Threads marked with t->inrendez
* need to be extracted from the _threadrgrp before * need to be extracted from the _threadrgrp before
* being freed. * being freed.
*
* _threadrgrp.lock cannot be acquired while holding p->lock.
*/ */
#include <u.h>
#include <assert.h> #include <assert.h>
#include <lib9.h> #include <libc.h>
#include <thread.h> #include <thread.h>
#include "label.h" #include "label.h"
@ -28,10 +27,8 @@ typedef struct Thread Thread;
typedef struct Proc Proc; typedef struct Proc Proc;
typedef struct Tqueue Tqueue; typedef struct Tqueue Tqueue;
typedef struct Pqueue Pqueue; typedef struct Pqueue Pqueue;
typedef struct Rgrp Rgrp;
typedef struct Execargs Execargs; typedef struct Execargs Execargs;
/* must match list in sched.c */
typedef enum typedef enum
{ {
Dead, Dead,
@ -50,17 +47,9 @@ typedef enum
enum enum
{ {
RENDHASH = 10009,
Printsize = 2048,
NPRIV = 8, NPRIV = 8,
}; };
struct Rgrp
{
Lock lock;
Thread *hash[RENDHASH];
};
struct Tqueue /* Thread queue */ struct Tqueue /* Thread queue */
{ {
int asleep; int asleep;
@ -68,27 +57,38 @@ struct Tqueue /* Thread queue */
Thread *tail; Thread *tail;
}; };
struct Pqueue { /* Proc queue */
Lock lock;
Proc *head;
Proc **tail;
};
struct Thread struct Thread
{ {
Lock lock; /* protects thread data structure */ Lock lock; /* protects thread data structure */
Label sched; /* for context switches */
int id; /* thread id */
int grp; /* thread group */
int moribund; /* thread needs to die */
State state; /* run state */
State nextstate; /* next run state */
uchar *stk; /* top of stack (lowest address of stack) */
uint stksize; /* stack size */
Thread *next; /* next on ready queue */
Proc *proc; /* proc of this thread */ int asleep; /* thread is in _threadsleep */
Label context; /* for context switches */
int grp; /* thread group */
int id; /* thread id */
int moribund; /* thread needs to die */
char *name; /* name of thread */
Thread *next; /* next on ready queue */
Thread *nextt; /* next on list of threads in this proc */ Thread *nextt; /* next on list of threads in this proc */
State nextstate; /* next run state */
Proc *proc; /* proc of this thread */
Thread *prevt; /* prev on list of threads in this proc */ Thread *prevt; /* prev on list of threads in this proc */
int ret; /* return value for Exec, Fork */ int ret; /* return value for Exec, Fork */
State state; /* run state */
uchar *stk; /* top of stack (lowest address of stack) */
uint stksize; /* stack size */
void* udata[NPRIV]; /* User per-thread data pointer */
char *cmdname; /* ptr to name of thread */ /*
* for debugging only
* (could go away without impacting correct behavior):
*/
int inrendez;
Channel *altc; Channel *altc;
_Procrend altrend; _Procrend altrend;
@ -96,10 +96,7 @@ struct Thread
Alt *alt; /* pointer to current alt structure (debugging) */ Alt *alt; /* pointer to current alt structure (debugging) */
ulong userpc; ulong userpc;
Channel *c; Channel *c;
pthread_cond_t cond;
void* udata[NPRIV]; /* User per-thread data pointer */
int lastfd;
}; };
struct Execargs struct Execargs
@ -113,12 +110,13 @@ struct Execargs
struct Proc struct Proc
{ {
Lock lock; Lock lock;
Label sched; /* for context switches */
Proc *link; /* in proctab */ Label context; /* for context switches */
int pid; /* process id */ Proc *link; /* in ptab */
int splhi; /* delay notes */ int splhi; /* delay notes */
Thread *thread; /* running thread */ Thread *thread; /* running thread */
Thread *idle; /* idle thread */ Thread *idle; /* idle thread */
int id;
int needexec; int needexec;
Execargs exec; /* exec argument */ Execargs exec; /* exec argument */
@ -131,13 +129,15 @@ struct Proc
Tqueue ready; /* Runnable threads */ Tqueue ready; /* Runnable threads */
Lock readylock; Lock readylock;
char printbuf[Printsize];
int blocked; /* In a rendezvous */ int blocked; /* In a rendezvous */
int pending; /* delayed note pending */ int pending; /* delayed note pending */
int nonotes; /* delay notes */ int nonotes; /* delay notes */
uint nextID; /* ID of most recently created thread */ uint nextID; /* ID of most recently created thread */
Proc *next; /* linked list of Procs */ Proc *next; /* linked list of Procs */
void (*schedfn)(Proc*); /* function to call in scheduler */
_Procrend rend; /* sleep here for more ready threads */ _Procrend rend; /* sleep here for more ready threads */
void *arg; /* passed between shared and unshared stk */ void *arg; /* passed between shared and unshared stk */
@ -147,29 +147,17 @@ struct Proc
void* udata; /* User per-proc data pointer */ void* udata; /* User per-proc data pointer */
int nsched; int nsched;
};
struct Pqueue { /* Proc queue */ /*
Lock lock; * for debugging only
Proc *head; */
Proc **tail; int pid; /* process id */
}; int pthreadid; /* pthread id */
struct Ioproc
{
int tid;
Channel *c, *creply;
int inuse;
long (*op)(va_list*);
va_list arg;
long ret;
char err[ERRMAX];
Ioproc *next;
}; };
void _swaplabel(Label*, Label*); void _swaplabel(Label*, Label*);
void _freeproc(Proc*); Proc* _newproc(void);
Proc* _newproc(void(*)(void*), void*, uint, char*, int, int); int _newthread(Proc*, void(*)(void*), void*, uint, char*, int);
int _procsplhi(void); int _procsplhi(void);
void _procsplx(int); void _procsplx(int);
int _sched(void); int _sched(void);
@ -177,7 +165,8 @@ int _schedexec(Execargs*);
void _schedexecwait(void); void _schedexecwait(void);
void _schedexit(Proc*); void _schedexit(Proc*);
int _schedfork(Proc*); int _schedfork(Proc*);
void _scheduler(void*); void _threadfree(Thread*);
void _threadscheduler(void*);
void _systhreadinit(void); void _systhreadinit(void);
void _threadassert(char*); void _threadassert(char*);
void __threaddebug(ulong, char*, ...); void __threaddebug(ulong, char*, ...);
@ -186,12 +175,15 @@ void _threadexitsall(char*);
Proc* _threadgetproc(void); Proc* _threadgetproc(void);
extern void _threadmultiproc(void); extern void _threadmultiproc(void);
Proc* _threaddelproc(void); Proc* _threaddelproc(void);
void _threadinitproc(Proc*);
void _threadwaitkids(Proc*);
void _threadsetproc(Proc*); void _threadsetproc(Proc*);
void _threadinitstack(Thread*, void(*)(void*), void*); void _threadinitstack(Thread*, void(*)(void*), void*);
void _threadlinkmain(void);
void* _threadmalloc(long, int); void* _threadmalloc(long, int);
void _threadnote(void*, char*); void _threadnote(void*, char*);
void _threadready(Thread*); void _threadready(Thread*);
void _threadidle(void); void _threadsetidle(int);
void _threadsleep(_Procrend*); void _threadsleep(_Procrend*);
void _threadwakeup(_Procrend*); void _threadwakeup(_Procrend*);
void _threadsignal(void); void _threadsignal(void);
@ -200,13 +192,15 @@ long _xdec(long*);
void _xinc(long*); void _xinc(long*);
void _threadremove(Proc*, Thread*); void _threadremove(Proc*, Thread*);
void threadstatus(void); void threadstatus(void);
void _threadstartproc(Proc*);
void _threadexitproc(char*);
void _threadexitallproc(char*);
extern int _threadnprocs;
extern int _threaddebuglevel; extern int _threaddebuglevel;
extern char* _threadexitsallstatus; extern char* _threadexitsallstatus;
extern Pqueue _threadpq; extern Pqueue _threadpq;
extern Channel* _threadwaitchan; extern Channel* _threadwaitchan;
extern Rgrp _threadrgrp;
extern void _stackfree(void*);
#define DBGAPPL (1 << 0) #define DBGAPPL (1 << 0)
#define DBGSCHED (1 << 16) #define DBGSCHED (1 << 16)
@ -216,8 +210,6 @@ extern void _stackfree(void*);
#define DBGNOTE (1 << 20) #define DBGNOTE (1 << 20)
#define DBGEXEC (1 << 21) #define DBGEXEC (1 << 21)
#define ioproc_arg(io, type) (va_arg((io)->arg, type))
extern int _threadgetpid(void);
extern void _threadmemset(void*, int, int); extern void _threadmemset(void*, int, int);
extern void _threaddebugmemset(void*, int, int); extern void _threaddebugmemset(void*, int, int);
extern int _threadprocs; extern int _threadprocs;

View file

@ -6,17 +6,18 @@ _threadinitstack(Thread *t, void (*f)(void*), void *arg)
sigset_t zero; sigset_t zero;
/* do a reasonable initialization */ /* do a reasonable initialization */
memset(&t->sched.uc, 0, sizeof t->sched.uc); memset(&t->context.uc, 0, sizeof t->context.uc);
sigemptyset(&zero); sigemptyset(&zero);
sigprocmask(SIG_BLOCK, &zero, &t->sched.uc.uc_sigmask); sigprocmask(SIG_BLOCK, &zero, &t->context.uc.uc_sigmask);
/* call getcontext, because on Linux makecontext neglects floating point */ /* call getcontext, because on Linux makecontext neglects floating point */
getcontext(&t->sched.uc); getcontext(&t->context.uc);
/* call makecontext to do the real work. */ /* call makecontext to do the real work. */
t->sched.uc.uc_stack.ss_sp = t->stk; /* leave a few words open on both ends */
t->sched.uc.uc_stack.ss_size = t->stksize; t->context.uc.uc_stack.ss_sp = t->stk+8;
makecontext(&t->sched.uc, (void(*)())f, 1, arg); t->context.uc.uc_stack.ss_size = t->stksize-16;
makecontext(&t->context.uc, (void(*)())f, 1, arg);
} }
void void