2 #include "../port/lib.h"
6 #include "../port/error.h"
10 typedef struct Pipe Pipe;
37 ".", {Qdir,0,QTDIR}, 0, DMDIR|0500,
38 "data", {Qdata0}, 0, 0600,
39 "data1", {Qdata1}, 0, 0600,
46 if(conf.pipeqsize == 0){
48 conf.pipeqsize = 256*1024;
50 conf.pipeqsize = 32*1024;
55 * create a pipe, no streams are created until an open
58 pipeattach(char *spec)
63 c = devattach('|', spec);
64 p = malloc(sizeof(Pipe));
69 p->q[0] = qopen(conf.pipeqsize, 0, 0, 0);
74 p->q[1] = qopen(conf.pipeqsize, 0, 0, 0);
82 p->path = ++pipealloc.path;
84 p->perm = pipedir[Qdata0].perm;
86 mkqid(&c->qid, NETQID(2*p->path, Qdir), 0, QTDIR);
93 pipegen(Chan *c, char*, Dirtab *tab, int ntab, int i, Dir *dp)
100 devdir(c, c->qid, "#|", 0, eve, DMDIR|0555, dp);
104 if(tab==0 || i>=ntab)
109 switch((ulong)tab->qid.path){
120 mkqid(&q, NETQID(NETID(c->qid.path), tab->qid.path), 0, QTFILE);
121 devdir(c, q, tab->name, len, eve, p->perm, dp);
127 pipewalk(Chan *c, Chan *nc, char **name, int nname)
132 wq = devwalk(c, nc, name, nname, pipedir, NPIPEDIR, pipegen);
133 if(wq != nil && wq->clone != nil && wq->clone != c){
138 print("channel open in pipewalk\n");
139 switch(NETTYPE(c->qid.path)){
154 pipestat(Chan *c, uchar *db, int n)
161 switch(NETTYPE(c->qid.path)){
163 devdir(c, c->qid, ".", 0, eve, DMDIR|0555, &dir);
166 devdir(c, c->qid, "data", qlen(p->q[0]), eve, p->perm, &dir);
169 devdir(c, c->qid, "data1", qlen(p->q[1]), eve, p->perm, &dir);
174 n = convD2M(&dir, db, n);
181 pipewstat(Chan* c, uchar* db, int n)
188 if(strcmp(up->user, eve) != 0)
190 if(NETTYPE(c->qid.path) == Qdir)
193 dir = smalloc(sizeof(Dir)+n);
198 m = convM2D(db, n, &dir[0], (char*)&dir[1]);
201 if(!emptystr(dir[0].uid))
202 error("can't change owner");
203 if(dir[0].mode != ~0UL)
204 p->perm = dir[0].mode;
211 * if the stream doesn't exist, create it
214 pipeopen(Chan *c, int omode)
218 if(c->qid.type & QTDIR){
229 switch(NETTYPE(c->qid.path)){
239 c->mode = openmode(omode);
242 c->iounit = qiomaxatomic;
256 * closing either side hangs up the stream
258 switch(NETTYPE(c->qid.path)){
278 * if both sides are closed, they are reusable
280 if(p->qref[0] == 0 && p->qref[1] == 0){
286 * free the structure on last close
299 piperead(Chan *c, void *va, long n, vlong)
305 switch(NETTYPE(c->qid.path)){
307 return devdirread(c, va, n, pipedir, NPIPEDIR, pipegen);
309 return qread(p->q[0], va, n);
311 return qread(p->q[1], va, n);
315 return -1; /* not reached */
319 pipebread(Chan *c, long n, ulong offset)
325 switch(NETTYPE(c->qid.path)){
327 return qbread(p->q[0], n);
329 return qbread(p->q[1], n);
332 return devbread(c, n, offset);
336 * a write to a closed pipe causes a note to be sent to
340 pipewrite(Chan *c, void *va, long n, vlong)
345 print("pipewrite hi %#p\n", getcallerpc(&c));
347 /* avoid notes when pipe is a mounted queue */
348 if((c->flag & CMSG) == 0)
349 postnote(up, 1, "sys: write on closed pipe", NUser);
355 switch(NETTYPE(c->qid.path)){
357 n = qwrite(p->q[1], va, n);
361 n = qwrite(p->q[0], va, n);
373 pipebwrite(Chan *c, Block *bp, ulong)
379 /* avoid notes when pipe is a mounted queue */
380 if((c->flag & CMSG) == 0)
381 postnote(up, 1, "sys: write on closed pipe", NUser);
386 switch(NETTYPE(c->qid.path)){
388 n = qbwrite(p->q[1], bp);
392 n = qbwrite(p->q[0], bp);