new venti library.
This commit is contained in:
parent
9df487d720
commit
056fe1ba7f
28 changed files with 4635 additions and 0 deletions
212
src/libventi/send.c
Normal file
212
src/libventi/send.c
Normal file
|
|
@ -0,0 +1,212 @@
|
|||
#include <u.h>
|
||||
#include <libc.h>
|
||||
#include <venti.h>
|
||||
#include "queue.h"
|
||||
|
||||
static int
|
||||
_vtsend(VtConn *z, Packet *p)
|
||||
{
|
||||
IOchunk ioc;
|
||||
int n;
|
||||
uchar buf[2];
|
||||
|
||||
if(z->state != VtStateConnected) {
|
||||
werrstr("session not connected");
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* add framing */
|
||||
n = packetsize(p);
|
||||
if(n >= (1<<16)) {
|
||||
werrstr("packet too large");
|
||||
packetfree(p);
|
||||
return -1;
|
||||
}
|
||||
buf[0] = n>>8;
|
||||
buf[1] = n;
|
||||
packetprefix(p, buf, 2);
|
||||
|
||||
for(;;){
|
||||
n = packetfragments(p, &ioc, 1, 0);
|
||||
if(n == 0)
|
||||
break;
|
||||
if(write(z->outfd, ioc.addr, ioc.len) < ioc.len){
|
||||
packetfree(p);
|
||||
return 0;
|
||||
}
|
||||
packetconsume(p, nil, ioc.len);
|
||||
}
|
||||
packetfree(p);
|
||||
return 1;
|
||||
}
|
||||
|
||||
static Packet*
|
||||
_vtrecv(VtConn *z)
|
||||
{
|
||||
uchar buf[10], *b;
|
||||
int n;
|
||||
Packet *p;
|
||||
int size, len;
|
||||
|
||||
if(z->state != VtStateConnected) {
|
||||
werrstr("session not connected");
|
||||
return nil;
|
||||
}
|
||||
|
||||
p = z->part;
|
||||
/* get enough for head size */
|
||||
size = packetsize(p);
|
||||
while(size < 2) {
|
||||
b = packettrailer(p, MaxFragSize);
|
||||
assert(b != nil);
|
||||
n = read(z->infd, b, MaxFragSize);
|
||||
if(n <= 0)
|
||||
goto Err;
|
||||
size += n;
|
||||
packettrim(p, 0, size);
|
||||
}
|
||||
|
||||
if(packetconsume(p, buf, 2) < 0)
|
||||
goto Err;
|
||||
len = (buf[0] << 8) | buf[1];
|
||||
size -= 2;
|
||||
|
||||
while(size < len) {
|
||||
n = len - size;
|
||||
if(n > MaxFragSize)
|
||||
n = MaxFragSize;
|
||||
b = packettrailer(p, n);
|
||||
if(readn(z->infd, b, n) != n)
|
||||
goto Err;
|
||||
size += n;
|
||||
}
|
||||
p = packetsplit(p, len);
|
||||
return p;
|
||||
Err:
|
||||
return nil;
|
||||
}
|
||||
|
||||
/*
|
||||
* If you fork off two procs running vtrecvproc and vtsendproc,
|
||||
* then vtrecv/vtsend (and thus vtrpc) will never block except on
|
||||
* rendevouses, which is nice when it's running in one thread of many.
|
||||
*/
|
||||
void
|
||||
vtrecvproc(void *v)
|
||||
{
|
||||
Packet *p;
|
||||
VtConn *z;
|
||||
Queue *q;
|
||||
|
||||
z = v;
|
||||
q = _vtqalloc();
|
||||
|
||||
qlock(&z->lk);
|
||||
z->readq = q;
|
||||
qlock(&z->inlk);
|
||||
rwakeup(&z->rpcfork);
|
||||
qunlock(&z->lk);
|
||||
|
||||
while((p = _vtrecv(z)) != nil)
|
||||
if(_vtqsend(q, p) < 0){
|
||||
packetfree(p);
|
||||
break;
|
||||
}
|
||||
qunlock(&z->inlk);
|
||||
qlock(&z->lk);
|
||||
_vtqhangup(q);
|
||||
while((p = _vtnbqrecv(q)) != nil)
|
||||
packetfree(p);
|
||||
vtfree(q);
|
||||
z->readq = nil;
|
||||
rwakeup(&z->rpcfork);
|
||||
qunlock(&z->lk);
|
||||
vthangup(z);
|
||||
}
|
||||
|
||||
void
|
||||
vtsendproc(void *v)
|
||||
{
|
||||
Queue *q;
|
||||
Packet *p;
|
||||
VtConn *z;
|
||||
|
||||
z = v;
|
||||
q = _vtqalloc();
|
||||
|
||||
qlock(&z->lk);
|
||||
z->writeq = q;
|
||||
qlock(&z->outlk);
|
||||
rwakeup(&z->rpcfork);
|
||||
qunlock(&z->lk);
|
||||
|
||||
while((p = _vtqrecv(q)) != nil)
|
||||
if(_vtsend(z, p) < 0)
|
||||
break;
|
||||
qunlock(&z->outlk);
|
||||
qlock(&z->lk);
|
||||
_vtqhangup(q);
|
||||
while((p = _vtnbqrecv(q)) != nil)
|
||||
packetfree(p);
|
||||
vtfree(q);
|
||||
z->writeq = nil;
|
||||
rwakeup(&z->rpcfork);
|
||||
qunlock(&z->lk);
|
||||
return;
|
||||
}
|
||||
|
||||
Packet*
|
||||
vtrecv(VtConn *z)
|
||||
{
|
||||
Packet *p;
|
||||
|
||||
qlock(&z->lk);
|
||||
if(z->state != VtStateConnected){
|
||||
werrstr("not connected");
|
||||
qunlock(&z->lk);
|
||||
return nil;
|
||||
}
|
||||
if(z->readq){
|
||||
qunlock(&z->lk);
|
||||
return _vtqrecv(z->readq);
|
||||
}
|
||||
|
||||
qlock(&z->inlk);
|
||||
qunlock(&z->lk);
|
||||
p = _vtrecv(z);
|
||||
qunlock(&z->inlk);
|
||||
if(!p)
|
||||
vthangup(z);
|
||||
return p;
|
||||
}
|
||||
|
||||
int
|
||||
vtsend(VtConn *z, Packet *p)
|
||||
{
|
||||
qlock(&z->lk);
|
||||
if(z->state != VtStateConnected){
|
||||
packetfree(p);
|
||||
werrstr("not connected");
|
||||
qunlock(&z->lk);
|
||||
return -1;
|
||||
}
|
||||
if(z->writeq){
|
||||
qunlock(&z->lk);
|
||||
if(_vtqsend(z->writeq, p) < 0){
|
||||
packetfree(p);
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
qlock(&z->outlk);
|
||||
qunlock(&z->lk);
|
||||
if(_vtsend(z, p) < 0){
|
||||
qunlock(&z->outlk);
|
||||
vthangup(z);
|
||||
return -1;
|
||||
}
|
||||
qunlock(&z->outlk);
|
||||
return 0;
|
||||
}
|
||||
|
||||
Loading…
Add table
Add a link
Reference in a new issue