2 #include "../port/lib.h"
6 #include "../port/error.h"
8 typedef struct Stream Stream;
9 typedef struct Iocom Iocom;
65 #define BOFF(b) (*(vlong*)((b)->rp - sizeof(vlong)))
67 #define BERROR (1<<14)
74 b = allocb(sizeof(vlong)+s->iounit);
75 b->flag &= ~(BDONE|BERROR);
76 b->wp += sizeof(vlong);
82 iocom(void *arg, int complete)
90 if(complete && p == up){
96 if(q != nil && p == up){
102 if(complete && s != nil && s->noseek){
105 BOFF(io->b) = s->soff;
106 s->soff += s->iounit;
112 ioq(Iocom *io, QLock *q)
114 eqlock(q); /* unlocked in iocom() above */
122 up->iocomfun = iocom;
126 streamreader(void *arg)
135 id = incref(&s->nrp) % nelem(s->rp);
142 qhangup(s->rq, up->errstr);
147 n = devtab[f->type]->read(f, b->wp, s->iounit, 0x7fffffffffffffLL);
163 while(!qisclosed(s->rq)) {
165 while((l = *ll) != nil){
166 if((l->flag & BDONE) == 0 || BOFF(l) != s->roff){
173 if((l->flag & BERROR) != 0)
180 s->roff += s->noseek ? s->iounit : BLEN(l);
193 } else if(l == nil) {
203 if((l->flag & BDONE) != 0 && BLEN(l) == 0)
205 o = BOFF(l) + ((l->flag & BDONE) == 0 ? s->iounit : BLEN(l));
208 if(o < BOFF(l->next)){
209 n = BOFF(l->next) - o;
228 strncpy((char*)b->wp, up->errstr, s->iounit-1);
229 b->wp[s->iounit-1] = 0;
232 n = devtab[f->type]->read(f, b->wp, n, o);
260 streamwriter(void *arg)
269 id = incref(&s->nwp) % nelem(s->wp);
273 while(!qisclosed(s->wq)) {
274 if(incref(&s->nwq) == s->nwp.ref && qlen(s->wq) == 0)
275 wakeup(&s->wz); /* queue drained */
281 b = qbread(s->wq, s->iounit);
290 qhangup(s->wq, up->errstr);
298 if(devtab[f->type]->write(f, b->rp, n, o) != n)
313 streamgen(Chan *c, char *, Dirtab*, int, int s, Dir *dp)
315 static int perm[] = { 0400, 0200, 0600, 0 };
316 Fgrp *fgrp = up->fgrp;
321 devdir(c, c->qid, ".", 0, eve, DMDIR|0555, dp);
329 if((f=fgrp->fd[s]) == nil)
331 sprint(up->genbuf, "%dstream", s);
332 mkqid(&q, s+1, 0, QTFILE);
333 devdir(c, q, up->genbuf, 0, eve, perm[f->mode&3], dp);
338 streamattach(char *spec)
340 return devattach(L'ΒΆ', spec);
344 streamwalk(Chan *c, Chan *nc, char **name, int nname)
346 return devwalk(c, nc, name, nname, (Dirtab *)0, 0, streamgen);
350 streamstat(Chan *c, uchar *db, int n)
352 return devstat(c, db, n, (Dirtab *)0, 0L, streamgen);
356 streamopen(Chan *c, int omode)
361 if(c->qid.type & QTDIR){
369 s = mallocz(sizeof(*s), 1);
377 omode = openmode(omode);
378 s->f = fdtochan(c->qid.path - 1, omode, 0, 1);
379 if(s->f == nil || s->f->qid.type != QTFILE)
382 s->roff = s->f->offset;
383 s->woff = s->f->offset;
384 s->iounit = s->f->iounit;
385 if(s->iounit <= 0 || s->iounit > qiomaxatomic)
386 s->iounit = qiomaxatomic;
387 c->iounit = s->iounit;
406 if(qisclosed(s->wq) == 0)
407 return qlen(s->wq) == 0 && s->nwq.ref == s->nwp.ref;
409 for(i=0; i<nelem(s->wp); i++)
421 if((s = c->aux) == nil)
429 sleep(&s->wz, isdrained, s);
440 if((c->flag & COPEN) == 0 || (s = c->aux) == nil)
444 for(i=0; i<nelem(s->rp); i++)
445 postnote(s->rp[i], 1, "streamclose", 0);
453 qclose(s->wq); /* discard the data */
454 for(i=0; i<nelem(s->wp); i++)
455 postnote(s->wp[i], 1, "streamclose", 0);
462 canpipeline(Chan *f, int mode)
466 return devtab[f->type]->dc == 'M';
470 streamqueue(Chan *c, int mode)
476 if(s == nil || c->qid.type != QTFILE)
487 s->rq = qopen(conf.pipeqsize, 0, 0, 0);
492 n = canpipeline(s->f, mode) ? nelem(s->rp) : 1;
495 kproc("streamreader", streamreader, s);
497 while(s->nrp.ref != n)
510 s->wq = qopen(conf.pipeqsize, 0, 0, 0);
515 n = canpipeline(s->f, mode) ? nelem(s->wp) : 1;
518 kproc("streamwriter", streamwriter, s);
520 while(s->nwp.ref != n)
532 streamread(Chan *c, void *va, long n, vlong)
534 if(c->qid.type == QTDIR)
535 return devdirread(c, va, n, (Dirtab *)0, 0L, streamgen);
536 return qread(streamqueue(c, OREAD), va, n);
540 streambread(Chan *c, long n, ulong)
542 return qbread(streamqueue(c, OREAD), n);
546 streamwrite(Chan *c, void *va, long n, vlong)
550 return qwrite(streamqueue(c, OWRITE), va, n);
554 streambwrite(Chan *c, Block *b, ulong)
558 return qbwrite(streamqueue(c, OWRITE), b);