more thread work

This commit is contained in:
rsc 2004-11-08 16:03:20 +00:00
parent 77dcf88474
commit 1956455367
12 changed files with 294 additions and 340 deletions

View file

@ -4,7 +4,7 @@ static Lock chanlock; /* central channel access lock */
static void enqueue(Alt*, Thread*); static void enqueue(Alt*, Thread*);
static void dequeue(Alt*); static void dequeue(Alt*);
static int altexec(Alt*, int); static int altexec(Alt*);
int _threadhighnentry; int _threadhighnentry;
int _threadnalt; int _threadnalt;
@ -101,7 +101,7 @@ _alt(Alt *alts)
{ {
Alt *a, *xa; Alt *a, *xa;
Channel *c; Channel *c;
int n, s; int n;
Thread *t; Thread *t;
/* /*
@ -119,7 +119,6 @@ _alt(Alt *alts)
t = _threadgetproc()->thread; t = _threadgetproc()->thread;
if((t && t->moribund) || _threadexitsallstatus) if((t && t->moribund) || _threadexitsallstatus)
yield(); /* won't return */ yield(); /* won't return */
s = _procsplhi();
lock(&chanlock); lock(&chanlock);
/* test whether any channels can proceed */ /* test whether any channels can proceed */
@ -134,7 +133,6 @@ _alt(Alt *alts)
c = xa->c; c = xa->c;
if(c==nil){ if(c==nil){
unlock(&chanlock); unlock(&chanlock);
_procsplx(s);
return -1; return -1;
} }
if(canexec(xa)) if(canexec(xa))
@ -146,8 +144,7 @@ _alt(Alt *alts)
/* nothing can proceed */ /* nothing can proceed */
if(xa->op == CHANNOBLK){ if(xa->op == CHANNOBLK){
unlock(&chanlock); unlock(&chanlock);
_procsplx(s); _threadnalt++;
_threadnalt++;
return xa - alts; return xa - alts;
} }
@ -172,9 +169,7 @@ _threadnalt++;
t->alt = alts; t->alt = alts;
t->chan = Chanalt; t->chan = Chanalt;
t->altrend.l = &chanlock; t->altrend.l = &chanlock;
_procsplx(s);
_threadsleep(&t->altrend); _threadsleep(&t->altrend);
s = _procsplhi();
/* dequeue from channels, find selected one */ /* dequeue from channels, find selected one */
a = nil; a = nil;
@ -187,13 +182,12 @@ _threadnalt++;
dequeue(xa); dequeue(xa);
} }
unlock(&chanlock); unlock(&chanlock);
_procsplx(s);
if(a == nil){ /* we were interrupted */ if(a == nil){ /* we were interrupted */
assert(c==(Channel*)~0); assert(c==(Channel*)~0);
return -1; return -1;
} }
}else{ }else{
altexec(a, s); /* unlocks chanlock, does splx */ altexec(a); /* unlocks chanlock, does splx */
} }
if(t) if(t)
t->chan = Channone; t->chan = Channone;
@ -445,7 +439,7 @@ altcopy(void *dst, void *src, int sz)
} }
static int static int
altexec(Alt *a, int spl) altexec(Alt *a)
{ {
volatile Alt *b; volatile Alt *b;
int i, n, otherop; int i, n, otherop;
@ -492,7 +486,6 @@ altexec(Alt *a, int spl)
_threaddebug(DBGCHAN, "chanlock is %lud", *(ulong*)(void*)&chanlock); _threaddebug(DBGCHAN, "chanlock is %lud", *(ulong*)(void*)&chanlock);
_threaddebug(DBGCHAN, "unlocking the chanlock"); _threaddebug(DBGCHAN, "unlocking the chanlock");
unlock(&chanlock); unlock(&chanlock);
_procsplx(spl);
return 1; return 1;
} }
@ -503,6 +496,5 @@ altexec(Alt *a, int spl)
altcopy(buf, me, c->e); altcopy(buf, me, c->e);
unlock(&chanlock); unlock(&chanlock);
_procsplx(spl);
return 1; return 1;
} }

View file

