Long-standing stability bugs fixed in 9pserve.

Update win to use acme interface directly instead of via pipes.
Add comment to pipe about lack of message boundaries.
This commit is contained in:
rsc 2004-03-02 19:21:48 +00:00
parent c4991217e1
commit 05b7f431f0
3 changed files with 154 additions and 69 deletions

View file

@ -74,7 +74,7 @@ char adir[40];
int isunix;
Queue *outq;
Queue *inq;
int verbose;
int verbose = 0;
int msize = 8192;
void *gethash(Hash**, uint);
@ -276,8 +276,8 @@ connthread(void *arg)
{
int i, fd;
Conn *c;
Hash *h;
Msg *m, *om;
Hash *h, *hnext;
Msg *m, *om, *mm;
Fid *f;
Ioproc *io;
@ -405,11 +405,16 @@ connthread(void *arg)
}
}
if(verbose) fprint(2, "%s eof\n", c->dir);
if(verbose) fprint(2, "fd#%d eof; flushing conn\n", c->fd);
/* flush the output queue */
sendq(c->outq, nil);
while(c->outq != nil)
yield();
/* flush all outstanding messages */
for(i=0; i<NHASH; i++){
for(h=c->tag[i]; h; h=h->next){
for(h=c->tag[i]; h; h=hnext){
om = h->v;
m = msgnew();
m->internal = 1;
@ -419,19 +424,22 @@ connthread(void *arg)
m->tx.tag = m->tag;
m->tx.oldtag = om->tag;
m->oldm = om;
om->ref++;
om->ref++; /* for m->oldm */
m->ref++; /* for outq */
sendomsg(m);
recvp(c->internal);
mm = recvp(c->internal);
assert(mm == m);
msgput(m); /* got from recvp */
msgput(m); /* got from msgnew */
msgput(om); /* got from hash table */
hnext = h->next;
free(h);
}
}
/* clunk all outstanding fids */
for(i=0; i<NHASH; i++){
for(h=c->fid[i]; h; h=h->next){
for(h=c->fid[i]; h; h=hnext){
f = h->v;
m = msgnew();
m->internal = 1;
@ -444,10 +452,13 @@ connthread(void *arg)
f->ref++;
m->ref++;
sendomsg(m);
recvp(c->internal);
mm = recvp(c->internal);
assert(mm == m);
msgput(m); /* got from recvp */
msgput(m); /* got from msgnew */
fidput(f); /* got from hash table */
hnext = h->next;
free(h);
}
}
@ -461,8 +472,6 @@ out:
c->inc = 0;
free(c->inq);
c->inq = 0;
free(c->outq);
c->outq = 0;
free(c);
}
@ -482,6 +491,7 @@ openfdthread(void *v)
io = ioproc();
tot = 0;
m = nil;
if(c->fdmode == OREAD){
for(;;){
if(verbose) fprint(2, "tread...");
@ -506,11 +516,12 @@ openfdthread(void *v)
break;
tot += m->rx.count;
if(iowrite(io, c->fd, m->rx.data, m->rx.count) != m->rx.count){
fprint(2, "pipe write error: %r\n");
// fprint(2, "pipe write error: %r\n");
break;
}
msgput(m);
msgput(m);
m = nil;
}
}else{
for(;;){
@ -521,7 +532,6 @@ openfdthread(void *v)
if((n=ioread(io, c->fd, buf, n)) <= 0){
if(n < 0)
fprint(2, "pipe read error: %r\n");
m = nil;
break;
}
m = msgnew();
@ -540,11 +550,11 @@ openfdthread(void *v)
recvp(c->internal);
if(m->rx.type == Rerror){
// fprint(2, "write error: %s\n", m->rx.ename);
continue;
}
tot = n;
tot += n;
msgput(m);
msgput(m);
m = nil;
}
}
if(verbose) fprint(2, "eof on %d fid %d\n", c->fd, fid->fid);
@ -559,6 +569,7 @@ openfdthread(void *v)
m->internal = 1;
m->c = c;
m->tx.type = Tclunk;
m->tx.tag = m->tag;
m->tx.fid = fid->fid;
m->fid = fid;
fid->ref++;
@ -635,12 +646,14 @@ connoutthread(void *arg)
{
int err;
Conn *c;
Queue *outq;
Msg *m, *om;
Ioproc *io;
c = arg;
outq = c->outq;
io = ioproc();
while((m = recvq(c->outq)) != nil){
while((m = recvq(outq)) != nil){
err = m->tx.type+1 != m->rx.type;
if(!err && m->isopenfd)
if(xopenfd(m) < 0)
@ -687,6 +700,8 @@ connoutthread(void *arg)
nbsendp(c->inc, 0);
}
closeioproc(io);
free(outq);
c->outq = nil;
}
void
@ -740,13 +755,16 @@ inputthread(void *arg)
msgput(m);
continue;
}
if(verbose > 1) fprint(2, "* -> %F\n", &m->rx);
if(verbose > 1) fprint(2, "* -> %F%s\n", &m->rx,
m->internal ? " (internal)" : "");
m->rpkt = pkt;
m->rx.tag = m->ctag;
if(m->internal)
sendp(m->c->internal, 0);
else
sendp(m->c->internal, m);
else if(m->c->outq)
sendq(m->c->outq, m);
else
msgput(m);
}
closeioproc(io);
//fprint(2, "input eof\n");
@ -856,12 +874,17 @@ msgnew(void)
void
msgput(Msg *m)
{
if(m == nil)
return;
if(verbose > 2) fprint(2, "msgput tag %d/%d ref %d\n", m->tag, m->ctag, m->ref);
assert(m->ref > 0);
if(--m->ref > 0)
return;
m->c->nmsg--;
m->c = nil;
msgput(m->oldm);
m->oldm = nil;
fidput(m->fid);
m->fid = nil;
fidput(m->afid);

View file

@ -40,33 +40,49 @@ struct Q
Q q;
int eventfd;
int addrfd;
int datafd;
int ctlfd;
int bodyfd;
Fid *eventfd;
Fid *addrfd;
Fid *datafd;
Fid *ctlfd;
// int bodyfd;
char *typing;
int ntypeb;
int ntyper;
int ntypebreak;
int debug;
char *name;
char **prog;
int p[2];
Channel *cpid;
Channel *cwait;
int pid = -1;
int label(char*, int);
void error(char*);
void stdinproc(void*);
void stdoutproc(void*);
void type(Event*, int, int, int);
void sende(Event*, int, int, int, int, int);
void type(Event*, int, Fid*, Fid*);
void sende(Event*, int, Fid*, Fid*, Fid*, int);
char *onestring(int, char**);
int delete(Event*);
void deltype(uint, uint);
void runproc(void*);
int
fsfidprint(Fid *fid, char *fmt, ...)
{
char buf[256];
va_list arg;
int n;
va_start(arg, fmt);
n = vsnprint(buf, sizeof buf, fmt, arg);
va_end(arg);
return fswrite(fid, buf, n);
}
void
usage(void)
{
@ -83,13 +99,19 @@ nopipes(void *v, char *msg)
return 0;
}
void
waitthread(void *v)
{
recvp(cwait);
threadexitsall(nil);
}
void
threadmain(int argc, char **argv)
{
int fd, id;
char buf[256];
char buf1[128];
char *name;
Fsys *fs;
ARGBEGIN{
@ -110,8 +132,8 @@ threadmain(int argc, char **argv)
threadnotify(nopipes, 1);
if((fs = nsmount("acme", "")) < 0)
sysfatal("nsmount acme: %r");
ctlfd = fsopenfd(fs, "new/ctl", ORDWR|OCEXEC);
if(ctlfd < 0 || read(ctlfd, buf, 12) != 12)
ctlfd = fsopen(fs, "new/ctl", ORDWR|OCEXEC);
if(ctlfd < 0 || fsread(ctlfd, buf, 12) != 12)
sysfatal("ctl: %r");
id = atoi(buf);
sprint(buf, "%d/tag", id);
@ -119,21 +141,27 @@ threadmain(int argc, char **argv)
write(fd, " Send Delete", 12);
close(fd);
sprint(buf, "%d/event", id);
eventfd = fsopenfd(fs, buf, ORDWR|OCEXEC);
eventfd = fsopen(fs, buf, ORDWR|OCEXEC);
sprint(buf, "%d/addr", id);
addrfd = fsopenfd(fs, buf, ORDWR|OCEXEC);
addrfd = fsopen(fs, buf, ORDWR|OCEXEC);
sprint(buf, "%d/data", id);
datafd = fsopenfd(fs, buf, ORDWR|OCEXEC);
datafd = fsopen(fs, buf, ORDWR|OCEXEC);
sprint(buf, "%d/body", id);
bodyfd = fsopenfd(fs, buf, ORDWR|OCEXEC);
/* bodyfd = fsopenfd(fs, buf, ORDWR|OCEXEC); */
if(eventfd==nil || addrfd==nil || datafd==nil)
sysfatal("data files: %r");
/*
if(eventfd<0 || addrfd<0 || datafd<0 || bodyfd<0)
sysfatal("data files: %r");
*/
fsunmount(fs);
if(pipe(p) < 0)
sysfatal("pipe: %r");
cpid = chancreate(sizeof(ulong), 1);
cwait = threadwaitchan();
threadcreate(waitthread, nil, STACK);
threadcreate(runproc, nil, STACK);
pid = recvul(cpid);
if(pid == -1)
@ -141,13 +169,13 @@ threadmain(int argc, char **argv)
getwd(buf1, sizeof buf1);
sprint(buf, "name %s/-%s\n0\n", buf1, name);
write(ctlfd, buf, strlen(buf));
fswrite(ctlfd, buf, strlen(buf));
sprint(buf, "dumpdir %s/\n", buf1);
write(ctlfd, buf, strlen(buf));
fswrite(ctlfd, buf, strlen(buf));
sprint(buf, "dump %s\n", onestring(argc, argv));
write(ctlfd, buf, strlen(buf));
fswrite(ctlfd, buf, strlen(buf));
// proccreate(stdoutproc, nil, STACK);
threadcreate(stdoutproc, nil, STACK);
stdinproc(nil);
}
@ -161,10 +189,10 @@ runproc(void *v)
USED(v);
fd[0] = p[1];
fd[1] = bodyfd;
fd[2] = bodyfd;
// fd[1] = p[1];
// fd[2] = p[1];
// fd[1] = bodyfd;
// fd[2] = bodyfd;
fd[1] = p[1];
fd[2] = p[1];
if(prog[0] == nil){
prog = shell;
@ -210,14 +238,14 @@ onestring(int argc, char **argv)
}
int
getec(int efd)
getec(Fid *efd)
{
static char buf[8192];
static char *bufp;
static int nbuf;
if(nbuf == 0){
nbuf = read(efd, buf, sizeof buf);
nbuf = fsread(efd, buf, sizeof buf);
if(nbuf <= 0)
error(nil);
bufp = buf;
@ -227,7 +255,7 @@ getec(int efd)
}
int
geten(int efd)
geten(Fid *efd)
{
int n, c;
@ -240,7 +268,7 @@ geten(int efd)
}
int
geter(int efd, char *buf, int *nb)
geter(Fid *efd, char *buf, int *nb)
{
Rune r;
int n;
@ -259,7 +287,7 @@ geter(int efd, char *buf, int *nb)
}
void
gete(int efd, Event *e)
gete(Fid *efd, Event *e)
{
int i, nb;
@ -297,10 +325,10 @@ nrunes(char *s, int nb)
void
stdinproc(void *v)
{
int cfd = ctlfd;
int efd = eventfd;
int dfd = datafd;
int afd = addrfd;
Fid *cfd = ctlfd;
Fid *efd = eventfd;
Fid *dfd = datafd;
Fid *afd = addrfd;
int fd0 = p[0];
Event e, e2, e3, e4;
@ -358,7 +386,7 @@ stdinproc(void *v)
}
if(e.flag&1 || (e.c2=='x' && e.nr==0 && e2.nr==0)){
/* send it straight back */
fprint(efd, "%c%c%d %d\n", e.c1, e.c2, e.q0, e.q1);
fsfidprint(efd, "%c%c%d %d\n", e.c1, e.c2, e.q0, e.q1);
break;
}
if(e.q0==e.q1 && (e.flag&2)){
@ -380,7 +408,7 @@ stdinproc(void *v)
/* just send it back */
if(e.flag & 2)
gete(efd, &e2);
fprint(efd, "%c%c%d %d\n", e.c1, e.c2, e.q0, e.q1);
fsfidprint(efd, "%c%c%d %d\n", e.c1, e.c2, e.q0, e.q1);
break;
case 'd':
@ -399,8 +427,8 @@ void
stdoutproc(void *v)
{
int fd1 = p[0];
int afd = addrfd;
int dfd = datafd;
Fid *afd = addrfd;
Fid *dfd = datafd;
int n, m, w, npart;
char *buf, *s, *t;
Rune r;
@ -411,7 +439,7 @@ stdoutproc(void *v)
buf = malloc(8192+UTFmax+1);
npart = 0;
for(;;){
n = read(fd1, buf+npart, 8192);
n = threadread(fd1, buf+npart, 8192);
if(n < 0)
error(nil);
if(n == 0)
@ -445,11 +473,13 @@ stdoutproc(void *v)
if(n > 0){
memmove(hold, buf+n, npart);
buf[n] = 0;
n = label(buf, n);
buf[n] = 0;
qlock(&q.lk);
m = sprint(x, "#%d", q.p);
if(write(afd, x, m) != m)
if(fswrite(afd, x, m) != m)
error("stdout writing address");
if(write(dfd, buf, n) != n)
if(fswrite(dfd, buf, n) != n)
error("stdout writing body");
q.p += nrunes(buf, n);
qunlock(&q.lk);
@ -458,6 +488,37 @@ stdoutproc(void *v)
}
}
char wdir[256];
int
label(char *sr, int n)
{
char *sl, *el, *er, *r;
er = sr+n;
for(r=er-1; r>=sr; r--)
if(*r == '\007')
break;
if(r < sr)
return n;
el = r+1;
if(el-sr > sizeof wdir)
sr = el - sizeof wdir;
for(sl=el-3; sl>=sr; sl--)
if(sl[0]=='\033' && sl[1]==']' && sl[2]==';')
break;
if(sl < sr)
return n;
*r = 0;
snprint(wdir, sizeof wdir, "name %s/-%s\n0\n", sl+3, name);
fswrite(ctlfd, wdir, strlen(wdir));
memmove(sl, el, er-el);
n -= (el-sl);
return n;
}
int
delete(Event *e)
{
@ -584,7 +645,7 @@ deltype(uint p0, uint p1)
}
void
type(Event *e, int fd0, int afd, int dfd)
type(Event *e, int fd0, Fid *afd, Fid *dfd)
{
int m, n, nr;
char buf[128];
@ -595,8 +656,8 @@ type(Event *e, int fd0, int afd, int dfd)
m = e->q0;
while(m < e->q1){
n = sprint(buf, "#%d", m);
write(afd, buf, n);
n = read(dfd, buf, sizeof buf);
fswrite(afd, buf, n);
n = fsread(dfd, buf, sizeof buf);
nr = nrunes(buf, n);
while(m+nr > e->q1){
do; while(n>0 && (buf[--n]&0xC0)==0x80);
@ -612,16 +673,16 @@ type(Event *e, int fd0, int afd, int dfd)
}
void
sende(Event *e, int fd0, int cfd, int afd, int dfd, int donl)
sende(Event *e, int fd0, Fid *cfd, Fid *afd, Fid *dfd, int donl)
{
int l, m, n, nr, lastc, end;
char abuf[16], buf[128];
end = q.p+ntyper;
l = sprint(abuf, "#%d", end);
write(afd, abuf, l);
fswrite(afd, abuf, l);
if(e->nr > 0){
write(dfd, e->b, e->nb);
fswrite(dfd, e->b, e->nb);
addtype(e->c1, ntyper, e->b, e->nb, e->nr);
lastc = e->r[e->nr-1];
}else{
@ -629,8 +690,8 @@ sende(Event *e, int fd0, int cfd, int afd, int dfd, int donl)
lastc = 0;
while(m < e->q1){
n = sprint(buf, "#%d", m);
write(afd, buf, n);
n = read(dfd, buf, sizeof buf);
fswrite(afd, buf, n);
n = fsread(dfd, buf, sizeof buf);
nr = nrunes(buf, n);
while(m+nr > e->q1){
do; while(n>0 && (buf[--n]&0xC0)==0x80);
@ -639,8 +700,8 @@ sende(Event *e, int fd0, int cfd, int afd, int dfd, int donl)
if(n == 0)
break;
l = sprint(abuf, "#%d", end);
write(afd, abuf, l);
write(dfd, buf, n);
fswrite(afd, abuf, l);
fswrite(dfd, buf, n);
addtype(e->c1, ntyper, buf, n, nr);
lastc = buf[n-1];
m += nr;
@ -648,9 +709,9 @@ sende(Event *e, int fd0, int cfd, int afd, int dfd, int donl)
}
}
if(donl && lastc!='\n'){
write(dfd, "\n", 1);
fswrite(dfd, "\n", 1);
addtype(e->c1, ntyper, "\n", 1, 1);
}
write(cfd, "dot=addr", 8);
fswrite(cfd, "dot=addr", 8);
sendtype(fd0);
}

View file

@ -3,6 +3,7 @@
#include <libc.h>
#include <sys/socket.h>
/* BUG: would like to preserve delimiters on systems that can */
int
p9pipe(int fd[2])
{