Add support for user-level 9P servers/clients and various bug fixes to go with them.

This commit is contained in:
rsc 2003-12-11 17:48:38 +00:00
parent ac244f8d28
commit 32f69c36e0
60 changed files with 965 additions and 485 deletions

View file

@ -2,6 +2,8 @@
#include <libc.h>
#include <fcall.h>
#include <thread.h>
#include <poll.h>
#include <errno.h>
enum
{
@ -38,6 +40,7 @@ struct Msg
int ref;
int ctag;
int tag;
int isopenfd;
Fcall tx;
Fcall rx;
Fid *fid;
@ -52,6 +55,8 @@ struct Msg
struct Conn
{
int fd;
int fdmode;
Fid *fdfid;
int nmsg;
int nfid;
Channel *inc;
@ -89,7 +94,7 @@ void *erealloc(void*, int);
Queue *qalloc(void);
int sendq(Queue*, void*);
void *recvq(Queue*);
void selectthread(void*);
void pollthread(void*);
void connthread(void*);
void connoutthread(void*);
void listenthread(void*);
@ -100,6 +105,10 @@ int tlisten(char*, char*);
int taccept(int, char*);
int iolisten(Ioproc*, char*, char*);
int ioaccept(Ioproc*, int, char*);
int iorecvfd(Ioproc*, int);
int iosendfd(Ioproc*, int, int);
void mainproc(void*);
int ignorepipe(void*, char*);
void
usage(void)
@ -110,14 +119,13 @@ usage(void)
}
uchar vbuf[128];
extern int _threaddebuglevel;
void
threadmain(int argc, char **argv)
{
char *file;
int n;
Fcall f;
if(verbose) fprint(2, "9pserve running\n");
ARGBEGIN{
default:
usage();
@ -142,6 +150,20 @@ threadmain(int argc, char **argv)
if((afd = announce(addr, adir)) < 0)
sysfatal("announce %s: %r", addr);
proccreate(mainproc, nil, STACK);
threadexits(0);
}
void
mainproc(void *v)
{
int n;
Fcall f;
USED(v);
yield(); /* let threadmain exit */
atnotify(ignorepipe, 1);
fmtinstall('D', dirfmt);
fmtinstall('M', dirmodefmt);
fmtinstall('F', fcallfmt);
@ -150,10 +172,6 @@ threadmain(int argc, char **argv)
outq = qalloc();
inq = qalloc();
// threadcreateidle(selectthread, nil, STACK);
threadcreate(inputthread, nil, STACK);
threadcreate(outputthread, nil, STACK);
f.type = Tversion;
f.version = "9P2000";
f.msize = 8192;
@ -165,7 +183,22 @@ threadmain(int argc, char **argv)
if(convM2S(vbuf, n, &f) != n)
sysfatal("convM2S failure");
if(verbose > 1) fprint(2, "* -> %F\n", &f);
threadcreate(inputthread, nil, STACK);
threadcreate(outputthread, nil, STACK);
threadcreate(listenthread, nil, STACK);
threadcreateidle(pollthread, nil, STACK);
threadexits(0);
}
int
ignorepipe(void *v, char *s)
{
USED(v);
if(strcmp(s, "sys: write on closed pipe") == 0)
return 1;
fprint(2, "msg: %s\n", s);
return 0;
}
void
@ -178,10 +211,6 @@ listenthread(void *arg)
USED(arg);
for(;;){
c = emalloc(sizeof(Conn));
c->inc = chancreate(sizeof(void*), 0);
c->internal = chancreate(sizeof(void*), 0);
c->inq = qalloc();
c->outq = qalloc();
c->fd = iolisten(io, adir, c->dir);
if(c->fd < 0){
if(verbose) fprint(2, "listen: %r\n");
@ -189,13 +218,17 @@ listenthread(void *arg)
free(c);
return;
}
c->inc = chancreate(sizeof(void*), 0);
c->internal = chancreate(sizeof(void*), 0);
c->inq = qalloc();
c->outq = qalloc();
if(verbose) fprint(2, "incoming call on %s\n", c->dir);
threadcreate(connthread, c, STACK);
}
}
void
sendmsg(Msg *m)
send9pmsg(Msg *m)
{
int n, nn;
@ -226,7 +259,7 @@ err(Msg *m, char *ename)
m->rx.type = Rerror;
m->rx.ename = ename;
m->rx.tag = m->tx.tag;
sendmsg(m);
send9pmsg(m);
}
void
@ -250,7 +283,7 @@ connthread(void *arg)
c->fd = fd;
threadcreate(connoutthread, c, STACK);
while((m = mread9p(io, c->fd)) != nil){
if(verbose > 1) fprint(2, "%s -> %F\n", c->dir, &m->tx);
if(verbose > 1) fprint(2, "fd#%d -> %F\n", c->fd, &m->tx);
m->c = c;
m->ctag = m->tx.tag;
c->nmsg++;
@ -267,13 +300,13 @@ connthread(void *arg)
m->rx.msize = 8192;
m->rx.version = "9P2000";
m->rx.type = Rversion;
sendmsg(m);
send9pmsg(m);
continue;
case Tflush:
if((m->oldm = gethash(c->tag, m->tx.oldtag)) == nil){
m->rx.tag = m->tx.tag;
m->rx.type = Rflush;
sendmsg(m);
send9pmsg(m);
continue;
}
m->oldm->ref++;
@ -318,6 +351,15 @@ connthread(void *arg)
}
m->afid->ref++;
break;
case Topenfd:
if(m->tx.mode != OREAD && (m->tx.mode&~OTRUNC) != OWRITE){
err(m, "openfd mode must be OREAD or OWRITE");
continue;
}
m->isopenfd = 1;
m->tx.type = Topen;
m->tpkt[4] = Topen;
/* fall through */
case Tcreate:
case Topen:
case Tclunk:
@ -363,6 +405,7 @@ connthread(void *arg)
m = msgnew();
m->internal = 1;
m->c = c;
c->nmsg++;
m->tx.type = Tflush;
m->tx.tag = m->tag;
m->tx.oldtag = om->tag;
@ -371,7 +414,9 @@ connthread(void *arg)
m->ref++; /* for outq */
sendomsg(m);
recvp(c->internal);
msgput(m);
msgput(m); /* got from recvp */
msgput(m); /* got from msgnew */
msgput(om); /* got from hash table */
}
}
@ -382,6 +427,7 @@ connthread(void *arg)
m = msgnew();
m->internal = 1;
m->c = c;
c->nmsg++;
m->tx.type = Tclunk;
m->tx.tag = m->tag;
m->tx.fid = f->fid;
@ -390,7 +436,9 @@ connthread(void *arg)
m->ref++;
sendomsg(m);
recvp(c->internal);
msgput(m);
msgput(m); /* got from recvp */
msgput(m); /* got from msgnew */
fidput(f); /* got from hash table */
}
}
@ -398,9 +446,157 @@ out:
assert(c->nmsg == 0);
assert(c->nfid == 0);
close(c->fd);
chanfree(c->internal);
c->internal = 0;
chanfree(c->inc);
c->inc = 0;
free(c->inq);
c->inq = 0;
free(c->outq);
c->outq = 0;
free(c);
}
static void
openfdthread(void *v)
{
Conn *c;
Fid *fid;
Msg *m;
int n;
vlong tot;
Ioproc *io;
char buf[1024];
c = v;
fid = c->fdfid;
io = ioproc();
tot = 0;
if(c->fdmode == OREAD){
for(;;){
if(verbose) fprint(2, "tread...");
m = msgnew();
m->internal = 1;
m->c = c;
m->tx.type = Tread;
m->tx.count = 8192;
m->tx.fid = fid->fid;
m->tx.tag = m->tag;
m->tx.offset = tot;
m->fid = fid;
fid->ref++;
m->ref++;
sendomsg(m);
recvp(c->internal);
if(m->rx.type == Rerror)
break;
if(m->rx.count == 0)
break;
tot += m->rx.count;
if(iowrite(io, c->fd, m->rx.data, m->rx.count) != m->rx.count)
break;
msgput(m);
msgput(m);
}
}else{
for(;;){
if(verbose) fprint(2, "twrite...");
if((n=ioread(io, c->fd, buf, sizeof buf)) <= 0){
m = nil;
break;
}
m = msgnew();
m->internal = 1;
m->c = c;
m->tx.type = Twrite;
m->tx.fid = fid->fid;
m->tx.data = buf;
m->tx.count = n;
m->tx.tag = m->tag;
m->tx.offset = tot;
m->fid = fid;
fid->ref++;
m->ref++;
sendomsg(m);
recvp(c->internal);
if(m->rx.type == Rerror)
break;
tot = n;
msgput(m);
msgput(m);
}
}
if(verbose) fprint(2, "eof on %d fid %d\n", c->fd, fid->fid);
close(c->fd);
closeioproc(io);
if(m){
msgput(m);
msgput(m);
}
m = msgnew();
m->internal = 1;
m->c = c;
m->tx.type = Tclunk;
m->tx.fid = fid->fid;
m->fid = fid;
fid->ref++;
m->ref++;
sendomsg(m);
recvp(c->internal);
msgput(m);
msgput(m);
fidput(fid);
c->fdfid = nil;
chanfree(c->internal);
c->internal = 0;
free(c);
}
int
xopenfd(Msg *m)
{
char errs[ERRMAX];
int n, p[2];
Conn *nc;
if(pipe(p) < 0){
rerrstr(errs, sizeof errs);
err(m, errs);
}
if(verbose) fprint(2, "xopen pipe %d %d...", p[0], p[1]);
/* now we're committed. */
/* a new connection for this fid */
nc = emalloc(sizeof(Conn));
nc->internal = chancreate(sizeof(void*), 0);
/* a ref for us */
nc->fdfid = m->fid;
m->fid->ref++;
nc->fdmode = m->tx.mode;
nc->fd = p[0];
/* clunk fid from other connection */
if(delhash(m->c->fid, m->fid->cfid, m->fid) == 0)
fidput(m->fid);
/* a thread to tend the pipe */
threadcreate(openfdthread, nc, STACK);
/* rewrite as Ropenfd */
m->rx.type = Ropenfd;
n = GBIT32(m->rpkt);
m->rpkt = erealloc(m->rpkt, n+4);
PBIT32(m->rpkt+n, p[1]);
n += 4;
PBIT32(m->rpkt, n);
m->rpkt[4] = Ropenfd;
m->rx.unixfd = p[1];
return 0;
}
void
connoutthread(void *arg)
{
@ -413,6 +609,9 @@ connoutthread(void *arg)
io = ioproc();
while((m = recvq(c->outq)) != nil){
err = m->tx.type+1 != m->rx.type;
if(!err && m->isopenfd)
if(xopenfd(m) < 0)
continue;
switch(m->tx.type){
case Tflush:
om = m->oldm;
@ -446,7 +645,7 @@ connoutthread(void *arg)
}
if(delhash(m->c->tag, m->ctag, m) == 0)
msgput(m);
if(verbose > 1) fprint(2, "%s <- %F\n", c->dir, &m->rx);
if(verbose > 1) fprint(2, "fd#%d <- %F\n", c->fd, &m->rx);
rewritehdr(&m->rx, m->rpkt);
if(mwrite9p(io, c->fd, m->rpkt) < 0)
if(verbose) fprint(2, "write error: %r\n");
@ -473,6 +672,8 @@ outputthread(void *arg)
msgput(m);
}
closeioproc(io);
fprint(2, "output eof\n");
threadexitsall(0);
}
void
@ -483,6 +684,7 @@ inputthread(void *arg)
Msg *m;
Ioproc *io;
if(verbose) fprint(2, "input thread\n");
io = ioproc();
USED(arg);
while((pkt = read9ppkt(io, 0)) != nil){
@ -514,6 +716,8 @@ inputthread(void *arg)
sendq(m->c->outq, m);
}
closeioproc(io);
fprint(2, "input eof\n");
threadexitsall(0);
}
void*
@ -626,15 +830,20 @@ msgput(Msg *m)
m->c->nmsg--;
m->c = nil;
fidput(m->fid);
fidput(m->afid);
fidput(m->newfid);
free(m->tpkt);
free(m->rpkt);
m->fid = nil;
fidput(m->afid);
m->afid = nil;
fidput(m->newfid);
m->newfid = nil;
free(m->tpkt);
m->tpkt = nil;
free(m->rpkt);
m->rpkt = nil;
if(m->rx.type == Ropenfd)
close(m->rx.unixfd);
m->rx.unixfd = -1;
m->isopenfd = 0;
m->internal = 0;
m->next = freemsg;
freemsg = m;
}
@ -649,6 +858,7 @@ msgget(int n)
m = msgtab[n];
if(m->ref == 0)
return nil;
if(verbose) fprint(2, "msgget %d = %p\n", n, m);
m->ref++;
return m;
}
@ -768,6 +978,12 @@ read9ppkt(Ioproc *io, int fd)
free(pkt);
return nil;
}
/* would do this if we ever got one of these, but we only generate them
if(pkt[4] == Ropenfd){
newfd = iorecvfd(io, fd);
PBIT32(pkt+n-4, newfd);
}
*/
return pkt;
}
@ -795,7 +1011,7 @@ mread9p(Ioproc *io, int fd)
int
mwrite9p(Ioproc *io, int fd, uchar *pkt)
{
int n;
int n, nfd;
n = GBIT32(pkt);
if(verbose > 2) fprint(2, "write %d %d %.*H\n", fd, n, n, pkt);
@ -803,6 +1019,13 @@ mwrite9p(Ioproc *io, int fd, uchar *pkt)
fprint(2, "write error: %r\n");
return -1;
}
if(pkt[4] == Ropenfd){
nfd = GBIT32(pkt+n-4);
if(iosendfd(io, fd, nfd) < 0){
fprint(2, "send fd error: %r\n");
return -1;
}
}
return 0;
}
@ -871,42 +1094,212 @@ rewritehdr(Fcall *f, uchar *pkt)
#ifdef _LIB9_H_
/* unix select-based polling */
struct Ioproc
{
Channel *c;
Ioproc *next;
int index;
};
static struct Ioproc **pio;
static struct pollfd *pfd;
static int npfd;
static struct Ioproc *iofree;
Ioproc*
ioproc(void)
{
return nil;
Ioproc *io;
if(iofree == nil){
pfd = erealloc(pfd, (npfd+1)*sizeof(pfd[0]));
pfd[npfd].events = 0;
pfd[npfd].fd = -1;
iofree = emalloc(sizeof(Ioproc));
iofree->index = npfd;
iofree->c = chancreate(sizeof(ulong), 1);
pio = erealloc(pio, (npfd+1)*sizeof(pio[0]));
pio[npfd] = iofree;
npfd++;
}
io = iofree;
iofree = io->next;
return io;
}
void
closeioproc(Ioproc *io)
{
io->next = iofree;
iofree = io;
}
void
pollthread(void *v)
{
int i, n;
for(;;){
yield();
for(i=0; i<npfd; i++)
pfd[i].revents = 0;
if(verbose){
fprint(2, "poll:");
for(i=0; i<npfd; i++)
if(pfd[i].events)
fprint(2, " %d%c", pfd[i].fd, pfd[i].events==POLLIN ? 'r' : pfd[i].events==POLLOUT ? 'w' : '?');
fprint(2, "\n");
}
n = poll(pfd, npfd, -1);
if(n <= 0)
continue;
for(i=0; i<npfd; i++)
if(pfd[i].fd != -1 && pfd[i].revents){
pfd[i].fd = -1;
pfd[i].events = 0;
pfd[i].revents = 0;
nbsendul(pio[i]->c, 1);
}
}
}
static void
noblock(int fd)
{
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0)|O_NONBLOCK);
}
static void
xwait(Ioproc *io, int fd, int e)
{
if(verbose) fprint(2, "wait for %d%c\n", fd, e==POLLIN ? 'r' : 'w');
pfd[io->index].fd = fd;
pfd[io->index].events = e;
recvul(io->c);
if(verbose) fprint(2, "got %d\n", fd);
}
static void
rwait(Ioproc *io, int fd)
{
xwait(io, fd, POLLIN);
}
static void
wwait(Ioproc *io, int fd)
{
xwait(io, fd, POLLOUT);
}
long
ioread(Ioproc *io, int fd, void *v, long n)
{
long r;
USED(io);
xxx;
noblock(fd);
while((r=read(fd, v, n)) < 0 && errno == EWOULDBLOCK)
rwait(io, fd);
return r;
}
long
ioreadn(Ioproc *io, int fd, void *v, long n)
{
long tot, m;
uchar *u;
u = v;
for(tot=0; tot<n; tot+=m){
m = ioread(io, fd, u+tot, n-tot);
if(m <= 0){
if(tot)
break;
return m;
}
}
return tot;
}
int
iorecvfd(Ioproc *io, int fd)
{
int r;
noblock(fd);
while((r=recvfd(fd)) < 0 && errno == EWOULDBLOCK)
rwait(io, fd);
return r;
}
int
iosendfd(Ioproc *io, int s, int fd)
{
int r;
noblock(s);
while((r=sendfd(s, fd)) < 0 && errno == EWOULDBLOCK)
wwait(io, s);
if(r < 0) fprint(2, "sent %d, %d\n", s, fd);
return r;
}
static long
_iowrite(Ioproc *io, int fd, void *v, long n)
{
long r;
USED(io);
noblock(fd);
while((r=write(fd, v, n)) < 0 && errno == EWOULDBLOCK)
wwait(io, fd);
return r;
}
long
iowrite(Ioproc *io, int fd, void *v, long n)
{
USED(io);
long tot, m;
uchar *u;
xxx;
u = v;
for(tot=0; tot<n; tot+=m){
m = _iowrite(io, fd, u+tot, n-tot);
if(m <= 0){
if(tot)
break;
return m;
}
}
return tot;
}
int
iolisten(Ioproc *io, char *a, char *b)
iolisten(Ioproc *io, char *dir, char *ndir)
{
int fd;
int r;
extern int _p9netfd(char*);
USED(io);
xxx;
if((fd = _p9netfd(dir)) < 0)
return -1;
noblock(fd);
while((r=listen(dir, ndir)) < 0 && errno == EWOULDBLOCK)
rwait(io, fd);
return r;
}
int
ioaccept(Ioproc *io, int fd, char *dir)
{
int r;
USED(io);
xxx;
noblock(fd);
while((r=accept(fd, dir)) < 0 && errno == EWOULDBLOCK)
rwait(io, fd);
return r;
}
#else