File system stuff.

This commit is contained in:
rsc 2003-12-06 18:08:52 +00:00
parent e97ceade5e
commit d3df308747
29 changed files with 3316 additions and 0 deletions

27
src/libmux/COPYRIGHT Normal file
View file

@ -0,0 +1,27 @@
This software was developed as part of a project at MIT:
/sys/src/libmux/*
/sys/include/mux.h
Copyright (c) 2003 Russ Cox,
Massachusetts Institute of Technology
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

136
src/libmux/io.c Normal file
View file

@ -0,0 +1,136 @@
/* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */
/* See COPYRIGHT */
#include <u.h>
#include <libc.h>
#include <mux.h>
/*
* If you fork off two procs running muxrecvproc and muxsendproc,
* then muxrecv/muxsend (and thus muxrpc) will never block except on
* rendevouses, which is nice when it's running in one thread of many.
*/
void
_muxrecvproc(void *v)
{
void *p;
Mux *mux;
Muxqueue *q;
mux = v;
q = _muxqalloc();
qlock(&mux->lk);
mux->readq = q;
qlock(&mux->inlk);
rwakeup(&mux->rpcfork);
qunlock(&mux->lk);
while((p = mux->recv(mux)) != nil)
if(_muxqsend(q, p) < 0){
free(p);
break;
}
qunlock(&mux->inlk);
qlock(&mux->lk);
_muxqhangup(q);
while((p = _muxnbqrecv(q)) != nil)
free(p);
free(q);
mux->readq = nil;
rwakeup(&mux->rpcfork);
qunlock(&mux->lk);
}
void
_muxsendproc(void *v)
{
Muxqueue *q;
void *p;
Mux *mux;
mux = v;
q = _muxqalloc();
qlock(&mux->lk);
mux->writeq = q;
qlock(&mux->outlk);
rwakeup(&mux->rpcfork);
qunlock(&mux->lk);
while((p = _muxqrecv(q)) != nil)
if(mux->send(mux, p) < 0)
break;
qunlock(&mux->outlk);
qlock(&mux->lk);
_muxqhangup(q);
while((p = _muxnbqrecv(q)) != nil)
free(p);
free(q);
mux->writeq = nil;
rwakeup(&mux->rpcfork);
qunlock(&mux->lk);
return;
}
void*
_muxrecv(Mux *mux)
{
void *p;
qlock(&mux->lk);
/*
if(mux->state != VtStateConnected){
werrstr("not connected");
qunlock(&mux->lk);
return nil;
}
*/
if(mux->readq){
qunlock(&mux->lk);
return _muxqrecv(mux->readq);
}
qlock(&mux->inlk);
qunlock(&mux->lk);
p = mux->recv(mux);
qunlock(&mux->inlk);
/*
if(!p)
vthangup(mux);
*/
return p;
}
int
_muxsend(Mux *mux, void *p)
{
qlock(&mux->lk);
/*
if(mux->state != VtStateConnected){
packetfree(p);
werrstr("not connected");
qunlock(&mux->lk);
return -1;
}
*/
if(mux->writeq){
qunlock(&mux->lk);
if(_muxqsend(mux->writeq, p) < 0){
free(p);
return -1;
}
return 0;
}
qlock(&mux->outlk);
qunlock(&mux->lk);
if(mux->send(mux, p) < 0){
qunlock(&mux->outlk);
/* vthangup(mux); */
return -1;
}
qunlock(&mux->outlk);
return 0;
}

16
src/libmux/mkfile Normal file
View file

@ -0,0 +1,16 @@
PLAN9=../..
<$PLAN9/src/mkhdr
LIB=libmux.a
OFILES=\
io.$O\
mux.$O\
queue.$O\
thread.$O\
HFILES=\
$PLAN9/include/mux.h\
<$PLAN9/src/mksyslib

152
src/libmux/mux.c Normal file
View file

