new thread library

This commit is contained in:
rsc 2004-12-25 22:00:11 +00:00
parent 8bbb2f6492
commit 3286afda88
2 changed files with 189 additions and 187 deletions

View file

@ -422,80 +422,75 @@ extern void needstack(int);
#define pow10 p9pow10 #define pow10 p9pow10
#endif #endif
/*
* just enough information so that libc can be
* properly locked without dragging in all of libthread
*/
typedef struct _Thread _Thread;
typedef struct _Threadlist _Threadlist;
struct _Threadlist
{
_Thread *head;
_Thread *tail;
};
extern _Thread *(*threadnow)(void);
/* /*
* synchronization * synchronization
*/ */
typedef struct Lock Lock; typedef struct Lock Lock;
struct Lock struct Lock
{ {
#ifdef PLAN9_PTHREADS #ifdef PLAN9PORT_USING_PTHREADS
int init; int init;
pthread_mutex_t mutex; pthread_mutex_t mutex;
#else
int val;
#endif #endif
int held;
}; };
extern int _tas(int*);
extern void lock(Lock*); extern void lock(Lock*);
extern void unlock(Lock*); extern void unlock(Lock*);
extern int canlock(Lock*); extern int canlock(Lock*);
extern int (*_lock)(Lock*, int, ulong);
extern void (*_unlock)(Lock*, ulong);
/* typedef struct QLock QLock;
* Used to implement process sleep and wakeup,
* either in terms of pthreads or our own primitives.
* This will be more portable than writing our own
* per-system implementations, and on some systems
* non-pthreads threading implementations break libc
* (cough, Linux, cough).
*/
typedef struct _Procrend _Procrend;
struct _Procrend
{
int asleep;
Lock *l;
void *arg;
int pid;
#ifdef PLAN9_PTHREADS
pthread_cond_t cond;
#endif
};
extern void _procsleep(_Procrend*);
extern void _procwakeup(_Procrend*);
typedef struct QLp QLp;
struct QLp
{
Lock inuse;
QLp *next;
_Procrend rend;
char state;
};
typedef
struct QLock struct QLock
{ {
Lock lock; Lock l;
int locked; _Thread *owner;
QLp *head; _Threadlist waiting;
QLp *tail; };
} QLock;
extern void qlock(QLock*); extern void qlock(QLock*);
extern void qunlock(QLock*); extern void qunlock(QLock*);
extern int canqlock(QLock*); extern int canqlock(QLock*);
extern void _qlockinit(void(*)(_Procrend*), void(*)(_Procrend*)); /* called only by the thread library */ extern int (*_qlock)(QLock*, int, ulong); /* do not use */
extern void (*_qunlock)(QLock*, ulong);
typedef typedef struct Rendez Rendez;
struct Rendez
{
QLock *l;
_Threadlist waiting;
};
extern void rsleep(Rendez*); /* unlocks r->l, sleeps, locks r->l again */
extern int rwakeup(Rendez*);
extern int rwakeupall(Rendez*);
extern void (*_rsleep)(Rendez*, ulong); /* do not use */
extern int (*_rwakeup)(Rendez*, int, ulong);
typedef struct RWLock RWLock;
struct RWLock struct RWLock
{ {
Lock lock; Lock l;
int readers; /* number of readers */ int readers;
int writer; /* number of writers */ _Thread *writer;
QLp *head; /* list of waiting processes */ _Threadlist rwaiting;
QLp *tail; _Threadlist wwaiting;
} RWLock; };
extern void rlock(RWLock*); extern void rlock(RWLock*);
extern void runlock(RWLock*); extern void runlock(RWLock*);
@ -503,18 +498,14 @@ extern int canrlock(RWLock*);
extern void wlock(RWLock*); extern void wlock(RWLock*);
extern void wunlock(RWLock*); extern void wunlock(RWLock*);
extern int canwlock(RWLock*); extern int canwlock(RWLock*);
extern int (*_rlock)(RWLock*, int, ulong); /* do not use */
extern int (*_wlock)(RWLock*, int, ulong);
extern void (*_runlock)(RWLock*, ulong);
extern void (*_wunlock)(RWLock*, ulong);
typedef /*
struct Rendez * per-process private data
{ */
QLock *l;
QLp *head;
QLp *tail;
} Rendez;
extern void rsleep(Rendez*); /* unlocks r->l, sleeps, locks r->l again */
extern int rwakeup(Rendez*);
extern int rwakeupall(Rendez*);
extern void** privalloc(void); extern void** privalloc(void);
extern void privfree(void**); extern void privfree(void**);
@ -589,7 +580,7 @@ extern void freenetconninfo(NetConnInfo*);
#define OTRUNC 16 /* or'ed in (except for exec), truncate file first */ #define OTRUNC 16 /* or'ed in (except for exec), truncate file first */
#define OCEXEC 32 /* or'ed in, close on exec */ #define OCEXEC 32 /* or'ed in, close on exec */
#define ORCLOSE 64 /* or'ed in, remove on close */ #define ORCLOSE 64 /* or'ed in, remove on close */
#define ODIRECT 128 /* or'ed in, bypass the cache */ #define ODIRECT 128 /* or'ed in, direct access */
#define OEXCL 0x1000 /* or'ed in, exclusive use (create only) */ #define OEXCL 0x1000 /* or'ed in, exclusive use (create only) */
#define OLOCK 0x2000 /* or'ed in, lock after opening */ #define OLOCK 0x2000 /* or'ed in, lock after opening */
@ -724,6 +715,10 @@ extern int unmount(char*, char*);
*/ */
extern int noted(int); extern int noted(int);
extern int notify(void(*)(void*, char*)); extern int notify(void(*)(void*, char*));
extern void notifyenable(char*);
extern void notifydisable(char*);
extern void notifyon(char*);
extern void notifyoff(char*);
extern int p9open(char*, int); extern int p9open(char*, int);
extern int fd2path(int, char*, int); extern int fd2path(int, char*, int);
extern int p9pipe(int*); extern int p9pipe(int*);
@ -772,6 +767,7 @@ extern ulong rendezvous(ulong, ulong);
#define rfork p9rfork #define rfork p9rfork
/* #define access p9access */ /* #define access p9access */
#define create p9create #define create p9create
#undef open
#define open p9open #define open p9open
#define pipe p9pipe #define pipe p9pipe
#endif #endif
@ -801,8 +797,14 @@ extern int post9pservice(int, char*);
#endif #endif
/* compiler directives on plan 9 */ /* compiler directives on plan 9 */
#define USED(x) if(x){}else{}
#define SET(x) ((x)=0) #define SET(x) ((x)=0)
#define USED(x) if(x){}else{}
#ifdef __GNUC__
# if __GNUC__ >= 3
# undef USED
# define USED(x) { ulong __y __attribute__ ((unused)); __y = (ulong)(x); }
# endif
#endif
/* command line */ /* command line */
extern char *argv0; extern char *argv0;

