In non-blocking recv functions in libmux and libdraw,

distinguish between "cannot receive without blocking"
and "EOF on connection".

In libmux, do not elect async guys muxers, so that
synchronous RPC calls run in the main event loop
(e.g., in eresized) do not get stuck.

Fixes problem reported by Lu Xuxiao, namely that
jpg etc. would spin at 100% cpu usage.
This commit is contained in:
rsc 2006-11-04 18:46:00 +00:00
parent d3864abaee
commit 3a19470202
7 changed files with 88 additions and 61 deletions

View file

@ -19,6 +19,7 @@ struct Muxrpc
uint tag; uint tag;
void *p; void *p;
int waiting; int waiting;
int async;
}; };
struct Mux struct Mux
@ -27,7 +28,7 @@ struct Mux
uint maxtag; uint maxtag;
int (*send)(Mux*, void*); int (*send)(Mux*, void*);
void *(*recv)(Mux*); void *(*recv)(Mux*);
void *(*nbrecv)(Mux*); int (*nbrecv)(Mux*, void**);
int (*gettag)(Mux*, void*); int (*gettag)(Mux*, void*);
int (*settag)(Mux*, void*, uint); int (*settag)(Mux*, void*, uint);
void *aux; /* for private use by client */ void *aux; /* for private use by client */
@ -52,18 +53,18 @@ void muxinit(Mux*);
void* muxrpc(Mux*, void*); void* muxrpc(Mux*, void*);
void muxprocs(Mux*); void muxprocs(Mux*);
Muxrpc* muxrpcstart(Mux*, void*); Muxrpc* muxrpcstart(Mux*, void*);
void* muxrpccanfinish(Muxrpc*); int muxrpccanfinish(Muxrpc*, void**);
/* private */ /* private */
int _muxsend(Mux*, void*); int _muxsend(Mux*, void*);
void* _muxrecv(Mux*, int); int _muxrecv(Mux*, int, void**);
void _muxsendproc(void*); void _muxsendproc(void*);
void _muxrecvproc(void*); void _muxrecvproc(void*);
Muxqueue *_muxqalloc(void); Muxqueue *_muxqalloc(void);
int _muxqsend(Muxqueue*, void*); int _muxqsend(Muxqueue*, void*);
void *_muxqrecv(Muxqueue*); void *_muxqrecv(Muxqueue*);
void _muxqhangup(Muxqueue*); void _muxqhangup(Muxqueue*);
void *_muxnbqrecv(Muxqueue*); int _muxnbqrecv(Muxqueue*, void**);
#if defined(__cplusplus) #if defined(__cplusplus)
} }

View file

@ -47,7 +47,8 @@ static int
xioerror(XDisplay *d) xioerror(XDisplay *d)
{ {
/*print("X I/O error\n"); */ /*print("X I/O error\n"); */
sysfatal("X I/O error\n"); exit(0);
/*sysfatal("X I/O error\n");*/
abort(); abort();
return -1; return -1;
} }

View file

@ -13,7 +13,7 @@ int chattydrawclient;
static int drawgettag(Mux *mux, void *vmsg); static int drawgettag(Mux *mux, void *vmsg);
static void* drawrecv(Mux *mux); static void* drawrecv(Mux *mux);
static void* drawnbrecv(Mux *mux); static int drawnbrecv(Mux *mux, void**);
static int drawsend(Mux *mux, void *vmsg); static int drawsend(Mux *mux, void *vmsg);
static int drawsettag(Mux *mux, void *vmsg, uint tag); static int drawsettag(Mux *mux, void *vmsg, uint tag);
static int canreadfd(int); static int canreadfd(int);
@ -83,40 +83,46 @@ drawsend(Mux *mux, void *vmsg)
return write(d->srvfd, msg, n); return write(d->srvfd, msg, n);
} }
static void* static int
_drawrecv(Mux *mux, int nb) _drawrecv(Mux *mux, int canblock, void **vp)
{ {
int n; int n;
uchar buf[4], *p; uchar buf[4], *p;
Display *d; Display *d;
d = mux->aux; d = mux->aux;
if(nb && !canreadfd(d->srvfd)) *vp = nil;
return nil; if(!canblock && !canreadfd(d->srvfd))
return 0;
if((n=readn(d->srvfd, buf, 4)) != 4) if((n=readn(d->srvfd, buf, 4)) != 4)
return nil; return 1;
GET(buf, n); GET(buf, n);
p = malloc(n); p = malloc(n);
if(p == nil){ if(p == nil){
fprint(2, "out of memory allocating %d in drawrecv\n", n); fprint(2, "out of memory allocating %d in drawrecv\n", n);
return nil; return 1;
} }
memmove(p, buf, 4); memmove(p, buf, 4);
if(readn(d->srvfd, p+4, n-4) != n-4) if(readn(d->srvfd, p+4, n-4) != n-4){
return nil; free(p);
return p; return 1;
}
*vp = p;
return 1;
} }
static void* static void*
drawrecv(Mux *mux) drawrecv(Mux *mux)
{ {
return _drawrecv(mux, 0); void *p;
_drawrecv(mux, 1, &p);
return p;
} }
static void* static int
drawnbrecv(Mux *mux) drawnbrecv(Mux *mux, void **vp)
{ {
return _drawrecv(mux, 1); return _drawrecv(mux, 0, vp);
} }
static int static int

