]> git.lizzy.rs Git - plan9front.git/blob - sys/src/cmd/venti/srv/buildindex.c
97ec3855b3421ec37008c9b74c0c84f2b31793f1
[plan9front.git] / sys / src / cmd / venti / srv / buildindex.c
1 /*
2  * Rebuild the index from scratch, in place.
3  */
4 #include "stdinc.h"
5 #include "dat.h"
6 #include "fns.h"
7
8 enum
9 {
10         MinBufSize = 64*1024,
11         MaxBufSize = 4*1024*1024,
12 };
13
14 int             dumb;
15 int             errors;
16 char            **isect;
17 int             nisect;
18 int             bloom;
19 int             zero;
20
21 u32int  isectmem;
22 u64int  totalbuckets;
23 u64int  totalclumps;
24 Channel *arenadonechan;
25 Channel *isectdonechan;
26 Index   *ix;
27
28 u64int  arenaentries;
29 u64int  skipentries;
30 u64int  indexentries;
31
32 static int shouldprocess(ISect*);
33 static void     isectproc(void*);
34 static void     arenapartproc(void*);
35
36 void
37 usage(void)
38 {
39         fprint(2, "usage: buildindex [-b] [-i isect]... [-M imem] venti.conf\n");
40         threadexitsall("usage");
41 }
42
43 void
44 threadmain(int argc, char *argv[])
45 {
46         int fd, i, napart, nfinish, maxdisks;
47         u32int bcmem, imem;
48         Config conf;
49         Part *p;
50         
51         maxdisks = 100000;
52         ventifmtinstall();
53         imem = 256*1024*1024;
54         ARGBEGIN{
55         case 'b':
56                 bloom = 1;
57                 break;
58         case 'd':       /* debugging - make sure to run all 3 passes */
59                 dumb = 1;
60                 break;
61         case 'i':
62                 isect = vtrealloc(isect, (nisect+1)*sizeof(isect[0]));
63                 isect[nisect++] = EARGF(usage());
64                 break;
65         case 'M':
66                 imem = unittoull(EARGF(usage()));
67                 break;
68         case 'm':       /* temporary - might go away */
69                 maxdisks = atoi(EARGF(usage()));
70                 break;
71         default:
72                 usage();
73                 break;
74         }ARGEND
75
76         if(argc != 1)
77                 usage();
78
79         if(initventi(argv[0], &conf) < 0)
80                 sysfatal("can't init venti: %r");
81         ix = mainindex;
82         if(nisect == 0 && ix->bloom)
83                 bloom = 1;
84         if(bloom && ix->bloom && resetbloom(ix->bloom) < 0)
85                 sysfatal("loadbloom: %r");
86         if(bloom && !ix->bloom)
87                 sysfatal("-b specified but no bloom filter");
88         if(!bloom)
89                 ix->bloom = nil;
90         isectmem = imem/ix->nsects;
91
92         /*
93          * safety first - only need read access to arenas
94          */
95         p = nil;
96         for(i=0; i<ix->narenas; i++){
97                 if(ix->arenas[i]->part != p){
98                         p = ix->arenas[i]->part;
99                         if((fd = open(p->filename, OREAD)) < 0)
100                                 sysfatal("cannot reopen %s: %r", p->filename);
101                         dup(fd, p->fd);
102                         close(fd);
103                 }
104         }
105         
106         /*
107          * need a block for every arena
108          */
109         bcmem = maxblocksize * (mainindex->narenas + 16);
110         if(0) fprint(2, "initialize %d bytes of disk block cache\n", bcmem);
111         initdcache(bcmem);
112         
113         totalclumps = 0;
114         for(i=0; i<ix->narenas; i++)
115                 totalclumps += ix->arenas[i]->diskstats.clumps;
116         
117         totalbuckets = 0;
118         for(i=0; i<ix->nsects; i++)
119                 totalbuckets += ix->sects[i]->blocks;
120         fprint(2, "%,lld clumps, %,lld buckets\n", totalclumps, totalbuckets);
121
122         /* start index procs */
123         fprint(2, "%T read index\n");
124         isectdonechan = chancreate(sizeof(void*), 0);
125         for(i=0; i<ix->nsects; i++){
126                 if(shouldprocess(ix->sects[i])){
127                         ix->sects[i]->writechan = chancreate(sizeof(IEntry), 0);
128                         vtproc(isectproc, ix->sects[i]);
129                 }
130         }
131         
132         for(i=0; i<nisect; i++)
133                 if(isect[i])
134                         fprint(2, "warning: did not find index section %s\n", isect[i]);
135
136         /* start arena procs */
137         p = nil;
138         napart = 0;
139         nfinish = 0;
140         arenadonechan = chancreate(sizeof(void*), 0);
141         for(i=0; i<ix->narenas; i++){
142                 if(ix->arenas[i]->part != p){
143                         p = ix->arenas[i]->part;
144                         vtproc(arenapartproc, p);
145                         if(++napart >= maxdisks){
146                                 recvp(arenadonechan);
147                                 nfinish++;
148                         }
149                 }
150         }
151
152         /* wait for arena procs to finish */
153         for(nfinish=0; nfinish<napart; nfinish++)
154                 recvp(arenadonechan);
155
156         /* tell index procs to finish */
157         for(i=0; i<ix->nsects; i++)
158                 if(ix->sects[i]->writechan)
159                         send(ix->sects[i]->writechan, nil);
160
161         /* wait for index procs to finish */
162         for(i=0; i<ix->nsects; i++)
163                 if(ix->sects[i]->writechan)
164                         recvp(isectdonechan);
165
166         if(ix->bloom && writebloom(ix->bloom) < 0)
167                 fprint(2, "writing bloom filter: %r\n");
168
169         fprint(2, "%T done arenaentries=%,lld indexed=%,lld (nskip=%,lld)\n", 
170                 arenaentries, indexentries, skipentries);
171         threadexitsall(nil);
172 }
173
174 static int
175 shouldprocess(ISect *is)
176 {
177         int i;
178         
179         if(nisect == 0)
180                 return 1;
181
182         for(i=0; i<nisect; i++)
183                 if(isect[i] && strcmp(isect[i], is->name) == 0){
184                         isect[i] = nil;
185                         return 1;
186                 }
187         return 0;
188 }
189
190 static void
191 add(u64int *a, u64int n)
192 {
193         static Lock l;
194         
195         lock(&l);
196         *a += n;
197         unlock(&l);
198 }
199
200 /*
201  * Read through an arena partition and send each of its IEntries
202  * to the appropriate index section.  When finished, send on
203  * arenadonechan.
204  */
205 enum
206 {
207         ClumpChunks = 32*1024,
208 };
209 static void
210 arenapartproc(void *v)
211 {
212         int i, j, n, nskip, x;
213         u32int clump;
214         u64int addr, tot;
215         Arena *a;
216         ClumpInfo *ci, *cis;
217         IEntry ie;
218         Part *p;
219         
220         p = v;
221         threadsetname("arenaproc %s", p->name);
222
223         nskip = 0;
224         tot = 0;
225         cis = MKN(ClumpInfo, ClumpChunks);
226         for(i=0; i<ix->narenas; i++){
227                 a = ix->arenas[i];
228                 if(a->part != p)
229                         continue;
230                 if(a->memstats.clumps)
231                         fprint(2, "%T arena %s: %d entries\n", 
232                                 a->name, a->memstats.clumps);
233                 /*
234                  * Running the loop backwards accesses the 
235                  * clump info blocks forwards, since they are
236                  * stored in reverse order at the end of the arena.
237                  * This speeds things slightly.
238                  */
239                 addr = ix->amap[i].start + a->memstats.used;
240                 for(clump=a->memstats.clumps; clump > 0; clump-=n){
241                         n = ClumpChunks;
242                         if(n > clump)
243                                 n = clump;
244                         if(readclumpinfos(a, clump-n, cis, n) != n){
245                                 fprint(2, "%T arena %s: directory read: %r\n", a->name);
246                                 errors = 1;
247                                 break;
248                         }
249                         for(j=n-1; j>=0; j--){
250                                 ci = &cis[j];
251                                 ie.ia.type = ci->type;
252                                 ie.ia.size = ci->uncsize;
253                                 addr -= ci->size + ClumpSize;
254                                 ie.ia.addr = addr;
255                                 ie.ia.blocks = (ci->size + ClumpSize + (1<<ABlockLog)-1) >> ABlockLog;
256                                 scorecp(ie.score, ci->score);
257                                 if(ci->type == VtCorruptType)
258                                         nskip++;
259                                 else{
260                                         tot++;
261                                         x = indexsect(ix, ie.score);
262                                         assert(0 <= x && x < ix->nsects);
263                                         if(ix->sects[x]->writechan)
264                                                 send(ix->sects[x]->writechan, &ie);
265                                         if(ix->bloom)
266                                                 markbloomfilter(ix->bloom, ie.score);
267                                 }
268                         }
269                 }
270                 if(addr != ix->amap[i].start)
271                         fprint(2, "%T arena %s: clump miscalculation %lld != %lld\n", a->name, addr, ix->amap[i].start);
272         }
273         add(&arenaentries, tot);
274         add(&skipentries, nskip);
275         sendp(arenadonechan, p);
276 }
277
278 /*
279  * Convert score into relative bucket number in isect.
280  * Can pass a packed ientry instead of score - score is first.
281  */
282 static u32int
283 score2bucket(ISect *is, uchar *score)
284 {
285         u32int b;
286         
287         b = hashbits(score, 32)/ix->div;
288         if(b < is->start || b >= is->stop){
289                 fprint(2, "score2bucket: score=%V div=%d b=%ud start=%ud stop=%ud\n",
290                         score, ix->div, b, is->start, is->stop);
291         }
292         assert(is->start <= b && b < is->stop);
293         return b - is->start;
294 }
295
296 /*
297  * Convert offset in index section to bucket number.
298  */
299 static u32int
300 offset2bucket(ISect *is, u64int offset)
301 {
302         u32int b;
303         
304         assert(is->blockbase <= offset);
305         offset -= is->blockbase;
306         b = offset/is->blocksize;
307         assert(b < is->stop-is->start);
308         return b;
309 }
310
311 /*
312  * Convert bucket number to offset.
313  */
314 static u64int
315 bucket2offset(ISect *is, u32int b)
316 {
317         assert(b <= is->stop-is->start);
318         return is->blockbase + (u64int)b*is->blocksize;
319 }
320
321 /* 
322  * IEntry buffers to hold initial round of spraying.
323  */
324 typedef struct Buf Buf;
325 struct Buf
326 {
327         Part *part;                     /* partition being written */
328         uchar *bp;              /* current block */
329         uchar *ep;              /* end of block */
330         uchar *wp;              /* write position in block */
331         u64int boffset;         /* start offset */
332         u64int woffset;         /* next write offset */
333         u64int eoffset;         /* end offset */
334         u32int nentry;          /* number of entries written */
335 };
336
337 static void
338 bflush(Buf *buf)
339 {
340         u32int bufsize;
341         
342         if(buf->woffset >= buf->eoffset)
343                 sysfatal("buf index chunk overflow - need bigger index");
344         bufsize = buf->ep - buf->bp;
345         if(writepart(buf->part, buf->woffset, buf->bp, bufsize) < 0){
346                 fprint(2, "write %s: %r\n", buf->part->name);
347                 errors = 1;
348         }
349         buf->woffset += bufsize;
350         memset(buf->bp, 0, bufsize);
351         buf->wp = buf->bp;
352 }
353
354 static void
355 bwrite(Buf *buf, IEntry *ie)
356 {
357         if(buf->wp+IEntrySize > buf->ep)
358                 bflush(buf);
359         assert(buf->bp <= buf->wp && buf->wp < buf->ep);
360         packientry(ie, buf->wp);
361         buf->wp += IEntrySize;
362         assert(buf->bp <= buf->wp && buf->wp <= buf->ep);
363         buf->nentry++;
364 }
365
366 /*
367  * Minibuffer.  In-memory data structure holds our place
368  * in the buffer but has no block data.  We are writing and
369  * reading the minibuffers at the same time.  (Careful!)
370  */
371 typedef struct Minibuf Minibuf;
372 struct Minibuf
373 {
374         u64int boffset;         /* start offset */
375         u64int roffset;         /* read offset */
376         u64int woffset;         /* write offset */
377         u64int eoffset;         /* end offset */
378         u32int nentry;          /* # entries left to read */
379         u32int nwentry; /* # entries written */
380 };
381
382 /*
383  * Index entry pool.  Used when trying to shuffle around 
384  * the entries in a big buffer into the corresponding M minibuffers.
385  * Sized to hold M*EntriesPerBlock entries, so that there will always
386  * either be room in the pool for another block worth of entries
387  * or there will be an entire block worth of sorted entries to 
388  * write out.
389  */
390 typedef struct IEntryLink IEntryLink;
391 typedef struct IPool IPool;
392
393 struct IEntryLink
394 {
395         uchar ie[IEntrySize];           /* raw IEntry */
396         IEntryLink *next;               /* next in chain */
397 };
398
399 struct IPool
400 {
401         ISect *isect;
402         u32int buck0;                   /* first bucket in pool */
403         u32int mbufbuckets;     /* buckets per minibuf */
404         IEntryLink *entry;              /* all IEntryLinks */
405         u32int nentry;                  /* # of IEntryLinks */
406         IEntryLink *free;               /* free list */
407         u32int nfree;                   /* # on free list */
408         Minibuf *mbuf;                  /* all minibufs */
409         u32int nmbuf;                   /* # of minibufs */
410         IEntryLink **mlist;             /* lists for each minibuf */
411         u32int *mcount;         /* # on each mlist[i] */
412         u32int bufsize;                 /* block buffer size */
413         uchar *rbuf;                    /* read buffer */
414         uchar *wbuf;                    /* write buffer */
415         u32int epbuf;                   /* entries per block buffer */
416 };
417
418 /*
419 static int
420 countsokay(IPool *p)
421 {
422         int i;
423         u64int n;
424         
425         n = 0;
426         for(i=0; i<p->nmbuf; i++)
427                 n += p->mcount[i];
428         n += p->nfree;
429         if(n != p->nentry){
430                 print("free %ud:", p->nfree);
431                 for(i=0; i<p->nmbuf; i++)
432                         print(" %ud", p->mcount[i]);
433                 print(" = %lld nentry: %ud\n", n, p->nentry);
434         }
435         return n == p->nentry;
436 }
437 */
438
439 static IPool*
440 mkipool(ISect *isect, Minibuf *mbuf, u32int nmbuf, 
441         u32int mbufbuckets, u32int bufsize)
442 {
443         u32int i, nentry;
444         uchar *data;
445         IPool *p;
446         IEntryLink *l;
447         
448         nentry = (nmbuf+1)*bufsize / IEntrySize;
449         p = ezmalloc(sizeof(IPool)
450                 +nentry*sizeof(IEntry)
451                 +nmbuf*sizeof(IEntryLink*)
452                 +nmbuf*sizeof(u32int)
453                 +3*bufsize);
454         
455         p->isect = isect;
456         p->mbufbuckets = mbufbuckets;
457         p->bufsize = bufsize;
458         p->entry = (IEntryLink*)(p+1);
459         p->nentry = nentry;
460         p->mlist = (IEntryLink**)(p->entry+nentry);
461         p->mcount = (u32int*)(p->mlist+nmbuf);
462         p->nmbuf = nmbuf;
463         p->mbuf = mbuf;
464         data = (uchar*)(p->mcount+nmbuf);
465         data += bufsize - (uintptr)data%bufsize;
466         p->rbuf = data;
467         p->wbuf = data+bufsize;
468         p->epbuf = bufsize/IEntrySize;
469
470         for(i=0; i<p->nentry; i++){
471                 l = &p->entry[i];
472                 l->next = p->free;
473                 p->free = l;
474                 p->nfree++;
475         }
476         return p;
477 }
478
479 /* 
480  * Add the index entry ie to the pool p.
481  * Caller must know there is room.
482  */
483 static void
484 ipoolinsert(IPool *p, uchar *ie)
485 {
486         u32int buck, x;
487         IEntryLink *l;
488
489         assert(p->free != nil);
490
491         buck = score2bucket(p->isect, ie);
492         x = (buck-p->buck0) / p->mbufbuckets;
493         if(x >= p->nmbuf){
494                 fprint(2, "buck=%ud mbufbucket=%ud x=%ud\n",
495                         buck, p->mbufbuckets, x);
496         }
497         assert(x < p->nmbuf);
498
499         l = p->free;
500         p->free = l->next;
501         p->nfree--;
502         memmove(l->ie, ie, IEntrySize);
503         l->next = p->mlist[x];
504         p->mlist[x] = l;
505         p->mcount[x]++;
506 }       
507
508 /*
509  * Pull out a block containing as many
510  * entries as possible for minibuffer x.
511  */
512 static u32int
513 ipoolgetbuf(IPool *p, u32int x)
514 {
515         uchar *bp, *ep, *wp;
516         IEntryLink *l;
517         u32int n;
518         
519         bp = p->wbuf;
520         ep = p->wbuf + p->bufsize;
521         n = 0;
522         assert(x < p->nmbuf);
523         for(wp=bp; wp+IEntrySize<=ep && p->mlist[x]; wp+=IEntrySize){
524                 l = p->mlist[x];
525                 p->mlist[x] = l->next;
526                 p->mcount[x]--;
527                 memmove(wp, l->ie, IEntrySize);
528                 l->next = p->free;
529                 p->free = l;
530                 p->nfree++;
531                 n++;
532         }
533         memset(wp, 0, ep-wp);
534         return n;
535 }
536
537 /*
538  * Read a block worth of entries from the minibuf
539  * into the pool.  Caller must know there is room.
540  */
541 static void
542 ipoolloadblock(IPool *p, Minibuf *mb)
543 {
544         u32int i, n;
545         
546         assert(mb->nentry > 0);
547         assert(mb->roffset >= mb->woffset);
548         assert(mb->roffset < mb->eoffset);
549
550         n = p->bufsize/IEntrySize;
551         if(n > mb->nentry)
552                 n = mb->nentry;
553         if(readpart(p->isect->part, mb->roffset, p->rbuf, p->bufsize) < 0)
554                 fprint(2, "readpart %s: %r\n", p->isect->part->name);
555         else{
556                 for(i=0; i<n; i++)
557                         ipoolinsert(p, p->rbuf+i*IEntrySize);
558         }
559         mb->nentry -= n;
560         mb->roffset += p->bufsize;
561 }
562
563 /*
564  * Write out a block worth of entries to minibuffer x.
565  * If necessary, pick up the data there before overwriting it.
566  */
567 static void
568 ipoolflush0(IPool *pool, u32int x)
569 {
570         u32int bufsize;
571         Minibuf *mb;
572         
573         mb = pool->mbuf+x;
574         bufsize = pool->bufsize;
575         mb->nwentry += ipoolgetbuf(pool, x);
576         if(mb->nentry > 0 && mb->roffset == mb->woffset){
577                 assert(pool->nfree >= pool->bufsize/IEntrySize);
578                 /*
579                  * There will be room in the pool -- we just 
580                  * removed a block worth.
581                  */
582                 ipoolloadblock(pool, mb);
583         }
584         if(writepart(pool->isect->part, mb->woffset, pool->wbuf, bufsize) < 0)
585                 fprint(2, "writepart %s: %r\n", pool->isect->part->name);
586         mb->woffset += bufsize;
587 }
588
589 /*
590  * Write out some full block of entries.
591  * (There must be one -- the pool is almost full!)
592  */
593 static void
594 ipoolflush1(IPool *pool)
595 {
596         u32int i;
597
598         assert(pool->nfree <= pool->epbuf);
599
600         for(i=0; i<pool->nmbuf; i++){
601                 if(pool->mcount[i] >= pool->epbuf){
602                         ipoolflush0(pool, i);
603                         return;
604                 }
605         }
606         /* can't be reached - someone must be full */
607         sysfatal("ipoolflush1");
608 }
609
610 /*
611  * Flush all the entries in the pool out to disk.
612  * Nothing more to read from disk.
613  */
614 static void
615 ipoolflush(IPool *pool)
616 {
617         u32int i;
618         
619         for(i=0; i<pool->nmbuf; i++)
620                 while(pool->mlist[i])
621                         ipoolflush0(pool, i);
622         assert(pool->nfree == pool->nentry);
623 }
624
625 /*
626  * Third pass.  Pick up each minibuffer from disk into
627  * memory and then write out the buckets.
628  */
629
630 /*
631  * Compare two packed index entries.  
632  * Usual ordering except break ties by putting higher
633  * index addresses first (assumes have duplicates
634  * due to corruption in the lower addresses).
635  */
636 static int
637 ientrycmpaddr(const void *va, const void *vb)
638 {
639         int i;
640         uchar *a, *b;
641         
642         a = (uchar*)va;
643         b = (uchar*)vb;
644         i = ientrycmp(a, b);
645         if(i)
646                 return i;
647         return -memcmp(a+IEntryAddrOff, b+IEntryAddrOff, 8);
648 }
649
650 static void
651 zerorange(Part *p, u64int o, u64int e)
652 {
653         static uchar zero[MaxIoSize];
654         u32int n;
655         
656         for(; o<e; o+=n){
657                 n = sizeof zero;
658                 if(o+n > e)
659                         n = e-o;
660                 if(writepart(p, o, zero, n) < 0)
661                         fprint(2, "writepart %s: %r\n", p->name);
662         }
663 }
664
665 /*
666  * Load a minibuffer into memory and write out the 
667  * corresponding buckets.
668  */
669 static void
670 sortminibuffer(ISect *is, Minibuf *mb, uchar *buf, u32int nbuf, u32int bufsize)
671 {
672         uchar *buckdata, *p, *q, *ep;
673         u32int b, lastb, memsize, n;
674         u64int o;
675         IBucket ib;
676         Part *part;
677         
678         part = is->part;
679         buckdata = emalloc(is->blocksize);
680         
681         if(mb->nwentry == 0)
682                 return;
683
684         /*
685          * read entire buffer.
686          */
687         assert(mb->nwentry*IEntrySize <= mb->woffset-mb->boffset);
688         assert(mb->woffset-mb->boffset <= nbuf);
689         if(readpart(part, mb->boffset, buf, mb->woffset-mb->boffset) < 0){
690                 fprint(2, "readpart %s: %r\n", part->name);
691                 errors = 1;
692                 return;
693         }
694         assert(*(uint*)buf != 0xa5a5a5a5);
695         
696         /*
697          * remove fragmentation due to IEntrySize
698          * not evenly dividing Bufsize
699          */
700         memsize = (bufsize/IEntrySize)*IEntrySize;
701         for(o=mb->boffset, p=q=buf; o<mb->woffset; o+=bufsize){
702                 memmove(p, q, memsize);
703                 p += memsize;
704                 q += bufsize;
705         }
706         ep = buf + mb->nwentry*IEntrySize;
707         assert(ep <= buf+nbuf);
708
709         /* 
710          * sort entries
711          */
712         qsort(buf, mb->nwentry, IEntrySize, ientrycmpaddr);
713
714         /*
715          * write buckets out
716          */
717         n = 0;
718         lastb = offset2bucket(is, mb->boffset);
719         for(p=buf; p<ep; p=q){
720                 b = score2bucket(is, p);
721                 for(q=p; q<ep && score2bucket(is, q)==b; q+=IEntrySize)
722                         ;
723                 if(lastb+1 < b && zero)
724                         zerorange(part, bucket2offset(is, lastb+1), bucket2offset(is, b));
725                 if(IBucketSize+(q-p) > is->blocksize)
726                         sysfatal("bucket overflow - make index bigger");
727                 memmove(buckdata+IBucketSize, p, q-p);
728                 ib.n = (q-p)/IEntrySize;
729                 n += ib.n;
730                 packibucket(&ib, buckdata, is->bucketmagic);
731                 if(writepart(part, bucket2offset(is, b), buckdata, is->blocksize) < 0)
732                         fprint(2, "write %s: %r\n", part->name);
733                 lastb = b;
734         }
735         if(lastb+1 < is->stop-is->start && zero)
736                 zerorange(part, bucket2offset(is, lastb+1), bucket2offset(is, is->stop - is->start));
737
738         if(n != mb->nwentry)
739                 fprint(2, "sortminibuffer bug: n=%ud nwentry=%ud have=%zd\n", n, mb->nwentry, (ep-buf)/IEntrySize);
740
741         free(buckdata);
742 }
743
744 static void
745 isectproc(void *v)
746 {
747         u32int buck, bufbuckets, bufsize, epbuf, i, j;
748         u32int mbufbuckets, n, nbucket, nn, space;
749         u32int nbuf, nminibuf, xminiclump, prod;
750         u64int blocksize, offset, xclump;
751         uchar *data, *p;
752         Buf *buf;
753         IEntry ie;
754         IPool *ipool;
755         ISect *is;
756         Minibuf *mbuf, *mb;
757         
758         is = v;
759         blocksize = is->blocksize;
760         nbucket = is->stop - is->start;
761
762         /*
763          * Three passes:
764          *      pass 1 - write index entries from arenas into 
765          *              large sequential sections on index disk.
766          *              requires nbuf * bufsize memory.
767          *
768          *      pass 2 - split each section into minibufs.
769          *              requires nminibuf * bufsize memory.
770          *
771          *      pass 3 - read each minibuf into memory and
772          *              write buckets out. 
773          *              requires entries/minibuf * IEntrySize memory.
774          * 
775          * The larger we set bufsize the less seeking hurts us.
776          * 
777          * The fewer sections and minibufs we have, the less
778          * seeking hurts us.
779          * 
780          * The fewer sections and minibufs we have, the 
781          * more entries we end up with in each minibuf
782          * at the end.  
783          *
784          * Shoot for using half our memory to hold each
785          * minibuf.  The chance of a random distribution 
786          * getting off by 2x is quite low.  
787          *
788          * Once that is decided, figure out the smallest 
789          * nminibuf and nsection/biggest bufsize we can use
790          * and still fit in the memory constraints.
791          */
792         
793         /* expected number of clump index entries we'll see */
794         xclump = nbucket * (double)totalclumps/totalbuckets;
795         
796         /* number of clumps we want to see in a minibuf */
797         xminiclump = isectmem/2/IEntrySize;
798         
799         /* total number of minibufs we need */
800         prod = (xclump+xminiclump-1) / xminiclump;
801         
802         /* if possible, skip second pass */
803         if(!dumb && prod*MinBufSize < isectmem){
804                 nbuf = prod;
805                 nminibuf = 1;
806         }else{
807                 /* otherwise use nsection = sqrt(nmini) */
808                 for(nbuf=1; nbuf*nbuf<prod; nbuf++)
809                         ;
810                 if(nbuf*MinBufSize > isectmem)
811                         sysfatal("not enough memory");
812                 nminibuf = nbuf;
813         }
814         if (nbuf == 0) {
815                 fprint(2, "%s: brand-new index, no work to do\n", argv0);
816                 exits(0);
817         }
818
819         /* size buffer to use extra memory */
820         bufsize = MinBufSize;
821         while(bufsize*2*nbuf <= isectmem && bufsize < MaxBufSize)
822                 bufsize *= 2;
823         data = emalloc(nbuf*bufsize);
824         epbuf = bufsize/IEntrySize;
825         fprint(2, "%T %s: %,ud buckets, %,ud groups, %,ud minigroups, %,ud buffer\n",
826                 is->part->name, nbucket, nbuf, nminibuf, bufsize);
827         /*
828          * Accept index entries from arena procs.
829          */
830         buf = MKNZ(Buf, nbuf);
831         p = data;
832         offset = is->blockbase;
833         bufbuckets = (nbucket+nbuf-1)/nbuf;
834         for(i=0; i<nbuf; i++){
835                 buf[i].part = is->part;
836                 buf[i].bp = p;
837                 buf[i].wp = p;
838                 p += bufsize;
839                 buf[i].ep = p;
840                 buf[i].boffset = offset;
841                 buf[i].woffset = offset;
842                 if(i < nbuf-1){
843                         offset += bufbuckets*blocksize;
844                         buf[i].eoffset = offset;
845                 }else{
846                         offset = is->blockbase + nbucket*blocksize;
847                         buf[i].eoffset = offset;
848                 }
849         }
850         assert(p == data+nbuf*bufsize);
851
852         n = 0;
853         while(recv(is->writechan, &ie) == 1){
854                 if(ie.ia.addr == 0)
855                         break;
856                 buck = score2bucket(is, ie.score);
857                 i = buck/bufbuckets;
858                 assert(i < nbuf);
859                 bwrite(&buf[i], &ie);
860                 n++;
861         }
862         add(&indexentries, n);
863         
864         nn = 0;
865         for(i=0; i<nbuf; i++){
866                 bflush(&buf[i]);
867                 buf[i].bp = nil;
868                 buf[i].ep = nil;
869                 buf[i].wp = nil;
870                 nn += buf[i].nentry;
871         }
872         if(n != nn)
873                 fprint(2, "isectproc bug: n=%ud nn=%ud\n", n, nn);
874                 
875         free(data);
876
877         fprint(2, "%T %s: reordering\n", is->part->name);
878         
879         /*
880          * Rearrange entries into minibuffers and then
881          * split each minibuffer into buckets.
882          * The minibuffer must be sized so that it is 
883          * a multiple of blocksize -- ipoolloadblock assumes
884          * that each minibuf starts aligned on a blocksize
885          * boundary.
886          */
887         mbuf = MKN(Minibuf, nminibuf);
888         mbufbuckets = (bufbuckets+nminibuf-1)/nminibuf;
889         while(mbufbuckets*blocksize % bufsize)
890                 mbufbuckets++;
891         for(i=0; i<nbuf; i++){
892                 /*
893                  * Set up descriptors.
894                  */
895                 n = buf[i].nentry;
896                 nn = 0;
897                 offset = buf[i].boffset;
898                 memset(mbuf, 0, nminibuf*sizeof(mbuf[0]));
899                 for(j=0; j<nminibuf; j++){
900                         mb = &mbuf[j];
901                         mb->boffset = offset;
902                         offset += mbufbuckets*blocksize;
903                         if(offset > buf[i].eoffset)
904                                 offset = buf[i].eoffset;
905                         mb->eoffset = offset;
906                         mb->roffset = mb->boffset;
907                         mb->woffset = mb->boffset;
908                         mb->nentry = epbuf * (mb->eoffset - mb->boffset)/bufsize;
909                         if(mb->nentry > buf[i].nentry)
910                                 mb->nentry = buf[i].nentry;
911                         buf[i].nentry -= mb->nentry;
912                         nn += mb->nentry;
913                 }
914                 if(n != nn)
915                         fprint(2, "isectproc bug2: n=%ud nn=%ud (i=%d)\n", n, nn, i);;
916                 /*
917                  * Rearrange.
918                  */
919                 if(!dumb && nminibuf == 1){
920                         mbuf[0].nwentry = mbuf[0].nentry;
921                         mbuf[0].woffset = buf[i].woffset;
922                 }else{
923                         ipool = mkipool(is, mbuf, nminibuf, mbufbuckets, bufsize);
924                         ipool->buck0 = bufbuckets*i;
925                         for(j=0; j<nminibuf; j++){
926                                 mb = &mbuf[j];
927                                 while(mb->nentry > 0){
928                                         if(ipool->nfree < epbuf){
929                                                 ipoolflush1(ipool);
930                                                 /* ipoolflush1 might change mb->nentry */       
931                                                 continue;
932                                         }
933                                         assert(ipool->nfree >= epbuf);
934                                         ipoolloadblock(ipool, mb);
935                                 }
936                         }
937                         ipoolflush(ipool);
938                         nn = 0;
939                         for(j=0; j<nminibuf; j++)
940                                 nn += mbuf[j].nwentry;
941                         if(n != nn)
942                                 fprint(2, "isectproc bug3: n=%ud nn=%ud (i=%d)\n", n, nn, i);
943                         free(ipool);
944                 }
945
946                 /*
947                  * Make buckets.
948                  */
949                 space = 0;
950                 for(j=0; j<nminibuf; j++)
951                         if(space < mbuf[j].woffset - mbuf[j].boffset)
952                                 space = mbuf[j].woffset - mbuf[j].boffset;
953
954                 data = emalloc(space);
955                 for(j=0; j<nminibuf; j++){
956                         mb = &mbuf[j];
957                         sortminibuffer(is, mb, data, space, bufsize);
958                 }
959                 free(data);
960         }
961                 
962         sendp(isectdonechan, is);
963 }
964
965
966