2 #include "../port/lib.h"
6 #include "../port/error.h"
10 typedef struct Pipe Pipe;
37 ".", {Qdir,0,QTDIR}, 0, DMDIR|0500,
38 "data", {Qdata0}, 0, 0600,
39 "data1", {Qdata1}, 0, 0600,
46 if(conf.pipeqsize == 0){
48 conf.pipeqsize = 256*1024;
50 conf.pipeqsize = 32*1024;
55 * create a pipe, no streams are created until an open
58 pipeattach(char *spec)
63 c = devattach('|', spec);
68 p = malloc(sizeof(Pipe));
73 p->q[0] = qopen(conf.pipeqsize, 0, 0, 0);
78 p->q[1] = qopen(conf.pipeqsize, 0, 0, 0);
87 p->path = ++pipealloc.path;
89 p->perm = pipedir[Qdata0].perm;
91 mkqid(&c->qid, NETQID(2*p->path, Qdir), 0, QTDIR);
98 pipegen(Chan *c, char*, Dirtab *tab, int ntab, int i, Dir *dp)
105 devdir(c, c->qid, "#|", 0, eve, 0555, dp);
109 if(tab==0 || i>=ntab)
114 switch((ulong)tab->qid.path){
125 mkqid(&q, NETQID(NETID(c->qid.path), tab->qid.path), 0, QTFILE);
126 devdir(c, q, tab->name, len, eve, p->perm, dp);
132 pipewalk(Chan *c, Chan *nc, char **name, int nname)
137 wq = devwalk(c, nc, name, nname, pipedir, NPIPEDIR, pipegen);
138 if(wq != nil && wq->clone != nil && wq->clone != c){
143 print("channel open in pipewalk\n");
144 switch(NETTYPE(c->qid.path)){
159 pipestat(Chan *c, uchar *db, int n)
166 switch(NETTYPE(c->qid.path)){
168 devdir(c, c->qid, ".", 0, eve, 0555, &dir);
171 devdir(c, c->qid, "data", qlen(p->q[0]), eve, p->perm, &dir);
174 devdir(c, c->qid, "data1", qlen(p->q[1]), eve, p->perm, &dir);
179 n = convD2M(&dir, db, n);
186 pipewstat(Chan* c, uchar* db, int n)
193 if(strcmp(up->user, eve) != 0)
195 if(NETTYPE(c->qid.path) == Qdir)
198 dir = smalloc(sizeof(Dir)+n);
203 m = convM2D(db, n, &dir[0], (char*)&dir[1]);
206 if(!emptystr(dir[0].uid))
207 error("can't change owner");
208 if(dir[0].mode != ~0UL)
209 p->perm = dir[0].mode;
216 * if the stream doesn't exist, create it
219 pipeopen(Chan *c, int omode)
223 if(c->qid.type & QTDIR){
234 switch(NETTYPE(c->qid.path)){
244 c->mode = openmode(omode);
247 c->iounit = qiomaxatomic;
261 * closing either side hangs up the stream
263 switch(NETTYPE(c->qid.path)){
283 * if both sides are closed, they are reusable
285 if(p->qref[0] == 0 && p->qref[1] == 0){
291 * free the structure on last close
304 piperead(Chan *c, void *va, long n, vlong)
310 switch(NETTYPE(c->qid.path)){
312 return devdirread(c, va, n, pipedir, NPIPEDIR, pipegen);
314 return qread(p->q[0], va, n);
316 return qread(p->q[1], va, n);
320 return -1; /* not reached */
324 pipebread(Chan *c, long n, ulong offset)
330 switch(NETTYPE(c->qid.path)){
332 return qbread(p->q[0], n);
334 return qbread(p->q[1], n);
337 return devbread(c, n, offset);
341 * a write to a closed pipe causes a note to be sent to
345 pipewrite(Chan *c, void *va, long n, vlong)
350 print("pipewrite hi %#p\n", getcallerpc(&c));
352 /* avoid notes when pipe is a mounted queue */
353 if((c->flag & CMSG) == 0)
354 postnote(up, 1, "sys: write on closed pipe", NUser);
360 switch(NETTYPE(c->qid.path)){
362 n = qwrite(p->q[1], va, n);
366 n = qwrite(p->q[0], va, n);
378 pipebwrite(Chan *c, Block *bp, ulong)
384 /* avoid notes when pipe is a mounted queue */
385 if((c->flag & CMSG) == 0)
386 postnote(up, 1, "sys: write on closed pipe", NUser);
391 switch(NETTYPE(c->qid.path)){
393 n = qbwrite(p->q[1], bp);
397 n = qbwrite(p->q[0], bp);