]> git.lizzy.rs Git - plan9front.git/blob - sys/src/cmd/venti/srv/icachewrite.c
venti: fix memory layers
[plan9front.git] / sys / src / cmd / venti / srv / icachewrite.c
1 /*
2  * Write the dirty icache entries to disk.  Random seeks are
3  * so expensive that it makes sense to wait until we have
4  * a lot and then just make a sequential pass over the disk.
5  */
6 #include "stdinc.h"
7 #include "dat.h"
8 #include "fns.h"
9
10 static void icachewriteproc(void*);
11 static void icachewritecoord(void*);
12 static IEntry *iesort(IEntry*);
13
14 int icachesleeptime = 1000;     /* milliseconds */
15 int minicachesleeptime = 0;
16
17 enum
18 {
19         Bufsize = 8*1024*1024
20 };
21
22 typedef struct IWrite IWrite;
23 struct IWrite
24 {
25         Round round;
26         AState as;
27 };
28
29 static IWrite iwrite;
30
31 void
32 initicachewrite(void)
33 {
34         int i;
35         Index *ix;
36
37         initround(&iwrite.round, "icache", 120*60*1000);
38         ix = mainindex;
39         for(i=0; i<ix->nsects; i++){
40                 ix->sects[i]->writechan = chancreate(sizeof(ulong), 1);
41                 ix->sects[i]->writedonechan = chancreate(sizeof(ulong), 1);
42                 vtproc(icachewriteproc, ix->sects[i]);
43         }
44         vtproc(icachewritecoord, nil);
45         vtproc(delaykickroundproc, &iwrite.round);
46 }
47
48 static u64int
49 ie2diskaddr(Index *ix, ISect *is, IEntry *ie)
50 {
51         u64int bucket, addr;
52
53         bucket = hashbits(ie->score, 32)/ix->div;
54         addr = is->blockbase + ((bucket - is->start) << is->blocklog);
55         return addr;
56 }
57
58 static IEntry*
59 nextchunk(Index *ix, ISect *is, IEntry **pie, u64int *paddr, uint *pnbuf)
60 {
61         u64int addr, naddr;
62         uint nbuf;
63         int bsize;
64         IEntry *iefirst, *ie, **l;
65
66         bsize = 1<<is->blocklog;
67         iefirst = *pie;
68         addr = ie2diskaddr(ix, is, iefirst);
69         nbuf = 0;
70         for(l = &iefirst->nextdirty; (ie = *l) != nil; l = &(*l)->nextdirty){
71                 naddr = ie2diskaddr(ix, is, ie);
72                 if(naddr - addr >= Bufsize)
73                         break;
74                 nbuf = naddr - addr;
75         }
76         nbuf += bsize;
77
78         *l = nil;
79         *pie = ie;
80         *paddr = addr;
81         *pnbuf = nbuf;
82         return iefirst;
83 }
84         
85 static int
86 icachewritesect(Index *ix, ISect *is, u8int *buf)
87 {
88         int err, i, werr, h, bsize, t;
89         u32int lo, hi;
90         u64int addr, naddr;
91         uint nbuf, off;
92         DBlock *b;
93         IBucket ib;
94         IEntry *ie, *iedirty, **l, *chunk;
95
96         lo = is->start * ix->div;
97         if(TWID32/ix->div < is->stop)
98                 hi = TWID32;
99         else
100                 hi = is->stop * ix->div - 1;
101
102         trace(TraceProc, "icachewritesect enter %ud %ud %llud",
103                 lo, hi, iwrite.as.aa);
104
105         iedirty = icachedirty(lo, hi, iwrite.as.aa);
106         iedirty = iesort(iedirty);
107         bsize = 1 << is->blocklog;
108         err = 0;
109
110         while(iedirty){
111                 disksched();
112                 while((t = icachesleeptime) == SleepForever){
113                         sleep(1000);
114                         disksched();
115                 }
116                 if(t < minicachesleeptime)
117                         t = minicachesleeptime;
118                 if(t > 0)
119                         sleep(t);
120                 trace(TraceProc, "icachewritesect nextchunk");
121                 chunk = nextchunk(ix, is, &iedirty, &addr, &nbuf);
122
123                 trace(TraceProc, "icachewritesect readpart 0x%llux+0x%ux",
124                         addr, nbuf);
125                 if(readpart(is->part, addr, buf, nbuf) < 0){
126                         fprint(2, "%s: part %s addr 0x%llux: icachewritesect "
127                                 "readpart: %r\n", argv0, is->part->name, addr);
128                         err  = -1;
129                         continue;
130                 }
131                 trace(TraceProc, "icachewritesect updatebuf");
132                 addstat(StatIsectReadBytes, nbuf);
133                 addstat(StatIsectRead, 1);
134
135                 for(l=&chunk; (ie=*l)!=nil; l=&ie->nextdirty){
136 again:
137                         naddr = ie2diskaddr(ix, is, ie);
138                         off = naddr - addr;
139                         if(off+bsize > nbuf){
140                                 fprint(2, "%s: whoops! addr=0x%llux nbuf=%ud "
141                                         "addr+nbuf=0x%llux naddr=0x%llux\n",
142                                         argv0, addr, nbuf, addr+nbuf, naddr);
143                                 assert(off+bsize <= nbuf);
144                         }
145                         unpackibucket(&ib, buf+off, is->bucketmagic);
146                         if(okibucket(&ib, is) < 0){
147                                 fprint(2, "%s: bad bucket XXX\n", argv0);
148                                 goto skipit;
149                         }
150                         trace(TraceProc, "icachewritesect add %V at 0x%llux",
151                                 ie->score, naddr);
152                         h = bucklook(ie->score, ie->ia.type, ib.data, ib.n);
153                         if(h & 1){
154                                 h ^= 1;
155                                 packientry(ie, &ib.data[h]);
156                         }else if(ib.n < is->buckmax){
157                                 memmove(&ib.data[h + IEntrySize], &ib.data[h],
158                                         ib.n*IEntrySize - h);
159                                 ib.n++;
160                                 packientry(ie, &ib.data[h]);
161                         }else{
162                                 fprint(2, "%s: bucket overflow XXX\n", argv0);
163 skipit:
164                                 err = -1;
165                                 *l = ie->nextdirty;
166                                 ie = *l;
167                                 if(ie)
168                                         goto again;
169                                 else
170                                         break;
171                         }
172                         packibucket(&ib, buf+off, is->bucketmagic);
173                 }
174
175                 diskaccess(1);
176
177                 trace(TraceProc, "icachewritesect writepart", addr, nbuf);
178                 werr = 0;
179                 if(writepart(is->part, addr, buf, nbuf) < 0 || flushpart(is->part) < 0)
180                         werr = -1;
181
182                 for(i=0; i<nbuf; i+=bsize){
183                         if((b = _getdblock(is->part, addr+i, ORDWR, 0)) != nil){
184                                 memmove(b->data, buf+i, bsize);
185                                 putdblock(b);
186                         }
187                 }
188
189                 if(werr < 0){
190                         fprint(2, "%s: part %s addr 0x%llux: icachewritesect "
191                                 "writepart: %r\n", argv0, is->part->name, addr);
192                         err = -1;
193                         continue;
194                 }
195                 
196                 addstat(StatIsectWriteBytes, nbuf);
197                 addstat(StatIsectWrite, 1);
198                 icacheclean(chunk);
199         }
200
201         trace(TraceProc, "icachewritesect done");
202         return err;
203 }
204
205 static void
206 icachewriteproc(void *v)
207 {
208         int ret;
209         uint bsize;
210         ISect *is;
211         Index *ix;
212         u8int *buf;
213
214         ix = mainindex;
215         is = v;
216         threadsetname("icachewriteproc:%s", is->part->name);
217
218         bsize = 1<<is->blocklog;
219         buf = vtmalloc(Bufsize+bsize);
220         buf = (u8int*)(((uintptr)buf+bsize-1)&~(uintptr)(bsize-1));
221
222         for(;;){
223                 trace(TraceProc, "icachewriteproc recv");
224                 recv(is->writechan, 0);
225                 trace(TraceWork, "start");
226                 ret = icachewritesect(ix, is, buf);
227                 trace(TraceProc, "icachewriteproc send");
228                 trace(TraceWork, "finish");
229                 sendul(is->writedonechan, ret);
230         }
231 }
232
233 static void
234 icachewritecoord(void *v)
235 {
236         int i, err;
237         Index *ix;
238         AState as;
239
240         USED(v);
241
242         threadsetname("icachewritecoord");
243
244         ix = mainindex;
245         iwrite.as = icachestate();
246
247         for(;;){
248                 trace(TraceProc, "icachewritecoord sleep");
249                 waitforkick(&iwrite.round);
250                 trace(TraceWork, "start");
251                 as = icachestate();
252                 if(as.arena==iwrite.as.arena && as.aa==iwrite.as.aa){
253                         /* will not be able to do anything more than last flush - kick disk */
254                         trace(TraceProc, "icachewritecoord kick dcache");
255                         kickdcache();
256                         trace(TraceProc, "icachewritecoord kicked dcache");
257                         goto SkipWork;  /* won't do anything; don't bother rewriting bloom filter */
258                 }
259                 iwrite.as = as;
260
261                 trace(TraceProc, "icachewritecoord start flush");
262                 if(iwrite.as.arena){
263                         for(i=0; i<ix->nsects; i++)
264                                 send(ix->sects[i]->writechan, 0);
265                         if(ix->bloom)
266                                 send(ix->bloom->writechan, 0);
267                 
268                         err = 0;
269                         for(i=0; i<ix->nsects; i++)
270                                 err |= recvul(ix->sects[i]->writedonechan);
271                         if(ix->bloom)
272                                 err |= recvul(ix->bloom->writedonechan);
273
274                         trace(TraceProc, "icachewritecoord donewrite err=%d", err);
275                         if(err == 0){
276                                 setatailstate(&iwrite.as);
277                         }
278                 }
279         SkipWork:
280                 icacheclean(nil);       /* wake up anyone waiting */
281                 trace(TraceWork, "finish");
282                 addstat(StatIcacheFlush, 1);
283         }
284 }
285
286 void
287 flushicache(void)
288 {
289         trace(TraceProc, "flushicache enter");
290         kickround(&iwrite.round, 1);
291         trace(TraceProc, "flushicache exit");
292 }
293
294 void
295 kickicache(void)
296 {
297         kickround(&iwrite.round, 0);
298 }
299
300 void
301 delaykickicache(void)
302 {
303         delaykickround(&iwrite.round);
304 }
305
306 static IEntry*
307 iesort(IEntry *ie)
308 {
309         int cmp;
310         IEntry **l;
311         IEntry *ie1, *ie2, *sorted;
312
313         if(ie == nil || ie->nextdirty == nil)
314                 return ie;
315
316         /* split the lists */
317         ie1 = ie;
318         ie2 = ie;
319         if(ie2)
320                 ie2 = ie2->nextdirty;
321         if(ie2)
322                 ie2 = ie2->nextdirty;
323         while(ie1 && ie2){
324                 ie1 = ie1->nextdirty;
325                 ie2 = ie2->nextdirty;
326                 if(ie2)
327                         ie2 = ie2->nextdirty;
328         }
329         if(ie1){
330                 ie2 = ie1->nextdirty;
331                 ie1->nextdirty = nil;
332         }
333
334         /* sort the lists */
335         ie1 = iesort(ie);
336         ie2 = iesort(ie2);
337
338         /* merge the lists */
339         sorted = nil;
340         l = &sorted;
341         cmp = 0;
342         while(ie1 || ie2){
343                 if(ie1 && ie2)
344                         cmp = scorecmp(ie1->score, ie2->score);
345                 if(ie1==nil || (ie2 && cmp > 0)){
346                         *l = ie2;
347                         l = &ie2->nextdirty;
348                         ie2 = ie2->nextdirty;
349                 }else{
350                         *l = ie1;
351                         l = &ie1->nextdirty;
352                         ie1 = ie1->nextdirty;
353                 }
354         }
355         *l = nil;
356         return sorted;
357 }
358