2 #include "../port/lib.h"
6 #include "../port/error.h"
8 static ulong padblockcnt;
9 static ulong concatblockcnt;
10 static ulong pullupblockcnt;
11 static ulong copyblockcnt;
12 static ulong consumecnt;
13 static ulong producecnt;
20 typedef struct Queue Queue;
26 Block* bfirst; /* buffer */
29 int len; /* bytes allocated to queue */
30 int dlen; /* data bytes in queue */
31 int limit; /* max bytes in queue */
32 int inilim; /* initial limit */
34 int noblock; /* true if writes return immediately when q full */
35 int eof; /* number of eofs read by user */
37 void (*kick)(void*); /* restart output */
38 void (*bypass)(void*, Block*); /* bypass queue altogether */
39 void* arg; /* argument to kick */
41 QLock rlock; /* mutex for reading processes */
42 Rendez rr; /* process waiting to read */
43 QLock wlock; /* mutex for writing processes */
44 Rendez wr; /* process waiting to write */
54 uint qiomaxatomic = Maxatomic;
57 * free a list of blocks
64 for(; b != nil; b = next){
72 * pad a block to the front (or the back if size is negative)
75 padblock(Block *bp, int size)
80 QDEBUG checkb(bp, "padblock 0");
82 if(bp->rp - bp->base >= size){
90 memmove(nbp->wp, bp->rp, n);
95 if(bp->lim - bp->wp >= size)
99 memmove(nbp->wp, bp->rp, n);
102 nbp->next = bp->next;
105 QDEBUG checkb(nbp, "padblock 1");
110 * return count of bytes in a string of blocks
126 * return count of space in blocks
129 blockalloclen(Block *bp)
142 * copy the string of blocks into
143 * a single block and free the string
146 concatblock(Block *bp)
153 concatblockcnt += len;
154 return pullupblock(bp, len);
158 * make sure the first block has at least n bytes
161 pullupblock(Block *bp, int n)
167 * this should almost always be true, it's
168 * just to avoid every caller checking.
174 * if not enough room in the first block,
175 * add another to the front of the list.
177 if(bp->lim - bp->rp < n){
184 * copy bytes from the trailing blocks into the first
187 while((nbp = bp->next) != nil){
191 memmove(bp->wp, nbp->rp, n);
194 QDEBUG checkb(bp, "pullupblock 1");
197 /* shouldn't happen but why crash if it does */
199 print("pullup negative length packet, called from %#p\n",
203 memmove(bp->wp, nbp->rp, i);
205 bp->next = nbp->next;
210 QDEBUG checkb(bp, "pullupblock 2");
220 * make sure the first block has at least n bytes
223 pullupqueue(Queue *q, int n)
227 if(BLEN(q->bfirst) >= n)
229 q->bfirst = pullupblock(q->bfirst, n);
230 for(b = q->bfirst; b != nil && b->next != nil; b = b->next)
237 * trim to len bytes starting at offset
240 trimblock(Block *bp, int offset, int len)
245 QDEBUG checkb(bp, "trimblock 1");
247 if(offset == 0 && len == l)
254 while((l = BLEN(bp)) < offset) {
265 while((l = BLEN(bp)) < len) {
270 bp->wp -= (BLEN(bp) - len);
272 if(bp->next != nil) {
281 * copy 'count' bytes into a new block
284 copyblock(Block *bp, int count)
289 QDEBUG checkb(bp, "copyblock 0");
291 for(; count > 0 && bp != nil; bp = bp->next){
295 memmove(nbp->wp, bp->rp, l);
300 memset(nbp->wp, 0, count);
304 QDEBUG checkb(nbp, "copyblock 1");
310 adjustblock(Block* bp, int len)
320 if(bp->rp+len > bp->lim){
321 nbp = copyblock(bp, len);
323 QDEBUG checkb(nbp, "adjustblock 1");
330 memset(bp->wp, 0, len-n);
332 QDEBUG checkb(bp, "adjustblock 2");
339 * throw away up to count bytes from a
340 * list of blocks. Return count of bytes
344 pullblock(Block **bph, int count)
353 while(*bph != nil && count != 0) {
361 QDEBUG checkb(bp, "pullblock ");
372 * get next block from a queue, return null if nothing there
380 /* sync with qwrite */
389 QDEBUG checkb(b, "qget");
395 /* if writer flow controlled, restart */
396 if((q->state & Qflow) && q->len < q->limit/2){
411 * throw away the next 'len' bytes in the queue
414 qdiscard(Queue *q, int len)
416 Block *b, *tofree = nil;
417 int dowakeup, n, sofar;
420 for(sofar = 0; sofar < len; sofar += n){
424 QDEBUG checkb(b, "qdiscard");
426 if(n <= len - sofar){
431 /* remember to free this */
442 * if writer flow controlled, restart
445 * q->len < q->limit/2
446 * but it slows down tcp too much for certain write sizes.
447 * I really don't understand it completely. It may be
448 * due to the queue draining so fast that the transmission
449 * stalls waiting for the app to produce more data. - presotto
451 if((q->state & Qflow) && q->len < q->limit){
469 * Interrupt level copy out of a queue, return # bytes copied.
472 qconsume(Queue *q, void *vp, int len)
474 Block *b, *tofree = nil;
478 /* sync with qwrite */
488 QDEBUG checkb(b, "qconsume 1");
496 /* remember to free this */
504 memmove(p, b->rp, len);
508 /* discard the block if we're done with it */
509 if((q->state & Qmsg) || len == n){
514 /* remember to free this */
520 /* if writer flow controlled, restart */
521 if((q->state & Qflow) && q->len < q->limit/2){
539 qpass(Queue *q, Block *b)
543 /* sync with qread */
546 if(q->len >= q->limit){
551 if(q->state & Qclosed){
557 len = qaddlist(q, b);
559 if(q->len >= q->limit/2)
562 if(q->state & Qstarve){
563 q->state &= ~Qstarve;
575 qpassnolim(Queue *q, Block *b)
579 /* sync with qread */
583 if(q->state & Qclosed){
589 len = qaddlist(q, b);
591 if(q->len >= q->limit/2)
594 if(q->state & Qstarve){
595 q->state &= ~Qstarve;
607 * if the allocated space is way out of line with the used
608 * space, reallocate to a smaller block
616 for(l = &bp; (nbp = *l) != nil; l = &(*l)->next){
618 if((n<<2) < BALLOC(nbp)){
620 memmove((*l)->wp, nbp->rp, n);
622 (*l)->next = nbp->next;
631 qproduce(Queue *q, void *vp, int len)
641 /* sync with qread */
645 /* no waiting receivers, room in buffer? */
646 if(q->len >= q->limit){
654 memmove(b->wp, p, len);
658 if(q->state & Qstarve){
659 q->state &= ~Qstarve;
663 if(q->len >= q->limit)
674 * copy from offset in the queue
677 qcopy(Queue *q, int len, ulong offset)
683 b->wp += readblist(q->bfirst, b->wp, len, offset);
689 * called by non-interrupt code
692 qopen(int limit, int msg, void (*kick)(void*), void *arg)
696 q = malloc(sizeof(Queue));
700 q->limit = q->inilim = limit;
712 /* open a queue to be bypassed */
714 qbypass(void (*bypass)(void*, Block*), void *arg)
718 q = malloc(sizeof(Queue));
735 return (q->state & Qclosed) || q->bfirst != nil;
739 * wait for the queue to be non-empty or closed.
740 * called with q ilocked.
750 if(q->state & Qclosed){
753 if(*q->err && strcmp(q->err, Ehungup) != 0)
758 q->state |= Qstarve; /* flag requesting producer to wake me */
760 sleep(&q->rr, notempty, q);
767 * add a block list to a queue, return bytes added
770 qaddlist(Queue *q, Block *b)
774 QDEBUG checkb(b, "qaddlist 1");
776 /* queue the block */
784 while(b->next != nil){
786 QDEBUG checkb(b, "qaddlist 2");
798 * called with q ilocked
808 QDEBUG checkb(b, "qremove");
817 * copy the contents of a string of blocks into
818 * memory from an offset. blocklist kept unchanged.
819 * return number of copied bytes.
822 readblist(Block *b, uchar *p, long n, ulong o)
827 while(n > 0 && b != nil){
835 memmove(p, b->rp + o, m);
847 * put a block back to the front of the queue
848 * called with q ilocked
851 qputback(Queue *q, Block *b)
862 * cut off n bytes from the end of *h. return a new
863 * block with the tail and change *h to refer to the
867 splitblock(Block **h, int n)
876 memmove(b->wp, a->rp, m);
884 memmove(b->wp, a->wp, n);
891 * flow control, get producer going again
892 * called with q ilocked
895 qwakeup_iunlock(Queue *q)
899 /* if writer flow controlled, restart */
900 if((q->state & Qflow) && q->len < q->limit/2){
907 /* wakeup flow controlled writers */
916 * get next block from a queue (up to a limit)
919 qbread(Queue *q, int len)
939 /* multiple reads on a closed queue */
944 /* if we get here, there's at least one block in the queue */
948 /* split block if it's too big and this is not a message queue */
951 if((q->state & Qmsg) == 0)
952 qputback(q, splitblock(&b, n));
957 /* restart producer */
967 * read a queue. if no data is queued, post a Block
968 * and wait on its Rendez.
971 qread(Queue *q, void *vp, int len)
973 Block *b, *first, **last;
992 /* multiple reads on a closed queue */
997 /* if we get here, there's at least one block in the queue */
999 if(q->state & Qcoalesce){
1000 /* when coalescing, 0 length blocks just go away */
1008 /* grab the first block plus as many
1009 * following blocks as will partially
1016 if(n >= len || q->bfirst == nil)
1027 /* split last block if it's too big and this is not a message queue */
1028 if(n > len && (q->state & Qmsg) == 0)
1029 qputback(q, splitblock(last, n - len));
1031 /* restart producer */
1041 n = readblist(first, vp, len, 0);
1053 return q->len < q->limit || (q->state & Qclosed);
1057 * flow control, wait for queue to get below the limit
1063 if(q->noblock || qnotfull(q))
1075 sleep(&q->wr, qnotfull, q);
1082 * add a block to a queue obeying flow control
1085 qbwrite(Queue *q, Block *b)
1090 if(q->bypass != nil){
1092 (*q->bypass)(q->arg, b);
1103 /* give up if the queue is closed */
1104 if(q->state & Qclosed){
1109 /* don't queue over the limit */
1110 if(q->len >= q->limit && q->noblock){
1118 len = qaddlist(q, b);
1120 /* make sure other end gets awakened */
1121 if(q->state & Qstarve){
1122 q->state &= ~Qstarve;
1128 /* get output going again */
1129 if(q->kick != nil && (dowakeup || (q->state&Qkick)))
1132 /* wakeup anyone consuming at the other end */
1136 /* if we just wokeup a higher priority process, let it run */
1137 if(p != nil && p->priority > up->priority)
1142 * flow control, before allowing the process to continue and
1143 * queue more. We do this here so that postnote can only
1144 * interrupt us after the data has been queued. This means that
1145 * things like 9p flushes and ssl messages will not be disrupted
1146 * by software interrupts.
1154 * write to a queue. only Maxatomic bytes at a time is atomic.
1157 qwrite(Queue *q, void *vp, int len)
1164 print("qwrite hi %#p\n", getcallerpc(&q));
1166 /* stop queue bloat before allocating blocks */
1167 if(q->len/2 >= q->limit && q->noblock == 0 && q->bypass == nil){
1169 if(up->procctl == Proc_exitme || up->procctl == Proc_exitbig)
1187 memmove(b->wp, p+sofar, n);
1191 sofar += qbwrite(q, b);
1192 } while(sofar < len && (q->state & Qmsg) == 0);
1198 * used by print() to write to a queue. Since we may be splhi or not in
1199 * a process, don't qlock.
1202 qiwrite(Queue *q, void *vp, int len)
1204 int n, sofar, dowakeup;
1219 memmove(b->wp, p+sofar, n);
1224 /* we use an artificially high limit for kernel prints since anything
1225 * over the limit gets dropped
1227 if((q->state & Qclosed) != 0 || q->len/2 >= q->limit){
1235 if(q->state & Qstarve){
1236 q->state &= ~Qstarve;
1249 } while(sofar < len && (q->state & Qmsg) == 0);
1255 * be extremely careful when calling this,
1256 * as there is no reference accounting
1266 * Mark a queue as closed. No further IO is permitted.
1267 * All blocks are released.
1279 q->state |= Qclosed;
1280 q->state &= ~(Qflow|Qstarve);
1281 kstrcpy(q->err, Ehungup, ERRMAX);
1289 /* free queued blocks */
1292 /* wake up readers/writers */
1298 * Mark a queue as closed. Wakeup any readers. Don't remove queued
1302 qhangup(Queue *q, char *msg)
1306 q->state |= Qclosed;
1307 if(msg == nil || *msg == '\0')
1309 kstrcpy(q->err, msg, ERRMAX);
1312 /* wake up readers/writers */
1318 * return non-zero if the q is hungup
1323 return q->state & Qclosed;
1327 * mark a queue as no longer hung up
1333 q->state &= ~Qclosed;
1334 q->state |= Qstarve;
1336 q->limit = q->inilim;
1341 * return bytes queued
1350 * return space remaining before flow control
1357 l = q->limit - q->len;
1364 * return true if we can read without blocking
1369 return q->bfirst != nil;
1373 * change queue limit
1376 qsetlimit(Queue *q, int limit)
1382 * set blocking/nonblocking
1385 qnoblock(Queue *q, int onoff)
1391 * flush the output queue
1406 /* free queued blocks */
1409 /* wake up writers */
1416 return q->state & Qflow;