Implement write buffer.

This commit is contained in:
rsc 2004-03-11 19:14:09 +00:00
parent d49a2e4801
commit 2499885177
12 changed files with 358 additions and 48 deletions

View file

@ -144,6 +144,7 @@ writeclumpinfo(Arena *arena, int clump, ClumpInfo *ci)
cib = getcib(arena, clump, 1, &r); cib = getcib(arena, clump, 1, &r);
if(cib == nil) if(cib == nil)
return -1; return -1;
dirtydblock(cib->data, DirtyArenaCib);
packclumpinfo(ci, &cib->data->data[cib->offset]); packclumpinfo(ci, &cib->data->data[cib->offset]);
putcib(arena, cib); putcib(arena, cib);
return 0; return 0;
@ -239,11 +240,13 @@ writearena(Arena *arena, u64int aa, u8int *clbuf, u32int n)
qunlock(&arena->lock); qunlock(&arena->lock);
return -1; return -1;
} }
dirtydblock(b, DirtyArena);
m = blocksize - off; m = blocksize - off;
if(m > n - nn) if(m > n - nn)
m = n - nn; m = n - nn;
memmove(&b->data[off], &clbuf[nn], m); memmove(&b->data[off], &clbuf[nn], m);
ok = writepart(arena->part, a, b->data, blocksize); // ok = writepart(arena->part, a, b->data, blocksize);
ok = 0;
putdblock(b); putdblock(b);
if(ok < 0){ if(ok < 0){
qunlock(&arena->lock); qunlock(&arena->lock);
@ -302,12 +305,13 @@ writeaclump(Arena *arena, Clump *c, u8int *clbuf)
qunlock(&arena->lock); qunlock(&arena->lock);
return TWID64; return TWID64;
} }
dirtydblock(b, DirtyArena);
m = blocksize - off; m = blocksize - off;
if(m > n - nn) if(m > n - nn)
m = n - nn; m = n - nn;
memmove(&b->data[off], &clbuf[nn], m); memmove(&b->data[off], &clbuf[nn], m);
print("writing\n"); // ok = writepart(arena->part, a, b->data, blocksize);
ok = writepart(arena->part, a, b->data, blocksize); ok = 0;
putdblock(b); putdblock(b);
if(ok < 0){ if(ok < 0){
qunlock(&arena->lock); qunlock(&arena->lock);
@ -352,6 +356,7 @@ static void
sealarena(Arena *arena) sealarena(Arena *arena)
{ {
flushciblocks(arena); flushciblocks(arena);
flushdcache();
arena->sealed = 1; arena->sealed = 1;
wbarena(arena); wbarena(arena);
backsumarena(arena); backsumarena(arena);
@ -439,6 +444,8 @@ ReadErr:
/* /*
* check for no checksum or the same * check for no checksum or the same
*
* the writepart is okay because we flushed the dcache in sealarena
*/ */
if(scorecmp(score, &b->data[bs - VtScoreSize]) != 0){ if(scorecmp(score, &b->data[bs - VtScoreSize]) != 0){
if(scorecmp(zeroscore, &b->data[bs - VtScoreSize]) != 0) if(scorecmp(zeroscore, &b->data[bs - VtScoreSize]) != 0)
@ -585,6 +592,7 @@ getcib(Arena *arena, int clump, int writing, CIBlock *rock)
block = clump / arena->clumpmax; block = clump / arena->clumpmax;
off = (clump - block * arena->clumpmax) * ClumpInfoSize; off = (clump - block * arena->clumpmax) * ClumpInfoSize;
/*
if(arena->cib.block == block if(arena->cib.block == block
&& arena->cib.data != nil){ && arena->cib.data != nil){
arena->cib.offset = off; arena->cib.offset = off;
@ -596,6 +604,8 @@ getcib(Arena *arena, int clump, int writing, CIBlock *rock)
cib = &arena->cib; cib = &arena->cib;
}else }else
cib = rock; cib = rock;
*/
cib = rock;
qlock(&stats.lock); qlock(&stats.lock);
stats.cireads++; stats.cireads++;
@ -620,6 +630,8 @@ putcib(Arena *arena, CIBlock *cib)
/* /*
* must be called with arena locked * must be called with arena locked
*
* cache turned off now that dcache does write caching too.
*/ */
int int
flushciblocks(Arena *arena) flushciblocks(Arena *arena)
@ -631,8 +643,8 @@ flushciblocks(Arena *arena)
qlock(&stats.lock); qlock(&stats.lock);
stats.ciwrites++; stats.ciwrites++;
qunlock(&stats.lock); qunlock(&stats.lock);
ok = writepart(arena->part, arena->base + arena->size - (arena->cib.block + 1) * arena->blocksize, arena->cib.data->data, arena->blocksize); // ok = writepart(arena->part, arena->base + arena->size - (arena->cib.block + 1) * arena->blocksize, arena->cib.data->data, arena->blocksize);
ok = 0;
if(ok < 0) if(ok < 0)
seterr(EAdmin, "failed writing arena directory block"); seterr(EAdmin, "failed writing arena directory block");
putdblock(arena->cib.data); putdblock(arena->cib.data);

View file

@ -115,6 +115,13 @@ enum
MaxClumpBlocks = (VtMaxLumpSize + ClumpSize + (1 << ABlockLog) - 1) >> ABlockLog, MaxClumpBlocks = (VtMaxLumpSize + ClumpSize + (1 << ABlockLog) - 1) >> ABlockLog,
/*
* dirty flags
*/
DirtyArena = 1,
DirtyIndex,
DirtyArenaCib,
VentiZZZZZZZZ VentiZZZZZZZZ
}; };
@ -142,6 +149,7 @@ struct Part
u64int size; /* size of the partiton */ u64int size; /* size of the partiton */
u32int blocksize; /* block size for reads and writes */ u32int blocksize; /* block size for reads and writes */
char *name; char *name;
Channel *writechan; /* chan[dcache.nblock](DBlock*) */
}; };
/* /*
@ -156,6 +164,8 @@ struct DBlock
Part *part; /* partition in which cached */ Part *part; /* partition in which cached */
u64int addr; /* base address on the partition */ u64int addr; /* base address on the partition */
u16int size; /* amount of data available, not amount allocated; should go away */ u16int size; /* amount of data available, not amount allocated; should go away */
u32int dirty;
u32int dirtying;
DBlock *next; /* doubly linked hash chains */ DBlock *next; /* doubly linked hash chains */
DBlock *prev; DBlock *prev;
u32int heap; /* index in heap table */ u32int heap; /* index in heap table */
@ -163,6 +173,8 @@ struct DBlock
u32int used2; u32int used2;
u32int ref; /* reference count */ u32int ref; /* reference count */
QLock lock; /* for access to data only */ QLock lock; /* for access to data only */
Channel writedonechan;
void* chanbuf[1]; /* buffer for the chan! */
}; };
/* /*
@ -486,6 +498,10 @@ struct Stats
long iclookups; /* index cache lookups */ long iclookups; /* index cache lookups */
long ichits; /* hits in the cache */ long ichits; /* hits in the cache */
long icfills; /* successful fills from index */ long icfills; /* successful fills from index */
long absorbedwrites; /* disk writes absorbed by dcache */
long dirtydblocks; /* blocks dirtied */
long dcacheflushes; /* times dcache has flushed */
long dcacheflushwrites; /* blocks written by those flushes */
}; };
extern Index *mainindex; extern Index *mainindex;

