This commit is contained in:
rsc 2006-06-25 21:04:52 +00:00
parent b733ffba4f
commit 2d2e5c71f7
2 changed files with 132 additions and 43 deletions

View file

@ -74,7 +74,7 @@ _muxsendproc(void *v)
} }
void* void*
_muxrecv(Mux *mux) _muxrecv(Mux *mux, int canblock)
{ {
void *p; void *p;
@ -88,15 +88,24 @@ _muxrecv(Mux *mux)
*/ */
if(mux->readq){ if(mux->readq){
qunlock(&mux->lk); qunlock(&mux->lk);
if(canblock)
return _muxqrecv(mux->readq); return _muxqrecv(mux->readq);
return _muxnbqrecv(mux->readq);
} }
qlock(&mux->inlk); qlock(&mux->inlk);
qunlock(&mux->lk); qunlock(&mux->lk);
if(canblock)
p = mux->recv(mux); p = mux->recv(mux);
else{
if(mux->nbrecv)
p = mux->nbrecv(mux);
else
p = nil;
}
qunlock(&mux->inlk); qunlock(&mux->inlk);
/* /*
if(!p) if(!p && canblock)
vthangup(mux); vthangup(mux);
*/ */
return p; return p;

View file

@ -26,12 +26,10 @@ muxinit(Mux *mux)
mux->sleep.prev = &mux->sleep; mux->sleep.prev = &mux->sleep;
} }
void* static Muxrpc*
muxrpc(Mux *mux, void *tx) allocmuxrpc(Mux *mux)
{ {
int tag; Muxrpc *r;
Muxrpc *r, *r2;
void *p;
/* must malloc because stack could be private */ /* must malloc because stack could be private */
r = mallocz(sizeof(Muxrpc), 1); r = mallocz(sizeof(Muxrpc), 1);
@ -39,8 +37,20 @@ muxrpc(Mux *mux, void *tx)
werrstr("mallocz: %r"); werrstr("mallocz: %r");
return nil; return nil;
} }
r->mux = mux;
r->r.l = &mux->lk; r->r.l = &mux->lk;
r->waiting = 1;
return r;
}
static int
tagmuxrpc(Muxrpc *r, void *tx)
{
int tag;
Mux *mux;
mux = r->mux;
/* assign the tag, add selves to response queue */ /* assign the tag, add selves to response queue */
qlock(&mux->lk); qlock(&mux->lk);
tag = gettag(mux, r); tag = gettag(mux, r);
@ -56,54 +66,83 @@ muxrpc(Mux *mux, void *tx)
dequeue(mux, r); dequeue(mux, r);
puttag(mux, r); puttag(mux, r);
qunlock(&mux->lk); qunlock(&mux->lk);
return nil; return -1;
} }
return 0;
}
qlock(&mux->lk); void
/* wait for our packet */ muxmsgandqlock(Mux *mux, void *p)
while(mux->muxer && !r->p){ {
rsleep(&r->r); int tag;
} Muxrpc *r2;
/* if not done, there's no muxer: start muxing */
if(!r->p){
if(mux->muxer)
abort();
mux->muxer = 1;
while(!r->p){
qunlock(&mux->lk);
p = _muxrecv(mux);
if(p)
tag = mux->gettag(mux, p) - mux->mintag; tag = mux->gettag(mux, p) - mux->mintag;
else
tag = ~0;
/*print("mux tag %d\n", tag); */ /*print("mux tag %d\n", tag); */
qlock(&mux->lk); qlock(&mux->lk);
if(p == nil){ /* eof -- just give up and pass the buck */
dequeue(mux, r);
break;
}
/* hand packet to correct sleeper */ /* hand packet to correct sleeper */
if(tag < 0 || tag >= mux->mwait){ if(tag < 0 || tag >= mux->mwait){
fprint(2, "%s: bad rpc tag %ux\n", argv0, tag); fprint(2, "%s: bad rpc tag %ux\n", argv0, tag);
/* must leak packet! don't know how to free it! */ /* must leak packet! don't know how to free it! */
continue; return;
} }
r2 = mux->wait[tag]; r2 = mux->wait[tag];
if(r2 == nil || r2->prev == nil){ if(r2 == nil || r2->prev == nil){
fprint(2, "%s: bad rpc tag %ux (no one waiting on that tag)\n", argv0, tag); fprint(2, "%s: bad rpc tag %ux (no one waiting on that tag)\n", argv0, tag);
/* must leak packet! don't know how to free it! */ /* must leak packet! don't know how to free it! */
continue; return;
} }
r2->p = p; r2->p = p;
dequeue(mux, r2); dequeue(mux, r2);
rwakeup(&r2->r); rwakeup(&r2->r);
} }
mux->muxer = 0;
void
electmuxer(Mux *mux)
{
/* if there is anyone else sleeping, wake them to mux */ /* if there is anyone else sleeping, wake them to mux */
if(mux->sleep.next != &mux->sleep) if(mux->sleep.next != &mux->sleep){
rwakeup(&mux->sleep.next->r); mux->muxer = mux->sleep.next;
rwakeup(&mux->muxer->r);
}else
mux->muxer = nil;
}
void*
muxrpc(Mux *mux, void *tx)
{
int tag;
Muxrpc *r;
void *p;
if((r = allocmuxrpc(mux)) == nil)
return nil;
if((tag = tagmuxrpc(r, tx)) < 0)
return nil;
qlock(&mux->lk);
/* wait for our packet */
while(mux->muxer && mux->muxer != r && !r->p)
rsleep(&r->r);
/* if not done, there's no muxer: start muxing */
if(!r->p){
if(mux->muxer != nil && mux->muxer != r)
abort();
mux->muxer = r;
while(!r->p){
qunlock(&mux->lk);
p = _muxrecv(mux, 1);
if(p == nil){
/* eof -- just give up and pass the buck */
qlock(&mux->lk);
dequeue(mux, r);
break;
}
muxmsgandqlock(mux, p);
}
electmuxer(mux);
} }
/*print("finished %p\n", r); */ /*print("finished %p\n", r); */
p = r->p; p = r->p;
@ -114,6 +153,47 @@ muxrpc(Mux *mux, void *tx)
return p; return p;
} }
Muxrpc*
muxrpcstart(Mux *mux, void *tx)
{
int tag;
Muxrpc *r;
if((r = allocmuxrpc(mux)) == nil)
return nil;
if((tag = tagmuxrpc(r, tx)) < 0)
return nil;
return r;
}
void*
muxrpccanfinish(Muxrpc *r)
{
char *p;
Mux *mux;
mux = r->mux;
qlock(&mux->lk);
if(!r->p && !mux->muxer){
mux->muxer = r;
while(!r->p){
qunlock(&mux->lk);
p = _muxrecv(mux, 0);
if(p == nil){
qlock(&mux->lk);
break;
}
muxmsgandqlock(mux, p);
}
electmuxer(mux);
}
p = r->p;
if(p)
puttag(mux, r);
qunlock(&mux->lk);
return p;
}
static void static void
enqueue(Mux *mux, Muxrpc *r) enqueue(Mux *mux, Muxrpc *r)
{ {