]> git.lizzy.rs Git - plan9front.git/blob - sys/src/9/port/devstream.c
devmnt: deal with partial response for Tversion request in mntversion()
[plan9front.git] / sys / src / 9 / port / devstream.c
1 #include        "u.h"
2 #include        "../port/lib.h"
3 #include        "mem.h"
4 #include        "dat.h"
5 #include        "fns.h"
6 #include        "../port/error.h"
7
8 typedef struct Stream Stream;
9 typedef struct Iocom Iocom;
10
11 struct Stream
12 {
13         Ref;
14         Lock;
15
16         int     iounit;
17         int     noseek;
18
19         Ref     nrp;
20         Ref     nwp;
21         Ref     nwq;
22
23         Proc    *rp[4];
24         Proc    *wp[2];
25
26         Block   *rlist;
27
28         vlong   soff;
29         vlong   roff;
30         vlong   woff;
31         
32         QLock   rcl;
33         QLock   wcl;
34         QLock   rql;
35         QLock   wql;
36
37         Rendez  wz;
38
39         Queue   *rq;
40         Queue   *wq;
41         Chan    *f;
42 };
43
44 struct Iocom
45 {
46         Proc    *p;
47         QLock   *q;
48         Stream  *s;
49         Block   *b;
50 };
51
52 static void
53 putstream(Stream *s)
54 {
55         if(decref(s))
56                 return;
57         freeblist(s->rlist);
58         qfree(s->rq);
59         qfree(s->wq);
60         if(s->f != nil)
61                 cclose(s->f);
62         free(s);
63 }
64
65 #define BOFF(b)         (*(vlong*)((b)->rp - sizeof(vlong)))
66 #define BDONE           (1<<15)
67 #define BERROR          (1<<14)
68
69 static Block*
70 sblock(Stream *s)
71 {
72         Block *b;
73
74         b = allocb(sizeof(vlong)+s->iounit);
75         b->flag &= ~(BDONE|BERROR);
76         b->wp += sizeof(vlong);
77         b->rp = b->wp;
78         return b;
79 }
80
81 static void
82 iocom(void *arg, int complete)
83 {
84         Iocom *io = arg;
85         Stream *s;
86         QLock *q;
87         Proc *p;
88
89         p = io->p;
90         if(complete && p == up){
91                 up->iocomfun = nil;
92                 up->iocomarg = nil;
93         }
94
95         q = io->q;
96         if(q != nil && p == up){
97                 io->q = nil;
98                 qunlock(q);
99         }
100
101         s = io->s;
102         if(complete && s != nil && s->noseek){
103                 io->s = nil;
104                 lock(s);
105                 BOFF(io->b) = s->soff;
106                 s->soff += s->iounit;
107                 unlock(s);
108         }
109 }
110
111 static void
112 ioq(Iocom *io, QLock *q)
113 {
114         eqlock(q);      /* unlocked in iocom() above */
115
116         io->p = up;
117         io->q = q;
118         io->s = nil;
119         io->b = nil;
120
121         up->iocomarg = io;
122         up->iocomfun = iocom;
123 }
124
125 static void
126 streamreader(void *arg)
127 {
128         Stream *s = arg;
129         Iocom io;
130         Chan *f;
131         Block *b, *l, **ll;
132         vlong o;
133         int id, n;
134
135         id = incref(&s->nrp) % nelem(s->rp);
136         s->rp[id] = up;
137
138         f = s->f;
139         b = sblock(s);
140         qlock(&s->rql);
141         if(waserror()){
142                 qhangup(s->rq, up->errstr);
143                 goto Done;
144         }
145         if(s->noseek == -1){
146                 BOFF(b) = 0;
147                 n = devtab[f->type]->read(f, b->wp, s->iounit, 0x7fffffffffffffLL);
148
149                 if(n > 0){
150                         b->wp += n;
151                         b->flag |= BDONE;
152                         b->next = nil;
153                         s->rlist = b;
154                         s->soff = s->iounit;
155                         s->roff = 0;
156                         s->noseek = 1;
157
158                         b = sblock(s);
159                 } else {
160                         s->noseek = 0;
161                 }
162         }
163         while(!qisclosed(s->rq)) {
164                 ll = &s->rlist;
165                 while((l = *ll) != nil){
166                         if((l->flag & BDONE) == 0 || BOFF(l) != s->roff){
167                                 if(s->noseek){
168                                         ll = &l->next;
169                                         continue;
170                                 }
171                                 break;
172                         }
173                         if((l->flag & BERROR) != 0)
174                                 error((char*)l->rp);
175                         if(BLEN(l) == 0){
176                                 qhangup(s->rq, nil);
177                                 poperror();
178                                 goto Done;
179                         }
180                         s->roff += s->noseek ? s->iounit : BLEN(l);
181                         *ll = l->next;
182                         l->next = nil;
183                         qbwrite(s->rq, l);
184                 }
185
186                 n = s->iounit;
187                 o = s->roff;
188                 l = s->rlist;
189                 if(s->noseek) {
190                         o = 0;
191                         b->next = l;
192                         s->rlist = b;
193                 } else if(l == nil) {
194                         b->next = nil;
195                         s->rlist = b;
196                 } else {
197                         if(o < BOFF(l)){
198                                 n = BOFF(l) - o;
199                                 b->next = l;
200                                 s->rlist = b;
201                         } else {
202                                 for(;; l = l->next){
203                                         if((l->flag & BDONE) != 0 && BLEN(l) == 0)
204                                                 goto Done;
205                                         o = BOFF(l) + ((l->flag & BDONE) == 0 ? s->iounit : BLEN(l));
206                                         if(l->next == nil)
207                                                 break;
208                                         if(o < BOFF(l->next)){
209                                                 n = BOFF(l->next) - o;
210                                                 break;
211                                         }
212                                 }
213                                 b->next = l->next;
214                                 l->next = b;
215                         }
216                 }
217                 BOFF(b) = o;
218                 qunlock(&s->rql);
219
220                 if(waserror()){
221                         poperror();
222                         goto Exit;
223                 }
224                 ioq(&io, &s->rcl);
225                 io.b = b;
226                 io.s = s;
227                 if(waserror()){
228                         strncpy((char*)b->wp, up->errstr, s->iounit-1);
229                         b->wp[s->iounit-1] = 0;
230                         n = -1;
231                 } else {
232                         n = devtab[f->type]->read(f, b->wp, n, o);
233                         if(n < 0)
234                                 error(Eio);
235                         poperror();
236                 }
237                 iocom(&io, 1);
238                 poperror();
239
240                 l = b;
241                 b = sblock(s);
242                 qlock(&s->rql);
243                 if(n >= 0)
244                         l->wp += n;
245                 else
246                         l->flag |= BERROR;
247                 l->flag |= BDONE;
248         }
249         poperror();
250 Done:
251         qunlock(&s->rql);
252         freeb(b);
253 Exit:
254         s->rp[id] = nil;
255         putstream(s);
256         pexit("closed", 1);
257 }
258
259 static void
260 streamwriter(void *arg)
261 {
262         Stream *s = arg;
263         Iocom io;
264         Block *b;
265         Chan *f;
266         vlong o;
267         int id, n;
268
269         id = incref(&s->nwp) % nelem(s->wp);
270         s->wp[id] = up;
271
272         f = s->f;
273         while(!qisclosed(s->wq)) {
274                 if(incref(&s->nwq) == s->nwp.ref && qlen(s->wq) == 0)
275                         wakeup(&s->wz); /* queue drained */
276                 if(waserror()){
277                         decref(&s->nwq);
278                         break;
279                 }
280                 ioq(&io, &s->wcl);
281                 b = qbread(s->wq, s->iounit);
282                 decref(&s->nwq);
283                 if(b == nil){
284                         iocom(&io, 1);
285                         break;
286                 }
287                 poperror();
288
289                 if(waserror()){
290                         qhangup(s->wq, up->errstr);
291                         iocom(&io, 1);
292                         freeb(b);
293                         break;
294                 }
295                 n = BLEN(b);
296                 o = s->woff;
297                 s->woff += n;
298                 if(devtab[f->type]->write(f, b->rp, n, o) != n)
299                         error(Eio);
300                 iocom(&io, 1);
301                 freeb(b);
302                 poperror();
303         }
304
305         s->wp[id] = nil;
306         wakeup(&s->wz);
307
308         putstream(s);
309         pexit("closed", 1);
310 }
311
312 static int
313 streamgen(Chan *c, char *, Dirtab*, int, int s, Dir *dp)
314 {
315         static int perm[] = { 0400, 0200, 0600, 0 };
316         Fgrp *fgrp = up->fgrp;
317         Chan *f;
318         Qid q;
319
320         if(s == DEVDOTDOT){
321                 devdir(c, c->qid, ".", 0, eve, DMDIR|0555, dp);
322                 return 1;
323         }
324         if(s == 0)
325                 return 0;
326         s--;
327         if(s > fgrp->maxfd)
328                 return -1;
329         if((f=fgrp->fd[s]) == nil)
330                 return 0;
331         sprint(up->genbuf, "%dstream", s);
332         mkqid(&q, s+1, 0, QTFILE);
333         devdir(c, q, up->genbuf, 0, eve, perm[f->mode&3], dp);
334         return 1;
335 }
336
337 static Chan*
338 streamattach(char *spec)
339 {
340         return devattach(L'ΒΆ', spec);
341 }
342
343 static Walkqid*
344 streamwalk(Chan *c, Chan *nc, char **name, int nname)
345 {
346         return devwalk(c, nc, name, nname, (Dirtab *)0, 0, streamgen);
347 }
348
349 static int
350 streamstat(Chan *c, uchar *db, int n)
351 {
352         return devstat(c, db, n, (Dirtab *)0, 0L, streamgen);
353 }
354
355 static Chan*
356 streamopen(Chan *c, int omode)
357 {
358         Stream *s;
359
360         c->aux = nil;
361         if(c->qid.type & QTDIR){
362                 if(omode != 0)
363                         error(Eisdir);
364                 c->mode = 0;
365                 c->flag |= COPEN;
366                 c->offset = 0;
367                 return c;
368         }
369         s = mallocz(sizeof(*s), 1);
370         if(s == nil)
371                 error(Enomem);
372         incref(s);
373         if(waserror()){
374                 putstream(s);
375                 nexterror();
376         }
377         omode = openmode(omode);
378         s->f = fdtochan(c->qid.path - 1, omode, 0, 1);
379         if(s->f == nil || s->f->qid.type != QTFILE)
380                 error(Eperm);
381         s->noseek = -1;
382         s->roff = s->f->offset;
383         s->woff = s->f->offset;
384         s->iounit = s->f->iounit;
385         if(s->iounit <= 0 || s->iounit > qiomaxatomic)
386                 s->iounit = qiomaxatomic;
387         c->iounit = s->iounit;
388         c->aux = s;
389         c->mode = omode;
390         c->flag |= COPEN;
391         c->offset = 0;
392         poperror();
393         return c;
394 }
395
396 static int
397 isdrained(void *a)
398 {
399         Stream *s;
400         int i;
401
402         s = a;
403         if(s->wq == nil)
404                 return 1;
405
406         if(qisclosed(s->wq) == 0)
407                 return qlen(s->wq) == 0 && s->nwq.ref == s->nwp.ref;
408
409         for(i=0; i<nelem(s->wp); i++)
410                 if(s->wp[i] != nil)
411                         return 0;
412
413         return 1;
414 }
415
416 static void
417 streamdrain(Chan *c)
418 {
419         Stream *s;
420
421         if((s = c->aux) == nil)
422                 return;
423         eqlock(&s->wql);
424         if(waserror()){
425                 qunlock(&s->wql);
426                 nexterror();
427         }
428         while(!isdrained(s))
429                 sleep(&s->wz, isdrained, s);
430         qunlock(&s->wql);
431         poperror();
432 }
433
434 static void
435 streamclose(Chan *c)
436 {
437         Stream *s;
438         int i;
439
440         if((c->flag & COPEN) == 0 || (s = c->aux) == nil)
441                 return;
442         if(s->rq != nil){
443                 qclose(s->rq);
444                 for(i=0; i<nelem(s->rp); i++)
445                         postnote(s->rp[i], 1, "streamclose", 0);
446         }
447         if(s->wq != nil){
448                 qhangup(s->wq, nil);
449                 if(!waserror()){
450                         streamdrain(c);
451                         poperror();
452                 }
453                 qclose(s->wq);  /* discard the data */
454                 for(i=0; i<nelem(s->wp); i++)
455                         postnote(s->wp[i], 1, "streamclose", 0);
456         }
457         c->aux = nil;
458         putstream(s);
459 }
460
461 static int
462 canpipeline(Chan *f, int mode)
463 {
464         USED(mode);
465
466         return devtab[f->type]->dc == 'M';
467 }
468
469 static Queue*
470 streamqueue(Chan *c, int mode)
471 {
472         Stream *s;
473         int i, n;
474
475         s = c->aux;
476         if(s == nil || c->qid.type != QTFILE)
477                 error(Eperm);
478
479         switch(mode){
480         case OREAD:
481                 while(s->rq == nil){
482                         qlock(&s->rql);
483                         if(s->rq != nil){
484                                 qunlock(&s->rql);
485                                 break;
486                         }
487                         s->rq = qopen(conf.pipeqsize, 0, 0, 0);
488                         if(s->rq == nil){
489                                 qunlock(&s->rql);
490                                 error(Enomem);
491                         }
492                         n = canpipeline(s->f, mode) ? nelem(s->rp) : 1;
493                         for(i=0; i<n; i++){
494                                 incref(s);
495                                 kproc("streamreader", streamreader, s);
496                         }
497                         while(s->nrp.ref != n)
498                                 sched();
499                         qunlock(&s->rql);
500                         break;
501                 }
502                 return s->rq;
503         case OWRITE:
504                 while(s->wq == nil){
505                         qlock(&s->wql);
506                         if(s->wq != nil){
507                                 qunlock(&s->wql);
508                                 break;
509                         }
510                         s->wq = qopen(conf.pipeqsize, 0, 0, 0);
511                         if(s->wq == nil){
512                                 qunlock(&s->wql);
513                                 error(Enomem);
514                         }
515                         n = canpipeline(s->f, mode) ? nelem(s->wp) : 1;
516                         for(i=0; i<n; i++){
517                                 incref(s);
518                                 kproc("streamwriter", streamwriter, s);
519                         }
520                         while(s->nwp.ref != n)
521                                 sched();
522                         qunlock(&s->wql);
523                         break;
524                 }
525                 return s->wq;
526         }
527         error(Egreg);
528         return nil;
529 }
530
531 static long
532 streamread(Chan *c, void *va, long n, vlong)
533 {
534         if(c->qid.type == QTDIR)
535                 return devdirread(c, va, n, (Dirtab *)0, 0L, streamgen);
536         return qread(streamqueue(c, OREAD), va, n);
537 }
538
539 static Block*
540 streambread(Chan *c, long n, ulong)
541 {
542         return qbread(streamqueue(c, OREAD), n);
543 }
544
545 static long
546 streamwrite(Chan *c, void *va, long n, vlong)
547 {
548         if(n == 0)
549                 streamdrain(c);
550         return qwrite(streamqueue(c, OWRITE), va, n);
551 }
552
553 static long
554 streambwrite(Chan *c, Block *b, ulong)
555 {
556         if(BLEN(b) == 0)
557                 streamdrain(c);
558         return qbwrite(streamqueue(c, OWRITE), b);
559 }
560
561 Dev streamdevtab = {
562         L'ΒΆ',
563         "stream",
564
565         devreset,
566         devinit,
567         devshutdown,
568         streamattach,
569         streamwalk,
570         streamstat,
571         streamopen,
572         devcreate,
573         streamclose,
574         streamread,
575         streambread,
576         streamwrite,
577         streambwrite,
578         devremove,
579         devwstat,
580 };