assorted changes from Plan 9

This commit is contained in:
rsc 2006-07-18 15:26:33 +00:00
parent 686bd37d9d
commit 28b49df354
39 changed files with 4542 additions and 363 deletions

View file

@ -20,6 +20,7 @@ static void sumproc(void *);
static QLock sumlock;
static Rendez sumwait;
static ASum *sumq;
static ASum *sumqtail;
static uchar zero[8192];
int arenasumsleeptime;
@ -257,7 +258,6 @@ writearena(Arena *arena, u64int aa, u8int *clbuf, u32int n)
if(m > n - nn)
m = n - nn;
memmove(&b->data[off], &clbuf[nn], m);
/* ok = writepart(arena->part, a, b->data, blocksize); */
ok = 0;
putdblock(b);
if(ok < 0){
@ -329,7 +329,6 @@ writeaclump(Arena *arena, Clump *c, u8int *clbuf, u64int start, u64int *pa)
if(m > n - nn)
m = n - nn;
memmove(&b->data[off], &clbuf[nn], m);
/* ok = writepart(arena->part, a, b->data, blocksize); */
ok = 0;
putdblock(b);
if(ok < 0){
@ -356,6 +355,7 @@ writeaclump(Arena *arena, Clump *c, u8int *clbuf, u64int start, u64int *pa)
arena->ctime = arena->wtime;
writeclumpinfo(arena, clump, &c->info);
wbarena(arena);
/* set up for call to setdcachestate */
as.arena = arena;
@ -410,6 +410,9 @@ setatailstate(AState *as)
trace(0, "setatailstate %s 0x%llux clumps %d", as->arena->name, as->aa, as->stats.clumps);
/*
* Look up as->arena to find index.
*/
ix = mainindex;
for(i=0; i<ix->narenas; i++)
if(ix->arenas[i] == as->arena)
@ -419,6 +422,9 @@ setatailstate(AState *as)
return;
}
/*
* Walk backward until we find the last time these were in sync.
*/
for(j=i; --j>=0; ){
a = ix->arenas[j];
if(atailcmp(&a->diskstats, &a->memstats) == 0)
@ -464,8 +470,12 @@ backsumarena(Arena *arena)
return;
qlock(&sumlock);
as->arena = arena;
as->next = sumq;
as->next = nil;
if(sumq)
sumqtail->next = as;
else
sumq = as;
sumqtail = as;
rwakeup(&sumwait);
qunlock(&sumlock);
}
@ -499,6 +509,7 @@ sumarena(Arena *arena)
DigestState s;
u64int a, e;
u32int bs;
int t;
u8int score[VtScoreSize];
bs = MaxIoSize;
@ -512,7 +523,12 @@ sumarena(Arena *arena)
b = alloczblock(bs, 0, arena->part->blocksize);
e = arena->base + arena->size;
for(a = arena->base - arena->blocksize; a + arena->blocksize <= e; a += bs){
sleep(arenasumsleeptime);
disksched();
while((t=arenasumsleeptime) == SleepForever){
sleep(1000);
disksched();
}
sleep(t);
if(a + bs > e)
bs = arena->blocksize;
if(readpart(arena->part, a, b->data, bs) < 0)
@ -595,7 +611,7 @@ wbarenahead(Arena *arena)
b = alloczblock(arena->blocksize, 1, arena->part->blocksize);
if(b == nil){
logerr(EAdmin, "can't write arena header: %r");
/*/ZZZ add error message? */
/* ZZZ add error message? */
return -1;
}
/*
@ -681,18 +697,22 @@ okarena(Arena *arena)
ok = 0;
dsize = arenadirsize(arena, arena->diskstats.clumps);
if(arena->diskstats.used + dsize > arena->size){
seterr(ECorrupt, "arena used > size");
seterr(ECorrupt, "arena %s used > size", arena->name);
ok = -1;
}
if(arena->diskstats.cclumps > arena->diskstats.clumps)
logerr(ECorrupt, "arena has more compressed clumps than total clumps");
logerr(ECorrupt, "arena %s has more compressed clumps than total clumps", arena->name);
/*
* This need not be true if some of the disk is corrupted.
*
if(arena->diskstats.uncsize + arena->diskstats.clumps * ClumpSize + arena->blocksize < arena->diskstats.used)
logerr(ECorrupt, "arena uncompressed size inconsistent with used space %lld %d %lld", arena->diskstats.uncsize, arena->diskstats.clumps, arena->diskstats.used);
logerr(ECorrupt, "arena %s uncompressed size inconsistent with used space %lld %d %lld", arena->name, arena->diskstats.uncsize, arena->diskstats.clumps, arena->diskstats.used);
*/
if(arena->ctime > arena->wtime)
logerr(ECorrupt, "arena creation time after last write time");
logerr(ECorrupt, "arena %s creation time after last write time", arena->name);
return ok;
}

View file

@ -214,7 +214,7 @@ wbarenapart(ArenaPart *ap)
return -1;
b = alloczblock(HeadSize, 1, 0);
if(b == nil)
/*ZZZ set error message? */
/* ZZZ set error message? */
return -1;
if(packarenapart(ap, b->data) < 0){
@ -337,8 +337,8 @@ wbarenamap(AMap *am, int n, Part *part, u64int base, u64int size)
/*
* amap: n '\n' amapelem * n
* n: u32int
* amapelem: name '\t' astart '\t' asize '\n'
* astart, asize: u64int
* amapelem: name '\t' astart '\t' astop '\n'
* astart, astop: u64int
*/
int
parseamap(IFile *f, AMapN *amn)

View file

@ -7,6 +7,8 @@
#include "dat.h"
#include "fns.h"
int ignorebloom;
int
bloominit(Bloom *b, vlong vsize, u8int *data)
{
@ -24,6 +26,7 @@ bloominit(Bloom *b, vlong vsize, u8int *data)
if(unpackbloomhead(b, data) < 0)
return -1;
fprint(2, "bloom size %lud nhash %d\n", b->size, b->nhash);
b->mask = b->size-1;
b->data = data;
return 0;
@ -38,11 +41,7 @@ wbbloomhead(Bloom *b)
Bloom*
readbloom(Part *p)
{
int i, n;
uint ones;
uchar buf[512];
uchar *data;
u32int *a;
Bloom *b;
b = vtmallocz(sizeof *b);
@ -52,14 +51,40 @@ readbloom(Part *p)
vtfree(b);
return nil;
}
b->part = p;
return b;
}
int
resetbloom(Bloom *b)
{
uchar *data;
data = vtmallocz(b->size);
if(readpart(p, 0, data, b->size) < 0){
fprint(2, "bloom data %lud\n", b->size);
b->data = data;
if(b->size == MaxBloomSize) /* 2^32 overflows ulong */
addstat(StatBloomBits, b->size*8-1);
else
addstat(StatBloomBits, b->size*8);
return 0;
}
int
loadbloom(Bloom *b)
{
int i, n;
uint ones;
uchar *data;
u32int *a;
data = vtmallocz(b->size);
if(readpart(b->part, 0, data, b->size) < 0){
vtfree(b);
vtfree(data);
return nil;
return -1;
}
b->data = data;
b->part = p;
a = (u32int*)b->data;
n = b->size/4;
@ -73,7 +98,7 @@ readbloom(Part *p)
else
addstat(StatBloomBits, b->size*8);
return b;
return 0;
}
int
@ -101,6 +126,8 @@ gethashes(u8int *score, ulong *h)
a ^= *(u32int*)(score+i);
b ^= *(u32int*)(score+i+4);
}
if(i+4 <= VtScoreSize) /* 20 is not 4-aligned */
a ^= *(u32int*)(score+i);
for(i=0; i<BloomMaxHash; i++, a+=b)
h[i] = a < BloomHeadSize*8 ? BloomHeadSize*8 : a;
}
@ -154,14 +181,17 @@ inbloomfilter(Bloom *b, u8int *score)
int r;
uint ms;
if(b == nil)
if(b == nil || b->data == nil)
return 1;
if(ignorebloom)
return 1;
ms = msec();
rlock(&b->lk);
r = _inbloomfilter(b, score);
runlock(&b->lk);
ms = msec() - ms;
ms = ms - msec();
addstat2(StatBloomLookup, 1, StatBloomLookupTime, ms);
if(r)
addstat(StatBloomMiss, 1);
@ -173,7 +203,7 @@ inbloomfilter(Bloom *b, u8int *score)
void
markbloomfilter(Bloom *b, u8int *score)
{
if(b == nil)
if(b == nil || b->data == nil)
return;
rlock(&b->lk);
@ -186,14 +216,18 @@ markbloomfilter(Bloom *b, u8int *score)
static void
bloomwriteproc(void *v)
{
int ret;
Bloom *b;
threadsetname("bloomwriteproc");
b = v;
for(;;){
recv(b->writechan, 0);
if(writebloom(b) < 0)
if((ret=writebloom(b)) < 0)
fprint(2, "oops! writing bloom: %r\n");
send(b->writedonechan, 0);
else
ret = 0;
sendul(b->writedonechan, ret);
}
}

View file

@ -21,7 +21,7 @@ initiestream(Part *part, u64int off, u64int clumps, u32int size)
{
IEStream *ies;
/*ZZZ out of memory? */
/* out of memory? */
ies = MKZ(IEStream);
ies->buf = MKN(u8int, size);
ies->epos = ies->buf;
@ -61,7 +61,7 @@ peekientry(IEStream *ies)
nn -= n;
if(nn == 0)
return nil;
/*fprint(2, "peek %d from %llud into %p\n", nn, ies->off, ies->epos); */
//fprint(2, "peek %d from %llud into %p\n", nn, ies->off, ies->epos);
if(readpart(ies->part, ies->off, ies->epos, nn) < 0){
seterr(EOk, "can't read sorted index entries: %r");
return nil;
@ -101,7 +101,7 @@ buildbucket(Index *ix, IEStream *ies, IBucket *ib, uint maxdata)
b = peekientry(ies);
if(b == nil)
return TWID32;
/*fprint(2, "b=%p ies->n=%lld ib.n=%d buck=%d score=%V\n", b, ies->n, ib->n, iebuck(ix, b, ib, ies), b); */
/* fprint(2, "b=%p ies->n=%lld ib.n=%d buck=%d score=%V\n", b, ies->n, ib->n, iebuck(ix, b, ib, ies), b); */
if(ib->n == 0)
buck = iebuck(ix, b, ib, ies);
else{

File diff suppressed because it is too large Load diff

View file

@ -109,7 +109,7 @@ checkindex(Index *ix, Part *part, u64int off, u64int clumps, int zero)
int ok, bok;
u64int found = 0;
/*ZZZ make buffer size configurable */
/* ZZZ make buffer size configurable */
b = alloczblock(ix->blocksize, 0, ix->blocksize);
z = alloczblock(ix->blocksize, 1, ix->blocksize);
ies = initiestream(part, off, clumps, 64*1024);
@ -260,6 +260,8 @@ threadmain(int argc, char *argv[])
if(initventi(argv[0], &conf) < 0)
sysfatal("can't init venti: %r");
if(mainindex->bloom && loadbloom(mainindex->bloom) < 0)
sysfatal("can't load bloom filter: %r");
oldbloom = mainindex->bloom;
newbloom = nil;
if(oldbloom){

View file

@ -91,7 +91,7 @@ clumpmagic(Arena *arena, u64int aa)
{
u8int buf[U32Size];
if(readarena(arena, aa, buf, U32Size) < 0)
if(readarena(arena, aa, buf, U32Size) == TWID32)
return TWID32;
return unpackmagic(buf);
}
@ -138,6 +138,11 @@ loadclump(Arena *arena, u64int aa, int blocks, Clump *cl, u8int *score, int veri
freezblock(cb);
return nil;
}
if(cl->info.type == VtCorruptType){
seterr(EOk, "clump is marked corrupt");
freezblock(cb);
return nil;
}
n -= ClumpSize;
if(n < cl->info.size){
freezblock(cb);

View file

@ -23,7 +23,7 @@ static struct {
ArenaHeadMagic, "ArenaHeadMagic",
ArenaMagic, "ArenaMagic",
ISectMagic, "ISectMagic",
BloomMagic, "BloomMagic"
BloomMagic, "BloomMagic",
};
static char*
@ -138,9 +138,6 @@ unpackarena(Arena *arena, u8int *buf)
p += U64Size;
arena->diskstats.sealed = U8GET(p);
p += U8Size;
arena->memstats = arena->diskstats;
switch(arena->version){
case ArenaVersion4:
sz = ArenaSize4;
@ -153,6 +150,35 @@ unpackarena(Arena *arena, u8int *buf)
seterr(ECorrupt, "arena has bad version number %d", arena->version);
return -1;
}
/*
* Additional fields for the memstats version of the stats.
* Diskstats reflects what is committed to the index.
* Memstats reflects what is in the arena. Originally intended
* this to be a version 5 extension, but might as well use for
* all the existing version 4 arenas too.
*
* To maintain backwards compatibility with existing venti
* installations using the older format, we define that if
* memstats == diskstats, then the extension fields are not
* included (see packarena below). That is, only partially
* indexed arenas have these fields. Fully indexed arenas
* (in particular, sealed arenas) do not.
*/
if(U8GET(p) == 1){
sz += ArenaSize5a-ArenaSize5;
p += U8Size;
arena->memstats.clumps = U32GET(p);
p += U32Size;
arena->memstats.cclumps = U32GET(p);
p += U32Size;
arena->memstats.used = U64GET(p);
p += U64Size;
arena->memstats.uncsize = U64GET(p);
p += U64Size;
arena->memstats.sealed = U8GET(p);
p += U8Size;
}else
arena->memstats = arena->diskstats;
if(buf + sz != p)
sysfatal("unpackarena unpacked wrong amount");
@ -161,6 +187,12 @@ unpackarena(Arena *arena, u8int *buf)
int
packarena(Arena *arena, u8int *buf)
{
return _packarena(arena, buf, 0);
}
int
_packarena(Arena *arena, u8int *buf, int forceext)
{
int sz;
u8int *p;
@ -208,6 +240,30 @@ packarena(Arena *arena, u8int *buf)
U8PUT(p, arena->diskstats.sealed);
p += U8Size;
/*
* Extension fields; see above.
*/
if(forceext
|| arena->memstats.clumps != arena->diskstats.clumps
|| arena->memstats.cclumps != arena->diskstats.cclumps
|| arena->memstats.used != arena->diskstats.used
|| arena->memstats.uncsize != arena->diskstats.uncsize
|| arena->memstats.sealed != arena->diskstats.sealed){
sz += ArenaSize5a - ArenaSize5;
U8PUT(p, 1);
p += U8Size;
U32PUT(p, arena->memstats.clumps);
p += U32Size;
U32PUT(p, arena->memstats.cclumps);
p += U32Size;
U64PUT(p, arena->memstats.used, t32);
p += U64Size;
U64PUT(p, arena->memstats.uncsize, t32);
p += U64Size;
U8PUT(p, arena->memstats.sealed);
p += U8Size;
}
if(buf + sz != p)
sysfatal("packarena packed wrong amount");
@ -525,6 +581,8 @@ unpackientry(IEntry *ie, u8int *buf)
p += U32Size;
ie->train = U16GET(p);
p += U16Size;
if(p - buf != IEntryAddrOff)
sysfatal("unpackentry bad IEntryAddrOff amount");
ie->ia.addr = U64GET(p);
if(ie->ia.addr>>56) print("%.8H => %llux\n", p, ie->ia.addr);
p += U64Size;

View file

@ -75,23 +75,17 @@ enum
/*
* magic numbers on disk
*/
/* _ClumpMagic = 0xd15cb10cU, / * clump header, deprecated */
#define _ClumpMagic 0xd15cb10cU
_ClumpMagic = 0xd15cb10cU, /* clump header, deprecated */
ClumpFreeMagic = 0, /* free clump; terminates active clump log */
/* ArenaPartMagic = 0xa9e4a5e7U, / * arena partition header */
/* ArenaMagic = 0xf2a14eadU, / * arena trailer */
/* ArenaHeadMagic = 0xd15c4eadU, / * arena header */
#define ArenaPartMagic 0xa9e4a5e7U
#define ArenaMagic 0xf2a14eadU
#define ArenaHeadMagic 0xd15c4eadU
ArenaPartMagic = 0xa9e4a5e7U, /* arena partition header */
ArenaMagic = 0xf2a14eadU, /* arena trailer */
ArenaHeadMagic = 0xd15c4eadU, /* arena header */
/* BloomMagic = 0xb1004eadU, / * bloom filter header */
#define BloomMagic 0xb1004eadU
BloomMagic = 0xb1004eadU, /* bloom filter header */
BloomMaxHash = 32,
/* ISectMagic = 0xd15c5ec7U, / * index header */
#define ISectMagic 0xd15c5ec7U
ISectMagic = 0xd15c5ec7U, /* index header */
ArenaPartVersion = 3,
ArenaVersion4 = 4,
@ -120,6 +114,7 @@ enum
ArenaPartSize = 4 * U32Size,
ArenaSize4 = 2 * U64Size + 6 * U32Size + ANameSize + U8Size,
ArenaSize5 = ArenaSize4 + U32Size,
ArenaSize5a = ArenaSize5 + 2 * U8Size + 2 * U32Size + 2 * U64Size,
ArenaHeadSize4 = U64Size + 3 * U32Size + ANameSize,
ArenaHeadSize5 = ArenaHeadSize4 + U32Size,
BloomHeadSize = 4 * U32Size,
@ -137,10 +132,14 @@ enum
*/
IBucketSize = U32Size + U16Size,
IEntrySize = U64Size + U32Size + 2*U16Size + 2*U8Size + VtScoreSize,
IEntryTypeOff = VtScoreSize + U64Size + U32Size + 2 * U16Size,
IEntryTypeOff = VtScoreSize + U32Size + U16Size + U64Size + U16Size,
IEntryAddrOff = VtScoreSize + U32Size + U16Size,
MaxClumpBlocks = (VtMaxLumpSize + ClumpSize + (1 << ABlockLog) - 1) >> ABlockLog,
IcacheFrac = 1000000, /* denominator */
SleepForever = 1000000000, /* magic value for sleep time */
/*
* dirty flags - order controls disk write order
*/
@ -356,13 +355,11 @@ struct Arena
int blocksize; /* size of block to read or write */
u64int base; /* base address on disk */
u64int size; /* total space in the arena */
u64int limit; /* storage limit for clumps */
u8int score[VtScoreSize]; /* score of the entire sealed & summed arena */
int clumpmax; /* ClumpInfos per block */
AState mem;
int inqueue;
DigestState sha1;
/*
* fields stored on disk
@ -477,6 +474,8 @@ struct ISect
u32int tabsize; /* max. bytes in index config */
Channel *writechan;
Channel *writedonechan;
void *ig; /* used by buildindex only */
int ng;
/*
* fields stored on disk
@ -716,7 +715,18 @@ extern int writestodevnull; /* dangerous - for performance debugging */
extern int collectstats;
extern QLock memdrawlock;
extern int icachesleeptime;
extern int minicachesleeptime;
extern int arenasumsleeptime;
extern int manualscheduling;
extern int l0quantum;
extern int l1quantum;
extern int ignorebloom;
extern int icacheprefetch;
extern int syncwrites;
extern Stats *stathist;
extern int nstathist;
extern ulong stattime;
#ifndef PLAN9PORT
#pragma varargck type "V" uchar*

View file

@ -34,7 +34,7 @@ enum
{
HashLog = 9,
HashSize = 1<<HashLog,
HashMask = HashSize - 1
HashMask = HashSize - 1,
};
struct DCache
@ -212,8 +212,6 @@ return;
lastmiss.part = part;
lastmiss.addr = addr;
}
/* fprint(2, "%s %llx %s\n", part->name, addr, miss ? "miss" : "hit"); */
}
int
@ -230,6 +228,7 @@ rareadpart(Part *part, u64int addr, u8int *buf, uint n, int load)
}
if(load != 2 || addr >= part->size){ /* addr >= part->size: let readpart do the error */
runlock(&ralock);
diskaccess(0);
return readpart(part, addr, buf, n);
}
@ -239,6 +238,7 @@ fprint(2, "raread %s %llx\n", part->name, addr);
nn = dcache.ramax;
if(addr+nn > part->size)
nn = part->size - addr;
diskaccess(0);
if(readpart(part, addr, dcache.rabuf, nn) < 0){
wunlock(&ralock);
return -1;
@ -297,7 +297,6 @@ _getdblock(Part *part, u64int addr, int mode, int load)
/*
* look for the block in the cache
*/
/*checkdcache(); */
qlock(&dcache.lock);
again:
for(b = dcache.heads[h]; b != nil; b = b->next){
@ -367,7 +366,6 @@ found:
fixheap(b->heap, b);
qunlock(&dcache.lock);
/*checkdcache(); */
trace(TraceBlock, "getdblock lock");
addstat(StatDblockStall, 1);
@ -427,7 +425,6 @@ putdblock(DBlock *b)
else
wunlock(&b->lock);
/*checkdcache(); */
qlock(&dcache.lock);
if(--b->ref == 0 && !b->dirty){
if(b->heap == TWID32)
@ -435,7 +432,6 @@ putdblock(DBlock *b)
rwakeupall(&dcache.full);
}
qunlock(&dcache.lock);
/*checkdcache(); */
}
void
@ -474,6 +470,25 @@ dirtydblock(DBlock *b, int dirty)
qunlock(&dcache.lock);
}
static void
unchain(DBlock *b)
{
ulong h;
/*
* unchain the block
*/
if(b->prev == nil){
h = pbhash(b->addr);
if(dcache.heads[h] != b)
sysfatal("bad hash chains in disk cache");
dcache.heads[h] = b->next;
}else
b->prev->next = b->next;
if(b->next != nil)
b->next->prev = b->prev;
}
/*
* remove some block from use and update the free list and counters
*/
@ -481,7 +496,6 @@ static DBlock*
bumpdblock(void)
{
DBlock *b;
ulong h;
trace(TraceBlock, "bumpdblock enter");
b = dcache.free;
@ -512,22 +526,28 @@ bumpdblock(void)
trace(TraceBlock, "bumpdblock bumping %s 0x%llux", b->part->name, b->addr);
/*
* unchain the block
*/
if(b->prev == nil){
h = pbhash(b->addr);
if(dcache.heads[h] != b)
sysfatal("bad hash chains in disk cache");
dcache.heads[h] = b->next;
}else
b->prev->next = b->next;
if(b->next != nil)
b->next->prev = b->prev;
unchain(b);
return b;
}
void
emptydcache(void)
{
DBlock *b;
qlock(&dcache.lock);
while(dcache.nheap > 0){
b = dcache.heap[0];
delheap(b);
if(!b->ref && !b->dirty){
unchain(b);
b->next = dcache.free;
dcache.free = b;
}
}
qunlock(&dcache.lock);
}
/*
* delete an arbitrary block from the heap
*/
@ -683,6 +703,7 @@ static int
parallelwrites(DBlock **b, DBlock **eb, int dirty)
{
DBlock **p, **q;
for(p=b; p<eb && (*p)->dirty == dirty; p++){
assert(b<=p && p<eb);
sendp((*p)->part->writechan, *p);
@ -803,6 +824,7 @@ writeproc(void *v)
trace(TraceProc, "wlock %s 0x%llux", p->name, b->addr);
wlock(&b->lock);
trace(TraceProc, "writepart %s 0x%llux", p->name, b->addr);
diskaccess(0);
if(writepart(p, b->addr, b->data, b->size) < 0)
fprint(2, "write error: %r\n"); /* XXX details! */
addstat(StatApartWrite, 1);

View file

@ -0,0 +1,88 @@
#include "stdinc.h"
#include "dat.h"
#include "fns.h"
ulong lasttime[2];
int manualscheduling;
int l0quantum = 120;
int l1quantum = 120;
ulong lasticachechange;
void
disksched(void)
{
int p, nwrite, nflush, ndirty, tdirty, toflush;
ulong t;
vlong cflush;
Stats *prev;
/*
* no locks because all the data accesses are atomic.
*/
t = time(0);
if(manualscheduling){
lasticachechange = t;
return;
}
if(t-lasttime[0] < l0quantum){
/* level-0 disk access going on */
p = icachedirtyfrac();
if(p < IcacheFrac*5/10){ /* can wait */
icachesleeptime = SleepForever;
lasticachechange = t;
}else if(p > IcacheFrac*9/10){ /* can't wait */
icachesleeptime = 0;
lasticachechange = t;
}else if(t-lasticachechange > 60){
/* have minute worth of data for current rate */
prev = &stathist[(stattime-60+nstathist)%nstathist];
/* # entries written to index cache */
nwrite = stats.n[StatIcacheWrite] - prev->n[StatIcacheWrite];
/* # dirty entries in index cache */
ndirty = stats.n[StatIcacheDirty] - prev->n[StatIcacheDirty];
/* # entries flushed to disk */
nflush = nwrite - ndirty;
/* want to stay around 70% dirty */
tdirty = (vlong)stats.n[StatIcacheSize]*700/1000;
/* assume nflush*icachesleeptime is a constant */
cflush = (vlong)nflush*(icachesleeptime+1);
/* computer number entries to write in next minute */
toflush = nwrite + (stats.n[StatIcacheDirty] - tdirty);
/* schedule for that many */
if(toflush <= 0 || cflush/toflush > 100000)
icachesleeptime = SleepForever;
else
icachesleeptime = cflush/toflush;
}
arenasumsleeptime = SleepForever;
return;
}
if(t-lasttime[1] < l1quantum){
/* level-1 disk access (icache flush) going on */
icachesleeptime = 0;
arenasumsleeptime = SleepForever;
return;
}
/* no disk access going on - no holds barred*/
icachesleeptime = 0;
arenasumsleeptime = 0;
}
void
diskaccess(int level)
{
if(level < 0 || level >= nelem(lasttime)){
fprint(2, "bad level in diskaccess; caller=%lux\n", getcallerpc(&level));
return;
}
lasttime[level] = time(0);
}

View file

@ -27,7 +27,7 @@ findscore(Arena *arena, uchar *score)
u32int clump;
int i, n, found;
/*ZZZ remove fprint? */
//ZZZ remove fprint?
if(arena->memstats.clumps)
fprint(2, "reading directory for arena=%s with %d entries\n", arena->name, arena->memstats.clumps);

File diff suppressed because it is too large Load diff

View file

@ -24,8 +24,13 @@ void delaykickicache(void);
void delaykickround(Round*);
void delaykickroundproc(void*);
void dirtydblock(DBlock*, int);
void diskaccess(int);
void disksched(void);
AState diskstate(void);
void *emalloc(ulong);
void emptydcache(void);
void emptyicache(void);
void emptylumpcache(void);
void *erealloc(void *, ulong);
char *estrdup(char*);
void *ezmalloc(ulong);
@ -49,6 +54,7 @@ u32int hashbits(u8int *score, int nbits);
int httpdinit(char *address, char *webroot);
int iaddrcmp(IAddr *ia1, IAddr *ia2);
IEntry* icachedirty(u32int, u32int, u64int);
ulong icachedirtyfrac(void);
void icacheclean(IEntry*);
int ientrycmp(const void *vie1, const void *vie2);
char *ifileline(IFile *f);
@ -77,6 +83,7 @@ int insertscore(u8int *score, IAddr *ia, int write);
void kickdcache(void);
void kickicache(void);
void kickround(Round*, int wait);
int loadbloom(Bloom*);
ZBlock *loadclump(Arena *arena, u64int aa, int blocks, Clump *cl, u8int *score, int verify);
DBlock *loadibucket(Index *index, u8int *score, ISect **is, u32int *buck, IBucket *ib);
int loadientry(Index *index, u8int *score, int type, IEntry *ie);
@ -98,6 +105,7 @@ int okamap(AMap *am, int n, u64int start, u64int stop, char *what);
int okibucket(IBucket*, ISect*);
int outputamap(Fmt *f, AMap *am, int n);
int outputindex(Fmt *f, Index *ix);
int _packarena(Arena *arena, u8int *buf, int);
int packarena(Arena *arena, u8int *buf);
int packarenahead(ArenaHead *head, u8int *buf);
int packarenapart(ArenaPart *as, u8int *buf);
@ -129,6 +137,7 @@ ZBlock *readfile(char *name);
int readifile(IFile *f, char *name);
Packet *readlump(u8int *score, int type, u32int size, int *cached);
int readpart(Part *part, u64int addr, u8int *buf, u32int n);
int resetbloom(Bloom*);
int runconfig(char *config, Config*);
int scorecmp(u8int *, u8int *);
void scoremem(u8int *score, u8int *buf, int size);

View file

@ -55,7 +55,11 @@ ginit(void)
first = 0;
memimageinit();
#ifdef PLAN9PORT
smallfont = openmemsubfont(unsharp("#9/font/lucsans/lstr.10"));
#else
smallfont = openmemsubfont("/lib/font/bit/lucidasans/lstr.10");
#endif
black = memblack;
blue = allocrepl(DBlue);
red = allocrepl(DRed);
@ -121,7 +125,7 @@ statgraph(Graph *g)
if(g->wid > nelem(bin))
g->wid = nelem(bin);
if(g->fill < 0)
g->fill = ((uint)(uintptr)g->arg>>8)%nelem(lofill);
g->fill = ((uint)g->arg>>8)%nelem(lofill);
if(g->fill > nelem(lofill))
g->fill %= nelem(lofill);
@ -151,7 +155,7 @@ statgraph(Graph *g)
qlock(&memdrawlock);
ginit();
if(smallfont==nil || black==nil || blue==nil || red==nil || hifill==nil || lofill==nil){
werrstr("graphics initialization failed");
werrstr("graphics initialization failed: %r");
qunlock(&memdrawlock);
return nil;
}
@ -186,12 +190,12 @@ statgraph(Graph *g)
if(0)
if(lastlo != -1){
if(lastlo < lo)
memimagedraw(m, Rect(x-1, lastlo, x, lo), hifill[g->fill], ZP, memopaque, ZP, S);
memimagedraw(m, Rect(x-1, lastlo, x, lo), hifill[g->fill%nelem(hifill)], ZP, memopaque, ZP, S);
else if(lastlo > lo)
memimagedraw(m, Rect(x-1, lo, x, lastlo), hifill[g->fill], ZP, memopaque, ZP, S);
memimagedraw(m, Rect(x-1, lo, x, lastlo), hifill[g->fill%nelem(hifill)], ZP, memopaque, ZP, S);
}
memimagedraw(m, Rect(x, hi, x+1,lo), hifill[g->fill], ZP, memopaque, ZP, S);
memimagedraw(m, Rect(x, lo, x+1, r.max.y), lofill[g->fill], ZP, memopaque, ZP, S);
memimagedraw(m, Rect(x, hi, x+1,lo), hifill[g->fill%nelem(hifill)], ZP, memopaque, ZP, S);
memimagedraw(m, Rect(x, lo, x+1, r.max.y), lofill[g->fill%nelem(lofill)], ZP, memopaque, ZP, S);
lastlo = lo;
}

View file

@ -9,7 +9,7 @@ extern QLock memdrawlock;
enum
{
ObjNameSize = 64,
MaxObjs = 16
MaxObjs = 64
};
struct HttpObj
@ -28,6 +28,12 @@ static int dindex(HConnect *c);
static int xindex(HConnect *c);
static int xlog(HConnect *c);
static int sindex(HConnect *c);
static int hempty(HConnect *c);
static int hlcacheempty(HConnect *c);
static int hdcacheempty(HConnect *c);
static int hicacheempty(HConnect *c);
static int hicachekick(HConnect *c);
static int hdcachekick(HConnect *c);
static int hicacheflush(HConnect *c);
static int hdcacheflush(HConnect *c);
static int notfound(HConnect *c);
@ -53,10 +59,17 @@ httpdinit(char *address, char *dir)
httpdobj("/xindex", xindex);
httpdobj("/flushicache", hicacheflush);
httpdobj("/flushdcache", hdcacheflush);
httpdobj("/kickicache", hicachekick);
httpdobj("/kickdcache", hdcachekick);
httpdobj("/graph/", xgraph);
httpdobj("/set", xset);
httpdobj("/set/", xset);
httpdobj("/log", xlog);
httpdobj("/log/", xlog);
httpdobj("/empty", hempty);
httpdobj("/emptyicache", hicacheempty);
httpdobj("/emptylumpcache", hlcacheempty);
httpdobj("/emptydcache", hdcacheempty);
if(vtproc(listenproc, address) < 0)
return -1;
@ -105,8 +118,6 @@ listenproc(void *vaddress)
char *address, ndir[NETPATHLEN], dir[NETPATHLEN];
int ctl, nctl, data;
/*sleep(1000); // let strace find us */
address = vaddress;
ctl = announce(address, dir);
if(ctl < 0){
@ -148,7 +159,6 @@ httpproc(void *v)
HConnect *c;
int ok, i, n;
/*sleep(1000); // let strace find us */
c = v;
for(;;){
@ -182,7 +192,7 @@ httpproc(void *v)
}
static int
percent(long v, long total)
percent(ulong v, ulong total)
{
if(total == 0)
total = 1;
@ -239,6 +249,31 @@ preqtext(HConnect *c)
return preqtype(c, "text/plain");
}
static int
herror(HConnect *c)
{
int n;
Hio *hout;
hout = &c->hout;
n = snprint(c->xferbuf, HBufSize, "<html><head><title>Error</title></head>\n<body><h1>Error</h1>\n<pre>%r</pre>\n</body></html>");
hprint(hout, "%s %s\r\n", hversion, "400 Bad Request");
hprint(hout, "Date: %D\r\n", time(nil));
hprint(hout, "Server: Venti\r\n");
hprint(hout, "Content-Type: text/html\r\n");
hprint(hout, "Content-Length: %d\r\n", n);
if(c->head.closeit)
hprint(hout, "Connection: close\r\n");
else if(!http11(c))
hprint(hout, "Connection: Keep-Alive\r\n");
hprint(hout, "\r\n");
if(c->req.meth == nil || strcmp(c->req.meth, "HEAD") != 0)
hwrite(hout, c->xferbuf, n);
return hflush(hout);
}
static int
notfound(HConnect *c)
{
@ -325,21 +360,53 @@ static struct
"logging", &ventilogging,
"stats", &collectstats,
"icachesleeptime", &icachesleeptime,
"minicachesleeptime", &minicachesleeptime,
"arenasumsleeptime", &arenasumsleeptime,
"l0quantum", &l0quantum,
"l1quantum", &l1quantum,
"manualscheduling", &manualscheduling,
"ignorebloom", &ignorebloom,
"syncwrites", &syncwrites,
"icacheprefetch", &icacheprefetch,
0
};
static int
xsetlist(HConnect *c)
{
int i;
if(preqtype(c, "text/plain") < 0)
return -1;
for(i=0; namedints[i].name; i++)
print("%s = %d\n", namedints[i].name, *namedints[i].p);
hflush(&c->hout);
return 0;
}
static int
xset(HConnect *c)
{
int i, nf, r;
char *f[10], *s;
if(strcmp(c->req.uri, "/set") == 0 || strcmp(c->req.uri, "/set/") == 0)
return xsetlist(c);
s = estrdup(c->req.uri);
nf = getfields(s+strlen("/set/"), f, nelem(f), 1, "/");
if(nf < 1)
return notfound(c);
if(nf < 1){
r = preqtext(c);
if(r < 0)
return r;
for(i=0; namedints[i].name; i++)
hprint(&c->hout, "%s = %d\n", namedints[i].name, *namedints[i].p);
hflush(&c->hout);
return 0;
}
for(i=0; namedints[i].name; i++){
if(strcmp(f[0], namedints[i].name) == 0){
if(nf >= 2)
@ -494,6 +561,108 @@ darena(Hio *hout, Arena *arena)
arena->diskstats.used + arena->diskstats.clumps * ClumpInfoSize);
}
static int
hempty(HConnect *c)
{
Hio *hout;
int r;
r = preqtext(c);
if(r < 0)
return r;
hout = &c->hout;
emptylumpcache();
emptydcache();
emptyicache();
hprint(hout, "emptied all caches\n");
hflush(hout);
return 0;
}
static int
hlcacheempty(HConnect *c)
{
Hio *hout;
int r;
r = preqtext(c);
if(r < 0)
return r;
hout = &c->hout;
emptylumpcache();
hprint(hout, "emptied lumpcache\n");
hflush(hout);
return 0;
}
static int
hicacheempty(HConnect *c)
{
Hio *hout;
int r;
r = preqtext(c);
if(r < 0)
return r;
hout = &c->hout;
emptyicache();
hprint(hout, "emptied icache\n");
hflush(hout);
return 0;
}
static int
hdcacheempty(HConnect *c)
{
Hio *hout;
int r;
r = preqtext(c);
if(r < 0)
return r;
hout = &c->hout;
emptydcache();
hprint(hout, "emptied dcache\n");
hflush(hout);
return 0;
}
static int
hicachekick(HConnect *c)
{
Hio *hout;
int r;
r = preqtext(c);
if(r < 0)
return r;
hout = &c->hout;
kickicache();
hprint(hout, "kicked icache\n");
hflush(hout);
return 0;
}
static int
hdcachekick(HConnect *c)
{
Hio *hout;
int r;
r = preqtext(c);
if(r < 0)
return r;
hout = &c->hout;
kickdcache();
hprint(hout, "kicked dcache\n");
hflush(hout);
return 0;
}
static int
hicacheflush(HConnect *c)
{
@ -569,6 +738,7 @@ rawgraph(Stats *s, Stats *t, void *va)
{
Arg *a;
USED(s);
a = va;
return t->n[a->index];
}
@ -587,6 +757,7 @@ pctgraph(Stats *s, Stats *t, void *va)
{
Arg *a;
USED(s);
a = va;
return percent(t->n[a->index], t->n[a->index2]);
}
@ -722,7 +893,7 @@ static char* graphname[] =
"isectwritebyte",
"sumread",
"sumreadbyte"
"sumreadbyte",
};
static int
@ -733,7 +904,6 @@ findname(char *s)
for(i=0; i<nelem(graphname); i++)
if(strcmp(graphname[i], s) == 0)
return i;
fprint(2, "no name '%s'\n", s);
return -1;
}
@ -769,10 +939,14 @@ xgraph(HConnect *c)
if(0) fprint(2, "graph %s\n" ,s);
memset(&g, 0, sizeof g);
nf = getfields(s+strlen("/graph/"), f, nelem(f), 1, "/");
if(nf < 1)
goto notfound;
if((arg.index = findname(f[0])) == -1 && strcmp(f[0], "*") != 0)
goto notfound;
if(nf < 1){
werrstr("bad syntax -- not enough fields");
goto error;
}
if((arg.index = findname(f[0])) == -1 && strcmp(f[0], "*") != 0){
werrstr("unknown name %s", f[0]);
goto error;
}
g.arg = &arg;
g.t0 = -120;
g.t1 = 0;
@ -793,14 +967,18 @@ if(0) fprint(2, "graph %s\n" ,s);
else if(strncmp(f[i], "max=", 4) == 0)
g.max = atoi(f[i]+4);
else if(strncmp(f[i], "pct=", 4) == 0){
if((arg.index2 = findname(f[i]+4)) == -1)
goto notfound;
if((arg.index2 = findname(f[i]+4)) == -1){
werrstr("unknown name %s", f[i]+4);
goto error;
}
g.fn = pctgraph;
g.min = 0;
g.max = 100;
}else if(strncmp(f[i], "pctdiff=", 8) == 0){
if((arg.index2 = findname(f[i]+8)) == -1)
goto notfound;
if((arg.index2 = findname(f[i]+8)) == -1){
werrstr("unknown name %s", f[i]+8);
goto error;
}
g.fn = pctdiffgraph;
g.min = 0;
g.max = 100;
@ -830,7 +1008,7 @@ if(0) fprint(2, "graph %s\n" ,s);
m = statgraph(&g);
if(m == nil)
goto notfound;
goto error;
if(preqtype(c, "image/png") < 0)
return -1;
@ -843,9 +1021,9 @@ if(0) fprint(2, "graph %s\n" ,s);
free(s);
return 0;
notfound:
error:
free(s);
return notfound(c);
return herror(c);
}
static int
@ -944,7 +1122,6 @@ vtloghdump(Hio *h, VtLog *l)
name = l ? l->name : "&lt;nil&gt;";
fprint(2, "hdump xfer %d\n", h->xferenc);
hprint(h, "<html><head>\n");
hprint(h, "<title>Venti Server Log: %s</title>\n", name);
hprint(h, "</head><body>\n");

View file

@ -11,6 +11,7 @@ struct ICache
int bits; /* bits to use for indexing heads */
u32int size; /* number of heads; == 1 << bits, should be < entries */
IEntry *base; /* all allocated hash table entries */
IEntry *free;
u32int entries; /* elements in base */
IEntry *dirty; /* chain of dirty elements */
u32int ndirty;
@ -23,6 +24,8 @@ struct ICache
int nlast;
};
int icacheprefetch = 1;
static ICache icache;
static IEntry *icachealloc(IAddr *ia, u8int *score);
@ -45,6 +48,12 @@ initicache(int bits, int depth)
setstat(StatIcacheSize, icache.entries);
}
ulong
icachedirtyfrac(void)
{
return (vlong)icache.ndirty*IcacheFrac / icache.entries;
}
u32int
hashbits(u8int *sc, int bits)
{
@ -141,6 +150,7 @@ lookupscore(u8int *score, int type, IAddr *ia, int *rac)
* load the table of contents for that arena into the cache.
*/
ie = icachealloc(&d.ia, score);
if(icacheprefetch){
icache.last[icache.nlast++%nelem(icache.last)] = amapitoa(mainindex, ie->ia.addr, &aa);
aa = ie->ia.addr - aa; /* compute base addr of arena */
for(i=0; i<nelem(icache.last); i++)
@ -150,6 +160,7 @@ lookupscore(u8int *score, int type, IAddr *ia, int *rac)
load = icache.last[0];
icache.lastload = load;
}
}
found:
ie->next = icache.heads[h];
@ -250,6 +261,11 @@ icachealloc(IAddr *ia, u8int *score)
goto Found;
}
if((ie = icache.free) != nil){
icache.free = ie->next;
goto Found;
}
h = icache.stolen;
for(i=0;; i++){
h++;
@ -346,3 +362,21 @@ icacheclean(IEntry *ie)
trace(TraceProc, "icachedirty exit");
}
void
emptyicache(void)
{
int i;
IEntry *ie, **lie;
qlock(&icache.lock);
for(i=0; i<icache.size; i++)
for(lie=&icache.heads[i]; (ie=*lie); ){
if(ie->dirty == 0){
*lie = ie->next;
ie->next = icache.free;
icache.free = ie;
}else
lie = &ie->next;
}
qunlock(&icache.lock);
}

View file

@ -12,6 +12,7 @@ static void icachewritecoord(void*);
static IEntry *iesort(IEntry*);
int icachesleeptime = 1000; /* milliseconds */
int minicachesleeptime = 50;
enum
{
@ -74,7 +75,7 @@ nextchunk(Index *ix, ISect *is, IEntry **pie, u64int *paddr, uint *pnbuf)
static int
icachewritesect(Index *ix, ISect *is, u8int *buf)
{
int err, h, bsize;
int err, h, bsize, t;
u32int lo, hi;
u64int addr, naddr;
uint nbuf, off;
@ -96,7 +97,14 @@ icachewritesect(Index *ix, ISect *is, u8int *buf)
err = 0;
while(iedirty){
sleep(icachesleeptime);
disksched();
while((t=icachesleeptime) == SleepForever){
sleep(1000);
disksched();
}
if(t < minicachesleeptime)
t = minicachesleeptime;
sleep(t);
trace(TraceProc, "icachewritesect nextchunk");
chunk = nextchunk(ix, is, &iedirty, &addr, &nbuf);
@ -146,12 +154,15 @@ icachewritesect(Index *ix, ISect *is, u8int *buf)
break;
}
packibucket(&ib, buf+off, is->bucketmagic);
/* XXX not right - must update cache after writepart */
if((b = _getdblock(is->part, naddr, ORDWR, 0)) != nil){
memmove(b->data, buf+off, bsize);
putdblock(b);
}
}
diskaccess(1);
trace(TraceProc, "icachewritesect writepart", addr, nbuf);
if(writepart(is->part, addr, buf, nbuf) < 0){
/* XXX */
@ -171,6 +182,7 @@ icachewritesect(Index *ix, ISect *is, u8int *buf)
static void
icachewriteproc(void *v)
{
int ret;
uint bsize;
ISect *is;
Index *ix;
@ -188,17 +200,17 @@ icachewriteproc(void *v)
trace(TraceProc, "icachewriteproc recv");
recv(is->writechan, 0);
trace(TraceWork, "start");
icachewritesect(ix, is, buf);
ret = icachewritesect(ix, is, buf);
trace(TraceProc, "icachewriteproc send");
trace(TraceWork, "finish");
send(is->writedonechan, 0);
sendul(is->writedonechan, ret);
}
}
static void
icachewritecoord(void *v)
{
int i;
int i, err;
Index *ix;
AState as;
@ -216,9 +228,9 @@ icachewritecoord(void *v)
as = diskstate();
if(as.arena==iwrite.as.arena && as.aa==iwrite.as.aa){
/* will not be able to do anything more than last flush - kick disk */
trace(TraceProc, "icachewritecoord flush dcache");
trace(TraceProc, "icachewritecoord kick dcache");
kickdcache();
trace(TraceProc, "icachewritecoord flushed dcache");
trace(TraceProc, "icachewritecoord kicked dcache");
}
iwrite.as = as;
@ -229,12 +241,14 @@ icachewritecoord(void *v)
if(ix->bloom)
send(ix->bloom->writechan, 0);
err = 0;
for(i=0; i<ix->nsects; i++)
recv(ix->sects[i]->writedonechan, 0);
err |= recvul(ix->sects[i]->writedonechan);
if(ix->bloom)
recv(ix->bloom->writedonechan, 0);
err |= recvul(ix->bloom->writedonechan);
trace(TraceProc, "icachewritecoord donewrite");
trace(TraceProc, "icachewritecoord donewrite err=%d", err);
if(err == 0)
setatailstate(&iwrite.as);
}
icacheclean(nil); /* wake up anyone waiting */

View file

@ -23,17 +23,11 @@
#include "dat.h"
#include "fns.h"
/*static int bucklook(u8int *score, int type, u8int *data, int n); */
/*static int writebucket(ISect *is, u32int buck, IBucket *ib, DBlock *b); */
/*static int okibucket(IBucket *ib, ISect *is); */
static int initindex1(Index*);
static ISect *initisect1(ISect *is);
/*static int splitiblock(Index *ix, DBlock *b, ISect *is, u32int buck, IBucket *ib); */
#define KEY(k,d) ((d) ? (k)>>(32-(d)) : 0)
/*static QLock indexlock; //ZZZ */
static char IndexMagic[] = "venti index configuration";
Index*
@ -375,6 +369,8 @@ initisect(Part *part)
seterr(EAdmin, "can't read index section header: %r");
return nil;
}
print("read %s at %d: %.2ux %.2ux %.2ux %.2ux\n",
part->name, PartBlank, b->data[0], b->data[1], b->data[2], b->data[3]);
is = MKZ(ISect);
if(is == nil){
@ -457,9 +453,10 @@ initisect1(ISect *is)
v = is->part->size & ~(u64int)(is->blocksize - 1);
if(is->blockbase + (u64int)is->blocks * is->blocksize != v){
seterr(ECorrupt, "invalid blocks in index section %s", is->name);
/*ZZZZZZZZZ */
/* freeisect(is); */
/* return nil; */
/* ZZZ what to do?
freeisect(is);
return nil;
*/
}
if(is->stop - is->start > is->blocks){
@ -482,9 +479,10 @@ wbisect(ISect *is)
ZBlock *b;
b = alloczblock(HeadSize, 1, 0);
if(b == nil)
/*ZZZ set error? */
if(b == nil){
/* ZZZ set error? */
return -1;
}
if(packisect(is, b->data) < 0){
seterr(ECorrupt, "can't make index section header: %r");
@ -789,7 +787,7 @@ loadibucket0(Index *ix, u32int buck, ISect **pis, u32int *pbuck, IBucket *ib, in
/*
* find the number of the index section holding score
*/
static int
int
indexsect1(Index *ix, u8int *score)
{
return indexsect0(ix, hashbits(score, 32) / ix->div);

View file

@ -2,6 +2,7 @@
#include "dat.h"
#include "fns.h"
int syncwrites = 0;
int queuewrites = 0;
int writestodevnull = 0;
@ -45,7 +46,7 @@ readlump(u8int *score, int type, u32int size, int *cached)
*cached = 0;
if(lookupscore(score, type, &ia, &rac) < 0){
/*ZZZ place to check for someone trying to guess scores */
/* ZZZ place to check for someone trying to guess scores */
seterr(EOk, "no block with score %V/%d exists", score, type);
putlump(u);
@ -92,7 +93,15 @@ writelump(Packet *p, u8int *score, int type, u32int creator, uint ms)
if(u->data != nil){
ok = 0;
if(packetcmp(p, u->data) != 0){
seterr(EStrange, "score collision");
uchar nscore[VtScoreSize];
packetsha1(u->data, nscore);
if(scorecmp(u->score, score) != 0)
seterr(EStrange, "lookuplump returned bad score %V not %V", u->score, score);
else if(scorecmp(u->score, nscore) != 0)
seterr(EStrange, "lookuplump returned bad data %V not %V", nscore, u->score);
else
seterr(EStrange, "score collision %V", score);
ok = -1;
}
packetfree(p);
@ -138,7 +147,13 @@ writeqlump(Lump *u, Packet *p, int creator, uint ms)
if(old != nil){
ok = 0;
if(packetcmp(p, old) != 0){
seterr(EStrange, "score collision");
uchar nscore[VtScoreSize];
packetsha1(old, nscore);
if(scorecmp(u->score, nscore) != 0)
seterr(EStrange, "readilump returned bad data %V not %V", nscore, u->score);
else
seterr(EStrange, "score collision %V", u->score);
ok = -1;
}
packetfree(p);
@ -161,6 +176,12 @@ writeqlump(Lump *u, Packet *p, int creator, uint ms)
else
packetfree(p);
if(syncwrites){
flushdcache();
flushicache();
flushdcache();
}
ms = msec() - ms;
addstat2(StatRpcWriteNew, 1, StatRpcWriteNewTime, ms);
return ok;

View file

@ -11,7 +11,7 @@ enum
{
HashLog = 9,
HashSize = 1<<HashLog,
HashMask = HashSize - 1
HashMask = HashSize - 1,
};
struct LumpCache
@ -175,7 +175,6 @@ again:
* remove it from the heap, and fix up the heap.
*/
size = packetasize(p);
/*ZZZ */
while(lumpcache.avail < size){
trace(TraceLump, "insertlump bump");
CHECK(checklumpcache());
@ -277,6 +276,15 @@ bumplump(void)
return b;
}
void
emptylumpcache(void)
{
qlock(&lumpcache.lock);
while(bumplump())
;
qunlock(&lumpcache.lock);
}
/*
* delete an arbitrary block from the heap
*/
@ -415,3 +423,4 @@ checklumpcache(void)
if(lumpcache.nheap + nfree + refed != lumpcache.nblocks)
sysfatal("lc: missing blocks: %d %d %d %d", lumpcache.nheap, refed, nfree, lumpcache.nblocks);
}

View file

@ -58,22 +58,6 @@ initlumpqueues(int nq)
seterr(EOk, "can't start write queue slave: %r");
return -1;
}
if(vtproc(queueproc, q) < 0){
seterr(EOk, "can't start write queue slave: %r");
return -1;
}
if(vtproc(queueproc, q) < 0){
seterr(EOk, "can't start write queue slave: %r");
return -1;
}
if(vtproc(queueproc, q) < 0){
seterr(EOk, "can't start write queue slave: %r");
return -1;
}
if(vtproc(queueproc, q) < 0){
seterr(EOk, "can't start write queue slave: %r");
return -1;
}
}
return 0;

View file

@ -0,0 +1,464 @@
/*
* Mirror one arena partition onto another.
* Be careful to copy only new data.
*/
#include "stdinc.h"
#include "dat.h"
#include "fns.h"
Channel *writechan;
typedef struct Write Write;
struct Write
{
uchar *p;
int n;
uvlong o;
int error;
};
Part *src;
Part *dst;
int force;
int verbose;
char *status;
uvlong astart, aend;
void
usage(void)
{
fprint(2, "usage: mirrorarenas [-v] src dst [ranges]\n");
threadexitsall("usage");
}
int
ereadpart(Part *p, u64int offset, u8int *buf, u32int count)
{
if(readpart(p, offset, buf, count) != count){
print("%T readpart %s at %#llux+%ud: %r\n", p->name, offset, count);
return -1;
}
return 0;
}
int
ewritepart(Part *p, u64int offset, u8int *buf, u32int count)
{
if(writepart(p, offset, buf, count) != count){
print("%T writepart %s at %#llux+%ud: %r\n", p->name, offset, count);
return -1;
}
return 0;
}
/*
* Extra proc to do writes to dst, so that we can overlap reading
* src with writing dst during copy. This is an easy factor of two
* (almost) in performance.
*/
static void
writeproc(void *v)
{
Write *w;
USED(v);
while((w = recvp(writechan)) != nil){
if(w->n == 0)
continue;
if(ewritepart(dst, w->o, w->p, w->n) < 0)
w->error = 1;
}
}
int
copy(uvlong start, uvlong end, char *what, DigestState *ds)
{
int i, n;
uvlong o;
static uchar tmp[2][1024*1024];
Write w[2];
assert(start <= end);
assert(astart <= start && start < aend);
assert(astart <= end && end <= aend);
if(verbose && start != end)
print("%T copy %,llud-%,llud %s\n", start, end, what);
i = 0;
memset(w, 0, sizeof w);
for(o=start; o<end; o+=n){
if(w[i].error)
goto error;
n = sizeof tmp[i];
if(o+n > end)
n = end - o;
if(ereadpart(src, o, tmp[i], n) < 0)
goto error;
w[i].p = tmp[i];
w[i].o = o;
w[i].n = n;
w[i].error = 0;
sendp(writechan, &w[i]);
if(ds)
sha1(tmp[i], n, nil, ds);
i = 1-i;
}
if(w[i].error)
goto error;
/*
* wait for queued write to finish
*/
w[i].p = nil;
w[i].o = 0;
w[i].n = 0;
w[i].error = 0;
sendp(writechan, &w[i]);
i = 1-i;
if(w[i].error)
return -1;
return 0;
error:
/*
* sync with write proc
*/
w[i].p = nil;
w[i].o = 0;
w[i].n = 0;
w[i].error = 0;
sendp(writechan, &w[i]);
return -1;
}
/* single-threaded, for reference */
int
copy1(uvlong start, uvlong end, char *what, DigestState *ds)
{
int n;
uvlong o;
static uchar tmp[1024*1024];
assert(start <= end);
assert(astart <= start && start < aend);
assert(astart <= end && end <= aend);
if(verbose && start != end)
print("%T copy %,llud-%,llud %s\n", start, end, what);
for(o=start; o<end; o+=n){
n = sizeof tmp;
if(o+n > end)
n = end - o;
if(ereadpart(src, o, tmp, n) < 0)
return -1;
if(ds)
sha1(tmp, n, nil, ds);
if(ewritepart(dst, o, tmp, n) < 0)
return -1;
}
return 0;
}
int
asha1(Part *p, uvlong start, uvlong end, DigestState *ds)
{
int n;
uvlong o;
static uchar tmp[1024*1024];
if(start == end)
return 0;
assert(start < end);
if(verbose)
print("%T sha1 %,llud-%,llud\n", start, end);
for(o=start; o<end; o+=n){
n = sizeof tmp;
if(o+n > end)
n = end - o;
if(ereadpart(p, o, tmp, n) < 0)
return -1;
sha1(tmp, n, nil, ds);
}
return 0;
}
uvlong
rdown(uvlong a, int b)
{
return a-a%b;
}
uvlong
rup(uvlong a, int b)
{
if(a%b == 0)
return a;
return a+b-a%b;
}
void
mirror(Arena *sa, Arena *da)
{
vlong v, si, di, end;
int clumpmax, blocksize;
static uchar buf[MaxIoSize];
ArenaHead h;
DigestState xds, *ds;
vlong shaoff, base;
base = sa->base;
blocksize = sa->blocksize;
end = sa->base + sa->size;
astart = base - blocksize;
aend = end + blocksize;
shaoff = 0;
if(force){
copy(astart, aend, "all", nil);
return;
}
if(verbose)
print("%T %s (%,llud-%,llud)\n", sa->name, astart, aend);
if(sa->diskstats.sealed && da->diskstats.sealed && scorecmp(da->score, zeroscore) != 0){
if(scorecmp(sa->score, da->score) == 0)
return;
print("%T arena %s: sealed score mismatch %V vs %V\n", sa->name, sa->score, da->score);
status = "errors";
return;
}
if(da->diskstats.sealed && scorecmp(da->score, zeroscore) != 0){
print("%T arena %s: dst is sealed, src is not\n", sa->name);
status = "errors";
return;
}
if(sa->diskstats.used < da->diskstats.used){
print("%T arena %s: src used %,lld < dst used %,lld\n", sa->name, sa->diskstats.used, da->diskstats.used);
status = "errors";
return;
}
if(da->clumpmagic != sa->clumpmagic){
/*
* Write this now to reduce the window in which
* the head and tail disagree about clumpmagic.
*/
da->clumpmagic = sa->clumpmagic;
memset(buf, 0, sizeof buf);
packarena(da, buf);
if(ewritepart(dst, end, buf, blocksize) < 0)
return;
}
memset(&h, 0, sizeof h);
h.version = da->version;
strcpy(h.name, da->name);
h.blocksize = da->blocksize;
h.size = da->size + 2*da->blocksize;
h.clumpmagic = da->clumpmagic;
memset(buf, 0, sizeof buf);
packarenahead(&h, buf);
if(ewritepart(dst, base - blocksize, buf, blocksize) < 0)
return;
ds = nil;
if(sa->diskstats.sealed && scorecmp(sa->score, zeroscore) != 0){
/* start sha1 state with header */
memset(&xds, 0, sizeof xds);
ds = &xds;
sha1(buf, blocksize, nil, ds);
shaoff = base;
}
if(sa->diskstats.used != da->diskstats.used){
di = base+rdown(da->diskstats.used, blocksize);
si = base+rup(sa->diskstats.used, blocksize);
if(ds && asha1(dst, shaoff, di, ds) < 0)
return;
if(copy(di, si, "data", ds) < 0)
return;
shaoff = si;
}
clumpmax = sa->clumpmax;
di = end - da->diskstats.clumps/clumpmax * blocksize;
si = end - (sa->diskstats.clumps+clumpmax-1)/clumpmax * blocksize;
if(sa->diskstats.sealed){
/*
* might be a small hole between the end of the
* data and the beginning of the directory.
*/
v = base+rup(sa->diskstats.used, blocksize);
if(ds && asha1(dst, shaoff, v, ds) < 0)
return;
if(copy(v, si, "hole", ds) < 0)
return;
shaoff = si;
}
if(da->diskstats.clumps != sa->diskstats.clumps){
if(ds && asha1(dst, shaoff, si, ds) < 0)
return;
if(copy(si, di, "directory", ds) < 0) /* si < di because clumpinfo blocks grow down */
return;
shaoff = di;
}
da->ctime = sa->ctime;
da->wtime = sa->wtime;
da->diskstats = sa->diskstats;
da->diskstats.sealed = 0;
memset(buf, 0, sizeof buf);
packarena(da, buf);
if(ewritepart(dst, end, buf, blocksize) < 0)
return;
if(ds){
asha1(dst, shaoff, end, ds);
da->diskstats.sealed = 1;
memset(buf, 0, sizeof buf);
packarena(da, buf);
sha1(buf, blocksize, da->score, ds);
if(scorecmp(sa->score, da->score) == 0){
if(verbose)
print("%T arena %s: %V\n", sa->name, da->score);
scorecp(buf+blocksize-VtScoreSize, da->score);
if(ewritepart(dst, end, buf, blocksize) < 0)
return;
}else{
print("%T arena %s: sealing dst: score mismatch: %V vs %V\n", sa->name, sa->score, da->score);
memset(&xds, 0, sizeof xds);
asha1(dst, base-blocksize, end, &xds);
sha1(buf, blocksize, da->score, &xds);
print("%T reseal: %V\n", da->score);
status = "errors";
}
}
}
void
mirrormany(ArenaPart *sp, ArenaPart *dp, char *range)
{
int i, lo, hi;
char *s, *t;
Arena *sa, *da;
if(range == nil){
for(i=0; i<sp->narenas; i++){
sa = sp->arenas[i];
da = dp->arenas[i];
mirror(sa, da);
}
return;
}
if(strcmp(range, "none") == 0)
return;
for(s=range; *s; s=t){
t = strchr(s, ',');
if(t)
*t++ = 0;
else
t = s+strlen(s);
if(*s == '-')
lo = 0;
else
lo = strtol(s, &s, 0);
hi = lo;
if(*s == '-'){
s++;
if(*s == 0)
hi = sp->narenas-1;
else
hi = strtol(s, &s, 0);
}
if(*s != 0){
print("%T bad arena range: %s\n", s);
continue;
}
for(i=lo; i<=hi; i++){
sa = sp->arenas[i];
da = dp->arenas[i];
mirror(sa, da);
}
}
}
void
threadmain(int argc, char **argv)
{
int i;
Arena *sa, *da;
ArenaPart *s, *d;
char *ranges;
ventifmtinstall();
ARGBEGIN{
case 'F':
force = 1;
break;
case 'v':
verbose++;
break;
default:
usage();
}ARGEND
if(argc != 2 && argc != 3)
usage();
ranges = nil;
if(argc == 3)
ranges = argv[2];
if((src = initpart(argv[0], OREAD)) == nil)
sysfatal("initpart %s: %r", argv[0]);
if((dst = initpart(argv[1], ORDWR)) == nil)
sysfatal("initpart %s: %r", argv[1]);
if((s = initarenapart(src)) == nil)
sysfatal("initarenapart %s: %r", argv[0]);
for(i=0; i<s->narenas; i++)
delarena(s->arenas[i]);
if((d = initarenapart(dst)) == nil)
sysfatal("loadarenapart %s: %r", argv[1]);
for(i=0; i<d->narenas; i++)
delarena(d->arenas[i]);
/*
* The arena geometries must match or all bets are off.
*/
if(s->narenas != d->narenas)
sysfatal("arena count mismatch: %d vs %d", s->narenas, d->narenas);
for(i=0; i<s->narenas; i++){
sa = s->arenas[i];
da = d->arenas[i];
if(sa->version != da->version)
sysfatal("arena %d: version mismatch: %d vs %d", i, sa->version, da->version);
if(sa->blocksize != da->blocksize)
sysfatal("arena %d: blocksize mismatch: %d vs %d", i, sa->blocksize, da->blocksize);
if(sa->size != da->size)
sysfatal("arena %d: size mismatch: %,lld vs %,lld", i, sa->size, da->size);
if(strcmp(sa->name, da->name) != 0)
sysfatal("arena %d: name mismatch: %s vs %s", i, sa->name, da->name);
}
/*
* Mirror one arena at a time.
*/
writechan = chancreate(sizeof(void*), 0);
vtproc(writeproc, nil);
mirrormany(s, d, ranges);
sendp(writechan, nil);
threadexitsall(status);
}

View file

@ -11,6 +11,7 @@ LIBOFILES=\
config.$O\
conv.$O\
dcache.$O\
disksched.$O\
dump.$O\
graph.$O\
httpd.$O\
@ -52,11 +53,13 @@ TARG=\
fmtbloom\
fmtisect\
fmtindex\
fixarenas\
buildindex\
checkarenas\
checkindex\
clumpstats\
findscore\
mirrorarenas\
rdarena\
wrarena\
syncindex\

View file

@ -145,8 +145,6 @@ initpart(char *name, int mode)
if(hi == 0)
hi = dir->length;
part->size = hi - part->offset;
fprint(2, "part %s: file %s offset %,lld size %,lld\n",
name, file, part->offset, part->size);
#ifdef CANBLOCKSIZE
{
struct statfs sfs;
@ -203,10 +201,32 @@ prwb(char *name, int fd, int isread, u64int offset, void *vbuf, u32int count, u3
u32int c, delta, icount, opsize;
int r;
icount = count;
buf = vbuf;
#ifndef PLAN9PORT
op = isread ? "read" : "write";
dst = buf;
freetmp = nil;
while(count > 0){
opsize = min(count, 131072 /* blocksize */);
if(isread)
r = pread(fd, dst, opsize, offset);
else
r = pwrite(fd, dst, opsize, offset);
if(r <= 0)
goto Error;
offset += r;
count -= r;
dst += r;
if(r != opsize)
goto Error;
}
return icount;
#endif
tmp = nil;
freetmp = nil;
icount = count;
opsize = blocksize;
if(count == 0){
@ -313,7 +333,7 @@ print("FAILED isread=%d r=%d count=%d blocksize=%d\n", isread, r, count, blocksi
memmove(buf, tmp, count);
else{
memmove(tmp, buf, count);
if(pwrite(fd, tmp, blocksize, offset) != blocksize){
if(pwrite(fd, tmp, opsize, offset) != blocksize){
dst = tmp;
op = "write";
goto Error;
@ -332,9 +352,16 @@ Error:
return -1;
}
#ifndef PLAN9PORT
static int sdreset(Part*);
static int reopen(Part*);
static int threadspawnl(int[3], char*, char*, ...);
#endif
int
rwpart(Part *part, int isread, u64int offset, u8int *buf, u32int count)
{
int n, try;
u32int blocksize;
trace(TraceDisk, "%s %s %ud at 0x%llx",
@ -351,9 +378,33 @@ rwpart(Part *part, int isread, u64int offset, u8int *buf, u32int count)
if(blocksize == 0)
blocksize = 4096;
return prwb(part->filename, part->fd, isread, part->offset+offset, buf, count, blocksize);
}
for(try=0;; try++){
n = prwb(part->filename, part->fd, isread, part->offset+offset, buf, count, blocksize);
if(n >= 0 || try > 10)
break;
#ifndef PLAN9PORT
{
char err[ERRMAX];
/*
* This happens with the sdmv disks frustratingly often.
* Try to fix things up and continue.
*/
rerrstr(err, sizeof err);
if(strstr(err, "i/o timeout") || strstr(err, "i/o error")){
if(sdreset(part) >= 0)
reopen(part);
continue;
}else if(strstr(err, "partition has changed")){
reopen(part);
continue;
}
}
#endif
break;
}
return n;
}
int
readpart(Part *part, u64int offset, u8int *buf, u32int count)
{
@ -391,3 +442,200 @@ readfile(char *name)
return b;
}
#ifndef PLAN9PORT
static int
sdreset(Part *part)
{
char *name, *p;
int i, fd, xfd[3], rv;
static QLock resetlk;
Dir *d, *dd;
fprint(2, "sdreset %s\n", part->name);
name = emalloc(strlen(part->filename)+20);
strcpy(name, part->filename);
p = strrchr(name, '/');
if(p)
p++;
else
p = name;
strcpy(p, "ctl");
d = dirstat(name);
if(d == nil){
free(name);
return -1;
}
/*
* We don't need multiple people resetting the disk.
*/
qlock(&resetlk);
if((fd = open(name, OWRITE)) < 0)
goto error;
dd = dirfstat(fd);
if(d && dd && d->qid.vers != dd->qid.vers){
fprint(2, "sdreset %s: got scooped\n", part->name);
/* Someone else got here first. */
if(access(part->filename, AEXIST) >= 0)
goto ok;
goto error;
}
/*
* Write "reset" to the ctl file to cause the chipset
* to reinitialize itself (specific to sdmv driver).
* Ignore error in case using other disk.
*/
fprint(2, "sdreset %s: reset ctl\n", part->name);
write(fd, "reset", 5);
if(access(part->filename, AEXIST) >= 0)
goto ok;
/*
* Re-run fdisk and prep. Don't use threadwaitchan
* to avoid coordinating for it. Reopen ctl because
* we reset the disk.
*/
strcpy(p, "ctl");
close(fd);
if((fd = open(name, OWRITE)) < 0)
goto error;
strcpy(p, "data");
xfd[0] = open("/dev/null", OREAD);
xfd[1] = dup(fd, -1);
xfd[2] = dup(2, -1);
fprint(2, "sdreset %s: run fdisk %s\n", part->name, name);
if(threadspawnl(xfd, "/bin/disk/fdisk", "disk/fdisk", "-p", name, nil) < 0){
close(xfd[0]);
close(xfd[1]);
close(xfd[2]);
goto error;
}
strcpy(p, "plan9");
for(i=0; i<=20; i++){
sleep(i*100);
if(access(part->filename, AEXIST) >= 0)
goto ok;
if(access(name, AEXIST) >= 0)
goto prep;
}
goto error;
prep:
strcpy(p, "ctl");
close(fd);
if((fd = open(name, OWRITE)) < 0)
goto error;
strcpy(p, "plan9");
xfd[0] = open("/dev/null", OREAD);
xfd[1] = dup(fd, -1);
xfd[2] = dup(2, -1);
fprint(2, "sdreset %s: run prep\n", part->name);
if(threadspawnl(xfd, "/bin/disk/prep", "disk/prep", "-p", name, nil) < 0){
close(xfd[0]);
close(xfd[1]);
close(xfd[2]);
goto error;
}
for(i=0; i<=20; i++){
sleep(i*100);
if(access(part->filename, AEXIST) >= 0)
goto ok;
}
error:
fprint(2, "sdreset %s: error: %r\n", part->name);
rv = -1;
if(fd >= 0)
close(fd);
goto out;
ok:
fprint(2, "sdreset %s: all okay\n", part->name);
rv = 0;
goto out;
out:
free(name);
qunlock(&resetlk);
return rv;
}
static int
reopen(Part *part)
{
int fd;
fprint(2, "reopen %s\n", part->filename);
if((fd = open(part->filename, ORDWR)) < 0){
fprint(2, "reopen %s: %r\n", part->filename);
return -1;
}
if(fd != part->fd){
dup(fd, part->fd);
close(fd);
}
return 0;
}
typedef struct Spawn Spawn;
struct Spawn
{
Channel *c;
int fd[3];
char *file;
char **argv;
};
static void
spawnproc(void *v)
{
int i, *fd;
Spawn *s;
rfork(RFFDG);
s = v;
fd = s->fd;
for(i=0; i<3; i++)
dup(fd[i], i);
if(fd[0] > 2)
close(fd[0]);
if(fd[1] > 2 && fd[1] != fd[0])
close(fd[1]);
if(fd[2] > 2 && fd[2] != fd[1] && fd[2] != fd[0])
close(fd[2]);
procexec(s->c, s->file, s->argv);
}
static int
threadspawnl(int fd[3], char *file, char *argv0, ...)
{
int pid;
Spawn s;
s.c = chancreate(sizeof(void*), 0);
memmove(s.fd, fd, sizeof(s.fd));
s.file = file;
s.argv = &argv0;
vtproc(spawnproc, &s);
pid = recvul(s.c);
if(pid < 0)
return -1;
close(fd[0]);
if(fd[1] != fd[0])
close(fd[1]);
if(fd[2] != fd[1] && fd[2] != fd[0])
close(fd[2]);
return pid;
}
#endif

View file

@ -0,0 +1,160 @@
#include "stdinc.h"
#include "dat.h"
#include "fns.h"
uchar buf[64*1024];
void
usage(void)
{
fprint(2, "usage: printarenapart arenafile [offset]\n");
threadexitsall("usage");
}
static void
rdarena(Arena *arena, u64int offset)
{
u64int a, aa, e;
u32int magic;
Clump cl;
uchar score[VtScoreSize];
ZBlock *lump;
printarena(2, arena);
a = arena->base;
e = arena->base + arena->size;
if(offset != ~(u64int)0) {
if(offset >= e-a)
sysfatal("bad offset %llud >= %llud\n",
offset, e-a);
aa = offset;
} else
aa = 0;
for(; aa < e; aa += ClumpSize+cl.info.size) {
magic = clumpmagic(arena, aa);
if(magic == ClumpFreeMagic)
break;
if(magic != arena->clumpmagic) {
fprint(2, "illegal clump magic number %#8.8ux offset %llud\n",
magic, aa);
break;
}
lump = loadclump(arena, aa, 0, &cl, score, 0);
if(lump == nil) {
fprint(2, "clump %llud failed to read: %r\n", aa);
break;
}
if(cl.info.type != VtCorruptType) {
scoremem(score, lump->data, cl.info.uncsize);
if(scorecmp(cl.info.score, score) != 0) {
fprint(2, "clump %llud has mismatched score\n", aa);
break;
}
if(vttypevalid(cl.info.type) < 0) {
fprint(2, "clump %llud has bad type %d\n", aa, cl.info.type);
break;
}
}
print("%22llud %V %3d %5d\n", aa, score, cl.info.type, cl.info.uncsize);
freezblock(lump);
}
print("end offset %llud\n", aa);
}
void
threadmain(int argc, char *argv[])
{
char *file, *p, *name;
char *table;
u64int offset;
Part *part;
ArenaPart ap;
ArenaHead head;
Arena tail;
char ct[40], mt[40];
readonly = 1; /* for part.c */
ARGBEGIN{
default:
usage();
break;
}ARGEND
switch(argc) {
default:
usage();
case 1:
file = argv[0];
}
ventifmtinstall();
statsinit();
part = initpart(file, OREAD|ODIRECT);
if(part == nil)
sysfatal("can't open file %s: %r", file);
if(readpart(part, PartBlank, buf, sizeof buf) < 0)
sysfatal("can't read file %s: %r", file);
if(unpackarenapart(&ap, buf) < 0)
sysfatal("corrupted arena part header: %r");
print("# arena part version=%d blocksize=%d arenabase=%d\n",
ap.version, ap.blocksize, ap.arenabase);
ap.tabbase = (PartBlank+HeadSize+ap.blocksize-1)&~(ap.blocksize-1);
ap.tabsize = ap.arenabase - ap.tabbase;
print("A");
table = malloc(ap.tabsize+1);
if(readpart(part, ap.tabbase, (uchar*)table, ap.tabsize) < 0)
sysfatal("read %s: %r", file);
table[ap.tabsize] = 0;
print("A");
partblocksize(part, ap.blocksize);
initdcache(8 * MaxDiskBlock);
print("A");
/* XXX - read the number of arenas from the first line */
for(p=table; p && *p; p=strchr(p, '\n')){
if(*p == '\n')
p++;
name = p;
p = strpbrk(p, " \t");
if(p == nil){
fprint(2, "bad line: %s\n", name);
break;
}
print("%p\n", p);
offset = strtoull(p, nil, 0);
if(readpart(part, offset, buf, sizeof buf) < 0){
fprint(2, "%s: read %s: %r\n", argv0, file);
continue;
}
if(unpackarenahead(&head, buf) < 0){
fprint(2, "%s: unpackarenahead: %r\n", argv0);
continue;
}
if(readpart(part, offset+head.size-head.blocksize, buf, head.blocksize) < 0){
fprint(2, "%s: read %s: %r\n", argv0, file);
continue;
}
if(unpackarena(&tail, buf) < 0){
fprint(2, "%s: unpackarena: %r\n", argv0);
continue;
}
print("arena %s %lld clumps=%,d cclumps=%,d used=%,lld uncsize=%,lld%s\n",
tail.name, offset,
tail.diskstats.clumps, tail.diskstats.cclumps,
tail.diskstats.used, tail.diskstats.uncsize,
tail.diskstats.sealed ? " sealed" : "");
strcpy(ct, ctime(tail.ctime));
ct[28] = 0;
strcpy(mt, ctime(tail.wtime));
mt[28] = 0;
print("\tctime=%s\n\tmtime=%s\n", ct, mt);
}
threadexitsall(0);
}

View file

@ -36,7 +36,7 @@ shoulddump(char *name, int argc, char **argv)
enum
{
ClumpChunks = 32*1024
ClumpChunks = 32*1024,
};
void

View file

@ -61,7 +61,7 @@ sortrawientries(Index *ix, Part *tmp, u64int *base, Bloom *bloom)
u32int n;
int i, ok;
/*ZZZ should allow configuration of bits, bucket size */
/* ZZZ should allow configuration of bits, bucket size */
ib = initiebucks(tmp, 8, 64*1024);
if(ib == nil){
seterr(EOk, "can't create sorting buckets: %r");
@ -116,10 +116,7 @@ readarenainfo(IEBucks *ib, Arena *arena, u64int a, Bloom *b)
ClumpInfo *ci, *cis;
u32int clump;
int i, n, ok, nskip;
/* static Biobuf bout; */
/*ZZZ remove fprint? */
/*fprint(2, "ra %s %d %d\n", arena->name, arena->memstats.clumps, arena->diskstats.clumps); */
if(arena->memstats.clumps)
fprint(2, "\tarena %s: %d entries\n", arena->name, arena->memstats.clumps);
else
@ -129,7 +126,6 @@ readarenainfo(IEBucks *ib, Arena *arena, u64int a, Bloom *b)
ok = 0;
nskip = 0;
memset(&ie, 0, sizeof(IEntry));
/* Binit(&bout, 1, OWRITE); */
for(clump = 0; clump < arena->memstats.clumps; clump += n){
n = ClumpChunks;
if(n > arena->memstats.clumps - clump)
@ -148,18 +144,15 @@ readarenainfo(IEBucks *ib, Arena *arena, u64int a, Bloom *b)
a += ci->size + ClumpSize;
ie.ia.blocks = (ci->size + ClumpSize + (1 << ABlockLog) - 1) >> ABlockLog;
scorecp(ie.score, ci->score);
/* Bprint(&bout, "%22lld %V %3d %5d\n", */
/* ie.ia.addr, ie.score, ie.ia.type, ie.ia.size); */
if(ci->type == VtCorruptType){
/* print("! %V %22lld %3d %5d %3d\n", */
/* ie.score, ie.ia.addr, ie.ia.type, ie.ia.size, ie.ia.blocks); */
if(0) print("! %V %22lld %3d %5d %3d\n",
ie.score, ie.ia.addr, ie.ia.type, ie.ia.size, ie.ia.blocks);
nskip++;
}else
sprayientry(ib, &ie);
markbloomfilter(b, ie.score);
}
}
/* Bterm(&bout); */
free(cis);
if(ok < 0)
return TWID32;
@ -358,8 +351,8 @@ readiebuck(IEBucks *ib, int b)
m = ib->bucks[b].used;
if(m == 0)
m = ib->usable;
/* if(ib->bucks[b].total) */
/* fprint(2, "\tbucket %d: %d entries\n", b, ib->bucks[b].total/IEntrySize); */
if(0) if(ib->bucks[b].total)
fprint(2, "\tbucket %d: %d entries\n", b, ib->bucks[b].total/IEntrySize);
while(head != TWID32){
if(readpart(ib->part, (u64int)head * ib->size, &ib->buf[n], m+U32Size) < 0){
seterr(EOk, "can't read index sort bucket: %r");

View file

@ -80,7 +80,7 @@ Statdesc statdesc[NStat] =
{ "isect block write bytes", },
{ "sum reads", },
{ "sum read bytes", }
{ "sum read bytes", },
};
QLock statslock;

View file

@ -30,12 +30,11 @@ syncarena(Arena *arena, u64int start, u32int n, int zok, int fix)
ZBlock *lump;
Clump cl;
ClumpInfo ci;
static ClumpInfo zci = { -1 };
static ClumpInfo zci = { .type = -1 };
u8int score[VtScoreSize];
u64int uncsize, used, aa;
u32int clump, clumps, cclumps, magic;
int err, flush, broken;
AState as;
used = arena->memstats.used;
clumps = arena->memstats.clumps;
@ -133,19 +132,21 @@ syncarena(Arena *arena, u64int start, u32int n, int zok, int fix)
flushdcache();
}
fprint(2, "arena %s: start=%lld fix=%d flush=%d %lld->%lld %ud->%ud %ud->%ud %lld->%lld\n",
arena->name,
start,
fix,
flush,
used, arena->memstats.used,
clumps, arena->memstats.clumps,
cclumps, arena->memstats.cclumps,
uncsize, arena->memstats.uncsize);
if(used != arena->memstats.used
|| clumps != arena->memstats.clumps
|| cclumps != arena->memstats.cclumps
|| uncsize != arena->memstats.uncsize)
err |= SyncHeader;
if(start && (err&SyncHeader)){
trace(TraceProc, "syncarena setdcachestate");
as.arena = arena;
as.aa = start+arena->memstats.used;
as.stats = arena->memstats;
setdcachestate(&as);
}
return err;
}

View file

@ -48,6 +48,8 @@ threadmain(int argc, char *argv[])
ventifmtinstall();
if(initventi(argv[0], &conf) < 0)
sysfatal("can't init venti: %r");
if(mainindex->bloom && loadbloom(mainindex->bloom) < 0)
sysfatal("can't load bloom filter: %r");
if(bcmem < maxblocksize * (mainindex->narenas + mainindex->nsects * 4 + 16))
bcmem = maxblocksize * (mainindex->narenas + mainindex->nsects * 4 + 16);

View file

@ -121,6 +121,7 @@ int
syncindex(Index *ix, int fix, int mustflush, int check)
{
Arena *arena;
AState as;
u64int a;
u32int clump;
int i, e, e1, ok, ok1, flush;
@ -130,7 +131,12 @@ syncindex(Index *ix, int fix, int mustflush, int check)
for(i = 0; i < ix->narenas; i++){
trace(TraceProc, "syncindex start %d", i);
arena = ix->arenas[i];
clump = arena->memstats.clumps;
/*
* Syncarena will scan through the arena looking for blocks
* that have been forgotten. It will update arena->memstats.used,
* so save the currenct copy as the place to start the
* syncarenaindex scan.
*/
a = arena->memstats.used;
e = syncarena(arena, ix->amap[i].start, TWID32, fix, fix);
e1 = e;
@ -138,15 +144,23 @@ syncindex(Index *ix, int fix, int mustflush, int check)
e1 &= ~(SyncHeader|SyncCIZero|SyncCIErr);
if(e1 == SyncHeader)
fprint(2, "arena %s: header is out-of-date\n", arena->name);
clump = arena->diskstats.clumps;
if(e1)
ok = -1;
else{
ok1 = syncarenaindex(ix, arena, clump, a + ix->amap[i].start, fix, &flush, check);
if(ok1 < 0)
fprint(2, "syncarenaindex: %r\n");
fprint(2, "arena %s: wbarena in syncindex\n", arena->name);
if(fix && ok1==0 && (e & SyncHeader) && wbarena(arena) < 0)
fprint(2, "arena=%s header write failed: %r\n", arena->name);
ok |= ok1;
fprint(2, "arena %s: setdcachestate\n", arena->name);
as.arena = arena;
as.aa = ix->amap[i].start + arena->memstats.used;
as.stats = arena->memstats;
setdcachestate(&as);
}
}
if(missing || wrong)

View file

@ -23,7 +23,7 @@ static uchar lenval[1 << (DBigLenBits - 1)] =
static uchar lenbits[] =
{
0, 0, 0,
2, 3, 5, 5
2, 3, 5, 5,
};
static uchar offbits[16] =

View file

@ -148,6 +148,7 @@ emalloc(ulong n)
sysfatal("out of memory allocating %lud", n);
}
memset(p, 0xa5, n);
setmalloctag(p, getcallerpc(&n));
if(0)print("emalloc %p-%p by %lux\n", p, (char*)p+n, getcallerpc(&n));
return p;
}
@ -164,6 +165,7 @@ ezmalloc(ulong n)
sysfatal("out of memory allocating %lud", n);
}
memset(p, 0, n);
setmalloctag(p, getcallerpc(&n));
if(0)print("ezmalloc %p-%p by %lux\n", p, (char*)p+n, getcallerpc(&n));
return p;
}
@ -177,6 +179,7 @@ erealloc(void *p, ulong n)
abort();
sysfatal("out of memory allocating %lud", n);
}
setrealloctag(p, getcallerpc(&p));
if(0)print("erealloc %p-%p by %lux\n", p, (char*)p+n, getcallerpc(&p));
return p;
}
@ -190,6 +193,7 @@ estrdup(char *s)
n = strlen(s) + 1;
t = emalloc(n);
memmove(t, s, n);
setmalloctag(t, getcallerpc(&s));
if(0)print("estrdup %p-%p by %lux\n", t, (char*)t+n, getcallerpc(&s));
return t;
}
@ -231,6 +235,7 @@ ventifmtinstall(void)
fmtinstall('F', vtfcallfmt);
fmtinstall('H', encodefmt);
fmtinstall('I', ientryfmt);
fmtinstall('T', vttimefmt);
fmtinstall('V', vtscorefmt);
}

View file

@ -105,6 +105,8 @@ threadmain(int argc, char *argv[])
fprint(2, "conf...");
if(initventi(configfile, &config) < 0)
sysfatal("can't init server: %r");
if(mainindex->bloom && loadbloom(mainindex->bloom) < 0)
sysfatal("can't load bloom filter: %r");
if(mem == 0)
mem = config.mem;
@ -210,8 +212,8 @@ ventiserver(void *v)
trace(TraceRpc, "<- %F", &r->tx);
r->rx.msgtype = r->tx.msgtype+1;
addstat(StatRpcTotal, 1);
/* print("req (arenas[0]=%p sects[0]=%p) %F\n", */
/* mainindex->arenas[0], mainindex->sects[0], &r->tx); */
if(0) print("req (arenas[0]=%p sects[0]=%p) %F\n",
mainindex->arenas[0], mainindex->sects[0], &r->tx);
switch(r->tx.msgtype){
default:
vtrerror(r, "unknown request");

View file

@ -3,65 +3,102 @@
#include "fns.h"
static int verbose;
static int fd;
static uchar *data;
static int blocksize;
static int sleepms;
void
usage(void)
{
fprint(2, "usage: verifyarena [-v]\n");
fprint(2, "usage: verifyarena [-b blocksize] [-s ms] [-v] [arenapart [name...]]\n");
threadexitsall(0);
}
static void
static int
preadblock(uchar *buf, int n, vlong off)
{
int nr, m;
for(nr = 0; nr < n; nr += m){
m = n - nr;
m = pread(fd, &buf[nr], m, off+nr);
if(m <= 0){
if(m == 0)
werrstr("early eof");
return -1;
}
}
return 0;
}
static int
readblock(uchar *buf, int n)
{
int nr, m;
for(nr = 0; nr < n; nr += m){
m = n - nr;
m = read(0, &buf[nr], m);
if(m <= 0)
sysfatal("can't read arena from standard input: %r");
m = read(fd, &buf[nr], m);
if(m <= 0){
if(m == 0)
werrstr("early eof");
return -1;
}
}
return 0;
}
static void
verifyarena(void)
verifyarena(char *name, vlong len)
{
Arena arena;
ArenaHead head;
ZBlock *b;
DigestState s;
u64int n, e;
u32int bs;
u8int score[VtScoreSize];
fprint(2, "verify arena from standard input\n");
fprint(2, "verify %s\n", name);
memset(&arena, 0, sizeof arena);
memset(&s, 0, sizeof s);
/*
* read the little bit, which will included the header
* read a little bit, which will include the header
*/
bs = MaxIoSize;
b = alloczblock(bs, 0, 0);
readblock(b->data, HeadSize);
sha1(b->data, HeadSize, nil, &s);
if(unpackarenahead(&head, b->data) < 0)
sysfatal("corrupted arena header: %r");
if(readblock(data, HeadSize) < 0){
fprint(2, "%s: reading header: %r\n", name);
return;
}
sha1(data, HeadSize, nil, &s);
if(unpackarenahead(&head, data) < 0){
fprint(2, "%s: corrupt arena header: %r\n", name);
return;
}
if(head.version != ArenaVersion4 && head.version != ArenaVersion5)
fprint(2, "warning: unknown arena version %d\n", head.version);
fprint(2, "%s: warning: unknown arena version %d\n", name, head.version);
if(len != 0 && len != head.size)
fprint(2, "%s: warning: unexpected length %lld != %lld\n", name, head.size, len);
if(strcmp(name, "<stdin>") != 0 && strcmp(head.name, name) != 0)
fprint(2, "%s: warning: unexpected name %s\n", name, head.name);
/*
* now we know how much to read
* read everything but the last block, which is special
*/
e = head.size - head.blocksize;
bs = blocksize;
for(n = HeadSize; n < e; n += bs){
if(n + bs > e)
bs = e - n;
readblock(b->data, bs);
sha1(b->data, bs, nil, &s);
if(readblock(data, bs) < 0){
fprint(2, "%s: read data: %r\n", name);
return;
}
sha1(data, bs, nil, &s);
if(sleepms)
sleep(sleepms);
}
/*
@ -69,8 +106,11 @@ verifyarena(void)
* the sum is calculated assuming the slot for the sum is zero.
*/
bs = head.blocksize;
readblock(b->data, bs);
sha1(b->data, bs-VtScoreSize, nil, &s);
if(readblock(data, bs) < 0){
fprint(2, "%s: read last block: %r\n", name);
return;
}
sha1(data, bs-VtScoreSize, nil, &s);
sha1(zeroscore, VtScoreSize, nil, &s);
sha1(nil, 0, score, &s);
@ -78,37 +118,73 @@ verifyarena(void)
* validity check on the trailer
*/
arena.blocksize = head.blocksize;
if(unpackarena(&arena, b->data) < 0)
sysfatal("corrupted arena trailer: %r");
scorecp(arena.score, &b->data[arena.blocksize - VtScoreSize]);
if(unpackarena(&arena, data) < 0){
fprint(2, "%s: corrupt arena trailer: %r\n", name);
return;
}
scorecp(arena.score, &data[arena.blocksize - VtScoreSize]);
if(namecmp(arena.name, head.name) != 0)
sysfatal("arena header and trailer names clash: %s vs. %s\n", head.name, arena.name);
if(arena.version != head.version)
sysfatal("arena header and trailer versions clash: %d vs. %d\n", head.version, arena.version);
if(namecmp(arena.name, head.name) != 0){
fprint(2, "%s: wrong name in trailer: %s vs. %s\n",
name, head.name, arena.name);
return;
}
if(arena.version != head.version){
fprint(2, "%s: wrong version in trailer: %d vs. %d\n",
name, head.version, arena.version);
return;
}
arena.size = head.size - 2 * head.blocksize;
/*
* check for no checksum or the same
*/
if(scorecmp(score, arena.score) != 0){
if(scorecmp(zeroscore, arena.score) != 0)
fprint(2, "warning: mismatched checksums for arena=%s, found=%V calculated=%V",
arena.name, arena.score, score);
scorecp(arena.score, score);
}else
fprint(2, "matched score\n");
if(scorecmp(score, arena.score) == 0)
fprint(2, "%s: verified score\n", name);
else if(scorecmp(zeroscore, arena.score) == 0)
fprint(2, "%s: unsealed\n", name);
else{
fprint(2, "%s: mismatch checksum - found=%V calculated=%V\n",
name, arena.score, score);
return;
}
printarena(2, &arena);
}
static int
shouldcheck(char *name, char **s, int n)
{
int i;
if(n == 0)
return 1;
for(i=0; i<n; i++){
if(s[i] && strcmp(name, s[i]) == 0){
s[i] = nil;
return 1;
}
}
return 0;
}
void
threadmain(int argc, char *argv[])
{
ventifmtinstall();
statsinit();
int i, nline;
char *p, *q, *table, *f[10], line[256];
vlong start, stop;
ArenaPart ap;
ventifmtinstall();
blocksize = MaxIoSize;
ARGBEGIN{
case 'b':
blocksize = unittoull(EARGF(usage()));
break;
case 's':
sleepms = atoi(EARGF(usage()));
break;
case 'v':
verbose++;
break;
@ -117,11 +193,69 @@ threadmain(int argc, char *argv[])
break;
}ARGEND
readonly = 1;
data = vtmalloc(blocksize);
if(argc == 0){
fd = 0;
verifyarena("<stdin>", 0);
threadexitsall(nil);
}
if(argc != 0)
usage();
if((fd = open(argv[0], OREAD)) < 0)
sysfatal("open %s: %r", argv[0]);
verifyarena();
threadexitsall(0);
if(preadblock(data, 8192, PartBlank) < 0)
sysfatal("read arena part header: %r");
if(unpackarenapart(&ap, data) < 0)
sysfatal("corrupted arena part header: %r");
fprint(2, "# arena part version=%d blocksize=%d arenabase=%d\n",
ap.version, ap.blocksize, ap.arenabase);
ap.tabbase = (PartBlank+HeadSize+ap.blocksize-1)&~(ap.blocksize-1);
ap.tabsize = ap.arenabase - ap.tabbase;
table = malloc(ap.tabsize+1);
if(preadblock((uchar*)table, ap.tabsize, ap.tabbase) < 0)
sysfatal("reading arena part directory: %r");
table[ap.tabsize] = 0;
nline = atoi(table);
p = strchr(table, '\n');
if(p)
p++;
for(i=0; i<nline; i++){
if(p == nil){
fprint(2, "warning: unexpected arena table end\n");
break;
}
q = strchr(p, '\n');
if(q)
*q++ = 0;
if(strlen(p) >= sizeof line){
fprint(2, "warning: long arena table line: %s\n", p);
p = q;
continue;
}
strcpy(line, p);
memset(f, 0, sizeof f);
if(tokenize(line, f, nelem(f)) < 3){
fprint(2, "warning: bad arena table line: %s\n", p);
p = q;
continue;
}
p = q;
if(shouldcheck(f[0], argv+1, argc-1)){
start = strtoull(f[1], 0, 0);
stop = strtoull(f[2], 0, 0);
if(stop <= start){
fprint(2, "%s: bad start,stop %lld,%lld\n", f[0], stop, start);
continue;
}
if(seek(fd, start, 0) < 0)
fprint(2, "%s: seek to start: %r\n", f[0]);
verifyarena(f[0], stop - start);
}
}
for(i=1; i<argc; i++)
if(argv[i] != 0)
fprint(2, "%s: did not find arena\n", argv[i]);
threadexitsall(nil);
}

View file

@ -83,8 +83,8 @@ rdarena(Arena *arena, u64int offset)
if(magic == ClumpFreeMagic)
break;
if(magic != arena->clumpmagic) {
/* fprint(2, "illegal clump magic number %#8.8ux offset %llud\n", */
/* magic, aa); */
if(0) fprint(2, "illegal clump magic number %#8.8ux offset %llud\n",
magic, aa);
break;
}
lump = loadclump(arena, aa, 0, &cl, score, 0);

View file

@ -5,11 +5,13 @@
void
fmtzbinit(Fmt *f, ZBlock *b)
{
memset(f, 0, sizeof *f);
fmtlocaleinit(f, nil, nil, nil);
f->runes = 0;
f->start = b->data;
f->to = f->start;
f->stop = (char*)f->start + b->len;
f->flush = nil;
f->farg = nil;
f->nfmt = 0;
}
#define ROUNDUP(p, n) ((void*)(((uintptr)(p)+(n)-1)&~(uintptr)((n)-1)))

View file

@ -10,10 +10,6 @@ zeropart(Part *part, int blocksize)
int w;
fprint(2, "clearing the partition\n");
/*fprint(2, "NOT!\n"); */
/*return; */
/*b=alloczblock(MaxIoSize, 1, blocksize); */
/*freezblock(b); */
b = alloczblock(MaxIoSize, 1, blocksize);
w = 0;