View file

@ -214,10 +214,14 @@ static int
finishrpc(Muxrpc *r, Wsysmsg *w) finishrpc(Muxrpc *r, Wsysmsg *w)
{ {
uchar *p; uchar *p;
void *v;
int n; int n;
if((p = muxrpccanfinish(r)) == nil) if(!muxrpccanfinish(r, &v))
return 0; return 0;
p = v;
if(p == nil) /* eof on connection */
exit(0);
GET(p, n); GET(p, n);
convM2W(p, n, w); convM2W(p, n, w);
free(p); free(p);
@ -269,6 +273,9 @@ extract(int canblock)
if(eslave[i].rpc == nil) if(eslave[i].rpc == nil)
eslave[i].rpc = startrpc(Trdmouse); eslave[i].rpc = startrpc(Trdmouse);
if(eslave[i].rpc){ if(eslave[i].rpc){
/* if ready, don't block in select */
if(eslave[i].rpc->p)
canblock = 0;
FD_SET(display->srvfd, &rset); FD_SET(display->srvfd, &rset);
FD_SET(display->srvfd, &xset); FD_SET(display->srvfd, &xset);
if(display->srvfd > max) if(display->srvfd > max)
@ -278,6 +285,9 @@ extract(int canblock)
if(eslave[i].rpc == nil) if(eslave[i].rpc == nil)
eslave[i].rpc = startrpc(Trdkbd); eslave[i].rpc = startrpc(Trdkbd);
if(eslave[i].rpc){ if(eslave[i].rpc){
/* if ready, don't block in select */
if(eslave[i].rpc->p)
canblock = 0;
FD_SET(display->srvfd, &rset); FD_SET(display->srvfd, &rset);
FD_SET(display->srvfd, &xset); FD_SET(display->srvfd, &xset);
if(display->srvfd > max) if(display->srvfd > max)

View file

@ -34,7 +34,7 @@ _muxrecvproc(void *v)
qunlock(&mux->inlk); qunlock(&mux->inlk);
qlock(&mux->lk); qlock(&mux->lk);
_muxqhangup(q); _muxqhangup(q);
while((p = _muxnbqrecv(q)) != nil) while(_muxnbqrecv(q, &p))
free(p); free(p);
free(q); free(q);
mux->readq = nil; mux->readq = nil;
@ -64,7 +64,7 @@ _muxsendproc(void *v)
qunlock(&mux->outlk); qunlock(&mux->outlk);
qlock(&mux->lk); qlock(&mux->lk);
_muxqhangup(q); _muxqhangup(q);
while((p = _muxnbqrecv(q)) != nil) while(_muxnbqrecv(q, &p))
free(p); free(p);
free(q); free(q);
mux->writeq = nil; mux->writeq = nil;
@ -73,42 +73,39 @@ _muxsendproc(void *v)
return; return;
} }
void* int
_muxrecv(Mux *mux, int canblock) _muxrecv(Mux *mux, int canblock, void **vp)
{ {
void *p; void *p;
int ret;
qlock(&mux->lk); qlock(&mux->lk);
/*
if(mux->state != VtStateConnected){
werrstr("not connected");
qunlock(&mux->lk);
return nil;
}
*/
if(mux->readq){ if(mux->readq){
qunlock(&mux->lk); qunlock(&mux->lk);
if(canblock) if(canblock){
return _muxqrecv(mux->readq); *vp = _muxqrecv(mux->readq);
return _muxnbqrecv(mux->readq); return 1;
}
return _muxnbqrecv(mux->readq, vp);
} }
qlock(&mux->inlk); qlock(&mux->inlk);
qunlock(&mux->lk); qunlock(&mux->lk);
if(canblock) if(canblock){
p = mux->recv(mux); p = mux->recv(mux);
else{ ret = 1;
}else{
if(mux->nbrecv) if(mux->nbrecv)
p = mux->nbrecv(mux); ret = mux->nbrecv(mux, &p);
else else{
/* send eof, not "no packet ready" */
p = nil; p = nil;
ret = 1;
}
} }
qunlock(&mux->inlk); qunlock(&mux->inlk);
/* *vp = p;
if(!p && canblock) return ret;
vthangup(mux);
*/
return p;
} }
int int

View file

@ -1,4 +1,4 @@
/* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */ /* Copyright (C) 2003-2006 Russ Cox, Massachusetts Institute of Technology */
/* See COPYRIGHT */ /* See COPYRIGHT */
/* /*
@ -100,11 +100,16 @@ muxmsgandqlock(Mux *mux, void *p)
void void
electmuxer(Mux *mux) electmuxer(Mux *mux)
{ {
Muxrpc *rpc;
/* if there is anyone else sleeping, wake them to mux */ /* if there is anyone else sleeping, wake them to mux */
if(mux->sleep.next != &mux->sleep){ for(rpc=mux->sleep.next; rpc != &mux->sleep; rpc = rpc->next){
mux->muxer = mux->sleep.next; if(!rpc->async){
rwakeup(&mux->muxer->r); mux->muxer = rpc;
}else rwakeup(&rpc->r);
return;
}
}
mux->muxer = nil; mux->muxer = nil;
} }
@ -133,7 +138,7 @@ muxrpc(Mux *mux, void *tx)
mux->muxer = r; mux->muxer = r;
while(!r->p){ while(!r->p){
qunlock(&mux->lk); qunlock(&mux->lk);
p = _muxrecv(mux, 1); _muxrecv(mux, 1, &p);
if(p == nil){ if(p == nil){
/* eof -- just give up and pass the buck */ /* eof -- just give up and pass the buck */
qlock(&mux->lk); qlock(&mux->lk);
@ -144,7 +149,6 @@ muxrpc(Mux *mux, void *tx)
} }
electmuxer(mux); electmuxer(mux);
} }
/*print("finished %p\n", r); */
p = r->p; p = r->p;
puttag(mux, r); puttag(mux, r);
qunlock(&mux->lk); qunlock(&mux->lk);
@ -161,24 +165,29 @@ muxrpcstart(Mux *mux, void *tx)
if((r = allocmuxrpc(mux)) == nil) if((r = allocmuxrpc(mux)) == nil)
return nil; return nil;
r->async = 1;
if((tag = tagmuxrpc(r, tx)) < 0) if((tag = tagmuxrpc(r, tx)) < 0)
return nil; return nil;
return r; return r;
} }
void* int
muxrpccanfinish(Muxrpc *r) muxrpccanfinish(Muxrpc *r, void **vp)
{ {
char *p; void *p;
Mux *mux; Mux *mux;
int ret;
mux = r->mux; mux = r->mux;
qlock(&mux->lk); qlock(&mux->lk);
ret = 1;
if(!r->p && !mux->muxer){ if(!r->p && !mux->muxer){
mux->muxer = r; mux->muxer = r;
while(!r->p){ while(!r->p){
qunlock(&mux->lk); qunlock(&mux->lk);
p = _muxrecv(mux, 0); p = nil;
if(!_muxrecv(mux, 0, &p))
ret = 0;
if(p == nil){ if(p == nil){
qlock(&mux->lk); qlock(&mux->lk);
break; break;
@ -191,7 +200,8 @@ muxrpccanfinish(Muxrpc *r)
if(p) if(p)
puttag(mux, r); puttag(mux, r);
qunlock(&mux->lk); qunlock(&mux->lk);
return p; *vp = p;
return ret;
} }
static void static void

View file

@ -81,8 +81,8 @@ _muxqrecv(Muxqueue *q)
return p; return p;
} }
void* int
_muxnbqrecv(Muxqueue *q) _muxnbqrecv(Muxqueue *q, void **vp)
{ {
void *p; void *p;
Qel *e; Qel *e;
@ -90,14 +90,16 @@ _muxnbqrecv(Muxqueue *q)
qlock(&q->lk); qlock(&q->lk);
if(q->head == nil){ if(q->head == nil){
qunlock(&q->lk); qunlock(&q->lk);
return nil; *vp = nil;
return q->hungup;
} }
e = q->head; e = q->head;
q->head = e->next; q->head = e->next;
qunlock(&q->lk); qunlock(&q->lk);
p = e->p; p = e->p;
free(e); free(e);
return p; *vp = p;
return 1;
} }
void void