@ -138,7 +138,7 @@ proccreate(void (*f)(void*), void *arg, uint stacksize)
p = _threadgetproc(); p = _threadgetproc();
np = _newproc(); np = _newproc();
p->newproc = np; p->newproc = np;
p->schedfn = _threadstartproc; p->schedfn = _kthreadstartproc;
id = _newthread(np, f, arg, stacksize, nil, p->thread->grp); id = _newthread(np, f, arg, stacksize, nil, p->thread->grp);
_sched(); /* call into scheduler to create proc XXX */ _sched(); /* call into scheduler to create proc XXX */
return id; return id;

View file

@ -10,20 +10,9 @@ _threadexec(Channel *pidc, int fd[3], char *prog, char *args[], int freeargs)
int pfd[2]; int pfd[2];
int n, pid; int n, pid;
char exitstr[ERRMAX]; char exitstr[ERRMAX];
static int firstexec = 1;
static Lock lk;
_threaddebug(DBGEXEC, "threadexec %s", prog); _threaddebug(DBGEXEC, "threadexec %s", prog);
if(firstexec){
lock(&lk);
if(firstexec){
firstexec = 0;
_threadfirstexec();
}
unlock(&lk);
}
/* /*
* We want threadexec to behave like exec; if exec succeeds, * We want threadexec to behave like exec; if exec succeeds,
* never return, and if it fails, return with errstr set. * never return, and if it fails, return with errstr set.
@ -53,7 +42,6 @@ _threadexec(Channel *pidc, int fd[3], char *prog, char *args[], int freeargs)
_threaddebug(DBGSCHED, "exit after efork"); _threaddebug(DBGSCHED, "exit after efork");
_exit(0); _exit(0);
default: default:
_threadafterexec();
if(freeargs) if(freeargs)
free(args); free(args);
break; break;
@ -88,14 +76,14 @@ Bad:
void void
threadexec(Channel *pidc, int fd[3], char *prog, char *args[]) threadexec(Channel *pidc, int fd[3], char *prog, char *args[])
{ {
if(_callthreadexec(pidc, fd, prog, args, 0) >= 0) if(_kthreadexec(pidc, fd, prog, args, 0) >= 0)
threadexits(nil); threadexits(nil);
} }
int int
threadspawn(int fd[3], char *prog, char *args[]) threadspawn(int fd[3], char *prog, char *args[])
{ {
return _callthreadexec(nil, fd, prog, args, 0); return _kthreadexec(nil, fd, prog, args, 0);
} }
/* /*
@ -128,7 +116,7 @@ threadexecl(Channel *pidc, int fd[3], char *f, ...)
args[n] = 0; args[n] = 0;
va_end(arg); va_end(arg);
if(_callthreadexec(pidc, fd, f, args, 1) >= 0) if(_kthreadexec(pidc, fd, f, args, 1) >= 0)
threadexits(nil); threadexits(nil);
} }

View file

@ -32,7 +32,7 @@ threadexitsall(char *exitstr)
_threadexitsallstatus = exitstr; _threadexitsallstatus = exitstr;
_threaddebug(DBGSCHED, "_threadexitsallstatus set to %p", _threadexitsallstatus); _threaddebug(DBGSCHED, "_threadexitsallstatus set to %p", _threadexitsallstatus);
/* leave */ /* leave */
_threadexitallproc(exitstr); _kthreadexitallproc(exitstr);
} }
Channel* Channel*

View file