@ -0,0 +1,152 @@
/* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */
/* See COPYRIGHT */
/*
* Generic RPC packet multiplexor. Inspired by but not derived from
* Plan 9 kernel. Originally developed as part of Tra, later used in
* libnventi, and then finally split out into a generic library.
*/
#include <u.h>
#include <libc.h>
#include <mux.h>
static int gettag(Mux*, Muxrpc*);
static void puttag(Mux*, Muxrpc*);
static void enqueue(Mux*, Muxrpc*);
static void dequeue(Mux*, Muxrpc*);
void
muxinit(Mux *mux)
{
mux->tagrend.l = &mux->lk;
mux->sleep.next = &mux->sleep;
mux->sleep.prev = &mux->sleep;
}
void*
muxrpc(Mux *mux, void *tx)
{
uint tag;
Muxrpc *r, *r2;
void *p;
/* must malloc because stack could be private */
r = mallocz(sizeof(Muxrpc), 1);
if(r == nil)
return nil;
r->r.l = &mux->lk;
/* assign the tag */
tag = gettag(mux, r);
if(mux->settag(mux, tx, tag) < 0){
puttag(mux, r);
free(r);
return nil;
}
/* send the packet */
if(_muxsend(mux, tx) < 0){
puttag(mux, r);
free(r);
return nil;
}
/* add ourselves to sleep queue */
qlock(&mux->lk);
enqueue(mux, r);
/* wait for our packet */
while(mux->muxer && !r->p)
rsleep(&r->r);
/* if not done, there's no muxer: start muxing */
if(!r->p){
if(mux->muxer)
abort();
mux->muxer = 1;
while(!r->p){
qunlock(&mux->lk);
p = _muxrecv(mux);
if(p)
tag = mux->gettag(mux, p);
else
tag = ~0;
qlock(&mux->lk);
if(p == nil){ /* eof -- just give up and pass the buck */
dequeue(mux, r);
break;
}
/* hand packet to correct sleeper */
if(tag < 0 || tag >= mux->mwait){
fprint(2, "%s: bad rpc tag %ux\n", argv0, tag);
/* must leak packet! don't know how to free it! */
continue;
}
r2 = mux->wait[tag];
r2->p = p;
rwakeup(&r2->r);
}
mux->muxer = 0;
/* if there is anyone else sleeping, wake them to mux */
if(mux->sleep.next != &mux->sleep)
rwakeup(&mux->sleep.next->r);
}
p = r->p;
puttag(mux, r);
free(r);
qunlock(&mux->lk);
return p;
}
static void
enqueue(Mux *mux, Muxrpc *r)
{
r->next = mux->sleep.next;
r->prev = &mux->sleep;
r->next->prev = r;
r->prev->next = r;
}
static void
dequeue(Mux *mux, Muxrpc *r)
{
r->next->prev = r->prev;
r->prev->next = r->next;
r->prev = nil;
r->next = nil;
}
static int
gettag(Mux *mux, Muxrpc *r)
{
int i;
Again:
while(mux->nwait == mux->mwait)
rsleep(&mux->tagrend);
i=mux->freetag;
if(mux->wait[i] == 0)
goto Found;
for(i=0; i<mux->mwait; i++)
if(mux->wait[i] == 0){
Found:
mux->nwait++;
mux->wait[i] = r;
r->tag = i;
return i;
}
fprint(2, "libfs: nwait botch\n");
goto Again;
}
static void
puttag(Mux *mux, Muxrpc *r)
{
assert(mux->wait[r->tag] == r);
mux->wait[r->tag] = nil;
mux->nwait--;
mux->freetag = r->tag;
rwakeup(&mux->tagrend);
}

109
src/libmux/queue.c Normal file
View file

@ -0,0 +1,109 @@
/* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */
/* See COPYRIGHT */
#include <u.h>
#include <libc.h>
#include <mux.h>
typedef struct Qel Qel;
struct Qel
{
Qel *next;
void *p;
};
struct Muxqueue
{
int hungup;
QLock lk;
Rendez r;
Qel *head;
Qel *tail;
};
Muxqueue*
_muxqalloc(void)
{
Muxqueue *q;
q = mallocz(sizeof(Muxqueue), 1);
if(q == nil)
return nil;
q->r.l = &q->lk;
return q;
}
int
_muxqsend(Muxqueue *q, void *p)
{
Qel *e;
e = malloc(sizeof(Qel));
if(e == nil)
return -1;
qlock(&q->lk);
if(q->hungup){
werrstr("hungup queue");
qunlock(&q->lk);
return -1;
}
e->p = p;
e->next = nil;
if(q->head == nil)
q->head = e;
else
q->tail->next = e;
q->tail = e;
rwakeup(&q->r);
qunlock(&q->lk);
return 0;
}
void*
_muxqrecv(Muxqueue *q)
{
void *p;
Qel *e;
qlock(&q->lk);
while(q->head == nil && !q->hungup)
rsleep(&q->r);
if(q->hungup){
qunlock(&q->lk);
return nil;
}
e = q->head;
q->head = e->next;
qunlock(&q->lk);
p = e->p;
free(e);
return p;
}
void*
_muxnbqrecv(Muxqueue *q)
{
void *p;
Qel *e;
qlock(&q->lk);
if(q->head == nil){
qunlock(&q->lk);
return nil;
}
e = q->head;
q->head = e->next;
qunlock(&q->lk);
p = e->p;
free(e);
return p;
}
void
_muxqhangup(Muxqueue *q)
{
qlock(&q->lk);
q->hungup = 1;
rwakeupall(&q->r);
qunlock(&q->lk);
}

27
src/libmux/thread.c Normal file
View file

@ -0,0 +1,27 @@
/* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */
/* See COPYRIGHT */
#include <u.h>
#include <libc.h>
#include <thread.h>
#include <mux.h>
enum
{
STACK = 32768
};
void
muxthreads(Mux *mux)
{
proccreate(_muxrecvproc, mux, STACK);
qlock(&mux->lk);
while(!mux->writeq)
rsleep(&mux->rpcfork);
qunlock(&mux->lk);
proccreate(_muxsendproc, mux, STACK);
qlock(&mux->lk);
while(!mux->writeq)
rsleep(&mux->rpcfork);
qunlock(&mux->lk);
}