View file

@ -1,3 +1,7 @@
/*
* The locking here is getting a little out of hand.
*/
#include "stdinc.h" #include "stdinc.h"
#include "dat.h" #include "dat.h"
#include "fns.h" #include "fns.h"
@ -14,7 +18,12 @@ enum
struct DCache struct DCache
{ {
QLock lock; QLock lock;
RWLock dirtylock; /* must be held to inspect or set b->dirty */
u32int flushround;
Rendez anydirty;
Rendez full; Rendez full;
Rendez flush;
Rendez flushdone;
DBlock *free; /* list of available lumps */ DBlock *free; /* list of available lumps */
u32int now; /* ticks for usage timestamps */ u32int now; /* ticks for usage timestamps */
int size; /* max. size of any block; allocated to each block */ int size; /* max. size of any block; allocated to each block */
@ -23,7 +32,10 @@ struct DCache
DBlock **heap; /* heap for locating victims */ DBlock **heap; /* heap for locating victims */
int nblocks; /* number of blocks allocated */ int nblocks; /* number of blocks allocated */
DBlock *blocks; /* array of block descriptors */ DBlock *blocks; /* array of block descriptors */
DBlock **write; /* array of block pointers to be written */
u8int *mem; /* memory for all block descriptors */ u8int *mem; /* memory for all block descriptors */
int ndirty; /* number of dirty blocks */
int maxdirty; /* max. number of dirty blocks */
}; };
static DCache dcache; static DCache dcache;
@ -33,6 +45,10 @@ static int upheap(int i, DBlock *b);
static DBlock *bumpdblock(void); static DBlock *bumpdblock(void);
static void delheap(DBlock *db); static void delheap(DBlock *db);
static void fixheap(int i, DBlock *b); static void fixheap(int i, DBlock *b);
static void _flushdcache(void);
static void flushproc(void*);
static void flushtimerproc(void*);
static void writeproc(void*);
void void
initdcache(u32int mem) initdcache(u32int mem)
@ -47,14 +63,20 @@ initdcache(u32int mem)
sysfatal("no max. block size given for disk cache"); sysfatal("no max. block size given for disk cache");
blocksize = maxblocksize; blocksize = maxblocksize;
nblocks = mem / blocksize; nblocks = mem / blocksize;
if(0)
fprint(2, "initialize disk cache with %d blocks of %d bytes\n", nblocks, blocksize);
dcache.full.l = &dcache.lock; dcache.full.l = &dcache.lock;
dcache.flush.l = &dcache.lock;
dcache.anydirty.l = &dcache.lock;
dcache.flushdone.l = &dcache.lock;
dcache.nblocks = nblocks; dcache.nblocks = nblocks;
dcache.maxdirty = (nblocks * 3) / 4;
if(1)
fprint(2, "initialize disk cache with %d blocks of %d bytes, maximum %d dirty blocks\n",
nblocks, blocksize, dcache.maxdirty);
dcache.size = blocksize; dcache.size = blocksize;
dcache.heads = MKNZ(DBlock*, HashSize); dcache.heads = MKNZ(DBlock*, HashSize);
dcache.heap = MKNZ(DBlock*, nblocks); dcache.heap = MKNZ(DBlock*, nblocks);
dcache.blocks = MKNZ(DBlock, nblocks); dcache.blocks = MKNZ(DBlock, nblocks);
dcache.write = MKNZ(DBlock*, nblocks);
dcache.mem = MKNZ(u8int, nblocks * blocksize); dcache.mem = MKNZ(u8int, nblocks * blocksize);
last = nil; last = nil;
@ -62,11 +84,15 @@ initdcache(u32int mem)
b = &dcache.blocks[i]; b = &dcache.blocks[i];
b->data = &dcache.mem[i * blocksize]; b->data = &dcache.mem[i * blocksize];
b->heap = TWID32; b->heap = TWID32;
chaninit(&b->writedonechan, sizeof(void*), 1);
b->next = last; b->next = last;
last = b; last = b;
} }
dcache.free = last; dcache.free = last;
dcache.nheap = 0; dcache.nheap = 0;
vtproc(flushproc, nil);
vtproc(flushtimerproc, nil);
} }
static u32int static u32int
@ -178,25 +204,70 @@ putdblock(DBlock *b)
if(b == nil) if(b == nil)
return; return;
if(b->dirtying){
b->dirtying = 0;
runlock(&dcache.dirtylock);
}
qunlock(&b->lock); qunlock(&b->lock);
//checkdcache(); //checkdcache();
qlock(&dcache.lock); qlock(&dcache.lock);
if(--b->ref == 0){ if(b->dirty)
delheap(b);
else if(--b->ref == 0){
if(b->heap == TWID32) if(b->heap == TWID32)
upheap(dcache.nheap++, b); upheap(dcache.nheap++, b);
rwakeup(&dcache.full); rwakeupall(&dcache.full);
} }
qunlock(&dcache.lock); qunlock(&dcache.lock);
//checkdcache(); //checkdcache();
} }
void
dirtydblock(DBlock *b, int dirty)
{
int odirty;
Part *p;
fprint(2, "dirty %p\n", b);
rlock(&dcache.dirtylock);
assert(b->ref != 0);
assert(b->dirtying == 0);
b->dirtying = 1;
qlock(&stats.lock);
if(b->dirty)
stats.absorbedwrites++;
stats.dirtydblocks++;
qunlock(&stats.lock);
if(b->dirty)
assert(b->dirty == dirty);
odirty = b->dirty;
b->dirty = dirty;
p = b->part;
if(p->writechan == nil){
fprint(2, "allocate write proc for part %s\n", p->name);
/* XXX hope this doesn't fail! */
p->writechan = chancreate(sizeof(DBlock*), dcache.nblocks);
vtproc(writeproc, p);
}
qlock(&dcache.lock);
if(!odirty){
dcache.ndirty++;
rwakeupall(&dcache.anydirty);
}
qunlock(&dcache.lock);
}
/* /*
* remove some block from use and update the free list and counters * remove some block from use and update the free list and counters
*/ */
static DBlock* static DBlock*
bumpdblock(void) bumpdblock(void)
{ {
int flushed;
DBlock *b; DBlock *b;
ulong h; ulong h;
@ -206,14 +277,20 @@ bumpdblock(void)
return b; return b;
} }
if(dcache.ndirty >= dcache.maxdirty)
_flushdcache();
/* /*
* remove blocks until we find one that is unused * remove blocks until we find one that is unused
* referenced blocks are left in the heap even though * referenced blocks are left in the heap even though
* they can't be scavenged; this is simple a speed optimization * they can't be scavenged; this is simple a speed optimization
*/ */
flushed = 0;
for(;;){ for(;;){
if(dcache.nheap == 0) if(dcache.nheap == 0){
_flushdcache();
return nil; return nil;
}
b = dcache.heap[0]; b = dcache.heap[0];
delheap(b); delheap(b);
if(!b->ref) if(!b->ref)
@ -242,6 +319,8 @@ bumpdblock(void)
static void static void
delheap(DBlock *db) delheap(DBlock *db)
{ {
if(db->heap == TWID32)
return;
fixheap(db->heap, dcache.heap[--dcache.nheap]); fixheap(db->heap, dcache.heap[--dcache.nheap]);
db->heap = TWID32; db->heap = TWID32;
} }
@ -370,3 +449,186 @@ checkdcache(void)
sysfatal("dc: missing blocks: %d %d %d", dcache.nheap, refed, dcache.nblocks); sysfatal("dc: missing blocks: %d %d %d", dcache.nheap, refed, dcache.nblocks);
qunlock(&dcache.lock); qunlock(&dcache.lock);
} }
void
flushdcache(void)
{
u32int flushround;
qlock(&dcache.lock);
flushround = dcache.flushround;
rwakeupall(&dcache.flush);
while(flushround == dcache.flushround)
rsleep(&dcache.flushdone);
qunlock(&dcache.lock);
}
static void
_flushdcache(void)
{
rwakeupall(&dcache.flush);
}
static int
parallelwrites(DBlock **b, DBlock **eb, int dirty)
{
DBlock **p;
for(p=b; p<eb && (*p)->dirty == dirty; p++)
sendp((*p)->part->writechan, *p);
for(p=b; p<eb && (*p)->dirty == dirty; p++)
recvp(&(*p)->writedonechan);
return p-b;
}
/*
* Sort first by dirty flag, then by partition, then by address in partition.
*/
static int
writeblockcmp(const void *va, const void *vb)
{
DBlock *a, *b;
a = *(DBlock**)va;
b = *(DBlock**)vb;
if(a->dirty != b->dirty)
return a->dirty - b->dirty;
if(a->part != b->part){
if(a->part < b->part)
return -1;
if(a->part > b->part)
return 1;
}
if(a->addr < b->addr)
return -1;
return 1;
}
static void
flushtimerproc(void *v)
{
u32int round;
for(;;){
qlock(&dcache.lock);
while(dcache.ndirty == 0)
rsleep(&dcache.anydirty);
round = dcache.flushround;
qunlock(&dcache.lock);
sleep(60*1000);
qlock(&dcache.lock);
if(round == dcache.flushround){
rwakeupall(&dcache.flush);
while(round == dcache.flushround)
rsleep(&dcache.flushdone);
}
qunlock(&dcache.lock);
}
}
static void
flushproc(void *v)
{
int i, n;
DBlock *b, **write;
USED(v);
for(;;){
qlock(&dcache.lock);
dcache.flushround++;
rwakeupall(&dcache.flushdone);
rsleep(&dcache.flush);
qunlock(&dcache.lock);
fprint(2, "flushing dcache\n");
/*
* Because we don't record any dependencies at all, we must write out
* all blocks currently dirty. Thus we must lock all the blocks that
* are currently dirty.
*
* We grab dirtylock to stop the dirtying of new blocks.
* Then we wait until all the current blocks finish being dirtied.
* Now all the dirty blocks in the system are immutable (clean blocks
* might still get recycled), so we can plan our disk writes.
*
* In a better scheme, dirtiers might lock the block for writing in getdblock,
* so that flushproc could lock all the blocks here and then unlock them as it
* finishes with them.
*/
fprint(2, "flushproc: wlock\n");
wlock(&dcache.dirtylock);
fprint(2, "flushproc: build list\n");
write = dcache.write;
n = 0;
for(i=0; i<dcache.nblocks; i++){
b = &dcache.blocks[i];
if(b->dirty)
write[n++] = b;
}
qsort(write, n, sizeof(write[0]), writeblockcmp);
/*
* At the beginning of the array are the arena blocks.
*/
fprint(2, "flushproc: write arena blocks\n");
i = 0;
i += parallelwrites(write+i, write+n, DirtyArena);
/*
* Next are the index blocks.
*/
fprint(2, "flushproc: write index blocks\n");
i += parallelwrites(write+i, write+n, DirtyIndex);
/*
* Finally, the arena clump info blocks.
*/
fprint(2, "flushproc: write cib blocks\n");
i += parallelwrites(write+i, write+n, DirtyArenaCib);
assert(i == n);
fprint(2, "flushproc: update dirty bits\n");
qlock(&dcache.lock);
for(i=0; i<n; i++){
b = write[i];
b->dirty = 0;
--dcache.ndirty;
if(b->ref == 0 && b->heap == TWID32){
upheap(dcache.nheap++, b);
rwakeupall(&dcache.full);
}
}
qunlock(&dcache.lock);
wunlock(&dcache.dirtylock);
qlock(&stats.lock);
stats.dcacheflushes++;
stats.dcacheflushwrites += n;
qunlock(&stats.lock);
}
}
static void
writeproc(void *v)
{
DBlock *b;
Part *p;
p = v;
for(;;){
b = recvp(p->writechan);
if(writepart(p, b->addr, b->data, b->size) < 0)
fprint(2, "write error: %r\n"); /* XXX details! */
sendp(&b->writedonechan, b);
}
}

