2 * Rebuild the index from scratch, in place.
11 MaxBufSize = 4*1024*1024,
24 Channel *arenadonechan;
25 Channel *isectdonechan;
32 static int shouldprocess(ISect*);
33 static void isectproc(void*);
34 static void arenapartproc(void*);
39 fprint(2, "usage: buildindex [-b] [-i isect]... [-M imem] venti.conf\n");
40 threadexitsall("usage");
44 threadmain(int argc, char *argv[])
46 int fd, i, napart, nfinish, maxdisks;
58 case 'd': /* debugging - make sure to run all 3 passes */
62 isect = vtrealloc(isect, (nisect+1)*sizeof(isect[0]));
63 isect[nisect++] = EARGF(usage());
66 imem = unittoull(EARGF(usage()));
68 case 'm': /* temporary - might go away */
69 maxdisks = atoi(EARGF(usage()));
79 if(initventi(argv[0], &conf) < 0)
80 sysfatal("can't init venti: %r");
82 if(nisect == 0 && ix->bloom)
84 if(bloom && ix->bloom && resetbloom(ix->bloom) < 0)
85 sysfatal("loadbloom: %r");
86 if(bloom && !ix->bloom)
87 sysfatal("-b specified but no bloom filter");
90 isectmem = imem/ix->nsects;
93 * safety first - only need read access to arenas
96 for(i=0; i<ix->narenas; i++){
97 if(ix->arenas[i]->part != p){
98 p = ix->arenas[i]->part;
99 if((fd = open(p->filename, OREAD)) < 0)
100 sysfatal("cannot reopen %s: %r", p->filename);
107 * need a block for every arena
109 bcmem = maxblocksize * (mainindex->narenas + 16);
110 if(0) fprint(2, "initialize %d bytes of disk block cache\n", bcmem);
114 for(i=0; i<ix->narenas; i++)
115 totalclumps += ix->arenas[i]->diskstats.clumps;
118 for(i=0; i<ix->nsects; i++)
119 totalbuckets += ix->sects[i]->blocks;
120 fprint(2, "%,lld clumps, %,lld buckets\n", totalclumps, totalbuckets);
122 /* start index procs */
123 fprint(2, "%T read index\n");
124 isectdonechan = chancreate(sizeof(void*), 0);
125 for(i=0; i<ix->nsects; i++){
126 if(shouldprocess(ix->sects[i])){
127 ix->sects[i]->writechan = chancreate(sizeof(IEntry), 0);
128 vtproc(isectproc, ix->sects[i]);
132 for(i=0; i<nisect; i++)
134 fprint(2, "warning: did not find index section %s\n", isect[i]);
136 /* start arena procs */
140 arenadonechan = chancreate(sizeof(void*), 0);
141 for(i=0; i<ix->narenas; i++){
142 if(ix->arenas[i]->part != p){
143 p = ix->arenas[i]->part;
144 vtproc(arenapartproc, p);
145 if(++napart >= maxdisks){
146 recvp(arenadonechan);
152 /* wait for arena procs to finish */
153 for(nfinish=0; nfinish<napart; nfinish++)
154 recvp(arenadonechan);
156 /* tell index procs to finish */
157 for(i=0; i<ix->nsects; i++)
158 if(ix->sects[i]->writechan)
159 send(ix->sects[i]->writechan, nil);
161 /* wait for index procs to finish */
162 for(i=0; i<ix->nsects; i++)
163 if(ix->sects[i]->writechan)
164 recvp(isectdonechan);
166 if(ix->bloom && writebloom(ix->bloom) < 0)
167 fprint(2, "writing bloom filter: %r\n");
169 fprint(2, "%T done arenaentries=%,lld indexed=%,lld (nskip=%,lld)\n",
170 arenaentries, indexentries, skipentries);
175 shouldprocess(ISect *is)
182 for(i=0; i<nisect; i++)
183 if(isect[i] && strcmp(isect[i], is->name) == 0){
191 add(u64int *a, u64int n)
201 * Read through an arena partition and send each of its IEntries
202 * to the appropriate index section. When finished, send on
207 ClumpChunks = 32*1024,
210 arenapartproc(void *v)
212 int i, j, n, nskip, x;
221 threadsetname("arenaproc %s", p->name);
225 cis = MKN(ClumpInfo, ClumpChunks);
226 for(i=0; i<ix->narenas; i++){
230 if(a->memstats.clumps)
231 fprint(2, "%T arena %s: %d entries\n",
232 a->name, a->memstats.clumps);
234 * Running the loop backwards accesses the
235 * clump info blocks forwards, since they are
236 * stored in reverse order at the end of the arena.
237 * This speeds things slightly.
239 addr = ix->amap[i].start + a->memstats.used;
240 for(clump=a->memstats.clumps; clump > 0; clump-=n){
244 if(readclumpinfos(a, clump-n, cis, n) != n){
245 fprint(2, "%T arena %s: directory read: %r\n", a->name);
249 for(j=n-1; j>=0; j--){
251 ie.ia.type = ci->type;
252 ie.ia.size = ci->uncsize;
253 addr -= ci->size + ClumpSize;
255 ie.ia.blocks = (ci->size + ClumpSize + (1<<ABlockLog)-1) >> ABlockLog;
256 scorecp(ie.score, ci->score);
257 if(ci->type == VtCorruptType)
261 x = indexsect(ix, ie.score);
262 assert(0 <= x && x < ix->nsects);
263 if(ix->sects[x]->writechan)
264 send(ix->sects[x]->writechan, &ie);
266 markbloomfilter(ix->bloom, ie.score);
270 if(addr != ix->amap[i].start)
271 fprint(2, "%T arena %s: clump miscalculation %lld != %lld\n", a->name, addr, ix->amap[i].start);
273 add(&arenaentries, tot);
274 add(&skipentries, nskip);
275 sendp(arenadonechan, p);
279 * Convert score into relative bucket number in isect.
280 * Can pass a packed ientry instead of score - score is first.
283 score2bucket(ISect *is, uchar *score)
287 b = hashbits(score, 32)/ix->div;
288 if(b < is->start || b >= is->stop){
289 fprint(2, "score2bucket: score=%V div=%d b=%ud start=%ud stop=%ud\n",
290 score, ix->div, b, is->start, is->stop);
292 assert(is->start <= b && b < is->stop);
293 return b - is->start;
297 * Convert offset in index section to bucket number.
300 offset2bucket(ISect *is, u64int offset)
304 assert(is->blockbase <= offset);
305 offset -= is->blockbase;
306 b = offset/is->blocksize;
307 assert(b < is->stop-is->start);
312 * Convert bucket number to offset.
315 bucket2offset(ISect *is, u32int b)
317 assert(b <= is->stop-is->start);
318 return is->blockbase + (u64int)b*is->blocksize;
322 * IEntry buffers to hold initial round of spraying.
324 typedef struct Buf Buf;
327 Part *part; /* partition being written */
328 uchar *bp; /* current block */
329 uchar *ep; /* end of block */
330 uchar *wp; /* write position in block */
331 u64int boffset; /* start offset */
332 u64int woffset; /* next write offset */
333 u64int eoffset; /* end offset */
334 u32int nentry; /* number of entries written */
342 if(buf->woffset >= buf->eoffset)
343 sysfatal("buf index chunk overflow - need bigger index");
344 bufsize = buf->ep - buf->bp;
345 if(writepart(buf->part, buf->woffset, buf->bp, bufsize) < 0){
346 fprint(2, "write %s: %r\n", buf->part->name);
349 buf->woffset += bufsize;
350 memset(buf->bp, 0, bufsize);
355 bwrite(Buf *buf, IEntry *ie)
357 if(buf->wp+IEntrySize > buf->ep)
359 assert(buf->bp <= buf->wp && buf->wp < buf->ep);
360 packientry(ie, buf->wp);
361 buf->wp += IEntrySize;
362 assert(buf->bp <= buf->wp && buf->wp <= buf->ep);
367 * Minibuffer. In-memory data structure holds our place
368 * in the buffer but has no block data. We are writing and
369 * reading the minibuffers at the same time. (Careful!)
371 typedef struct Minibuf Minibuf;
374 u64int boffset; /* start offset */
375 u64int roffset; /* read offset */
376 u64int woffset; /* write offset */
377 u64int eoffset; /* end offset */
378 u32int nentry; /* # entries left to read */
379 u32int nwentry; /* # entries written */
383 * Index entry pool. Used when trying to shuffle around
384 * the entries in a big buffer into the corresponding M minibuffers.
385 * Sized to hold M*EntriesPerBlock entries, so that there will always
386 * either be room in the pool for another block worth of entries
387 * or there will be an entire block worth of sorted entries to
390 typedef struct IEntryLink IEntryLink;
391 typedef struct IPool IPool;
395 uchar ie[IEntrySize]; /* raw IEntry */
396 IEntryLink *next; /* next in chain */
402 u32int buck0; /* first bucket in pool */
403 u32int mbufbuckets; /* buckets per minibuf */
404 IEntryLink *entry; /* all IEntryLinks */
405 u32int nentry; /* # of IEntryLinks */
406 IEntryLink *free; /* free list */
407 u32int nfree; /* # on free list */
408 Minibuf *mbuf; /* all minibufs */
409 u32int nmbuf; /* # of minibufs */
410 IEntryLink **mlist; /* lists for each minibuf */
411 u32int *mcount; /* # on each mlist[i] */
412 u32int bufsize; /* block buffer size */
413 uchar *rbuf; /* read buffer */
414 uchar *wbuf; /* write buffer */
415 u32int epbuf; /* entries per block buffer */
426 for(i=0; i<p->nmbuf; i++)
430 print("free %ud:", p->nfree);
431 for(i=0; i<p->nmbuf; i++)
432 print(" %ud", p->mcount[i]);
433 print(" = %lld nentry: %ud\n", n, p->nentry);
435 return n == p->nentry;
440 mkipool(ISect *isect, Minibuf *mbuf, u32int nmbuf,
441 u32int mbufbuckets, u32int bufsize)
448 nentry = (nmbuf+1)*bufsize / IEntrySize;
449 p = ezmalloc(sizeof(IPool)
450 +nentry*sizeof(IEntry)
451 +nmbuf*sizeof(IEntryLink*)
452 +nmbuf*sizeof(u32int)
456 p->mbufbuckets = mbufbuckets;
457 p->bufsize = bufsize;
458 p->entry = (IEntryLink*)(p+1);
460 p->mlist = (IEntryLink**)(p->entry+nentry);
461 p->mcount = (u32int*)(p->mlist+nmbuf);
464 data = (uchar*)(p->mcount+nmbuf);
465 data += bufsize - (uintptr)data%bufsize;
467 p->wbuf = data+bufsize;
468 p->epbuf = bufsize/IEntrySize;
470 for(i=0; i<p->nentry; i++){
480 * Add the index entry ie to the pool p.
481 * Caller must know there is room.
484 ipoolinsert(IPool *p, uchar *ie)
489 assert(p->free != nil);
491 buck = score2bucket(p->isect, ie);
492 x = (buck-p->buck0) / p->mbufbuckets;
494 fprint(2, "buck=%ud mbufbucket=%ud x=%ud\n",
495 buck, p->mbufbuckets, x);
497 assert(x < p->nmbuf);
502 memmove(l->ie, ie, IEntrySize);
503 l->next = p->mlist[x];
509 * Pull out a block containing as many
510 * entries as possible for minibuffer x.
513 ipoolgetbuf(IPool *p, u32int x)
520 ep = p->wbuf + p->bufsize;
522 assert(x < p->nmbuf);
523 for(wp=bp; wp+IEntrySize<=ep && p->mlist[x]; wp+=IEntrySize){
525 p->mlist[x] = l->next;
527 memmove(wp, l->ie, IEntrySize);
533 memset(wp, 0, ep-wp);
538 * Read a block worth of entries from the minibuf
539 * into the pool. Caller must know there is room.
542 ipoolloadblock(IPool *p, Minibuf *mb)
546 assert(mb->nentry > 0);
547 assert(mb->roffset >= mb->woffset);
548 assert(mb->roffset < mb->eoffset);
550 n = p->bufsize/IEntrySize;
553 if(readpart(p->isect->part, mb->roffset, p->rbuf, p->bufsize) < 0)
554 fprint(2, "readpart %s: %r\n", p->isect->part->name);
557 ipoolinsert(p, p->rbuf+i*IEntrySize);
560 mb->roffset += p->bufsize;
564 * Write out a block worth of entries to minibuffer x.
565 * If necessary, pick up the data there before overwriting it.
568 ipoolflush0(IPool *pool, u32int x)
574 bufsize = pool->bufsize;
575 mb->nwentry += ipoolgetbuf(pool, x);
576 if(mb->nentry > 0 && mb->roffset == mb->woffset){
577 assert(pool->nfree >= pool->bufsize/IEntrySize);
579 * There will be room in the pool -- we just
580 * removed a block worth.
582 ipoolloadblock(pool, mb);
584 if(writepart(pool->isect->part, mb->woffset, pool->wbuf, bufsize) < 0)
585 fprint(2, "writepart %s: %r\n", pool->isect->part->name);
586 mb->woffset += bufsize;
590 * Write out some full block of entries.
591 * (There must be one -- the pool is almost full!)
594 ipoolflush1(IPool *pool)
598 assert(pool->nfree <= pool->epbuf);
600 for(i=0; i<pool->nmbuf; i++){
601 if(pool->mcount[i] >= pool->epbuf){
602 ipoolflush0(pool, i);
606 /* can't be reached - someone must be full */
607 sysfatal("ipoolflush1");
611 * Flush all the entries in the pool out to disk.
612 * Nothing more to read from disk.
615 ipoolflush(IPool *pool)
619 for(i=0; i<pool->nmbuf; i++)
620 while(pool->mlist[i])
621 ipoolflush0(pool, i);
622 assert(pool->nfree == pool->nentry);
626 * Third pass. Pick up each minibuffer from disk into
627 * memory and then write out the buckets.
631 * Compare two packed index entries.
632 * Usual ordering except break ties by putting higher
633 * index addresses first (assumes have duplicates
634 * due to corruption in the lower addresses).
637 ientrycmpaddr(const void *va, const void *vb)
647 return -memcmp(a+IEntryAddrOff, b+IEntryAddrOff, 8);
651 zerorange(Part *p, u64int o, u64int e)
653 static uchar zero[MaxIoSize];
660 if(writepart(p, o, zero, n) < 0)
661 fprint(2, "writepart %s: %r\n", p->name);
666 * Load a minibuffer into memory and write out the
667 * corresponding buckets.
670 sortminibuffer(ISect *is, Minibuf *mb, uchar *buf, u32int nbuf, u32int bufsize)
672 uchar *buckdata, *p, *q, *ep;
673 u32int b, lastb, memsize, n;
679 buckdata = emalloc(is->blocksize);
685 * read entire buffer.
687 assert(mb->nwentry*IEntrySize <= mb->woffset-mb->boffset);
688 assert(mb->woffset-mb->boffset <= nbuf);
689 if(readpart(part, mb->boffset, buf, mb->woffset-mb->boffset) < 0){
690 fprint(2, "readpart %s: %r\n", part->name);
694 assert(*(uint*)buf != 0xa5a5a5a5);
697 * remove fragmentation due to IEntrySize
698 * not evenly dividing Bufsize
700 memsize = (bufsize/IEntrySize)*IEntrySize;
701 for(o=mb->boffset, p=q=buf; o<mb->woffset; o+=bufsize){
702 memmove(p, q, memsize);
706 ep = buf + mb->nwentry*IEntrySize;
707 assert(ep <= buf+nbuf);
712 qsort(buf, mb->nwentry, IEntrySize, ientrycmpaddr);
718 lastb = offset2bucket(is, mb->boffset);
719 for(p=buf; p<ep; p=q){
720 b = score2bucket(is, p);
721 for(q=p; q<ep && score2bucket(is, q)==b; q+=IEntrySize)
723 if(lastb+1 < b && zero)
724 zerorange(part, bucket2offset(is, lastb+1), bucket2offset(is, b));
725 if(IBucketSize+(q-p) > is->blocksize)
726 sysfatal("bucket overflow - make index bigger");
727 memmove(buckdata+IBucketSize, p, q-p);
728 ib.n = (q-p)/IEntrySize;
730 packibucket(&ib, buckdata, is->bucketmagic);
731 if(writepart(part, bucket2offset(is, b), buckdata, is->blocksize) < 0)
732 fprint(2, "write %s: %r\n", part->name);
735 if(lastb+1 < is->stop-is->start && zero)
736 zerorange(part, bucket2offset(is, lastb+1), bucket2offset(is, is->stop - is->start));
739 fprint(2, "sortminibuffer bug: n=%ud nwentry=%ud have=%zd\n", n, mb->nwentry, (ep-buf)/IEntrySize);
747 u32int buck, bufbuckets, bufsize, epbuf, i, j;
748 u32int mbufbuckets, n, nbucket, nn, space;
749 u32int nbuf, nminibuf, xminiclump, prod;
750 u64int blocksize, offset, xclump;
759 blocksize = is->blocksize;
760 nbucket = is->stop - is->start;
764 * pass 1 - write index entries from arenas into
765 * large sequential sections on index disk.
766 * requires nbuf * bufsize memory.
768 * pass 2 - split each section into minibufs.
769 * requires nminibuf * bufsize memory.
771 * pass 3 - read each minibuf into memory and
773 * requires entries/minibuf * IEntrySize memory.
775 * The larger we set bufsize the less seeking hurts us.
777 * The fewer sections and minibufs we have, the less
780 * The fewer sections and minibufs we have, the
781 * more entries we end up with in each minibuf
784 * Shoot for using half our memory to hold each
785 * minibuf. The chance of a random distribution
786 * getting off by 2x is quite low.
788 * Once that is decided, figure out the smallest
789 * nminibuf and nsection/biggest bufsize we can use
790 * and still fit in the memory constraints.
793 /* expected number of clump index entries we'll see */
794 xclump = nbucket * (double)totalclumps/totalbuckets;
796 /* number of clumps we want to see in a minibuf */
797 xminiclump = isectmem/2/IEntrySize;
799 /* total number of minibufs we need */
800 prod = (xclump+xminiclump-1) / xminiclump;
802 /* if possible, skip second pass */
803 if(!dumb && prod*MinBufSize < isectmem){
807 /* otherwise use nsection = sqrt(nmini) */
808 for(nbuf=1; nbuf*nbuf<prod; nbuf++)
810 if(nbuf*MinBufSize > isectmem)
811 sysfatal("not enough memory");
815 fprint(2, "%s: brand-new index, no work to do\n", argv0);
819 /* size buffer to use extra memory */
820 bufsize = MinBufSize;
821 while(bufsize*2*nbuf <= isectmem && bufsize < MaxBufSize)
823 data = emalloc(nbuf*bufsize);
824 epbuf = bufsize/IEntrySize;
825 fprint(2, "%T %s: %,ud buckets, %,ud groups, %,ud minigroups, %,ud buffer\n",
826 is->part->name, nbucket, nbuf, nminibuf, bufsize);
828 * Accept index entries from arena procs.
830 buf = MKNZ(Buf, nbuf);
832 offset = is->blockbase;
833 bufbuckets = (nbucket+nbuf-1)/nbuf;
834 for(i=0; i<nbuf; i++){
835 buf[i].part = is->part;
840 buf[i].boffset = offset;
841 buf[i].woffset = offset;
843 offset += bufbuckets*blocksize;
844 buf[i].eoffset = offset;
846 offset = is->blockbase + nbucket*blocksize;
847 buf[i].eoffset = offset;
850 assert(p == data+nbuf*bufsize);
853 while(recv(is->writechan, &ie) == 1){
856 buck = score2bucket(is, ie.score);
859 bwrite(&buf[i], &ie);
862 add(&indexentries, n);
865 for(i=0; i<nbuf; i++){
873 fprint(2, "isectproc bug: n=%ud nn=%ud\n", n, nn);
877 fprint(2, "%T %s: reordering\n", is->part->name);
880 * Rearrange entries into minibuffers and then
881 * split each minibuffer into buckets.
882 * The minibuffer must be sized so that it is
883 * a multiple of blocksize -- ipoolloadblock assumes
884 * that each minibuf starts aligned on a blocksize
887 mbuf = MKN(Minibuf, nminibuf);
888 mbufbuckets = (bufbuckets+nminibuf-1)/nminibuf;
889 while(mbufbuckets*blocksize % bufsize)
891 for(i=0; i<nbuf; i++){
893 * Set up descriptors.
897 offset = buf[i].boffset;
898 memset(mbuf, 0, nminibuf*sizeof(mbuf[0]));
899 for(j=0; j<nminibuf; j++){
901 mb->boffset = offset;
902 offset += mbufbuckets*blocksize;
903 if(offset > buf[i].eoffset)
904 offset = buf[i].eoffset;
905 mb->eoffset = offset;
906 mb->roffset = mb->boffset;
907 mb->woffset = mb->boffset;
908 mb->nentry = epbuf * (mb->eoffset - mb->boffset)/bufsize;
909 if(mb->nentry > buf[i].nentry)
910 mb->nentry = buf[i].nentry;
911 buf[i].nentry -= mb->nentry;
915 fprint(2, "isectproc bug2: n=%ud nn=%ud (i=%d)\n", n, nn, i);;
919 if(!dumb && nminibuf == 1){
920 mbuf[0].nwentry = mbuf[0].nentry;
921 mbuf[0].woffset = buf[i].woffset;
923 ipool = mkipool(is, mbuf, nminibuf, mbufbuckets, bufsize);
924 ipool->buck0 = bufbuckets*i;
925 for(j=0; j<nminibuf; j++){
927 while(mb->nentry > 0){
928 if(ipool->nfree < epbuf){
930 /* ipoolflush1 might change mb->nentry */
933 assert(ipool->nfree >= epbuf);
934 ipoolloadblock(ipool, mb);
939 for(j=0; j<nminibuf; j++)
940 nn += mbuf[j].nwentry;
942 fprint(2, "isectproc bug3: n=%ud nn=%ud (i=%d)\n", n, nn, i);
950 for(j=0; j<nminibuf; j++)
951 if(space < mbuf[j].woffset - mbuf[j].boffset)
952 space = mbuf[j].woffset - mbuf[j].boffset;
954 data = emalloc(space);
955 for(j=0; j<nminibuf; j++){
957 sortminibuffer(is, mb, data, space, bufsize);
962 sendp(isectdonechan, is);