2 #include "../port/lib.h"
6 #include "../port/error.h"
8 static ulong padblockcnt;
9 static ulong concatblockcnt;
10 static ulong pullupblockcnt;
11 static ulong copyblockcnt;
12 static ulong consumecnt;
13 static ulong producecnt;
20 typedef struct Queue Queue;
26 Block* bfirst; /* buffer */
29 int len; /* bytes allocated to queue */
30 int dlen; /* data bytes in queue */
31 int limit; /* max bytes in queue */
32 int inilim; /* initial limit */
34 int noblock; /* true if writes return immediately when q full */
35 int eof; /* number of eofs read by user */
37 void (*kick)(void*); /* restart output */
38 void (*bypass)(void*, Block*); /* bypass queue altogether */
39 void* arg; /* argument to kick */
41 QLock rlock; /* mutex for reading processes */
42 Rendez rr; /* process waiting to read */
43 QLock wlock; /* mutex for writing processes */
44 Rendez wr; /* process waiting to write */
54 uint qiomaxatomic = Maxatomic;
57 * free a list of blocks
64 for(; b != nil; b = next){
72 * pad a block to the front (or the back if size is negative)
75 padblock(Block *bp, int size)
80 QDEBUG checkb(bp, "padblock 0");
82 if(bp->rp - bp->base >= size){
90 memmove(nbp->wp, bp->rp, n);
95 if(bp->lim - bp->wp >= size)
99 memmove(nbp->wp, bp->rp, n);
102 nbp->next = bp->next;
105 QDEBUG checkb(nbp, "padblock 1");
110 * return count of bytes in a string of blocks
126 * return count of space in blocks
129 blockalloclen(Block *bp)
142 * copy the string of blocks into
143 * a single block and free the string
146 concatblock(Block *bp)
153 concatblockcnt += len;
154 return pullupblock(bp, len);
158 * make sure the first block has at least n bytes
161 pullupblock(Block *bp, int n)
167 * this should almost always be true, it's
168 * just to avoid every caller checking.
174 * if not enough room in the first block,
175 * add another to the front of the list.
177 if(bp->lim - bp->rp < n){
184 * copy bytes from the trailing blocks into the first
187 while((nbp = bp->next) != nil){
191 memmove(bp->wp, nbp->rp, n);
194 QDEBUG checkb(bp, "pullupblock 1");
197 /* shouldn't happen but why crash if it does */
199 print("pullup negative length packet, called from %#p\n",
203 memmove(bp->wp, nbp->rp, i);
205 bp->next = nbp->next;
210 QDEBUG checkb(bp, "pullupblock 2");
220 * make sure the first block has at least n bytes
223 pullupqueue(Queue *q, int n)
227 if(BLEN(q->bfirst) >= n)
229 q->bfirst = pullupblock(q->bfirst, n);
230 for(b = q->bfirst; b != nil && b->next != nil; b = b->next)
237 * trim to len bytes starting at offset
240 trimblock(Block *bp, int offset, int len)
245 QDEBUG checkb(bp, "trimblock 1");
246 if(blocklen(bp) < offset+len) {
251 while((l = BLEN(bp)) < offset) {
262 while((l = BLEN(bp)) < len) {
267 bp->wp -= (BLEN(bp) - len);
269 if(bp->next != nil) {
278 * copy 'count' bytes into a new block
281 copyblock(Block *bp, int count)
286 QDEBUG checkb(bp, "copyblock 0");
288 for(; count > 0 && bp != nil; bp = bp->next){
292 memmove(nbp->wp, bp->rp, l);
297 memset(nbp->wp, 0, count);
301 QDEBUG checkb(nbp, "copyblock 1");
307 adjustblock(Block* bp, int len)
317 if(bp->rp+len > bp->lim){
318 nbp = copyblock(bp, len);
320 QDEBUG checkb(nbp, "adjustblock 1");
327 memset(bp->wp, 0, len-n);
329 QDEBUG checkb(bp, "adjustblock 2");
336 * throw away up to count bytes from a
337 * list of blocks. Return count of bytes
341 pullblock(Block **bph, int count)
350 while(*bph != nil && count != 0) {
358 QDEBUG checkb(bp, "pullblock ");
369 * get next block from a queue, return null if nothing there
377 /* sync with qwrite */
386 QDEBUG checkb(b, "qget");
392 /* if writer flow controlled, restart */
393 if((q->state & Qflow) && q->len < q->limit/2){
408 * throw away the next 'len' bytes in the queue
411 qdiscard(Queue *q, int len)
413 Block *b, *tofree = nil;
414 int dowakeup, n, sofar;
417 for(sofar = 0; sofar < len; sofar += n){
421 QDEBUG checkb(b, "qdiscard");
423 if(n <= len - sofar){
428 /* remember to free this */
439 * if writer flow controlled, restart
442 * q->len < q->limit/2
443 * but it slows down tcp too much for certain write sizes.
444 * I really don't understand it completely. It may be
445 * due to the queue draining so fast that the transmission
446 * stalls waiting for the app to produce more data. - presotto
448 if((q->state & Qflow) && q->len < q->limit){
466 * Interrupt level copy out of a queue, return # bytes copied.
469 qconsume(Queue *q, void *vp, int len)
471 Block *b, *tofree = nil;
475 /* sync with qwrite */
485 QDEBUG checkb(b, "qconsume 1");
493 /* remember to free this */
501 memmove(p, b->rp, len);
505 /* discard the block if we're done with it */
506 if((q->state & Qmsg) || len == n){
511 /* remember to free this */
517 /* if writer flow controlled, restart */
518 if((q->state & Qflow) && q->len < q->limit/2){
536 qpass(Queue *q, Block *b)
540 /* sync with qread */
543 if(q->len >= q->limit){
548 if(q->state & Qclosed){
554 len = qaddlist(q, b);
556 if(q->len >= q->limit/2)
559 if(q->state & Qstarve){
560 q->state &= ~Qstarve;
572 qpassnolim(Queue *q, Block *b)
576 /* sync with qread */
580 if(q->state & Qclosed){
586 len = qaddlist(q, b);
588 if(q->len >= q->limit/2)
591 if(q->state & Qstarve){
592 q->state &= ~Qstarve;
604 * if the allocated space is way out of line with the used
605 * space, reallocate to a smaller block
613 for(l = &bp; (nbp = *l) != nil; l = &(*l)->next){
615 if((n<<2) < BALLOC(nbp)){
617 memmove((*l)->wp, nbp->rp, n);
619 (*l)->next = nbp->next;
628 qproduce(Queue *q, void *vp, int len)
638 /* sync with qread */
642 /* no waiting receivers, room in buffer? */
643 if(q->len >= q->limit){
651 memmove(b->wp, p, len);
655 if(q->state & Qstarve){
656 q->state &= ~Qstarve;
660 if(q->len >= q->limit)
671 * copy from offset in the queue
674 qcopy(Queue *q, int len, ulong offset)
680 b->wp += readblist(q->bfirst, b->wp, len, offset);
686 * called by non-interrupt code
689 qopen(int limit, int msg, void (*kick)(void*), void *arg)
693 q = malloc(sizeof(Queue));
697 q->limit = q->inilim = limit;
709 /* open a queue to be bypassed */
711 qbypass(void (*bypass)(void*, Block*), void *arg)
715 q = malloc(sizeof(Queue));
732 return (q->state & Qclosed) || q->bfirst != nil;
736 * wait for the queue to be non-empty or closed.
737 * called with q ilocked.
747 if(q->state & Qclosed){
750 if(*q->err && strcmp(q->err, Ehungup) != 0)
755 q->state |= Qstarve; /* flag requesting producer to wake me */
757 sleep(&q->rr, notempty, q);
764 * add a block list to a queue, return bytes added
767 qaddlist(Queue *q, Block *b)
771 QDEBUG checkb(b, "qaddlist 1");
773 /* queue the block */
781 while(b->next != nil){
783 QDEBUG checkb(b, "qaddlist 2");
795 * called with q ilocked
805 QDEBUG checkb(b, "qremove");
814 * copy the contents of a string of blocks into
815 * memory from an offset. blocklist kept unchanged.
816 * return number of copied bytes.
819 readblist(Block *b, uchar *p, long n, ulong o)
824 while(n > 0 && b != nil){
832 memmove(p, b->rp + o, m);
844 * put a block back to the front of the queue
845 * called with q ilocked
848 qputback(Queue *q, Block *b)
859 * cut off n bytes from the end of *h. return a new
860 * block with the tail and change *h to refer to the
864 splitblock(Block **h, int n)
873 memmove(b->wp, a->rp, m);
881 memmove(b->wp, a->wp, n);
888 * flow control, get producer going again
889 * called with q ilocked
892 qwakeup_iunlock(Queue *q)
896 /* if writer flow controlled, restart */
897 if((q->state & Qflow) && q->len < q->limit/2){
904 /* wakeup flow controlled writers */
913 * get next block from a queue (up to a limit)
916 qbread(Queue *q, int len)
936 /* multiple reads on a closed queue */
941 /* if we get here, there's at least one block in the queue */
945 /* split block if it's too big and this is not a message queue */
948 if((q->state & Qmsg) == 0)
949 qputback(q, splitblock(&b, n));
954 /* restart producer */
964 * read a queue. if no data is queued, post a Block
965 * and wait on its Rendez.
968 qread(Queue *q, void *vp, int len)
970 Block *b, *first, **last;
989 /* multiple reads on a closed queue */
994 /* if we get here, there's at least one block in the queue */
996 if(q->state & Qcoalesce){
997 /* when coalescing, 0 length blocks just go away */
1005 /* grab the first block plus as many
1006 * following blocks as will partially
1013 if(n >= len || q->bfirst == nil)
1024 /* split last block if it's too big and this is not a message queue */
1025 if(n > len && (q->state & Qmsg) == 0)
1026 qputback(q, splitblock(last, n - len));
1028 /* restart producer */
1038 n = readblist(first, vp, len, 0);
1050 return q->len < q->limit || (q->state & Qclosed);
1054 * flow control, wait for queue to get below the limit
1060 if(q->noblock || qnotfull(q))
1072 sleep(&q->wr, qnotfull, q);
1079 * add a block to a queue obeying flow control
1082 qbwrite(Queue *q, Block *b)
1087 if(q->bypass != nil){
1089 (*q->bypass)(q->arg, b);
1100 /* give up if the queue is closed */
1101 if(q->state & Qclosed){
1106 /* don't queue over the limit */
1107 if(q->len >= q->limit && q->noblock){
1115 len = qaddlist(q, b);
1117 /* make sure other end gets awakened */
1118 if(q->state & Qstarve){
1119 q->state &= ~Qstarve;
1125 /* get output going again */
1126 if(q->kick != nil && (dowakeup || (q->state&Qkick)))
1129 /* wakeup anyone consuming at the other end */
1133 /* if we just wokeup a higher priority process, let it run */
1134 if(p != nil && p->priority > up->priority)
1139 * flow control, before allowing the process to continue and
1140 * queue more. We do this here so that postnote can only
1141 * interrupt us after the data has been queued. This means that
1142 * things like 9p flushes and ssl messages will not be disrupted
1143 * by software interrupts.
1151 * write to a queue. only Maxatomic bytes at a time is atomic.
1154 qwrite(Queue *q, void *vp, int len)
1161 print("qwrite hi %#p\n", getcallerpc(&q));
1163 /* stop queue bloat before allocating blocks */
1164 if(q->len/2 >= q->limit && q->noblock == 0 && q->bypass == nil){
1166 if(up->procctl == Proc_exitme || up->procctl == Proc_exitbig)
1184 memmove(b->wp, p+sofar, n);
1188 sofar += qbwrite(q, b);
1189 } while(sofar < len && (q->state & Qmsg) == 0);
1195 * used by print() to write to a queue. Since we may be splhi or not in
1196 * a process, don't qlock.
1199 qiwrite(Queue *q, void *vp, int len)
1201 int n, sofar, dowakeup;
1216 memmove(b->wp, p+sofar, n);
1221 /* we use an artificially high limit for kernel prints since anything
1222 * over the limit gets dropped
1224 if((q->state & Qclosed) != 0 || q->len/2 >= q->limit){
1232 if(q->state & Qstarve){
1233 q->state &= ~Qstarve;
1246 } while(sofar < len && (q->state & Qmsg) == 0);
1252 * be extremely careful when calling this,
1253 * as there is no reference accounting
1263 * Mark a queue as closed. No further IO is permitted.
1264 * All blocks are released.
1276 q->state |= Qclosed;
1277 q->state &= ~(Qflow|Qstarve);
1278 kstrcpy(q->err, Ehungup, ERRMAX);
1286 /* free queued blocks */
1289 /* wake up readers/writers */
1295 * Mark a queue as closed. Wakeup any readers. Don't remove queued
1299 qhangup(Queue *q, char *msg)
1303 q->state |= Qclosed;
1304 if(msg == nil || *msg == '\0')
1306 kstrcpy(q->err, msg, ERRMAX);
1309 /* wake up readers/writers */
1315 * return non-zero if the q is hungup
1320 return q->state & Qclosed;
1324 * mark a queue as no longer hung up
1330 q->state &= ~Qclosed;
1331 q->state |= Qstarve;
1333 q->limit = q->inilim;
1338 * return bytes queued
1347 * return space remaining before flow control
1354 l = q->limit - q->len;
1361 * return true if we can read without blocking
1366 return q->bfirst != nil;
1370 * change queue limit
1373 qsetlimit(Queue *q, int limit)
1379 * set blocking/nonblocking
1382 qnoblock(Queue *q, int onoff)
1388 * flush the output queue
1403 /* free queued blocks */
1406 /* wake up writers */
1413 return q->state & Qflow;