]> git.lizzy.rs Git - plan9front.git/blob - sys/src/cmd/ip/httpfile.c
rio, kbdfs: increase read buffer for high latency kbdfs support
[plan9front.git] / sys / src / cmd / ip / httpfile.c
1 /* contributed by 20h@r-36.net, September 2005 */
2
3 #include <u.h>
4 #include <libc.h>
5 #include <thread.h>
6 #include <fcall.h>
7 #include <9p.h>
8
9 enum
10 {
11         Blocksize = 64*1024,
12         Stacksize = 8192,
13 };
14
15 char *user, *url, *file;
16 char webconn[64];
17 int webctlfd = -1;
18
19 vlong size;
20 int usetls;
21 int debug;
22 int ncache;
23 int mcache;
24
25 void
26 usage(void)
27 {
28         fprint(2, "usage: httpfile [-Dd] [-c count] [-f file] [-m mtpt] [-s srvname] url\n");
29         exits("usage");
30 }
31
32 enum
33 {
34         Qroot,
35         Qfile,
36 };
37
38 #define PATH(type, n)           ((type)|((n)<<8))
39 #define TYPE(path)                      ((int)(path) & 0xFF)
40 #define NUM(path)                       ((uint)(path)>>8)
41
42 Channel *reqchan;
43 Channel *httpchan;
44 Channel *finishchan;
45 ulong time0;
46
47 typedef struct Block Block;
48 struct Block
49 {
50         uchar *p;
51         vlong off;
52         vlong len;
53         Block *link;
54         long lastuse;
55         Req *rq;
56         Req **erq;
57 };
58
59 typedef struct Blocklist Blocklist;
60 struct Blocklist
61 {
62         Block *first;
63         Block **end;
64 };
65
66 Blocklist cache;
67 Blocklist inprogress;
68
69 void
70 queuereq(Block *b, Req *r)
71 {
72         if(b->rq==nil)
73                 b->erq = &b->rq;
74         *b->erq = r;
75         r->aux = nil;
76         b->erq = (Req**)&r->aux;
77 }
78
79 void
80 addblock(Blocklist *l, Block *b)
81 {
82         if(debug)
83                 print("adding: %p %lld\n", b, b->off);
84
85         if(l->first == nil)
86                 l->end = &l->first;
87         b->lastuse = time(0);
88         b->link = nil;
89         *l->end = b;
90         l->end = &b->link;
91 }
92
93 void
94 delreq(Block *b, Req *r)
95 {
96         Req **l;
97
98         for(l = &b->rq; *l; l = (Req**)&(*l)->aux){
99                 if(*l == r){
100                         *l = r->aux;
101                         if(*l == nil)
102                                 b->erq = l;
103                         r->aux = nil;
104                         respond(r, "interrupted");
105                         return;
106                 }
107         }
108 }
109
110 void
111 evictblock(Blocklist *cache)
112 {
113         Block **l, **oldest, *b;
114
115         oldest = nil;
116         for(l=&cache->first; (b=*l) != nil; l=&b->link){
117                 if(b->rq != nil)        /* dont touch block when still requests queued */
118                         continue;
119                 if(oldest == nil || (*oldest)->lastuse > b->lastuse)
120                         oldest = l;
121         }
122
123         if(oldest == nil || *oldest == nil || (*oldest)->rq != nil)
124                 return;
125
126         b = *oldest;
127         if((*oldest = b->link) == nil)
128                 cache->end = oldest;
129
130         free(b->p);
131         free(b);
132         ncache--;
133 }
134
135 Block *
136 findblock(Blocklist *s, vlong off)
137 {
138         Block *b;
139
140         for(b = s->first; b != nil; b = b->link){
141                 if(off >= b->off && off < b->off + b->len){
142                         if(debug)
143                                 print("found: %lld -> %lld\n", off, b->off);
144                         b->lastuse = time(0);
145                         return b;
146                 }
147         }
148         return nil;
149 }
150
151 void
152 readfrom(Req *r, Block *b)
153 {
154         int d, n;
155
156         n = r->ifcall.count;
157         d = r->ifcall.offset - b->off;
158         if(d + n > b->len)
159                 n = b->len - d;
160         if(debug)
161                 print("Reading from: %p %d %d\n", b->p, d, n);
162         memmove(r->ofcall.data, b->p + d, n);
163         r->ofcall.count = n;
164
165         respond(r, nil);
166 }
167
168 void
169 hangupclient(Srv*)
170 {
171         if(debug)
172                 print("Hangup.\n");
173
174         threadexitsall("done");
175 }
176
177 static int
178 readfile(int fd, char *buf, int nbuf)
179 {
180         int r, n;
181
182         for(n = 0; n < nbuf; n += r)
183                 if((r = read(fd, buf + n, nbuf - n)) <= 0)
184                         break;
185         return n;
186 }
187
188 static int
189 readstring(int fd, char *buf, int nbuf)
190 {
191         int n;
192
193         if((n = readfile(fd, buf, nbuf-1)) < 0){
194                 buf[0] = '\0';
195                 return -1;
196         }
197         if(n > 0 && buf[n-1] == '\n')
198                 n--;
199         buf[n] = '\0';
200         return n;
201 }
202
203 uchar*
204 getrange(Block *b)
205 {
206         char buf[128];
207         int fd, cfd;
208         uchar *data;
209
210         if(debug)
211                 print("getrange: %lld %lld\n", b->off, b->len);
212
213         if(fprint(webctlfd, "url %s\n", url) < 0)
214                 return nil;
215         if(fprint(webctlfd, "request GET\n") < 0)
216                 return nil;
217         if(fprint(webctlfd, "headers Range: bytes=%lld-%lld\n", b->off, b->off+b->len-1) < 0)
218                 return nil;
219
220         /* start the request */
221         snprint(buf, sizeof(buf), "%s/body", webconn);
222         if((fd = open(buf, OREAD)) < 0)
223                 return nil;
224
225         /* verify content-range response header */ 
226         snprint(buf, sizeof(buf), "%s/contentrange", webconn);
227         if((cfd = open(buf, OREAD)) < 0){
228                 close(fd);
229                 return nil;
230         }
231
232         werrstr("bad contentrange header");
233         if(readstring(cfd, buf, sizeof(buf)) <= 0){
234 Badrange:
235                 close(cfd);
236                 close(fd);
237                 return nil;
238         }
239         if(cistrncmp(buf, "bytes ", 6) != 0)
240                 goto Badrange;
241         if(strtoll(buf + 6, nil, 10) != b->off)
242                 goto Badrange;
243         close(cfd);
244
245         /* read body data */
246         data = emalloc9p(b->len);
247         werrstr("body data truncated");
248         if(readfile(fd, (char*)data, b->len) != b->len){
249                 close(fd);
250                 free(data);
251                 return nil;
252         }
253         close(fd);
254         b->p = data;
255         return data;
256 }
257
258 void
259 httpfilereadproc(void*)
260 {
261         Block *b;
262
263         threadsetname("httpfilereadproc %s", url);
264
265         for(;;){
266                 b = recvp(httpchan);
267                 if(b == nil)
268                         continue;
269                 if(getrange(b) == nil)
270                         sysfatal("getrange: %r");
271                 sendp(finishchan, b);
272         }
273 }
274
275 typedef struct Tab Tab;
276 struct Tab
277 {
278         char *name;
279         ulong mode;
280 };
281
282 Tab tab[] =
283 {
284         "/",            DMDIR|0555,
285         nil,            0444,
286 };
287
288 static void
289 fillstat(Dir *d, uvlong path)
290 {
291         Tab *t;
292
293         memset(d, 0, sizeof(*d));
294         d->uid = estrdup9p(user);
295         d->gid = estrdup9p(user);
296         d->qid.path = path;
297         d->atime = d->mtime = time0;
298         t = &tab[TYPE(path)];
299         d->name = estrdup9p(t->name);
300         d->length = size;
301         d->qid.type = t->mode>>24;
302         d->mode = t->mode;
303 }
304
305 static void
306 fsattach(Req *r)
307 {
308         if(r->ifcall.aname && r->ifcall.aname[0]){
309                 respond(r, "invalid attach specifier");
310                 return;
311         }
312         r->fid->qid.path = PATH(Qroot, 0);
313         r->fid->qid.type = QTDIR;
314         r->fid->qid.vers = 0;
315         r->ofcall.qid = r->fid->qid;
316         respond(r, nil);
317 }
318
319 static void
320 fsstat(Req *r)
321 {
322         fillstat(&r->d, r->fid->qid.path);
323         respond(r, nil);
324 }
325
326 static int
327 rootgen(int i, Dir *d, void*)
328 {
329         i += Qroot + 1;
330         if(i <= Qfile){
331                 fillstat(d, i);
332                 return 0;
333         }
334         return -1;
335 }
336
337 static char*
338 fswalk1(Fid *fid, char *name, Qid *qid)
339 {
340         int i;
341         ulong path;
342
343         path = fid->qid.path;
344         if(!(fid->qid.type & QTDIR))
345                 return "walk in non-directory";
346
347         if(strcmp(name, "..") == 0){
348                 switch(TYPE(path)){
349                 case Qroot:
350                         return nil;
351                 default:
352                         return "bug in fswalk1";
353                 }
354         }
355
356         i = TYPE(path) + 1;
357         while(i < nelem(tab)){
358                 if(strcmp(name, tab[i].name) == 0){
359                         qid->path = PATH(i, NUM(path));
360                         qid->type = tab[i].mode>>24;
361                         return nil;
362                 }
363                 if(tab[i].mode & DMDIR)
364                         break;
365                 i++;
366         }
367         return "directory entry not found";
368 }
369
370 vlong
371 getfilesize(void)
372 {
373         char buf[128];
374         int fd, cfd;
375
376         if(fprint(webctlfd, "url %s\n", url) < 0)
377                 return -1;
378         if(fprint(webctlfd, "request HEAD\n") < 0)
379                 return -1;
380         snprint(buf, sizeof(buf), "%s/body", webconn);
381         if((fd = open(buf, OREAD)) < 0)
382                 return -1;
383         snprint(buf, sizeof(buf), "%s/contentlength", webconn);
384         cfd = open(buf, OREAD);
385         close(fd);
386         if(cfd < 0)
387                 return -1;
388         if(readstring(cfd, buf, sizeof(buf)) <= 0){
389                 close(cfd);
390                 return -1;
391         }
392         close(cfd);
393         return strtoll(buf, nil, 10);
394 }
395
396 void
397 fileread(Req *r)
398 {
399         Block *b;
400
401         if(r->ifcall.offset >= size){
402                 r->ofcall.count = 0;
403                 respond(r, nil);
404                 return;
405         }
406
407         if((b = findblock(&cache, r->ifcall.offset)) != nil){
408                 readfrom(r, b);
409                 return;
410         }
411         if((b = findblock(&inprogress, r->ifcall.offset)) == nil){
412                 b = emalloc9p(sizeof(Block));
413                 b->off = r->ifcall.offset - (r->ifcall.offset % Blocksize);
414                 b->len = Blocksize;
415                 if(b->off + b->len > size)
416                         b->len = size - b->off;
417                 addblock(&inprogress, b);
418                 if(inprogress.first == b)
419                         sendp(httpchan, b);
420         }
421         queuereq(b, r);
422 }
423
424 static void
425 fsopen(Req *r)
426 {
427         if(r->ifcall.mode != OREAD){
428                 respond(r, "permission denied");
429                 return;
430         }
431         respond(r, nil);
432 }
433
434 void
435 finishthread(void*)
436 {
437         Block *b;
438         Req *r;
439
440         threadsetname("finishthread");
441
442         for(;;){
443                 b = recvp(finishchan);
444                 assert(b == inprogress.first);
445                 inprogress.first = b->link;
446                 if(++ncache >= mcache)
447                         evictblock(&cache);
448                 addblock(&cache, b);
449                 while((r = b->rq) != nil){
450                         b->rq = r->aux;
451                         r->aux = nil;
452                         readfrom(r, b);
453                 }
454                 if(inprogress.first != nil)
455                         sendp(httpchan, inprogress.first);
456         }
457 }
458
459 void
460 fsnetproc(void*)
461 {
462         Req *r, *o;
463         Block *b;
464
465         threadcreate(finishthread, nil, 8192);
466
467         threadsetname("fsnetproc");
468
469         for(;;){
470                 r = recvp(reqchan);
471                 switch(r->ifcall.type){
472                 case Tflush:
473                         o = r->oldreq;
474                         if(o->ifcall.type == Tread){
475                                 b = findblock(&inprogress, o->ifcall.offset);
476                                 if(b != nil)
477                                         delreq(b, o);
478                         }
479                         respond(r, nil);
480                         break;
481                 case Tread:
482                         fileread(r);
483                         break;
484                 default:
485                         respond(r, "bug in fsthread");
486                         break;
487                 }
488         }
489 }
490
491 static void
492 fsflush(Req *r)
493 {
494         sendp(reqchan, r);
495 }
496
497 static void
498 fsread(Req *r)
499 {
500         char e[ERRMAX];
501         ulong path;
502
503         path = r->fid->qid.path;
504         switch(TYPE(path)){
505         case Qroot:
506                 dirread9p(r, rootgen, nil);
507                 respond(r, nil);
508                 break;
509         case Qfile:
510                 sendp(reqchan, r);
511                 break;
512         default:
513                 snprint(e, sizeof(e), "bug in fsread path=%lux", path);
514                 respond(r, e);
515                 break;
516         }
517 }
518
519 Srv fs = 
520 {
521 .attach=                fsattach,
522 .walk1=         fswalk1,
523 .open=          fsopen,
524 .read=          fsread,
525 .stat=          fsstat,
526 .flush=         fsflush,
527 .end=           hangupclient,
528 };
529
530 void
531 threadmain(int argc, char **argv)
532 {
533         char *mtpt, *srvname, *p;
534
535         mtpt = nil;
536         srvname = nil;
537         ARGBEGIN{
538         case 'D':
539                 chatty9p++;
540                 break;
541         case 'd':
542                 debug++;
543                 break;
544         case 's':
545                 srvname = EARGF(usage());
546                 break;
547         case 'm':
548                 mtpt = EARGF(usage());
549                 break;
550         case 'c':
551                 mcache = atoi(EARGF(usage()));
552                 break;
553         case 'f':
554                 file = EARGF(usage());
555                 break;
556         default:
557                 usage();
558         }ARGEND;
559
560         if(srvname == nil && mtpt == nil)
561                 mtpt = ".";
562
563         if(argc < 1)
564                 usage();
565         if(mcache <= 0)
566                 mcache = 32;
567
568         time0 = time(0);
569         url = estrdup9p(argv[0]);
570         if(file == nil){
571                 file = strrchr(url, '/');
572                 if(file == nil || file[1] == '\0')
573                         file = "index";
574                 else
575                         file++;
576         }
577
578         snprint(webconn, sizeof(webconn), "/mnt/web/clone");
579         if((webctlfd = open(webconn, ORDWR)) < 0)
580                 sysfatal("open: %r");
581         p = strrchr(webconn, '/')+1;
582         if(readstring(webctlfd, p, webconn+sizeof(webconn)-p) <= 0)
583                 sysfatal("read: %r");
584
585         tab[Qfile].name = file;
586         user = getuser();
587         size = getfilesize();
588         if(size < 0)
589                 sysfatal("getfilesize: %r");
590
591         reqchan = chancreate(sizeof(Req*), 0);
592         httpchan = chancreate(sizeof(Block*), 0);
593         finishchan = chancreate(sizeof(Block*), 0);
594
595         procrfork(fsnetproc, nil, Stacksize, RFNAMEG|RFNOTEG);
596         procrfork(httpfilereadproc, nil, Stacksize, RFNAMEG|RFNOTEG);
597
598         threadpostmountsrv(&fs, srvname, mtpt, MBEFORE);
599         threadexits(0);
600 }