2 #include "../port/lib.h"
6 #include "../port/error.h"
10 typedef struct Pipe Pipe;
36 ".", {Qdir,0,QTDIR}, 0, DMDIR|0500,
37 "data", {Qdata0}, 0, 0600,
38 "data1", {Qdata1}, 0, 0600,
45 if(conf.pipeqsize == 0){
47 conf.pipeqsize = 256*1024;
49 conf.pipeqsize = 32*1024;
54 * create a pipe, no streams are created until an open
57 pipeattach(char *spec)
62 c = devattach('|', spec);
63 p = malloc(sizeof(Pipe));
68 p->q[0] = qopen(conf.pipeqsize, 0, 0, 0);
73 p->q[1] = qopen(conf.pipeqsize, 0, 0, 0);
81 p->path = ++pipealloc.path;
84 mkqid(&c->qid, NETQID(2*p->path, Qdir), 0, QTDIR);
91 pipegen(Chan *c, char*, Dirtab *tab, int ntab, int i, Dir *dp)
98 devdir(c, c->qid, "#|", 0, eve, DMDIR|0555, dp);
102 if(tab==0 || i>=ntab)
107 switch((ulong)tab->qid.path){
118 mkqid(&q, NETQID(NETID(c->qid.path), tab->qid.path), 0, QTFILE);
119 devdir(c, q, tab->name, len, eve, tab->perm, dp);
125 pipewalk(Chan *c, Chan *nc, char **name, int nname)
130 wq = devwalk(c, nc, name, nname, pipedir, NPIPEDIR, pipegen);
131 if(wq != nil && wq->clone != nil && wq->clone != c){
136 print("channel open in pipewalk\n");
137 switch(NETTYPE(c->qid.path)){
152 pipestat(Chan *c, uchar *db, int n)
159 switch(NETTYPE(c->qid.path)){
161 devdir(c, c->qid, ".", 0, eve, DMDIR|0555, &dir);
164 devdir(c, c->qid, "data", qlen(p->q[0]), eve, 0600, &dir);
167 devdir(c, c->qid, "data1", qlen(p->q[1]), eve, 0600, &dir);
172 n = convD2M(&dir, db, n);
179 * if the stream doesn't exist, create it
182 pipeopen(Chan *c, int omode)
186 if(c->qid.type & QTDIR){
197 switch(NETTYPE(c->qid.path)){
207 c->mode = openmode(omode);
210 c->iounit = qiomaxatomic;
224 * closing either side hangs up the stream
226 switch(NETTYPE(c->qid.path)){
246 * if both sides are closed, they are reusable
248 if(p->qref[0] == 0 && p->qref[1] == 0){
254 * free the structure on last close
267 piperead(Chan *c, void *va, long n, vlong)
273 switch(NETTYPE(c->qid.path)){
275 return devdirread(c, va, n, pipedir, NPIPEDIR, pipegen);
277 return qread(p->q[0], va, n);
279 return qread(p->q[1], va, n);
283 return -1; /* not reached */
287 pipebread(Chan *c, long n, ulong offset)
293 switch(NETTYPE(c->qid.path)){
295 return qbread(p->q[0], n);
297 return qbread(p->q[1], n);
300 return devbread(c, n, offset);
304 * a write to a closed pipe causes a note to be sent to
308 pipewrite(Chan *c, void *va, long n, vlong)
313 print("pipewrite hi %#p\n", getcallerpc(&c));
315 /* avoid notes when pipe is a mounted queue */
316 if((c->flag & CMSG) == 0)
317 postnote(up, 1, "sys: write on closed pipe", NUser);
323 switch(NETTYPE(c->qid.path)){
325 n = qwrite(p->q[1], va, n);
329 n = qwrite(p->q[0], va, n);
341 pipebwrite(Chan *c, Block *bp, ulong)
347 /* avoid notes when pipe is a mounted queue */
348 if((c->flag & CMSG) == 0)
349 postnote(up, 1, "sys: write on closed pipe", NUser);
354 switch(NETTYPE(c->qid.path)){
356 n = qbwrite(p->q[1], bp);
360 n = qbwrite(p->q[0], bp);