@ -7,164 +7,25 @@
#include <unistd.h> #include <unistd.h>
#include <fcntl.h> #include <fcntl.h>
#define debugpoll 0 void
fdwait()
#ifdef __APPLE__
#include <sys/time.h>
enum { POLLIN=1, POLLOUT=2, POLLERR=4 };
struct pollfd
{ {
int fd;
int events;
int revents;
};
int
poll(struct pollfd *p, int np, int ms)
{
int i, maxfd, n;
struct timeval tv, *tvp;
fd_set rfd, wfd, efd; fd_set rfd, wfd, efd;
maxfd = -1;
FD_ZERO(&rfd); FD_ZERO(&rfd);
FD_ZERO(&wfd); FD_ZERO(&wfd);
FD_ZERO(&efd); FD_ZERO(&efd);
for(i=0; i<np; i++){ if(mode=='w')
p[i].revents = 0; FD_SET(&wfd, fd);
if(p[i].fd == -1) else
continue; FD_SET(&rfd, fd);
if(p[i].fd > maxfd) FD_SET(&efd, fd);
maxfd = p[i].fd; select(fd+1, &rfd, &wfd, &efd, nil);
if(p[i].events & POLLIN)
FD_SET(p[i].fd, &rfd);
if(p[i].events & POLLOUT)
FD_SET(p[i].fd, &wfd);
FD_SET(p[i].fd, &efd);
}
if(ms != -1){
tv.tv_usec = (ms%1000)*1000;
tv.tv_sec = ms/1000;
tvp = &tv;
}else
tvp = nil;
if(debugpoll){
fprint(2, "select %d:", maxfd+1);
for(i=0; i<=maxfd; i++){
if(FD_ISSET(i, &rfd))
fprint(2, " r%d", i);
if(FD_ISSET(i, &wfd))
fprint(2, " w%d", i);
if(FD_ISSET(i, &efd))
fprint(2, " e%d", i);
}
fprint(2, "; tp=%p, t=%d.%d\n", tvp, tv.tv_sec, tv.tv_usec);
}
n = select(maxfd+1, &rfd, &wfd, &efd, tvp);
if(n <= 0)
return n;
for(i=0; i<np; i++){
if(p[i].fd == -1)
continue;
if(FD_ISSET(p[i].fd, &rfd))
p[i].revents |= POLLIN;
if(FD_ISSET(p[i].fd, &wfd))
p[i].revents |= POLLOUT;
if(FD_ISSET(p[i].fd, &efd))
p[i].revents |= POLLERR;
}
return n;
}
#else
#include <poll.h>
#endif
/*
* Poll file descriptors in an idle loop.
*/
typedef struct Poll Poll;
struct Poll
{
Channel *c; /* for sending back */
};
static Channel *sleepchan[64];
static int sleeptime[64];
static int nsleep;
static struct pollfd pfd[64];
static struct Poll polls[64];
static int npoll;
static void
pollidle(void *v)
{
int i, n, t;
uint now;
for(;; yield()){
if(debugpoll) fprint(2, "poll %d:", npoll);
for(i=0; i<npoll; i++){
if(debugpoll) fprint(2, " %d%c", pfd[i].fd, pfd[i].events==POLLIN ? 'r' : 'w');
pfd[i].revents = 0;
}
t = -1;
now = p9nsec()/1000000;
for(i=0; i<nsleep; i++){
n = sleeptime[i] - now;
if(debugpoll) fprint(2, " s%d", n);
if(n < 0)
n = 0;
if(t == -1 || n < t)
t = n;
}
if(debugpoll) fprint(2, "; t=%d\n", t);
n = poll(pfd, npoll, t);
//fprint(2, "poll ret %d:", n);
now = p9nsec()/1000000;
for(i=0; i<nsleep; i++){
if((int)(sleeptime[i] - now) < 0){
nbsendul(sleepchan[i], 0);
nsleep--;
sleepchan[i] = sleepchan[nsleep];
sleeptime[i] = sleeptime[nsleep];
i--;
}
}
if(n <= 0)
continue;
for(i=0; i<npoll; i++)
if(pfd[i].fd != -1 && pfd[i].revents){
//fprint(2, " %d", pfd[i].fd);
pfd[i].fd = -1;
pfd[i].events = 0;
pfd[i].revents = 0;
nbsendul(polls[i].c, 1);
//fprint(2, " x%d", pfd[i].fd);
}
//fprint(2, "\n");
}
} }
void void
threadfdwaitsetup(void) threadfdwaitsetup(void)
{ {
static int setup = 0;
if(!setup){
setup = 1;
proccreate(pollidle, nil, 16384);
}
} }
void void
@ -199,6 +60,8 @@ _threadfdwait(int fd, int rw, ulong pc)
polls[i].c = &s.c; polls[i].c = &s.c;
if(0) fprint(2, "%s [%3d] fdwait %d %c list *0x%lux\n", if(0) fprint(2, "%s [%3d] fdwait %d %c list *0x%lux\n",
argv0, threadid(), fd, rw, pc); argv0, threadid(), fd, rw, pc);
if(pollpid)
postnote(PNPROC, pollpid, "interrupt");
recvul(&s.c); recvul(&s.c);
} }

