2 #include "../port/lib.h"
6 #include "../port/error.h"
8 static ulong padblockcnt;
9 static ulong concatblockcnt;
10 static ulong pullupblockcnt;
11 static ulong copyblockcnt;
12 static ulong consumecnt;
13 static ulong producecnt;
14 static ulong qcopycnt;
23 typedef struct Queue Queue;
29 Block* bfirst; /* buffer */
32 int len; /* bytes allocated to queue */
33 int dlen; /* data bytes in queue */
34 int limit; /* max bytes in queue */
35 int inilim; /* initial limit */
37 int noblock; /* true if writes return immediately when q full */
38 int eof; /* number of eofs read by user */
40 void (*kick)(void*); /* restart output */
41 void (*bypass)(void*, Block*); /* bypass queue altogether */
42 void* arg; /* argument to kick */
44 QLock rlock; /* mutex for reading processes */
45 Rendez rr; /* process waiting to read */
46 QLock wlock; /* mutex for writing processes */
47 Rendez wr; /* process waiting to write */
57 uint qiomaxatomic = Maxatomic;
64 print("pad %lud, concat %lud, pullup %lud, copy %lud\n",
65 padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
66 print("consume %lud, produce %lud, qcopy %lud\n",
67 consumecnt, producecnt, qcopycnt);
71 * free a list of blocks
78 for(; b != 0; b = next){
87 * pad a block to the front (or the back if size is negative)
90 padblock(Block *bp, int size)
95 QDEBUG checkb(bp, "padblock 1");
97 if(bp->rp - bp->base >= size){
103 panic("padblock %#p", getcallerpc(&bp));
106 nbp = allocb(size+n);
109 memmove(nbp->wp, bp->rp, n);
117 panic("padblock %#p", getcallerpc(&bp));
119 if(bp->lim - bp->wp >= size)
124 nbp = allocb(size+n);
125 memmove(nbp->wp, bp->rp, n);
129 QDEBUG checkb(nbp, "padblock 1");
134 * return count of bytes in a string of blocks
150 * return count of space in blocks
153 blockalloclen(Block *bp)
166 * copy the string of blocks into
167 * a single block and free the string
170 concatblock(Block *bp)
178 nb = allocb(blocklen(bp));
179 for(f = bp; f; f = f->next) {
181 memmove(nb->wp, f->rp, len);
184 concatblockcnt += BLEN(nb);
186 QDEBUG checkb(nb, "concatblock 1");
191 * make sure the first block has at least n bytes
194 pullupblock(Block *bp, int n)
200 * this should almost always be true, it's
201 * just to avoid every caller checking.
207 * if not enough room in the first block,
208 * add another to the front of the list.
210 if(bp->lim - bp->rp < n){
217 * copy bytes from the trailing blocks into the first
220 while(nbp = bp->next){
223 memmove(bp->wp, nbp->rp, n);
227 QDEBUG checkb(bp, "pullupblock 1");
230 /* shouldn't happen but why crash if it does */
232 print("pullup negative length packet, called from %#p\n",
236 memmove(bp->wp, nbp->rp, i);
239 bp->next = nbp->next;
244 QDEBUG checkb(bp, "pullupblock 2");
254 * make sure the first block has at least n bytes
257 pullupqueue(Queue *q, int n)
261 if(BLEN(q->bfirst) >= n)
263 q->bfirst = pullupblock(q->bfirst, n);
264 for(b = q->bfirst; b != nil && b->next != nil; b = b->next)
271 * trim to len bytes starting at offset
274 trimblock(Block *bp, int offset, int len)
279 QDEBUG checkb(bp, "trimblock 1");
280 if(blocklen(bp) < offset+len) {
285 while((l = BLEN(bp)) < offset) {
296 while((l = BLEN(bp)) < len) {
301 bp->wp -= (BLEN(bp) - len);
312 * copy 'count' bytes into a new block
315 copyblock(Block *bp, int count)
320 QDEBUG checkb(bp, "copyblock 0");
322 for(; count > 0 && bp != 0; bp = bp->next){
326 memmove(nbp->wp, bp->rp, l);
331 memset(nbp->wp, 0, count);
335 QDEBUG checkb(nbp, "copyblock 1");
341 adjustblock(Block* bp, int len)
351 if(bp->rp+len > bp->lim){
352 nbp = copyblock(bp, len);
354 QDEBUG checkb(nbp, "adjustblock 1");
361 memset(bp->wp, 0, len-n);
363 QDEBUG checkb(bp, "adjustblock 2");
370 * throw away up to count bytes from a
371 * list of blocks. Return count of bytes
375 pullblock(Block **bph, int count)
384 while(*bph != nil && count != 0) {
392 QDEBUG checkb(bp, "pullblock ");
403 * get next block from a queue, return null if nothing there
411 /* sync with qwrite */
424 QDEBUG checkb(b, "qget");
426 /* if writer flow controlled, restart */
427 if((q->state & Qflow) && q->len < q->limit/2){
442 * throw away the next 'len' bytes in the queue
445 qdiscard(Queue *q, int len)
448 int dowakeup, n, sofar;
451 for(sofar = 0; sofar < len; sofar += n){
455 QDEBUG checkb(b, "qdiscard");
457 if(n <= len - sofar){
471 * if writer flow controlled, restart
474 * q->len < q->limit/2
475 * but it slows down tcp too much for certain write sizes.
476 * I really don't understand it completely. It may be
477 * due to the queue draining so fast that the transmission
478 * stalls waiting for the app to produce more data. - presotto
480 if((q->state & Qflow) && q->len < q->limit){
495 * Interrupt level copy out of a queue, return # bytes copied.
498 qconsume(Queue *q, void *vp, int len)
505 /* sync with qwrite */
515 QDEBUG checkb(b, "qconsume 1");
523 /* remember to free this */
530 memmove(p, b->rp, len);
535 /* discard the block if we're done with it */
536 if((q->state & Qmsg) || len == n){
542 /* remember to free this */
547 /* if writer flow controlled, restart */
548 if((q->state & Qflow) && q->len < q->limit/2){
566 qpass(Queue *q, Block *b)
568 int dlen, len, dowakeup;
570 /* sync with qread */
573 if(q->len >= q->limit){
578 if(q->state & Qclosed){
585 /* add buffer to queue */
592 QDEBUG checkb(b, "qpass");
595 QDEBUG checkb(b, "qpass");
603 if(q->len >= q->limit/2)
606 if(q->state & Qstarve){
607 q->state &= ~Qstarve;
619 qpassnolim(Queue *q, Block *b)
621 int dlen, len, dowakeup;
623 /* sync with qread */
627 if(q->state & Qclosed){
633 /* add buffer to queue */
640 QDEBUG checkb(b, "qpass");
643 QDEBUG checkb(b, "qpass");
651 if(q->len >= q->limit/2)
654 if(q->state & Qstarve){
655 q->state &= ~Qstarve;
667 * if the allocated space is way out of line with the used
668 * space, reallocate to a smaller block
676 for(l = &bp; *l; l = &(*l)->next){
679 if((n<<2) < BALLOC(nbp)){
681 memmove((*l)->wp, nbp->rp, n);
683 (*l)->next = nbp->next;
692 qproduce(Queue *q, void *vp, int len)
698 /* sync with qread */
702 /* no waiting receivers, room in buffer? */
703 if(q->len >= q->limit){
715 memmove(b->wp, p, len);
723 /* b->next = 0; done by iallocb() */
726 QDEBUG checkb(b, "qproduce");
728 if(q->state & Qstarve){
729 q->state &= ~Qstarve;
733 if(q->len >= q->limit)
744 * copy from offset in the queue
747 qcopy(Queue *q, int len, ulong offset)
760 for(sofar = 0; ; sofar += n){
766 if(sofar + n > offset){
767 p = b->rp + offset - sofar;
771 QDEBUG checkb(b, "qcopy");
775 /* copy bytes from there */
776 for(sofar = 0; sofar < len;){
779 memmove(nb->wp, p, n);
795 * called by non-interrupt code
798 qopen(int limit, int msg, void (*kick)(void*), void *arg)
802 q = malloc(sizeof(Queue));
806 q->limit = q->inilim = limit;
818 /* open a queue to be bypassed */
820 qbypass(void (*bypass)(void*, Block*), void *arg)
824 q = malloc(sizeof(Queue));
841 return (q->state & Qclosed) || q->bfirst != 0;
845 * wait for the queue to be non-empty or closed.
846 * called with q ilocked.
856 if(q->state & Qclosed){
859 if(*q->err && strcmp(q->err, Ehungup) != 0)
864 q->state |= Qstarve; /* flag requesting producer to wake me */
866 sleep(&q->rr, notempty, q);
873 * add a block list to a queue
876 qaddlist(Queue *q, Block *b)
878 /* queue the block */
883 q->len += blockalloclen(b);
884 q->dlen += blocklen(b);
891 * called with q ilocked
905 QDEBUG checkb(b, "qremove");
910 * copy the contents of a string of blocks into
911 * memory. emptied blocks are freed. return
912 * pointer to first unconsumed block.
915 bl2mem(uchar *p, Block *b, int n)
920 for(; b != nil; b = next){
923 memmove(p, b->rp, n);
927 memmove(p, b->rp, i);
938 * copy the contents of memory into a string of blocks.
939 * return nil on error.
942 mem2bl(uchar *p, int len)
945 Block *b, *first, **l;
959 setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]);
960 memmove(b->wp, p, n);
972 * put a block back to the front of the queue
973 * called with q ilocked
976 qputback(Queue *q, Block *b)
987 * flow control, get producer going again
988 * called with q ilocked
991 qwakeup_iunlock(Queue *q)
995 /* if writer flow controlled, restart */
996 if((q->state & Qflow) && q->len < q->limit/2){
1003 /* wakeup flow controlled writers */
1012 * get next block from a queue (up to a limit)
1015 qbread(Queue *q, int len)
1035 /* multiple reads on a closed queue */
1040 /* if we get here, there's at least one block in the queue */
1044 /* split block if it's too big and this is not a message queue */
1047 if((q->state&Qmsg) == 0){
1050 memmove(b->wp, nb->rp+len, n);
1054 nb->wp = nb->rp + len;
1057 /* restart producer */
1066 * read a queue. if no data is queued, post a Block
1067 * and wait on its Rendez.
1070 qread(Queue *q, void *vp, int len)
1072 Block *b, *first, **l;
1091 /* multiple reads on a closed queue */
1096 /* if we get here, there's at least one block in the queue */
1097 if(q->state & Qcoalesce){
1098 /* when coalescing, 0 length blocks just go away */
1105 /* grab the first block plus as many
1106 * following blocks as will completely
1129 /* copy to user space outside of the ilock */
1131 b = bl2mem(vp, first, len);
1134 /* take care of any left over partial block */
1143 /* restart producer */
1156 return q->len < q->limit || (q->state & Qclosed);
1160 * add a block to a queue obeying flow control
1163 qbwrite(Queue *q, Block *b)
1171 (*q->bypass)(q->arg, b);
1182 /* give up if the queue is closed */
1183 if(q->state & Qclosed){
1188 /* don't queue over the limit */
1189 if(q->len >= q->limit){
1196 if(q->len >= q->limit*10){
1202 /* queue the block */
1209 q->len += BALLOC(b);
1211 QDEBUG checkb(b, "qbwrite");
1213 /* make sure other end gets awakened */
1214 if(q->state & Qstarve){
1215 q->state &= ~Qstarve;
1221 /* get output going again */
1222 if(q->kick && (dowakeup || (q->state&Qkick)))
1225 /* wakeup anyone consuming at the other end */
1229 /* if we just wokeup a higher priority process, let it run */
1230 if(p != nil && p->priority > up->priority)
1235 * flow control, wait for queue to get below the limit
1236 * before allowing the process to continue and queue
1237 * more. We do this here so that postnote can only
1238 * interrupt us after the data has been queued. This
1239 * means that things like 9p flushes and ssl messages
1240 * will not be disrupted by software interrupts.
1242 * Note - this is moderately dangerous since a process
1243 * that keeps getting interrupted and rewriting will
1244 * queue up to 10 times the queue limit before failing.
1247 if(q->noblock || qnotfull(q))
1259 sleep(&q->wr, qnotfull, q);
1268 * write to a queue. only Maxatomic bytes at a time is atomic.
1271 qwrite(Queue *q, void *vp, int len)
1278 print("qwrite hi %#p\n", getcallerpc(&q));
1287 setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]);
1292 memmove(b->wp, p+sofar, n);
1299 } while(sofar < len && (q->state & Qmsg) == 0);
1305 * used by print() to write to a queue. Since we may be splhi or not in
1306 * a process, don't qlock.
1308 * this routine merges adjacent blocks if block n+1 will fit into
1309 * the free space of block n.
1312 qiwrite(Queue *q, void *vp, int len)
1314 int n, sofar, dowakeup;
1329 memmove(b->wp, p+sofar, n);
1334 /* we use an artificially high limit for kernel prints since anything
1335 * over the limit gets dropped
1337 if(q->dlen >= 16*1024){
1343 QDEBUG checkb(b, "qiwrite");
1349 q->len += BALLOC(b);
1352 if(q->state & Qstarve){
1353 q->state &= ~Qstarve;
1366 } while(sofar < len && (q->state & Qmsg) == 0);
1372 * be extremely careful when calling this,
1373 * as there is no reference accounting
1383 * Mark a queue as closed. No further IO is permitted.
1384 * All blocks are released.
1396 q->state |= Qclosed;
1397 q->state &= ~(Qflow|Qstarve);
1398 strcpy(q->err, Ehungup);
1406 /* free queued blocks */
1409 /* wake up readers/writers */
1415 * Mark a queue as closed. Wakeup any readers. Don't remove queued
1419 qhangup(Queue *q, char *msg)
1423 q->state |= Qclosed;
1424 if(msg == 0 || *msg == 0)
1425 strcpy(q->err, Ehungup);
1427 strncpy(q->err, msg, ERRMAX-1);
1430 /* wake up readers/writers */
1436 * return non-zero if the q is hungup
1441 return q->state & Qclosed;
1445 * mark a queue as no longer hung up
1451 q->state &= ~Qclosed;
1452 q->state |= Qstarve;
1454 q->limit = q->inilim;
1459 * return bytes queued
1468 * return space remaining before flow control
1475 l = q->limit - q->len;
1482 * return true if we can read without blocking
1487 return q->bfirst!=0;
1491 * change queue limit
1494 qsetlimit(Queue *q, int limit)
1500 * set blocking/nonblocking
1503 qnoblock(Queue *q, int onoff)
1509 * flush the output queue
1524 /* free queued blocks */
1527 /* wake up readers/writers */
1534 return q->state & Qflow;