View file

@ -18,16 +18,6 @@ clumpinfoeq(ClumpInfo *c, ClumpInfo *d)
&& scorecmp(c->score, d->score)==0; && scorecmp(c->score, d->score)==0;
} }
/*
* synchronize the clump info directory with
* with the clumps actually stored in the arena.
* the directory should be at least as up to date
* as the arena's trailer.
*
* checks/updates at most n clumps.
*
* returns 1 if ok, -1 if an error occured, 0 if blocks were updated
*/
int int
findscore(Arena *arena, uchar *score) findscore(Arena *arena, uchar *score)
{ {

View file

@ -14,6 +14,7 @@ int clumpinfoeq(ClumpInfo *c, ClumpInfo *d);
int clumpinfoeq(ClumpInfo *c, ClumpInfo *d); int clumpinfoeq(ClumpInfo *c, ClumpInfo *d);
u32int clumpmagic(Arena *arena, u64int aa); u32int clumpmagic(Arena *arena, u64int aa);
int delarena(Arena *arena); int delarena(Arena *arena);
void dirtydblock(DBlock*, int);
void *emalloc(ulong); void *emalloc(ulong);
void *erealloc(void *, ulong); void *erealloc(void *, ulong);
char *estrdup(char*); char *estrdup(char*);
@ -21,6 +22,8 @@ void *ezmalloc(ulong);
Arena *findarena(char *name); Arena *findarena(char *name);
ISect *findisect(Index *ix, u32int buck); ISect *findisect(Index *ix, u32int buck);
int flushciblocks(Arena *arena); int flushciblocks(Arena *arena);
void flushdcache(void);
void flushqueue(void);
void fmtzbinit(Fmt *f, ZBlock *b); void fmtzbinit(Fmt *f, ZBlock *b);
void freearena(Arena *arena); void freearena(Arena *arena);
void freearenapart(ArenaPart *ap, int freearenas); void freearenapart(ArenaPart *ap, int freearenas);
@ -90,7 +93,6 @@ void printindex(int fd, Index *ix);
void printstats(void); void printstats(void);
void putdblock(DBlock *b); void putdblock(DBlock *b);
void putlump(Lump *b); void putlump(Lump *b);
void queueflush(void);
int queuewrite(Lump *b, Packet *p, int creator); int queuewrite(Lump *b, Packet *p, int creator);
u32int readarena(Arena *arena, u64int aa, u8int *buf, long n); u32int readarena(Arena *arena, u64int aa, u8int *buf, long n);
int readarenamap(AMapN *amn, Part *part, u64int base, u32int size); int readarenamap(AMapN *amn, Part *part, u64int base, u32int size);

View file

@ -137,12 +137,14 @@ httpproc(void *v)
c = v; c = v;
for(t = 15*60*1000; ; t = 15*1000){ for(t = 15*60*1000; ; t = 15*1000){
if(hparsereq(c, t) <= 0) fprint(2, "httpd: get headers\n");
if(hparsereq(c, t) < 0)
break; break;
ok = -1; ok = -1;
for(i = 0; i < MaxObjs && objs[i].name[0]; i++){ for(i = 0; i < MaxObjs && objs[i].name[0]; i++){
if(strcmp(c->req.uri, objs[i].name) == 0){ if(strcmp(c->req.uri, objs[i].name) == 0){
fprint(2, "httpd: call function %p\n", objs[i].f);
ok = (*objs[i].f)(c); ok = (*objs[i].f)(c);
break; break;
} }
@ -180,7 +182,10 @@ static int
preq(HConnect *c) preq(HConnect *c)
{ {
if(hparseheaders(c, 15*60*1000) < 0) if(hparseheaders(c, 15*60*1000) < 0)
{
fprint(2, "hparseheaders failed\n");
return -1; return -1;
}
if(strcmp(c->req.meth, "GET") != 0 if(strcmp(c->req.meth, "GET") != 0
&& strcmp(c->req.meth, "HEAD") != 0) && strcmp(c->req.meth, "HEAD") != 0)
return hunallowed(c, "GET, HEAD"); return hunallowed(c, "GET, HEAD");
@ -196,7 +201,7 @@ preqtext(HConnect *c)
int r; int r;
r = preq(c); r = preq(c);
if(r <= 0) if(r < 0)
return r; return r;
hout = &c->hout; hout = &c->hout;
@ -221,7 +226,7 @@ notfound(HConnect *c)
int r; int r;
r = preq(c); r = preq(c);
if(r <= 0) if(r < 0)
return r; return r;
return hfail(c, HNotFound, c->req.uri); return hfail(c, HNotFound, c->req.uri);
} }
@ -233,9 +238,13 @@ estats(HConnect *c)
int r; int r;
r = preqtext(c); r = preqtext(c);
if(r <= 0) if(r < 0)
{
fprint(2, "preqtext failed\n");
return r; return r;
}
fprint(2, "write stats\n");
hout = &c->hout; hout = &c->hout;
hprint(hout, "lump writes=%,ld\n", stats.lumpwrites); hprint(hout, "lump writes=%,ld\n", stats.lumpwrites);
hprint(hout, "lump reads=%,ld\n", stats.lumpreads); hprint(hout, "lump reads=%,ld\n", stats.lumpreads);
@ -268,13 +277,21 @@ estats(HConnect *c)
hprint(hout, "disk cache misses=%,ld\n", stats.pcmiss); hprint(hout, "disk cache misses=%,ld\n", stats.pcmiss);
hprint(hout, "disk cache reads=%,ld\n", stats.pcreads); hprint(hout, "disk cache reads=%,ld\n", stats.pcreads);
hprint(hout, "disk cache bytes read=%,lld\n", stats.pcbreads); hprint(hout, "disk cache bytes read=%,lld\n", stats.pcbreads);
fprint(2, "write new stats\n");
hprint(hout, "disk cache writes=%,ld\n", stats.dirtydblocks);
hprint(hout, "disk cache writes absorbed=%,ld %d%%\n", stats.absorbedwrites,
percent(stats.absorbedwrites, stats.dirtydblocks));
fprint(2, "back to old stats\n");
hprint(hout, "disk writes=%,ld\n", stats.diskwrites); hprint(hout, "disk writes=%,ld\n", stats.diskwrites);
hprint(hout, "disk bytes written=%,lld\n", stats.diskbwrites); hprint(hout, "disk bytes written=%,lld\n", stats.diskbwrites);
hprint(hout, "disk reads=%,ld\n", stats.diskreads); hprint(hout, "disk reads=%,ld\n", stats.diskreads);
hprint(hout, "disk bytes read=%,lld\n", stats.diskbreads); hprint(hout, "disk bytes read=%,lld\n", stats.diskbreads);
fprint(2, "hflush stats\n");
hflush(hout); hflush(hout);
fprint(2, "done with stats\n");
return 0; return 0;
} }
@ -288,7 +305,7 @@ sindex(HConnect *c)
int i, r, active; int i, r, active;
r = preqtext(c); r = preqtext(c);
if(r <= 0) if(r < 0)
return r; return r;
hout = &c->hout; hout = &c->hout;
@ -348,7 +365,7 @@ dindex(HConnect *c)
int i, r; int i, r;
r = preqtext(c); r = preqtext(c);
if(r <= 0) if(r < 0)
return r; return r;
hout = &c->hout; hout = &c->hout;
@ -376,7 +393,7 @@ xindex(HConnect *c)
int r; int r;
r = preq(c); r = preq(c);
if(r <= 0) if(r < 0)
return r; return r;
hout = &c->hout; hout = &c->hout;

View file

@ -624,12 +624,14 @@ storeientry(Index *ix, IEntry *ie)
h = bucklook(ie->score, ie->ia.type, ib.data, ib.n); h = bucklook(ie->score, ie->ia.type, ib.data, ib.n);
if(h & 1){ if(h & 1){
h ^= 1; h ^= 1;
dirtydblock(b, DirtyIndex);
packientry(ie, &ib.data[h]); packientry(ie, &ib.data[h]);
ok = writebucket(is, buck, &ib, b); ok = writebucket(is, buck, &ib, b);
break; break;
} }
if(ib.n < is->buckmax){ if(ib.n < is->buckmax){
dirtydblock(b, DirtyIndex);
memmove(&ib.data[h + IEntrySize], &ib.data[h], ib.n * IEntrySize - h); memmove(&ib.data[h + IEntrySize], &ib.data[h], ib.n * IEntrySize - h);
ib.n++; ib.n++;
@ -648,14 +650,19 @@ storeientry(Index *ix, IEntry *ie)
static int static int
writebucket(ISect *is, u32int buck, IBucket *ib, DBlock *b) writebucket(ISect *is, u32int buck, IBucket *ib, DBlock *b)
{ {
if(buck >= is->blocks) assert(b->dirty == DirtyIndex);
if(buck >= is->blocks){
seterr(EAdmin, "index write out of bounds: %d >= %d\n", seterr(EAdmin, "index write out of bounds: %d >= %d\n",
buck, is->blocks); buck, is->blocks);
return -1;
}
qlock(&stats.lock); qlock(&stats.lock);
stats.indexwrites++; stats.indexwrites++;
qunlock(&stats.lock); qunlock(&stats.lock);
packibucket(ib, b->data); packibucket(ib, b->data);
return writepart(is->part, is->blockbase + ((u64int)buck << is->blocklog), b->data, is->blocksize); // return writepart(is->part, is->blockbase + ((u64int)buck << is->blocklog), b->data, is->blocksize);
return 0;
} }
/* /*

View file

@ -97,7 +97,7 @@ queuewrite(Lump *u, Packet *p, int creator)
} }
void void
queueflush(void) flushqueue(void)
{ {
int i; int i;
LumpQueue *q; LumpQueue *q;

View file

@ -32,22 +32,15 @@ LIBOFILES=\
SLIB=libvs.a SLIB=libvs.a
LIB=$SLIB\ LIB=$SLIB
$PLAN9/lib/libventi.a\ SHORTLIB=venti httpd bin sec thread 9
$PLAN9/lib/libhttpd.a\
$PLAN9/lib/libbin.a\
$PLAN9/lib/libsec.a\
$PLAN9/lib/libthread.a\
$PLAN9/lib/lib9.a\
$PLAN9/lib/libfmt.a\
$PLAN9/lib/libutf.a\
HFILES= dat.h\ HFILES= dat.h\
fns.h\ fns.h\
stdinc.h\ stdinc.h\
TARG=\ TARG=\
# venti\ venti\
fmtarenas\ fmtarenas\
fmtisect\ fmtisect\
fmtindex\ fmtindex\
@ -64,6 +57,7 @@ TARG=\
read\ read\
write\ write\
copy\ copy\
wb\
BIN=$BIN/venti BIN=$BIN/venti
@ -82,4 +76,3 @@ ainstall:V: ${TARG:%=%.ainstall}
%.ainstall:V: $O.% %.ainstall:V: $O.%
scp $prereq amsterdam:/usr/local/bin/venti/$stem scp $prereq amsterdam:/usr/local/bin/venti/$stem
LDFLAGS=$LDFLAGS -l9

View file

@ -12,7 +12,7 @@ initpart(char *name, int writable)
Dir *dir; Dir *dir;
int how; int how;
part = MK(Part); part = MKZ(Part);
part->name = estrdup(name); part->name = estrdup(name);
if(!writable && readonly) if(!writable && readonly)
how = OREAD; how = OREAD;

View file

@ -57,8 +57,13 @@ printstats(void)
fprint(2, "disk cache reads=%,ld\n", stats.pcreads); fprint(2, "disk cache reads=%,ld\n", stats.pcreads);
fprint(2, "disk cache bytes read=%,lld\n", stats.pcbreads); fprint(2, "disk cache bytes read=%,lld\n", stats.pcbreads);
fprint(2, "disk cache writes=%,ld\n", stats.dirtydblocks);
fprint(2, "disk cache writes absorbed=%,ld %d%%\n", stats.absorbedwrites,
percent(stats.absorbedwrites, stats.dirtydblocks));
fprint(2, "disk writes=%,ld\n", stats.diskwrites); fprint(2, "disk writes=%,ld\n", stats.diskwrites);
fprint(2, "disk bytes written=%,lld\n", stats.diskbwrites); fprint(2, "disk bytes written=%,lld\n", stats.diskbwrites);
fprint(2, "disk reads=%,ld\n", stats.diskreads); fprint(2, "disk reads=%,ld\n", stats.diskreads);
fprint(2, "disk bytes read=%,lld\n", stats.diskbreads); fprint(2, "disk bytes read=%,lld\n", stats.diskbreads);
} }

View file

@ -139,6 +139,7 @@ ventiserver(char *addr)
Packet *p; Packet *p;
VtReq *r; VtReq *r;
VtSrv *s; VtSrv *s;
char err[ERRMAX];
s = vtlisten(addr); s = vtlisten(addr);
if(s == nil) if(s == nil)
@ -153,17 +154,22 @@ ventiserver(char *addr)
vtrerror(r, "unknown request"); vtrerror(r, "unknown request");
break; break;
case VtTread: case VtTread:
if((r->rx.data = readlump(r->tx.score, r->tx.dtype, r->tx.count)) == nil) if((r->rx.data = readlump(r->tx.score, r->tx.dtype, r->tx.count)) == nil){
vtrerror(r, gerrstr()); rerrstr(err, sizeof err);
vtrerror(r, err);
}
break; break;
case VtTwrite: case VtTwrite:
p = r->tx.data; p = r->tx.data;
r->tx.data = nil; r->tx.data = nil;
if(writelump(p, r->rx.score, r->tx.dtype, 0) < 0) if(writelump(p, r->rx.score, r->tx.dtype, 0) < 0){
vtrerror(r, gerrstr()); rerrstr(err, sizeof err);
vtrerror(r, err);
}
break; break;
case VtTsync: case VtTsync:
queueflush(); flushqueue();
flushdcache();
break; break;
} }
vtrespond(r); vtrespond(r);