View file

@ -4,150 +4,150 @@
extern "C" { extern "C" {
#endif #endif
/* avoid conflicts with socket library */ /*
#undef send * basic procs and threads
#define send _threadsend
#undef recv
#define recv _threadrecv
typedef struct Alt Alt;
typedef struct Channel Channel;
typedef struct Ref Ref;
/* Channel structure. S is the size of the buffer. For unbuffered channels
* s is zero. v is an array of s values. If s is zero, v is unused.
* f and n represent the state of the queue pointed to by v.
*/ */
int proccreate(void (*f)(void *arg), void *arg, unsigned int stacksize);
int threadcreate(void (*f)(void *arg), void *arg, unsigned int stacksize);
void threadexits(char *);
void threadexitsall(char *);
void threadsetname(char*, ...);
void threadsetstate(char*, ...);
void _threadready(_Thread*);
void _threadswitch(void);
void _threadsetsysproc(void);
void _threadsleep(Rendez*);
_Thread *_threadwakeup(Rendez*);
enum { /*
Nqwds = 2, * per proc and thread data
Nqshift = 5, // 2log #of bits in long */
Nqmask = - 1, void **procdata(void);
Nqbits = (1 << Nqshift) * 2,
};
struct Channel { /*
int s; // Size of the channel (may be zero) * supplied by user instead of main.
unsigned int f; // Extraction point (insertion pt: (f + n) % s) * mainstacksize is size of stack allocated to run threadmain
unsigned int n; // Number of values in the channel */
int e; // Element size void threadmain(int argc, char *argv[]);
int freed; // Set when channel is being deleted extern int mainstacksize;
volatile Alt **qentry; // Receivers/senders waiting (malloc)
volatile int nentry; // # of entries malloc-ed
unsigned char v[1]; // Array of s values in the channel
};
/*
* channel communication
*/
typedef struct Alt Alt;
typedef struct _Altarray _Altarray;
typedef struct Channel Channel;
/* Channel operations for alt: */ enum
typedef enum { {
CHANEND, CHANEND,
CHANSND, CHANSND,
CHANRCV, CHANRCV,
CHANNOP, CHANNOP,
CHANNOBLK, CHANNOBLK,
} ChanOp;
struct Alt {
Channel *c; /* channel */
void *v; /* pointer to value */
ChanOp op; /* operation */
/* the next variables are used internally to alt
* they need not be initialized
*/
struct Thread *thread; /* thread waiting on this alt */
int entryno; /* entry number */
}; };
struct Alt
{
void *v;
Channel *c;
uint op;
_Thread *thread;
Alt *xalt;
};
struct _Altarray
{
Alt **a;
uint n;
uint m;
};
struct Channel
{
uint bufsize;
uint elemsize;
uchar *buf;
uint nbuf;
uint off;
_Altarray asend;
_Altarray arecv;
char *name;
};
/* [Edit .+1,./^$/ |cfn -h $PLAN9/src/libthread/channel.c] */
int chanalt(Alt *alts);
Channel* chancreate(int elemsize, int elemcnt);
void chanfree(Channel *c);
int chaninit(Channel *c, int elemsize, int elemcnt);
int channbrecv(Channel *c, void *v);
void* channbrecvp(Channel *c);
ulong channbrecvul(Channel *c);
int channbsend(Channel *c, void *v);
int channbsendp(Channel *c, void *v);
int channbsendul(Channel *c, ulong v);
int chanrecv(Channel *c, void *v);
void* chanrecvp(Channel *c);
ulong chanrecvul(Channel *c);
int chansend(Channel *c, void *v);
int chansendp(Channel *c, void *v);
int chansendul(Channel *c, ulong v);
#define alt chanalt
#define nbrecv channbrecv
#define nbrecvp channbrecvp
#define nvrecvul channbrecvul
#define nbsend channbsend
#define nbsendp channbsendp
#define nbsendul channbsendul
#define recv chanrecv
#define recvp chanrecvp
#define recvul chanrecvul
#define send chansend
#define sendp chansendp
#define sendul chansendul
/*
* reference counts
*/
typedef struct Ref Ref;
struct Ref { struct Ref {
Lock lk; Lock lock;
long ref; long ref;
}; };
int alt(Alt alts[]); long decref(Ref *r);
Channel* chancreate(int elemsize, int bufsize); long incref(Ref *r);
int chaninit(Channel *c, int elemsize, int elemcnt);
void chanfree(Channel *c);
int chanprint(Channel *, char *, ...);
long decref(Ref *r); /* returns 0 iff value is now zero */
void incref(Ref *r);
int nbrecv(Channel *c, void *v);
void* nbrecvp(Channel *c);
unsigned long nbrecvul(Channel *c);
int nbsend(Channel *c, void *v);
int nbsendp(Channel *c, void *v);
int nbsendul(Channel *c, unsigned long v);
int proccreate(void (*f)(void *arg), void *arg, unsigned int stacksize);
int procrfork(void (*f)(void *arg), void *arg, unsigned int stacksize, int flag);
void** procdata(void);
void threadexec(Channel *, int[3], char *, char *[]);
void threadexecl(Channel *, int[3], char *, ...);
int threadspawn(int[3], char*, char*[]);
int recv(Channel *c, void *v);
void* recvp(Channel *c);
unsigned long recvul(Channel *c);
int send(Channel *c, void *v);
int sendp(Channel *c, void *v);
int sendul(Channel *c, unsigned long v);
int threadcreate(void (*f)(void *arg), void *arg, unsigned int stacksize);
int threadcreateidle(void (*f)(void*), void*, unsigned int);
void** threaddata(void);
void threadexits(char *);
void threadexitsall(char *);
void threadfdwait(int, int);
void threadfdwaitsetup(void);
int threadgetgrp(void); /* return thread group of current thread */
char* threadgetname(void);
void threadint(int); /* interrupt thread */
void threadintgrp(int); /* interrupt threads in grp */
void threadkill(int); /* kill thread */
void threadkillgrp(int); /* kill threads in group */
void threadmain(int argc, char *argv[]);
void threadfdnoblock(int);
void threadnonotes(void);
int threadnotify(int (*f)(void*, char*), int in);
int threadid(void);
int threadpid(int);
long threadread(int, void*, long);
long threadreadn(int, void*, long);
int threadread9pmsg(int, void*, uint);
int threadrecvfd(int);
long threadwrite(int, const void*, long);
int threadsendfd(int, int);
int threadsetgrp(int); /* set thread group, return old */
void threadsetname(char *fmt, ...);
void threadsleep(int);
Channel* threadwaitchan(void);
int threadannounce(char*, char*);
int threadlisten(char*, char*);
int threadaccept(int, char*);
int tprivalloc(void); /*
void tprivfree(int); * slave i/o processes
void **tprivaddr(int); */
int yield(void);
long threadstack(void);
extern int mainstacksize;
/* slave I/O processes */
typedef struct Ioproc Ioproc; typedef struct Ioproc Ioproc;
/* [Edit .+1,/^$/ |cfn -h $PLAN9/src/libthread/io*.c] */
void closeioproc(Ioproc *io);
long iocall(Ioproc *io, long (*op)(va_list*), ...);
int ioclose(Ioproc *io, int fd);
int iodial(Ioproc *io, char *addr, char *local, char *dir, int *cdfp);
void iointerrupt(Ioproc *io);
int ioopen(Ioproc *io, char *path, int mode);
Ioproc* ioproc(void); Ioproc* ioproc(void);
void closeioproc(Ioproc*); long ioread(Ioproc *io, int fd, void *a, long n);
void iointerrupt(Ioproc*); int ioread9pmsg(Ioproc*, int, void*, int);
long ioreadn(Ioproc *io, int fd, void *a, long n);
int iorecvfd(Ioproc *, int);
int iosendfd(Ioproc*, int, int);
int iosleep(Ioproc *io, long n);
long iowrite(Ioproc *io, int fd, void *a, long n);
int ioclose(Ioproc*, int); /*
int iodial(Ioproc*, char*, char*, char*, int*); * exec external programs
int ioopen(Ioproc*, char*, int); */
long ioread(Ioproc*, int, void*, long); void threadexec(Channel*, int[3], char*, char *[]);
long ioreadn(Ioproc*, int, void*, long); void threadexecl(Channel*, int[3], char*, ...);
long iowrite(Ioproc*, int, void*, long); int threadspawn(int[3], char*, char*[]);
int iosleep(Ioproc*, long); Channel* threadwaitchan(void);
long iocall(Ioproc*, long (*)(va_list*), ...);
void ioret(Ioproc*, int);
#if defined(__cplusplus) #if defined(__cplusplus)
} }