View file

@ -11,12 +11,10 @@ struct Mainarg
char **argv; char **argv;
}; };
int _threadmainpid;
int mainstacksize; int mainstacksize;
int _callsthreaddaemonize;
static int passtomainpid;
extern void (*_sysfatal)(char*, va_list); extern void (*_sysfatal)(char*, va_list);
extern Jmp *(*_notejmpbuf)(void);
static void static void
mainlauncher(void *arg) mainlauncher(void *arg)
@ -24,58 +22,24 @@ mainlauncher(void *arg)
Mainarg *a; Mainarg *a;
a = arg; a = arg;
_threadmaininit(); _kmaininit();
threadmain(a->argc, a->argv); threadmain(a->argc, a->argv);
threadexits("threadmain"); threadexits("threadmain");
} }
static void
passer(void *x, char *msg)
{
USED(x);
Waitmsg *w;
if(strcmp(msg, "sys: usr2") == 0)
_exit(0); /* daemonize */
else if(strcmp(msg, "sys: child") == 0){
w = wait();
if(w == nil)
_exit(1);
_exit(atoi(w->msg));
}else
postnote(PNPROC, passtomainpid, msg);
}
int int
main(int argc, char **argv) main(int argc, char **argv)
{ {
int pid;
Mainarg a; Mainarg a;
Proc *p; Proc *p;
sigset_t mask;
/* /*
* Do daemonize hack here. * In pthreads, threadmain is actually run in a subprocess,
* so that the main process can exit (if threaddaemonize is called).
* The main process relays notes to the subprocess.
* _Threadbackgroundsetup will return only in the subprocess.
*/ */
if(_callsthreaddaemonize){ _threadbackgroundinit();
passtomainpid = getpid();
switch(pid = fork()){
case -1:
sysfatal("fork: %r");
case 0:
/* continue executing */
_threadmainpid = getppid();
break;
default:
/* wait for signal USR2 */
notify(passer);
for(;;)
pause();
_exit(0);
}
}
/* /*
* Instruct QLock et al. to use our scheduling functions * Instruct QLock et al. to use our scheduling functions
@ -85,16 +49,17 @@ main(int argc, char **argv)
/* /*
* Install our own _threadsysfatal which takes down * Install our own _threadsysfatal which takes down
* the whole conglomeration of procs. * the whole confederation of procs.
*/ */
_sysfatal = _threadsysfatal; _sysfatal = _threadsysfatal;
/* /*
* XXX Install our own jump handler. * Install our own jump handler.
*/ */
_notejmpbuf = _threadgetjmp;
/* /*
* Install our own signal handlers. * Install our own signal handler.
*/ */
notify(_threadnote); notify(_threadnote);
@ -119,3 +84,15 @@ void
_threadlinkmain(void) _threadlinkmain(void)
{ {
} }
Jmp*
_threadgetjmp(void)
{
static Jmp j;
Proc *p;
p = _threadgetproc();
if(p == nil)
return &j;
return &p->sigjmp;
}

View file

@ -1,15 +1,12 @@
<$PLAN9/src/mkhdr <$PLAN9/src/mkhdr
LIB=libthread.a LIB=libthread.a
THREAD=`sh ./thread.sh` SYSOFILES=`sh ./sysofiles.sh`
OFILES=\ OFILES=\
$OBJTYPE.$O\ $SYSOFILES\
$THREAD.$O\
asm-$SYSNAME-$OBJTYPE.$O\
channel.$O\ channel.$O\
chanprint.$O\ chanprint.$O\
create.$O\ create.$O\
daemon.$O\
debug.$O\ debug.$O\
exec-unix.$O\ exec-unix.$O\
exit.$O\ exit.$O\
@ -29,10 +26,10 @@ OFILES=\
main.$O\ main.$O\
memset.$O\ memset.$O\
memsetd.$O\ memsetd.$O\
note.$O\
read9pmsg.$O\ read9pmsg.$O\
ref.$O\ ref.$O\
sched.$O\ sched.$O\
setproc.$O\
sleep.$O\ sleep.$O\
HFILES=\ HFILES=\
@ -57,6 +54,9 @@ tspawn: tspawn.$O $PLAN9/lib/$LIB
trend: trend.$O $PLAN9/lib/$LIB trend: trend.$O $PLAN9/lib/$LIB
$LD -o trend trend.$O $LDFLAGS -lthread -l9 $LD -o trend trend.$O $LDFLAGS -lthread -l9
tsignal: tsignal.$O $PLAN9/lib/$LIB
$LD -o tsignal tsignal.$O $LDFLAGS -lthread -l9
CLEANFILES=$CLEANFILES tprimes texec CLEANFILES=$CLEANFILES tprimes texec
asm-Linux-ppc.$O: asm-Linux-386.s asm-Linux-ppc.$O: asm-Linux-386.s

View file

@ -2,53 +2,36 @@
#include <errno.h> #include <errno.h>
#include "threadimpl.h" #include "threadimpl.h"
static int multi; /*
static Proc *theproc; * Basic kernel thread management.
*/
static pthread_key_t key; static pthread_key_t key;
/*
* Called before we go multiprocess.
*/
void void
_threadmultiproc(void) _kthreadinit(void)
{ {
if(multi == 0){
multi = 1;
pthread_key_create(&key, 0); pthread_key_create(&key, 0);
_threadsetproc(theproc);
}
} }
/*
* Set the proc for the current pthread.
*/
void void
_threadsetproc(Proc *p) _kthreadsetproc(Proc *p)
{ {
if(!multi){ sigset_t all;
theproc = p;
return; p->pthreadid = pthread_self();
} sigfillset(&all);
pthread_sigmask(SIG_SETMASK, &all, nil);
pthread_setspecific(key, p); pthread_setspecific(key, p);
} }
/*
* Get the proc for the current pthread.
*/
Proc* Proc*
_threadgetproc(void) _kthreadgetproc(void)
{ {
if(!multi)
return theproc;
return pthread_getspecific(key); return pthread_getspecific(key);
} }
/*
* Called to start a new proc.
*/
void void
_threadstartproc(Proc *p) _kthreadstartproc(Proc *p)
{ {
Proc *np; Proc *np;
pthread_t tid; pthread_t tid;
@ -63,69 +46,43 @@ _threadstartproc(Proc *p)
np->pthreadid = tid; np->pthreadid = tid;
} }
/*
* Called to associate p with the current pthread.
*/
void void
_threadinitproc(Proc *p) _kthreadexitproc(char *exitstr)
{
p->pthreadid = pthread_self();
_threadsetproc(p);
}
/*
* Called to exit the current pthread.
*/
void
_threadexitproc(char *exitstr)
{ {
_threaddebug(DBGSCHED, "_pthreadexit"); _threaddebug(DBGSCHED, "_pthreadexit");
pthread_exit(nil); pthread_exit(nil);
} }
/*
* Called to exit all pthreads.
*/
void void
_threadexitallproc(char *exitstr) _kthreadexitallproc(char *exitstr)
{ {
_threaddebug(DBGSCHED, "_threadexitallproc"); _threaddebug(DBGSCHED, "_threadexitallproc");
exits(exitstr); exits(exitstr);
} }
/* /*
* Called to poll for any kids of this pthread. * Exec. Pthreads does the hard work of making it possible
* Wait messages aren't restricted to a particular * for any thread to do the waiting, so this is pretty easy.
* pthread, so we have a separate proc responsible * We create a separate proc whose job is to wait for children
* for them. So this is a no-op. * and deliver wait messages.
*/ */
void static Channel *_threadexecwaitchan;
_threadwaitkids(Proc *p)
{
}
/*
* Separate process to wait for child messages.
* Also runs signal handlers.
*/
static Channel *_threadexecchan;
static void static void
_threadwaitproc(void *v) _threadwaitproc(void *v)
{ {
Channel *c; Channel *c;
Waitmsg *w; Waitmsg *w;
sigset_t none;
sigemptyset(&none); _threadinternalproc();
pthread_sigmask(SIG_SETMASK, &none, 0);
USED(v); USED(v);
for(;;){ for(;;){
w = wait(); w = wait();
if(w == nil){ if(w == nil){
if(errno == ECHILD) if(errno == ECHILD) /* wait for more */
recvul(_threadexecchan); recvul(_threadexecwaitchan);
continue; continue;
} }
if((c = _threadwaitchan) != nil) if((c = _threadwaitchan) != nil)
@ -133,43 +90,182 @@ _threadwaitproc(void *v)
else else
free(w); free(w);
} }
fprint(2, "_threadwaitproc exits\n"); fprint(2, "_threadwaitproc exits\n"); /* not reached */
} }
/*
* Called before the first exec.
*/
void
_threadfirstexec(void)
{
}
/* /*
* Called from mainlauncher before threadmain. * Call _threadexec in the right conditions.
*/ */
void int
_threadmaininit(void) _kthreadexec(Channel *c, int fd[3], char *prog, char *args[], int freeargs)
{ {
_threadexecchan = chancreate(sizeof(ulong), 1); static Lock lk;
int rv;
if(!_threadexecwaitchan){
lock(&lk);
if(!_threadexecwaitchan){
_threadexecwaitchan = chancreate(sizeof(ulong), 1);
proccreate(_threadwaitproc, nil, 32*1024); proccreate(_threadwaitproc, nil, 32*1024);
}
/* unlock(&lk);
* Sleazy: decrement threadnprocs so that }
* the existence of the _threadwaitproc proc rv = _threadexec(c, fd, prog, args, freeargs);
* doesn't keep us from exiting. nbsendul(_threadexecwaitchan, 1);
*/ return rv;
lock(&_threadpq.lock);
--_threadnprocs;
/* print("change %d -> %d\n", _threadnprocs+1, _threadnprocs); */
unlock(&_threadpq.lock);
} }
/* /*
* Called after forking the exec child. * Some threaded applications want to run in the background.
* Calling fork() and exiting in the parent will result in a child
* with a single pthread (if we are using pthreads), and will screw
* up our internal process info if we are using clone/rfork.
* Instead, apps should call threadbackground(), which takes
* care of this.
*
* _threadbackgroundinit is called from main.
*/ */
void
_threadafterexec(void) static int mainpid, passerpid;
static void
passer(void *x, char *msg)
{ {
nbsendul(_threadexecchan, 1); Waitmsg *w;
USED(x);
if(strcmp(msg, "sys: usr2") == 0)
_exit(0); /* daemonize */
else if(strcmp(msg, "sys: child") == 0){
/* child exited => so should we */
w = wait();
if(w == nil)
_exit(1);
_exit(atoi(w->msg));
}else
postnote(PNGROUP, mainpid, msg);
} }
void
_threadbackgroundinit(void)
{
int pid;
sigset_t mask;
sigfillset(&mask);
pthread_sigmask(SIG_BLOCK, &mask, 0);
return;
passerpid = getpid();
switch(pid = fork()){
case -1:
sysfatal("fork: %r");
case 0:
rfork(RFNOTEG);
return;
default:
break;
}
mainpid = pid;
notify(passer);
notifyon("sys: child");
notifyon("sys: usr2"); /* should already be on */
for(;;)
pause();
_exit(0);
}
void
threadbackground(void)
{
if(passerpid <= 1)
return;
postnote(PNPROC, passerpid, "sys: usr2");
}
/*
* Notes.
*/
Channel *_threadnotechan;
static ulong sigs;
static Lock _threadnotelk;
static void _threadnoteproc(void*);
extern int _p9strsig(char*);
extern char *_p9sigstr(int);
Channel*
threadnotechan(void)
{
if(_threadnotechan == nil){
lock(&_threadnotelk);
if(_threadnotechan == nil){
_threadnotechan = chancreate(sizeof(char*), 1);
proccreate(_threadnoteproc, nil, 32*1024);
}
unlock(&_threadnotelk);
}
return _threadnotechan;
}
void
_threadnote(void *x, char *msg)
{
USED(x);
if(_threadexitsallstatus)
_kthreadexitproc(_threadexitsallstatus);
if(strcmp(msg, "sys: usr2") == 0)
noted(NCONT);
if(_threadnotechan == nil)
noted(NDFLT);
sigs |= 1<<_p9strsig(msg);
noted(NCONT);
}
void
_threadnoteproc(void *x)
{
int i;
sigset_t none;
Channel *c;
_threadinternalproc();
sigemptyset(&none);
pthread_sigmask(SIG_SETMASK, &none, 0);
c = _threadnotechan;
for(;;){
if(sigs == 0)
pause();
for(i=0; i<32; i++){
if((sigs&(1<<i)) == 0)
continue;
sigs &= ~(1<<i);
if(i == 0)
continue;
sendp(c, _p9sigstr(i));
}
}
}
void
_threadschednote(void)
{
}
void
_kmaininit(void)
{
sigset_t all;
sigfillset(&all);
pthread_sigmask(SIG_SETMASK, &all, 0);
}

View file

@ -18,13 +18,12 @@ _threadscheduler(void *arg)
p = arg; p = arg;
_threadlinkmain(); _threadlinkmain();
_threadinitproc(p); _threadsetproc(p);
for(;;){ for(;;){
/* /*
* Clean up zombie children. * Clean up zombie children.
*/ */
_threadwaitkids(p);
/* /*
* Find next thread to run. * Find next thread to run.
@ -154,6 +153,7 @@ runthread(Proc *p)
if(p->nthreads==0 || (p->nthreads==1 && p->idle)) if(p->nthreads==0 || (p->nthreads==1 && p->idle))
return nil; return nil;
_threadschednote();
lock(&p->readylock); lock(&p->readylock);
q = &p->ready; q = &p->ready;
if(q->head == nil){ if(q->head == nil){
@ -180,7 +180,10 @@ runthread(Proc *p)
*/ */
q->asleep = 1; q->asleep = 1;
p->rend.l = &p->readylock; p->rend.l = &p->readylock;
while(q->asleep){
_procsleep(&p->rend); _procsleep(&p->rend);
_threadschednote();
}
/* /*
* Maybe we were awakened to exit? * Maybe we were awakened to exit?
@ -284,6 +287,25 @@ _threadsetidle(int id)
unlock(&p->readylock); unlock(&p->readylock);
} }
/*
* Mark proc as internal so that if all but internal procs exit, we exit.
*/
void
_threadinternalproc(void)
{
Proc *p;
p = _threadgetproc();
if(p->internal)
return;
lock(&_threadpq.lock);
if(p->internal == 0){
p->internal = 1;
--_threadnprocs;
}
unlock(&_threadpq.lock);
}
static void static void
schedexit(Proc *p) schedexit(Proc *p)
{ {
@ -301,6 +323,9 @@ schedexit(Proc *p)
break; break;
} }
} }
if(p->internal)
n = _threadnprocs;
else
n = --_threadnprocs; n = --_threadnprocs;
unlock(&_threadpq.lock); unlock(&_threadpq.lock);
@ -309,10 +334,10 @@ schedexit(Proc *p)
free(p); free(p);
if(n == 0){ if(n == 0){
_threaddebug(DBGSCHED, "procexit; no more procs"); _threaddebug(DBGSCHED, "procexit; no more procs");
_threadexitallproc(ex); _kthreadexitallproc(ex);
}else{ }else{
_threaddebug(DBGSCHED, "procexit"); _threaddebug(DBGSCHED, "procexit");
_threadexitproc(ex); _kthreadexitproc(ex);
} }
} }

View file

@ -36,3 +36,4 @@ _threadwakeup(_Procrend *r)
_threadready(t); _threadready(t);
unlock(&t->proc->lock); unlock(&t->proc->lock);
} }

