2 #include "../port/lib.h"
6 #include "../port/error.h"
8 static ulong padblockcnt;
9 static ulong concatblockcnt;
10 static ulong pullupblockcnt;
11 static ulong copyblockcnt;
12 static ulong consumecnt;
13 static ulong producecnt;
14 static ulong qcopycnt;
21 typedef struct Queue Queue;
27 Block* bfirst; /* buffer */
30 int len; /* bytes allocated to queue */
31 int dlen; /* data bytes in queue */
32 int limit; /* max bytes in queue */
33 int inilim; /* initial limit */
35 int noblock; /* true if writes return immediately when q full */
36 int eof; /* number of eofs read by user */
38 void (*kick)(void*); /* restart output */
39 void (*bypass)(void*, Block*); /* bypass queue altogether */
40 void* arg; /* argument to kick */
42 QLock rlock; /* mutex for reading processes */
43 Rendez rr; /* process waiting to read */
44 QLock wlock; /* mutex for writing processes */
45 Rendez wr; /* process waiting to write */
55 uint qiomaxatomic = Maxatomic;
58 * free a list of blocks
65 for(; b != nil; b = next){
73 * pad a block to the front (or the back if size is negative)
76 padblock(Block *bp, int size)
82 panic("padblock %#p", getcallerpc(&bp));
84 QDEBUG checkb(bp, "padblock 1");
86 if(bp->rp - bp->base >= size){
95 memmove(nbp->wp, bp->rp, n);
100 if(bp->lim - bp->wp >= size)
104 nbp = allocb(size+n);
105 memmove(nbp->wp, bp->rp, n);
110 QDEBUG checkb(nbp, "padblock 1");
115 * return count of bytes in a string of blocks
131 * return count of space in blocks
134 blockalloclen(Block *bp)
147 * copy the string of blocks into
148 * a single block and free the string
151 concatblock(Block *bp)
159 nb = allocb(blocklen(bp));
160 for(; bp != nil; bp = next) {
163 memmove(nb->wp, bp->rp, len);
167 concatblockcnt += BLEN(nb);
168 QDEBUG checkb(nb, "concatblock 1");
173 * make sure the first block has at least n bytes
176 pullupblock(Block *bp, int n)
182 * this should almost always be true, it's
183 * just to avoid every caller checking.
189 * if not enough room in the first block,
190 * add another to the front of the list.
192 if(bp->lim - bp->rp < n){
199 * copy bytes from the trailing blocks into the first
202 while((nbp = bp->next) != nil){
206 memmove(bp->wp, nbp->rp, n);
209 QDEBUG checkb(bp, "pullupblock 1");
212 /* shouldn't happen but why crash if it does */
214 print("pullup negative length packet, called from %#p\n",
218 memmove(bp->wp, nbp->rp, i);
220 bp->next = nbp->next;
225 QDEBUG checkb(bp, "pullupblock 2");
235 * make sure the first block has at least n bytes
238 pullupqueue(Queue *q, int n)
242 if(BLEN(q->bfirst) >= n)
244 q->bfirst = pullupblock(q->bfirst, n);
245 for(b = q->bfirst; b != nil && b->next != nil; b = b->next)
252 * trim to len bytes starting at offset
255 trimblock(Block *bp, int offset, int len)
260 QDEBUG checkb(bp, "trimblock 1");
261 if(blocklen(bp) < offset+len) {
266 while((l = BLEN(bp)) < offset) {
277 while((l = BLEN(bp)) < len) {
282 bp->wp -= (BLEN(bp) - len);
284 if(bp->next != nil) {
293 * copy 'count' bytes into a new block
296 copyblock(Block *bp, int count)
301 QDEBUG checkb(bp, "copyblock 0");
303 for(; count > 0 && bp != nil; bp = bp->next){
307 memmove(nbp->wp, bp->rp, l);
312 memset(nbp->wp, 0, count);
316 QDEBUG checkb(nbp, "copyblock 1");
322 adjustblock(Block* bp, int len)
332 if(bp->rp+len > bp->lim){
333 nbp = copyblock(bp, len);
335 QDEBUG checkb(nbp, "adjustblock 1");
342 memset(bp->wp, 0, len-n);
344 QDEBUG checkb(bp, "adjustblock 2");
351 * throw away up to count bytes from a
352 * list of blocks. Return count of bytes
356 pullblock(Block **bph, int count)
365 while(*bph != nil && count != 0) {
373 QDEBUG checkb(bp, "pullblock ");
384 * get next block from a queue, return null if nothing there
392 /* sync with qwrite */
401 QDEBUG checkb(b, "qget");
407 /* if writer flow controlled, restart */
408 if((q->state & Qflow) && q->len < q->limit/2){
423 * throw away the next 'len' bytes in the queue
426 qdiscard(Queue *q, int len)
428 Block *b, *tofree = nil;
429 int dowakeup, n, sofar;
432 for(sofar = 0; sofar < len; sofar += n){
436 QDEBUG checkb(b, "qdiscard");
438 if(n <= len - sofar){
443 /* remember to free this */
454 * if writer flow controlled, restart
457 * q->len < q->limit/2
458 * but it slows down tcp too much for certain write sizes.
459 * I really don't understand it completely. It may be
460 * due to the queue draining so fast that the transmission
461 * stalls waiting for the app to produce more data. - presotto
463 if((q->state & Qflow) && q->len < q->limit){
481 * Interrupt level copy out of a queue, return # bytes copied.
484 qconsume(Queue *q, void *vp, int len)
486 Block *b, *tofree = nil;
490 /* sync with qwrite */
500 QDEBUG checkb(b, "qconsume 1");
508 /* remember to free this */
516 memmove(p, b->rp, len);
520 /* discard the block if we're done with it */
521 if((q->state & Qmsg) || len == n){
526 /* remember to free this */
532 /* if writer flow controlled, restart */
533 if((q->state & Qflow) && q->len < q->limit/2){
551 qpass(Queue *q, Block *b)
555 /* sync with qread */
558 if(q->len >= q->limit){
563 if(q->state & Qclosed){
569 len = qaddlist(q, b);
571 if(q->len >= q->limit/2)
574 if(q->state & Qstarve){
575 q->state &= ~Qstarve;
587 qpassnolim(Queue *q, Block *b)
591 /* sync with qread */
595 if(q->state & Qclosed){
601 len = qaddlist(q, b);
603 if(q->len >= q->limit/2)
606 if(q->state & Qstarve){
607 q->state &= ~Qstarve;
619 * if the allocated space is way out of line with the used
620 * space, reallocate to a smaller block
628 for(l = &bp; (nbp = *l) != nil; l = &(*l)->next){
630 if((n<<2) < BALLOC(nbp)){
632 memmove((*l)->wp, nbp->rp, n);
634 (*l)->next = nbp->next;
643 qproduce(Queue *q, void *vp, int len)
653 /* sync with qread */
657 /* no waiting receivers, room in buffer? */
658 if(q->len >= q->limit){
666 memmove(b->wp, p, len);
670 if(q->state & Qstarve){
671 q->state &= ~Qstarve;
675 if(q->len >= q->limit)
686 * copy from offset in the queue
689 qcopy(Queue *q, int len, ulong offset)
695 b->wp += readblist(q->bfirst, b->wp, len, offset);
701 * called by non-interrupt code
704 qopen(int limit, int msg, void (*kick)(void*), void *arg)
708 q = malloc(sizeof(Queue));
712 q->limit = q->inilim = limit;
724 /* open a queue to be bypassed */
726 qbypass(void (*bypass)(void*, Block*), void *arg)
730 q = malloc(sizeof(Queue));
747 return (q->state & Qclosed) || q->bfirst != nil;
751 * wait for the queue to be non-empty or closed.
752 * called with q ilocked.
762 if(q->state & Qclosed){
765 if(*q->err && strcmp(q->err, Ehungup) != 0)
770 q->state |= Qstarve; /* flag requesting producer to wake me */
772 sleep(&q->rr, notempty, q);
779 * add a block list to a queue, return bytes added
782 qaddlist(Queue *q, Block *b)
786 QDEBUG checkb(b, "qaddlist 1");
788 /* queue the block */
796 while(b->next != nil){
798 QDEBUG checkb(b, "qaddlist 2");
810 * called with q ilocked
820 QDEBUG checkb(b, "qremove");
829 * copy the contents of a string of blocks into
830 * memory from an offset. blocklist kept unchanged.
831 * return number of copied bytes.
834 readblist(Block *b, uchar *p, long n, long o)
839 while(n > 0 && b != nil){
847 memmove(p, b->rp + o, m);
859 * put a block back to the front of the queue
860 * called with q ilocked
863 qputback(Queue *q, Block *b)
874 * cut off n bytes from the end of *h. return a new
875 * block with the tail and change *h to refer to the
879 splitblock(Block **h, int n)
888 memmove(b->wp, a->rp, m);
896 memmove(b->wp, a->wp, n);
903 * flow control, get producer going again
904 * called with q ilocked
907 qwakeup_iunlock(Queue *q)
911 /* if writer flow controlled, restart */
912 if((q->state & Qflow) && q->len < q->limit/2){
919 /* wakeup flow controlled writers */
928 * get next block from a queue (up to a limit)
931 qbread(Queue *q, int len)
951 /* multiple reads on a closed queue */
956 /* if we get here, there's at least one block in the queue */
960 /* split block if it's too big and this is not a message queue */
963 if((q->state & Qmsg) == 0)
964 qputback(q, splitblock(&b, n));
969 /* restart producer */
979 * read a queue. if no data is queued, post a Block
980 * and wait on its Rendez.
983 qread(Queue *q, void *vp, int len)
985 Block *b, *first, **last;
1004 /* multiple reads on a closed queue */
1009 /* if we get here, there's at least one block in the queue */
1011 if(q->state & Qcoalesce){
1012 /* when coalescing, 0 length blocks just go away */
1020 /* grab the first block plus as many
1021 * following blocks as will partially
1028 if(n >= len || q->bfirst == nil)
1039 /* split last block if it's too big and this is not a message queue */
1040 if(n > len && (q->state & Qmsg) == 0)
1041 qputback(q, splitblock(last, n - len));
1043 /* restart producer */
1053 n = readblist(first, vp, len, 0);
1065 return q->len < q->limit || (q->state & Qclosed);
1069 * flow control, wait for queue to get below the limit
1075 if(q->noblock || qnotfull(q))
1087 sleep(&q->wr, qnotfull, q);
1094 * add a block to a queue obeying flow control
1097 qbwrite(Queue *q, Block *b)
1102 if(q->bypass != nil){
1104 (*q->bypass)(q->arg, b);
1115 /* give up if the queue is closed */
1116 if(q->state & Qclosed){
1121 /* don't queue over the limit */
1122 if(q->len >= q->limit && q->noblock){
1130 len = qaddlist(q, b);
1132 /* make sure other end gets awakened */
1133 if(q->state & Qstarve){
1134 q->state &= ~Qstarve;
1140 /* get output going again */
1141 if(q->kick != nil && (dowakeup || (q->state&Qkick)))
1144 /* wakeup anyone consuming at the other end */
1148 /* if we just wokeup a higher priority process, let it run */
1149 if(p != nil && p->priority > up->priority)
1154 * flow control, before allowing the process to continue and
1155 * queue more. We do this here so that postnote can only
1156 * interrupt us after the data has been queued. This means that
1157 * things like 9p flushes and ssl messages will not be disrupted
1158 * by software interrupts.
1166 * write to a queue. only Maxatomic bytes at a time is atomic.
1169 qwrite(Queue *q, void *vp, int len)
1176 print("qwrite hi %#p\n", getcallerpc(&q));
1178 /* stop queue bloat before allocating blocks */
1179 if(q->len/2 >= q->limit && q->noblock == 0 && q->bypass == nil){
1181 if(up->procctl == Proc_exitme || up->procctl == Proc_exitbig)
1199 memmove(b->wp, p+sofar, n);
1203 sofar += qbwrite(q, b);
1204 } while(sofar < len && (q->state & Qmsg) == 0);
1210 * used by print() to write to a queue. Since we may be splhi or not in
1211 * a process, don't qlock.
1214 qiwrite(Queue *q, void *vp, int len)
1216 int n, sofar, dowakeup;
1231 memmove(b->wp, p+sofar, n);
1236 /* we use an artificially high limit for kernel prints since anything
1237 * over the limit gets dropped
1239 if((q->state & Qclosed) != 0 || q->len/2 >= q->limit){
1247 if(q->state & Qstarve){
1248 q->state &= ~Qstarve;
1261 } while(sofar < len && (q->state & Qmsg) == 0);
1267 * be extremely careful when calling this,
1268 * as there is no reference accounting
1278 * Mark a queue as closed. No further IO is permitted.
1279 * All blocks are released.
1291 q->state |= Qclosed;
1292 q->state &= ~(Qflow|Qstarve);
1293 kstrcpy(q->err, Ehungup, ERRMAX);
1301 /* free queued blocks */
1304 /* wake up readers/writers */
1310 * Mark a queue as closed. Wakeup any readers. Don't remove queued
1314 qhangup(Queue *q, char *msg)
1318 q->state |= Qclosed;
1319 if(msg == nil || *msg == '\0')
1321 kstrcpy(q->err, msg, ERRMAX);
1324 /* wake up readers/writers */
1330 * return non-zero if the q is hungup
1335 return q->state & Qclosed;
1339 * mark a queue as no longer hung up
1345 q->state &= ~Qclosed;
1346 q->state |= Qstarve;
1348 q->limit = q->inilim;
1353 * return bytes queued
1362 * return space remaining before flow control
1369 l = q->limit - q->len;
1376 * return true if we can read without blocking
1381 return q->bfirst != nil;
1385 * change queue limit
1388 qsetlimit(Queue *q, int limit)
1394 * set blocking/nonblocking
1397 qnoblock(Queue *q, int onoff)
1403 * flush the output queue
1418 /* free queued blocks */
1421 /* wake up writers */
1428 return q->state & Qflow;