2 * Write the dirty icache entries to disk. Random seeks are
3 * so expensive that it makes sense to wait until we have
4 * a lot and then just make a sequential pass over the disk.
10 static void icachewriteproc(void*);
11 static void icachewritecoord(void*);
12 static IEntry *iesort(IEntry*);
14 int icachesleeptime = 1000; /* milliseconds */
15 int minicachesleeptime = 0;
22 typedef struct IWrite IWrite;
37 initround(&iwrite.round, "icache", 120*60*1000);
39 for(i=0; i<ix->nsects; i++){
40 ix->sects[i]->writechan = chancreate(sizeof(ulong), 1);
41 ix->sects[i]->writedonechan = chancreate(sizeof(ulong), 1);
42 vtproc(icachewriteproc, ix->sects[i]);
44 vtproc(icachewritecoord, nil);
45 vtproc(delaykickroundproc, &iwrite.round);
49 ie2diskaddr(Index *ix, ISect *is, IEntry *ie)
53 bucket = hashbits(ie->score, 32)/ix->div;
54 addr = is->blockbase + ((bucket - is->start) << is->blocklog);
59 nextchunk(Index *ix, ISect *is, IEntry **pie, u64int *paddr, uint *pnbuf)
64 IEntry *iefirst, *ie, **l;
66 bsize = 1<<is->blocklog;
68 addr = ie2diskaddr(ix, is, iefirst);
70 for(l = &iefirst->nextdirty; (ie = *l) != nil; l = &(*l)->nextdirty){
71 naddr = ie2diskaddr(ix, is, ie);
72 if(naddr - addr >= Bufsize)
86 icachewritesect(Index *ix, ISect *is, u8int *buf)
88 int err, i, werr, h, bsize, t;
94 IEntry *ie, *iedirty, **l, *chunk;
96 lo = is->start * ix->div;
97 if(TWID32/ix->div < is->stop)
100 hi = is->stop * ix->div - 1;
102 trace(TraceProc, "icachewritesect enter %ud %ud %llud",
103 lo, hi, iwrite.as.aa);
105 iedirty = icachedirty(lo, hi, iwrite.as.aa);
106 iedirty = iesort(iedirty);
107 bsize = 1 << is->blocklog;
112 while((t = icachesleeptime) == SleepForever){
116 if(t < minicachesleeptime)
117 t = minicachesleeptime;
120 trace(TraceProc, "icachewritesect nextchunk");
121 chunk = nextchunk(ix, is, &iedirty, &addr, &nbuf);
123 trace(TraceProc, "icachewritesect readpart 0x%llux+0x%ux",
125 if(readpart(is->part, addr, buf, nbuf) < 0){
126 fprint(2, "%s: part %s addr 0x%llux: icachewritesect "
127 "readpart: %r\n", argv0, is->part->name, addr);
131 trace(TraceProc, "icachewritesect updatebuf");
132 addstat(StatIsectReadBytes, nbuf);
133 addstat(StatIsectRead, 1);
135 for(l=&chunk; (ie=*l)!=nil; l=&ie->nextdirty){
137 naddr = ie2diskaddr(ix, is, ie);
139 if(off+bsize > nbuf){
140 fprint(2, "%s: whoops! addr=0x%llux nbuf=%ud "
141 "addr+nbuf=0x%llux naddr=0x%llux\n",
142 argv0, addr, nbuf, addr+nbuf, naddr);
143 assert(off+bsize <= nbuf);
145 unpackibucket(&ib, buf+off, is->bucketmagic);
146 if(okibucket(&ib, is) < 0){
147 fprint(2, "%s: bad bucket XXX\n", argv0);
150 trace(TraceProc, "icachewritesect add %V at 0x%llux",
152 h = bucklook(ie->score, ie->ia.type, ib.data, ib.n);
155 packientry(ie, &ib.data[h]);
156 }else if(ib.n < is->buckmax){
157 memmove(&ib.data[h + IEntrySize], &ib.data[h],
158 ib.n*IEntrySize - h);
160 packientry(ie, &ib.data[h]);
162 fprint(2, "%s: bucket overflow XXX\n", argv0);
172 packibucket(&ib, buf+off, is->bucketmagic);
177 trace(TraceProc, "icachewritesect writepart", addr, nbuf);
179 if(writepart(is->part, addr, buf, nbuf) < 0 || flushpart(is->part) < 0)
182 for(i=0; i<nbuf; i+=bsize){
183 if((b = _getdblock(is->part, addr+i, ORDWR, 0)) != nil){
184 memmove(b->data, buf+i, bsize);
190 fprint(2, "%s: part %s addr 0x%llux: icachewritesect "
191 "writepart: %r\n", argv0, is->part->name, addr);
196 addstat(StatIsectWriteBytes, nbuf);
197 addstat(StatIsectWrite, 1);
201 trace(TraceProc, "icachewritesect done");
206 icachewriteproc(void *v)
216 threadsetname("icachewriteproc:%s", is->part->name);
218 bsize = 1<<is->blocklog;
219 buf = vtmalloc(Bufsize+bsize);
220 buf = (u8int*)(((uintptr)buf+bsize-1)&~(uintptr)(bsize-1));
223 trace(TraceProc, "icachewriteproc recv");
224 recv(is->writechan, 0);
225 trace(TraceWork, "start");
226 ret = icachewritesect(ix, is, buf);
227 trace(TraceProc, "icachewriteproc send");
228 trace(TraceWork, "finish");
229 sendul(is->writedonechan, ret);
234 icachewritecoord(void *v)
242 threadsetname("icachewritecoord");
245 iwrite.as = icachestate();
248 trace(TraceProc, "icachewritecoord sleep");
249 waitforkick(&iwrite.round);
250 trace(TraceWork, "start");
252 if(as.arena==iwrite.as.arena && as.aa==iwrite.as.aa){
253 /* will not be able to do anything more than last flush - kick disk */
254 trace(TraceProc, "icachewritecoord kick dcache");
256 trace(TraceProc, "icachewritecoord kicked dcache");
257 goto SkipWork; /* won't do anything; don't bother rewriting bloom filter */
261 trace(TraceProc, "icachewritecoord start flush");
263 for(i=0; i<ix->nsects; i++)
264 send(ix->sects[i]->writechan, 0);
266 send(ix->bloom->writechan, 0);
269 for(i=0; i<ix->nsects; i++)
270 err |= recvul(ix->sects[i]->writedonechan);
272 err |= recvul(ix->bloom->writedonechan);
274 trace(TraceProc, "icachewritecoord donewrite err=%d", err);
276 setatailstate(&iwrite.as);
280 icacheclean(nil); /* wake up anyone waiting */
281 trace(TraceWork, "finish");
282 addstat(StatIcacheFlush, 1);
289 trace(TraceProc, "flushicache enter");
290 kickround(&iwrite.round, 1);
291 trace(TraceProc, "flushicache exit");
297 kickround(&iwrite.round, 0);
301 delaykickicache(void)
303 delaykickround(&iwrite.round);
311 IEntry *ie1, *ie2, *sorted;
313 if(ie == nil || ie->nextdirty == nil)
316 /* split the lists */
320 ie2 = ie2->nextdirty;
322 ie2 = ie2->nextdirty;
324 ie1 = ie1->nextdirty;
325 ie2 = ie2->nextdirty;
327 ie2 = ie2->nextdirty;
330 ie2 = ie1->nextdirty;
331 ie1->nextdirty = nil;
338 /* merge the lists */
344 cmp = scorecmp(ie1->score, ie2->score);
345 if(ie1==nil || (ie2 && cmp > 0)){
348 ie2 = ie2->nextdirty;
352 ie1 = ie1->nextdirty;