View file

@ -2,13 +2,13 @@
if [ `uname` = Linux ] if [ `uname` = Linux ]
then then
case "`uname | awk '{print $3}'`" in case `uname -r` in
*)
echo Linux-clone
;;
2.[6789]*) 2.[6789]*)
echo pthread echo pthread
;; ;;
*)
echo Linux-clone
;;
esac esac
else else
echo pthread echo pthread

View file

@ -28,6 +28,14 @@ typedef struct Proc Proc;
typedef struct Tqueue Tqueue; typedef struct Tqueue Tqueue;
typedef struct Pqueue Pqueue; typedef struct Pqueue Pqueue;
typedef struct Execargs Execargs; typedef struct Execargs Execargs;
typedef struct Jmp Jmp;
/* sync with ../lib9/notify.c */
struct Jmp
{
p9jmp_buf b;
};
typedef enum typedef enum
{ {
@ -126,6 +134,7 @@ struct Proc
Proc *newproc; /* fork argument */ Proc *newproc; /* fork argument */
char exitstr[ERRMAX]; /* exit status */ char exitstr[ERRMAX]; /* exit status */
int internal;
int rforkflag; int rforkflag;
int nthreads; int nthreads;
Tqueue threads; /* All threads of this proc */ Tqueue threads; /* All threads of this proc */
@ -138,6 +147,7 @@ struct Proc
uint nextID; /* ID of most recently created thread */ uint nextID; /* ID of most recently created thread */
Proc *next; /* linked list of Procs */ Proc *next; /* linked list of Procs */
Jmp sigjmp; /* for notify implementation */
void (*schedfn)(Proc*); /* function to call in scheduler */ void (*schedfn)(Proc*); /* function to call in scheduler */
@ -161,8 +171,6 @@ struct Proc
void _swaplabel(Label*, Label*); void _swaplabel(Label*, Label*);
Proc* _newproc(void); Proc* _newproc(void);
int _newthread(Proc*, void(*)(void*), void*, uint, char*, int); int _newthread(Proc*, void(*)(void*), void*, uint, char*, int);
int _procsplhi(void);
void _procsplx(int);
int _sched(void); int _sched(void);
int _schedexec(Execargs*); int _schedexec(Execargs*);
void _schedexecwait(void); void _schedexecwait(void);
@ -178,14 +186,14 @@ void _threadexitsall(char*);
Proc* _threadgetproc(void); Proc* _threadgetproc(void);
extern void _threadmultiproc(void); extern void _threadmultiproc(void);
Proc* _threaddelproc(void); Proc* _threaddelproc(void);
void _threadinitproc(Proc*); void _kthreadinitproc(Proc*);
void _threadwaitkids(Proc*);
void _threadsetproc(Proc*); void _threadsetproc(Proc*);
void _threadinitstack(Thread*, void(*)(void*), void*); void _threadinitstack(Thread*, void(*)(void*), void*);
void _threadlinkmain(void); void _threadlinkmain(void);
void* _threadmalloc(long, int); void* _threadmalloc(long, int);
void _threadnote(void*, char*); void _threadnote(void*, char*);
void _threadready(Thread*); void _threadready(Thread*);
void _threadschednote(void);
void _threadsetidle(int); void _threadsetidle(int);
void _threadsleep(_Procrend*); void _threadsleep(_Procrend*);
void _threadwakeup(_Procrend*); void _threadwakeup(_Procrend*);
@ -195,12 +203,18 @@ long _xdec(long*);
void _xinc(long*); void _xinc(long*);
void _threadremove(Proc*, Thread*); void _threadremove(Proc*, Thread*);
void threadstatus(void); void threadstatus(void);
void _threadstartproc(Proc*);
void _threadexitproc(char*);
void _threadexitallproc(char*);
void _threadefork(int[3], int[2], char*, char**); void _threadefork(int[3], int[2], char*, char**);
Jmp* _threadgetjmp(void);
void _kthreadinit(void);
void _kthreadsetproc(Proc*);
Proc* _kthreadgetproc(void);
void _kthreadstartproc(Proc*);
void _kthreadexitproc(char*);
void _kthreadexitallproc(char*);
void _threadinternalproc(void);
void _threadbackgroundinit(void);
void _kmaininit(void);
extern int _threadmainpid;
extern int _threadnprocs; extern int _threadnprocs;
extern int _threaddebuglevel; extern int _threaddebuglevel;
extern char* _threadexitsallstatus; extern char* _threadexitsallstatus;
@ -222,8 +236,6 @@ extern void _threadstacklimit(void*, void*);
extern void _procdelthread(Proc*, Thread*); extern void _procdelthread(Proc*, Thread*);
extern void _procaddthread(Proc*, Thread*); extern void _procaddthread(Proc*, Thread*);
extern void _threadafterexec(void);
extern void _threadmaininit(void); extern void _threadmaininit(void);
extern void _threadfirstexec(void);
extern int _threadexec(Channel*, int[3], char*, char*[], int); extern int _threadexec(Channel*, int[3], char*, char*[], int);
extern int _callthreadexec(Channel*, int[3], char*, char*[], int); extern int _kthreadexec(Channel*, int[3], char*, char*[], int);