]> git.lizzy.rs Git - plan9front.git/blob - sys/src/9/port/qio.c
qio: raise critical queue bloat threshold from 2 to 10 times to queue limit
[plan9front.git] / sys / src / 9 / port / qio.c
1 #include        "u.h"
2 #include        "../port/lib.h"
3 #include        "mem.h"
4 #include        "dat.h"
5 #include        "fns.h"
6 #include        "../port/error.h"
7
8 static ulong padblockcnt;
9 static ulong concatblockcnt;
10 static ulong pullupblockcnt;
11 static ulong copyblockcnt;
12 static ulong consumecnt;
13 static ulong producecnt;
14 static ulong qcopycnt;
15
16 static int debugging;
17
18 #define QDEBUG  if(0)
19
20 /*
21  *  IO queues
22  */
23 typedef struct Queue    Queue;
24
25 struct Queue
26 {
27         Lock;
28
29         Block*  bfirst;         /* buffer */
30         Block*  blast;
31
32         int     len;            /* bytes allocated to queue */
33         int     dlen;           /* data bytes in queue */
34         int     limit;          /* max bytes in queue */
35         int     inilim;         /* initial limit */
36         int     state;
37         int     noblock;        /* true if writes return immediately when q full */
38         int     eof;            /* number of eofs read by user */
39
40         void    (*kick)(void*); /* restart output */
41         void    (*bypass)(void*, Block*);       /* bypass queue altogether */
42         void*   arg;            /* argument to kick */
43
44         QLock   rlock;          /* mutex for reading processes */
45         Rendez  rr;             /* process waiting to read */
46         QLock   wlock;          /* mutex for writing processes */
47         Rendez  wr;             /* process waiting to write */
48
49         char    err[ERRMAX];
50 };
51
52 enum
53 {
54         Maxatomic       = 64*1024,
55 };
56
57 uint    qiomaxatomic = Maxatomic;
58
59 void
60 ixsummary(void)
61 {
62         debugging ^= 1;
63         iallocsummary();
64         print("pad %lud, concat %lud, pullup %lud, copy %lud\n",
65                 padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
66         print("consume %lud, produce %lud, qcopy %lud\n",
67                 consumecnt, producecnt, qcopycnt);
68 }
69
70 /*
71  *  free a list of blocks
72  */
73 void
74 freeblist(Block *b)
75 {
76         Block *next;
77
78         for(; b != 0; b = next){
79                 next = b->next;
80                 if(b->ref == 1)
81                         b->next = nil;
82                 freeb(b);
83         }
84 }
85
86 /*
87  *  pad a block to the front (or the back if size is negative)
88  */
89 Block*
90 padblock(Block *bp, int size)
91 {
92         int n;
93         Block *nbp;
94
95         QDEBUG checkb(bp, "padblock 1");
96         if(size >= 0){
97                 if(bp->rp - bp->base >= size){
98                         bp->rp -= size;
99                         return bp;
100                 }
101
102                 if(bp->next)
103                         panic("padblock %#p", getcallerpc(&bp));
104                 n = BLEN(bp);
105                 padblockcnt++;
106                 nbp = allocb(size+n);
107                 nbp->rp += size;
108                 nbp->wp = nbp->rp;
109                 memmove(nbp->wp, bp->rp, n);
110                 nbp->wp += n;
111                 freeb(bp);
112                 nbp->rp -= size;
113         } else {
114                 size = -size;
115
116                 if(bp->next)
117                         panic("padblock %#p", getcallerpc(&bp));
118
119                 if(bp->lim - bp->wp >= size)
120                         return bp;
121
122                 n = BLEN(bp);
123                 padblockcnt++;
124                 nbp = allocb(size+n);
125                 memmove(nbp->wp, bp->rp, n);
126                 nbp->wp += n;
127                 freeb(bp);
128         }
129         QDEBUG checkb(nbp, "padblock 1");
130         return nbp;
131 }
132
133 /*
134  *  return count of bytes in a string of blocks
135  */
136 int
137 blocklen(Block *bp)
138 {
139         int len;
140
141         len = 0;
142         while(bp) {
143                 len += BLEN(bp);
144                 bp = bp->next;
145         }
146         return len;
147 }
148
149 /*
150  * return count of space in blocks
151  */
152 int
153 blockalloclen(Block *bp)
154 {
155         int len;
156
157         len = 0;
158         while(bp) {
159                 len += BALLOC(bp);
160                 bp = bp->next;
161         }
162         return len;
163 }
164
165 /*
166  *  copy the  string of blocks into
167  *  a single block and free the string
168  */
169 Block*
170 concatblock(Block *bp)
171 {
172         int len;
173         Block *nb, *f;
174
175         if(bp->next == 0)
176                 return bp;
177
178         nb = allocb(blocklen(bp));
179         for(f = bp; f; f = f->next) {
180                 len = BLEN(f);
181                 memmove(nb->wp, f->rp, len);
182                 nb->wp += len;
183         }
184         concatblockcnt += BLEN(nb);
185         freeblist(bp);
186         QDEBUG checkb(nb, "concatblock 1");
187         return nb;
188 }
189
190 /*
191  *  make sure the first block has at least n bytes
192  */
193 Block*
194 pullupblock(Block *bp, int n)
195 {
196         int i;
197         Block *nbp;
198
199         /*
200          *  this should almost always be true, it's
201          *  just to avoid every caller checking.
202          */
203         if(BLEN(bp) >= n)
204                 return bp;
205
206         /*
207          *  if not enough room in the first block,
208          *  add another to the front of the list.
209          */
210         if(bp->lim - bp->rp < n){
211                 nbp = allocb(n);
212                 nbp->next = bp;
213                 bp = nbp;
214         }
215
216         /*
217          *  copy bytes from the trailing blocks into the first
218          */
219         n -= BLEN(bp);
220         while(nbp = bp->next){
221                 i = BLEN(nbp);
222                 if(i > n) {
223                         memmove(bp->wp, nbp->rp, n);
224                         pullupblockcnt++;
225                         bp->wp += n;
226                         nbp->rp += n;
227                         QDEBUG checkb(bp, "pullupblock 1");
228                         return bp;
229                 } else {
230                         /* shouldn't happen but why crash if it does */
231                         if(i < 0){
232                                 print("pullup negative length packet, called from %#p\n",
233                                         getcallerpc(&bp));
234                                 i = 0;
235                         }
236                         memmove(bp->wp, nbp->rp, i);
237                         pullupblockcnt++;
238                         bp->wp += i;
239                         bp->next = nbp->next;
240                         nbp->next = 0;
241                         freeb(nbp);
242                         n -= i;
243                         if(n == 0){
244                                 QDEBUG checkb(bp, "pullupblock 2");
245                                 return bp;
246                         }
247                 }
248         }
249         freeb(bp);
250         return 0;
251 }
252
253 /*
254  *  make sure the first block has at least n bytes
255  */
256 Block*
257 pullupqueue(Queue *q, int n)
258 {
259         Block *b;
260
261         if(BLEN(q->bfirst) >= n)
262                 return q->bfirst;
263         q->bfirst = pullupblock(q->bfirst, n);
264         for(b = q->bfirst; b != nil && b->next != nil; b = b->next)
265                 ;
266         q->blast = b;
267         return q->bfirst;
268 }
269
270 /*
271  *  trim to len bytes starting at offset
272  */
273 Block *
274 trimblock(Block *bp, int offset, int len)
275 {
276         ulong l;
277         Block *nb, *startb;
278
279         QDEBUG checkb(bp, "trimblock 1");
280         if(blocklen(bp) < offset+len) {
281                 freeblist(bp);
282                 return nil;
283         }
284
285         while((l = BLEN(bp)) < offset) {
286                 offset -= l;
287                 nb = bp->next;
288                 bp->next = nil;
289                 freeb(bp);
290                 bp = nb;
291         }
292
293         startb = bp;
294         bp->rp += offset;
295
296         while((l = BLEN(bp)) < len) {
297                 len -= l;
298                 bp = bp->next;
299         }
300
301         bp->wp -= (BLEN(bp) - len);
302
303         if(bp->next) {
304                 freeblist(bp->next);
305                 bp->next = nil;
306         }
307
308         return startb;
309 }
310
311 /*
312  *  copy 'count' bytes into a new block
313  */
314 Block*
315 copyblock(Block *bp, int count)
316 {
317         int l;
318         Block *nbp;
319
320         QDEBUG checkb(bp, "copyblock 0");
321         nbp = allocb(count);
322         for(; count > 0 && bp != 0; bp = bp->next){
323                 l = BLEN(bp);
324                 if(l > count)
325                         l = count;
326                 memmove(nbp->wp, bp->rp, l);
327                 nbp->wp += l;
328                 count -= l;
329         }
330         if(count > 0){
331                 memset(nbp->wp, 0, count);
332                 nbp->wp += count;
333         }
334         copyblockcnt++;
335         QDEBUG checkb(nbp, "copyblock 1");
336
337         return nbp;
338 }
339
340 Block*
341 adjustblock(Block* bp, int len)
342 {
343         int n;
344         Block *nbp;
345
346         if(len < 0){
347                 freeb(bp);
348                 return nil;
349         }
350
351         if(bp->rp+len > bp->lim){
352                 nbp = copyblock(bp, len);
353                 freeblist(bp);
354                 QDEBUG checkb(nbp, "adjustblock 1");
355
356                 return nbp;
357         }
358
359         n = BLEN(bp);
360         if(len > n)
361                 memset(bp->wp, 0, len-n);
362         bp->wp = bp->rp+len;
363         QDEBUG checkb(bp, "adjustblock 2");
364
365         return bp;
366 }
367
368
369 /*
370  *  throw away up to count bytes from a
371  *  list of blocks.  Return count of bytes
372  *  thrown away.
373  */
374 int
375 pullblock(Block **bph, int count)
376 {
377         Block *bp;
378         int n, bytes;
379
380         bytes = 0;
381         if(bph == nil)
382                 return 0;
383
384         while(*bph != nil && count != 0) {
385                 bp = *bph;
386                 n = BLEN(bp);
387                 if(count < n)
388                         n = count;
389                 bytes += n;
390                 count -= n;
391                 bp->rp += n;
392                 QDEBUG checkb(bp, "pullblock ");
393                 if(BLEN(bp) == 0) {
394                         *bph = bp->next;
395                         bp->next = nil;
396                         freeb(bp);
397                 }
398         }
399         return bytes;
400 }
401
402 /*
403  *  get next block from a queue, return null if nothing there
404  */
405 Block*
406 qget(Queue *q)
407 {
408         int dowakeup;
409         Block *b;
410
411         /* sync with qwrite */
412         ilock(q);
413
414         b = q->bfirst;
415         if(b == nil){
416                 q->state |= Qstarve;
417                 iunlock(q);
418                 return nil;
419         }
420         q->bfirst = b->next;
421         b->next = 0;
422         q->len -= BALLOC(b);
423         q->dlen -= BLEN(b);
424         QDEBUG checkb(b, "qget");
425
426         /* if writer flow controlled, restart */
427         if((q->state & Qflow) && q->len < q->limit/2){
428                 q->state &= ~Qflow;
429                 dowakeup = 1;
430         } else
431                 dowakeup = 0;
432
433         iunlock(q);
434
435         if(dowakeup)
436                 wakeup(&q->wr);
437
438         return b;
439 }
440
441 /*
442  *  throw away the next 'len' bytes in the queue
443  */
444 int
445 qdiscard(Queue *q, int len)
446 {
447         Block *b;
448         int dowakeup, n, sofar;
449
450         ilock(q);
451         for(sofar = 0; sofar < len; sofar += n){
452                 b = q->bfirst;
453                 if(b == nil)
454                         break;
455                 QDEBUG checkb(b, "qdiscard");
456                 n = BLEN(b);
457                 if(n <= len - sofar){
458                         q->bfirst = b->next;
459                         b->next = 0;
460                         q->len -= BALLOC(b);
461                         q->dlen -= BLEN(b);
462                         freeb(b);
463                 } else {
464                         n = len - sofar;
465                         b->rp += n;
466                         q->dlen -= n;
467                 }
468         }
469
470         /*
471          *  if writer flow controlled, restart
472          *
473          *  This used to be
474          *      q->len < q->limit/2
475          *  but it slows down tcp too much for certain write sizes.
476          *  I really don't understand it completely.  It may be
477          *  due to the queue draining so fast that the transmission
478          *  stalls waiting for the app to produce more data.  - presotto
479          */
480         if((q->state & Qflow) && q->len < q->limit){
481                 q->state &= ~Qflow;
482                 dowakeup = 1;
483         } else
484                 dowakeup = 0;
485
486         iunlock(q);
487
488         if(dowakeup)
489                 wakeup(&q->wr);
490
491         return sofar;
492 }
493
494 /*
495  *  Interrupt level copy out of a queue, return # bytes copied.
496  */
497 int
498 qconsume(Queue *q, void *vp, int len)
499 {
500         Block *b;
501         int n, dowakeup;
502         uchar *p = vp;
503         Block *tofree = nil;
504
505         /* sync with qwrite */
506         ilock(q);
507
508         for(;;) {
509                 b = q->bfirst;
510                 if(b == 0){
511                         q->state |= Qstarve;
512                         iunlock(q);
513                         return -1;
514                 }
515                 QDEBUG checkb(b, "qconsume 1");
516
517                 n = BLEN(b);
518                 if(n > 0)
519                         break;
520                 q->bfirst = b->next;
521                 q->len -= BALLOC(b);
522
523                 /* remember to free this */
524                 b->next = tofree;
525                 tofree = b;
526         };
527
528         if(n < len)
529                 len = n;
530         memmove(p, b->rp, len);
531         consumecnt += n;
532         b->rp += len;
533         q->dlen -= len;
534
535         /* discard the block if we're done with it */
536         if((q->state & Qmsg) || len == n){
537                 q->bfirst = b->next;
538                 b->next = 0;
539                 q->len -= BALLOC(b);
540                 q->dlen -= BLEN(b);
541
542                 /* remember to free this */
543                 b->next = tofree;
544                 tofree = b;
545         }
546
547         /* if writer flow controlled, restart */
548         if((q->state & Qflow) && q->len < q->limit/2){
549                 q->state &= ~Qflow;
550                 dowakeup = 1;
551         } else
552                 dowakeup = 0;
553
554         iunlock(q);
555
556         if(dowakeup)
557                 wakeup(&q->wr);
558
559         if(tofree != nil)
560                 freeblist(tofree);
561
562         return len;
563 }
564
565 int
566 qpass(Queue *q, Block *b)
567 {
568         int dlen, len, dowakeup;
569
570         /* sync with qread */
571         dowakeup = 0;
572         ilock(q);
573         if(q->len >= q->limit){
574                 freeblist(b);
575                 iunlock(q);
576                 return -1;
577         }
578         if(q->state & Qclosed){
579                 len = BALLOC(b);
580                 freeblist(b);
581                 iunlock(q);
582                 return len;
583         }
584
585         /* add buffer to queue */
586         if(q->bfirst)
587                 q->blast->next = b;
588         else
589                 q->bfirst = b;
590         len = BALLOC(b);
591         dlen = BLEN(b);
592         QDEBUG checkb(b, "qpass");
593         while(b->next){
594                 b = b->next;
595                 QDEBUG checkb(b, "qpass");
596                 len += BALLOC(b);
597                 dlen += BLEN(b);
598         }
599         q->blast = b;
600         q->len += len;
601         q->dlen += dlen;
602
603         if(q->len >= q->limit/2)
604                 q->state |= Qflow;
605
606         if(q->state & Qstarve){
607                 q->state &= ~Qstarve;
608                 dowakeup = 1;
609         }
610         iunlock(q);
611
612         if(dowakeup)
613                 wakeup(&q->rr);
614
615         return len;
616 }
617
618 int
619 qpassnolim(Queue *q, Block *b)
620 {
621         int dlen, len, dowakeup;
622
623         /* sync with qread */
624         dowakeup = 0;
625         ilock(q);
626
627         if(q->state & Qclosed){
628                 freeblist(b);
629                 iunlock(q);
630                 return BALLOC(b);
631         }
632
633         /* add buffer to queue */
634         if(q->bfirst)
635                 q->blast->next = b;
636         else
637                 q->bfirst = b;
638         len = BALLOC(b);
639         dlen = BLEN(b);
640         QDEBUG checkb(b, "qpass");
641         while(b->next){
642                 b = b->next;
643                 QDEBUG checkb(b, "qpass");
644                 len += BALLOC(b);
645                 dlen += BLEN(b);
646         }
647         q->blast = b;
648         q->len += len;
649         q->dlen += dlen;
650
651         if(q->len >= q->limit/2)
652                 q->state |= Qflow;
653
654         if(q->state & Qstarve){
655                 q->state &= ~Qstarve;
656                 dowakeup = 1;
657         }
658         iunlock(q);
659
660         if(dowakeup)
661                 wakeup(&q->rr);
662
663         return len;
664 }
665
666 /*
667  *  if the allocated space is way out of line with the used
668  *  space, reallocate to a smaller block
669  */
670 Block*
671 packblock(Block *bp)
672 {
673         Block **l, *nbp;
674         int n;
675
676         for(l = &bp; *l; l = &(*l)->next){
677                 nbp = *l;
678                 n = BLEN(nbp);
679                 if((n<<2) < BALLOC(nbp)){
680                         *l = allocb(n);
681                         memmove((*l)->wp, nbp->rp, n);
682                         (*l)->wp += n;
683                         (*l)->next = nbp->next;
684                         freeb(nbp);
685                 }
686         }
687
688         return bp;
689 }
690
691 int
692 qproduce(Queue *q, void *vp, int len)
693 {
694         Block *b;
695         int dowakeup;
696         uchar *p = vp;
697
698         /* sync with qread */
699         dowakeup = 0;
700         ilock(q);
701
702         /* no waiting receivers, room in buffer? */
703         if(q->len >= q->limit){
704                 q->state |= Qflow;
705                 iunlock(q);
706                 return -1;
707         }
708
709         /* save in buffer */
710         b = iallocb(len);
711         if(b == 0){
712                 iunlock(q);
713                 return 0;
714         }
715         memmove(b->wp, p, len);
716         producecnt += len;
717         b->wp += len;
718         if(q->bfirst)
719                 q->blast->next = b;
720         else
721                 q->bfirst = b;
722         q->blast = b;
723         /* b->next = 0; done by iallocb() */
724         q->len += BALLOC(b);
725         q->dlen += BLEN(b);
726         QDEBUG checkb(b, "qproduce");
727
728         if(q->state & Qstarve){
729                 q->state &= ~Qstarve;
730                 dowakeup = 1;
731         }
732
733         if(q->len >= q->limit)
734                 q->state |= Qflow;
735         iunlock(q);
736
737         if(dowakeup)
738                 wakeup(&q->rr);
739
740         return len;
741 }
742
743 /*
744  *  copy from offset in the queue
745  */
746 Block*
747 qcopy(Queue *q, int len, ulong offset)
748 {
749         int sofar;
750         int n;
751         Block *b, *nb;
752         uchar *p;
753
754         nb = allocb(len);
755
756         ilock(q);
757
758         /* go to offset */
759         b = q->bfirst;
760         for(sofar = 0; ; sofar += n){
761                 if(b == nil){
762                         iunlock(q);
763                         return nb;
764                 }
765                 n = BLEN(b);
766                 if(sofar + n > offset){
767                         p = b->rp + offset - sofar;
768                         n -= offset - sofar;
769                         break;
770                 }
771                 QDEBUG checkb(b, "qcopy");
772                 b = b->next;
773         }
774
775         /* copy bytes from there */
776         for(sofar = 0; sofar < len;){
777                 if(n > len - sofar)
778                         n = len - sofar;
779                 memmove(nb->wp, p, n);
780                 qcopycnt += n;
781                 sofar += n;
782                 nb->wp += n;
783                 b = b->next;
784                 if(b == nil)
785                         break;
786                 n = BLEN(b);
787                 p = b->rp;
788         }
789         iunlock(q);
790
791         return nb;
792 }
793
794 /*
795  *  called by non-interrupt code
796  */
797 Queue*
798 qopen(int limit, int msg, void (*kick)(void*), void *arg)
799 {
800         Queue *q;
801
802         q = malloc(sizeof(Queue));
803         if(q == 0)
804                 return 0;
805
806         q->limit = q->inilim = limit;
807         q->kick = kick;
808         q->arg = arg;
809         q->state = msg;
810         
811         q->state |= Qstarve;
812         q->eof = 0;
813         q->noblock = 0;
814
815         return q;
816 }
817
818 /* open a queue to be bypassed */
819 Queue*
820 qbypass(void (*bypass)(void*, Block*), void *arg)
821 {
822         Queue *q;
823
824         q = malloc(sizeof(Queue));
825         if(q == 0)
826                 return 0;
827
828         q->limit = 0;
829         q->arg = arg;
830         q->bypass = bypass;
831         q->state = 0;
832
833         return q;
834 }
835
836 static int
837 notempty(void *a)
838 {
839         Queue *q = a;
840
841         return (q->state & Qclosed) || q->bfirst != 0;
842 }
843
844 /*
845  *  wait for the queue to be non-empty or closed.
846  *  called with q ilocked.
847  */
848 static int
849 qwait(Queue *q)
850 {
851         /* wait for data */
852         for(;;){
853                 if(q->bfirst != nil)
854                         break;
855
856                 if(q->state & Qclosed){
857                         if(++q->eof > 3)
858                                 return -1;
859                         if(*q->err && strcmp(q->err, Ehungup) != 0)
860                                 return -1;
861                         return 0;
862                 }
863
864                 q->state |= Qstarve;    /* flag requesting producer to wake me */
865                 iunlock(q);
866                 sleep(&q->rr, notempty, q);
867                 ilock(q);
868         }
869         return 1;
870 }
871
872 /*
873  * add a block list to a queue
874  */
875 void
876 qaddlist(Queue *q, Block *b)
877 {
878         /* queue the block */
879         if(q->bfirst)
880                 q->blast->next = b;
881         else
882                 q->bfirst = b;
883         q->len += blockalloclen(b);
884         q->dlen += blocklen(b);
885         while(b->next)
886                 b = b->next;
887         q->blast = b;
888 }
889
890 /*
891  *  called with q ilocked
892  */
893 Block*
894 qremove(Queue *q)
895 {
896         Block *b;
897
898         b = q->bfirst;
899         if(b == nil)
900                 return nil;
901         q->bfirst = b->next;
902         b->next = nil;
903         q->dlen -= BLEN(b);
904         q->len -= BALLOC(b);
905         QDEBUG checkb(b, "qremove");
906         return b;
907 }
908
909 /*
910  *  copy the contents of a string of blocks into
911  *  memory.  emptied blocks are freed.  return
912  *  pointer to first unconsumed block.
913  */
914 Block*
915 bl2mem(uchar *p, Block *b, int n)
916 {
917         int i;
918         Block *next;
919
920         for(; b != nil; b = next){
921                 i = BLEN(b);
922                 if(i > n){
923                         memmove(p, b->rp, n);
924                         b->rp += n;
925                         return b;
926                 }
927                 memmove(p, b->rp, i);
928                 n -= i;
929                 p += i;
930                 b->rp += i;
931                 next = b->next;
932                 freeb(b);
933         }
934         return nil;
935 }
936
937 /*
938  *  copy the contents of memory into a string of blocks.
939  *  return nil on error.
940  */
941 Block*
942 mem2bl(uchar *p, int len)
943 {
944         int n;
945         Block *b, *first, **l;
946
947         first = nil;
948         l = &first;
949         if(waserror()){
950                 freeblist(first);
951                 nexterror();
952         }
953         do {
954                 n = len;
955                 if(n > Maxatomic)
956                         n = Maxatomic;
957
958                 *l = b = allocb(n);
959                 setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]);
960                 memmove(b->wp, p, n);
961                 b->wp += n;
962                 p += n;
963                 len -= n;
964                 l = &b->next;
965         } while(len > 0);
966         poperror();
967
968         return first;
969 }
970
971 /*
972  *  put a block back to the front of the queue
973  *  called with q ilocked
974  */
975 void
976 qputback(Queue *q, Block *b)
977 {
978         b->next = q->bfirst;
979         if(q->bfirst == nil)
980                 q->blast = b;
981         q->bfirst = b;
982         q->len += BALLOC(b);
983         q->dlen += BLEN(b);
984 }
985
986 /*
987  *  flow control, get producer going again
988  *  called with q ilocked
989  */
990 static void
991 qwakeup_iunlock(Queue *q)
992 {
993         int dowakeup = 0;
994
995         /* if writer flow controlled, restart */
996         if((q->state & Qflow) && q->len < q->limit/2){
997                 q->state &= ~Qflow;
998                 dowakeup = 1;
999         }
1000
1001         iunlock(q);
1002
1003         /* wakeup flow controlled writers */
1004         if(dowakeup){
1005                 if(q->kick)
1006                         q->kick(q->arg);
1007                 wakeup(&q->wr);
1008         }
1009 }
1010
1011 /*
1012  *  get next block from a queue (up to a limit)
1013  */
1014 Block*
1015 qbread(Queue *q, int len)
1016 {
1017         Block *b, *nb;
1018         int n;
1019
1020         eqlock(&q->rlock);
1021         if(waserror()){
1022                 qunlock(&q->rlock);
1023                 nexterror();
1024         }
1025
1026         ilock(q);
1027         switch(qwait(q)){
1028         case 0:
1029                 /* queue closed */
1030                 iunlock(q);
1031                 qunlock(&q->rlock);
1032                 poperror();
1033                 return nil;
1034         case -1:
1035                 /* multiple reads on a closed queue */
1036                 iunlock(q);
1037                 error(q->err);
1038         }
1039
1040         /* if we get here, there's at least one block in the queue */
1041         b = qremove(q);
1042         n = BLEN(b);
1043
1044         /* split block if it's too big and this is not a message queue */
1045         nb = b;
1046         if(n > len){
1047                 if((q->state&Qmsg) == 0){
1048                         n -= len;
1049                         b = allocb(n);
1050                         memmove(b->wp, nb->rp+len, n);
1051                         b->wp += n;
1052                         qputback(q, b);
1053                 }
1054                 nb->wp = nb->rp + len;
1055         }
1056
1057         /* restart producer */
1058         qwakeup_iunlock(q);
1059
1060         poperror();
1061         qunlock(&q->rlock);
1062         return nb;
1063 }
1064
1065 /*
1066  *  read a queue.  if no data is queued, post a Block
1067  *  and wait on its Rendez.
1068  */
1069 long
1070 qread(Queue *q, void *vp, int len)
1071 {
1072         Block *b, *first, **l;
1073         int m, n;
1074
1075         eqlock(&q->rlock);
1076         if(waserror()){
1077                 qunlock(&q->rlock);
1078                 nexterror();
1079         }
1080
1081         ilock(q);
1082 again:
1083         switch(qwait(q)){
1084         case 0:
1085                 /* queue closed */
1086                 iunlock(q);
1087                 qunlock(&q->rlock);
1088                 poperror();
1089                 return 0;
1090         case -1:
1091                 /* multiple reads on a closed queue */
1092                 iunlock(q);
1093                 error(q->err);
1094         }
1095
1096         /* if we get here, there's at least one block in the queue */
1097         if(q->state & Qcoalesce){
1098                 /* when coalescing, 0 length blocks just go away */
1099                 b = q->bfirst;
1100                 if(BLEN(b) <= 0){
1101                         freeb(qremove(q));
1102                         goto again;
1103                 }
1104
1105                 /*  grab the first block plus as many
1106                  *  following blocks as will completely
1107                  *  fit in the read.
1108                  */
1109                 n = 0;
1110                 l = &first;
1111                 m = BLEN(b);
1112                 for(;;) {
1113                         *l = qremove(q);
1114                         l = &b->next;
1115                         n += m;
1116
1117                         b = q->bfirst;
1118                         if(b == nil)
1119                                 break;
1120                         m = BLEN(b);
1121                         if(n+m > len)
1122                                 break;
1123                 }
1124         } else {
1125                 first = qremove(q);
1126                 n = BLEN(first);
1127         }
1128
1129         /* copy to user space outside of the ilock */
1130         iunlock(q);
1131         b = bl2mem(vp, first, len);
1132         ilock(q);
1133
1134         /* take care of any left over partial block */
1135         if(b != nil){
1136                 n -= BLEN(b);
1137                 if(q->state & Qmsg)
1138                         freeb(b);
1139                 else
1140                         qputback(q, b);
1141         }
1142
1143         /* restart producer */
1144         qwakeup_iunlock(q);
1145
1146         poperror();
1147         qunlock(&q->rlock);
1148         return n;
1149 }
1150
1151 static int
1152 qnotfull(void *a)
1153 {
1154         Queue *q = a;
1155
1156         return q->len < q->limit || (q->state & Qclosed);
1157 }
1158
1159 /*
1160  *  add a block to a queue obeying flow control
1161  */
1162 long
1163 qbwrite(Queue *q, Block *b)
1164 {
1165         int n, dowakeup;
1166         Proc *p;
1167
1168         n = BLEN(b);
1169
1170         if(q->bypass){
1171                 (*q->bypass)(q->arg, b);
1172                 return n;
1173         }
1174
1175         dowakeup = 0;
1176         if(waserror()){
1177                 freeb(b);
1178                 nexterror();
1179         }
1180         ilock(q);
1181
1182         /* give up if the queue is closed */
1183         if(q->state & Qclosed){
1184                 iunlock(q);
1185                 error(q->err);
1186         }
1187
1188         /* don't queue over the limit */
1189         if(q->len >= q->limit){
1190                 if(q->noblock){
1191                         iunlock(q);
1192                         freeb(b);
1193                         poperror();
1194                         return n;
1195                 }
1196                 if(q->len >= q->limit*10){
1197                         iunlock(q);
1198                         error(Egreg);
1199                 }
1200         }
1201
1202         /* queue the block */
1203         if(q->bfirst)
1204                 q->blast->next = b;
1205         else
1206                 q->bfirst = b;
1207         q->blast = b;
1208         b->next = 0;
1209         q->len += BALLOC(b);
1210         q->dlen += n;
1211         QDEBUG checkb(b, "qbwrite");
1212
1213         /* make sure other end gets awakened */
1214         if(q->state & Qstarve){
1215                 q->state &= ~Qstarve;
1216                 dowakeup = 1;
1217         }
1218         iunlock(q);
1219         poperror();
1220
1221         /*  get output going again */
1222         if(q->kick && (dowakeup || (q->state&Qkick)))
1223                 q->kick(q->arg);
1224
1225         /* wakeup anyone consuming at the other end */
1226         if(dowakeup){
1227                 p = wakeup(&q->rr);
1228
1229                 /* if we just wokeup a higher priority process, let it run */
1230                 if(p != nil && p->priority > up->priority)
1231                         sched();
1232         }
1233
1234         /*
1235          *  flow control, wait for queue to get below the limit
1236          *  before allowing the process to continue and queue
1237          *  more.  We do this here so that postnote can only
1238          *  interrupt us after the data has been queued.  This
1239          *  means that things like 9p flushes and ssl messages
1240          *  will not be disrupted by software interrupts.
1241          *
1242          *  Note - this is moderately dangerous since a process
1243          *  that keeps getting interrupted and rewriting will
1244          *  queue up to 10 times the queue limit before failing.
1245          */
1246         for(;;){
1247                 if(q->noblock || qnotfull(q))
1248                         break;
1249
1250                 ilock(q);
1251                 q->state |= Qflow;
1252                 iunlock(q);
1253
1254                 eqlock(&q->wlock);
1255                 if(waserror()){
1256                         qunlock(&q->wlock);
1257                         nexterror();
1258                 }
1259                 sleep(&q->wr, qnotfull, q);
1260                 qunlock(&q->wlock);
1261                 poperror();
1262         }
1263
1264         return n;
1265 }
1266
1267 /*
1268  *  write to a queue.  only Maxatomic bytes at a time is atomic.
1269  */
1270 int
1271 qwrite(Queue *q, void *vp, int len)
1272 {
1273         int n, sofar;
1274         Block *b;
1275         uchar *p = vp;
1276
1277         QDEBUG if(!islo())
1278                 print("qwrite hi %#p\n", getcallerpc(&q));
1279
1280         sofar = 0;
1281         do {
1282                 n = len-sofar;
1283                 if(n > Maxatomic)
1284                         n = Maxatomic;
1285
1286                 b = allocb(n);
1287                 setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]);
1288                 if(waserror()){
1289                         freeb(b);
1290                         nexterror();
1291                 }
1292                 memmove(b->wp, p+sofar, n);
1293                 poperror();
1294                 b->wp += n;
1295
1296                 qbwrite(q, b);
1297
1298                 sofar += n;
1299         } while(sofar < len && (q->state & Qmsg) == 0);
1300
1301         return len;
1302 }
1303
1304 /*
1305  *  used by print() to write to a queue.  Since we may be splhi or not in
1306  *  a process, don't qlock.
1307  *
1308  *  this routine merges adjacent blocks if block n+1 will fit into
1309  *  the free space of block n.
1310  */
1311 int
1312 qiwrite(Queue *q, void *vp, int len)
1313 {
1314         int n, sofar, dowakeup;
1315         Block *b;
1316         uchar *p = vp;
1317
1318         dowakeup = 0;
1319
1320         sofar = 0;
1321         do {
1322                 n = len-sofar;
1323                 if(n > Maxatomic)
1324                         n = Maxatomic;
1325
1326                 b = iallocb(n);
1327                 if(b == nil)
1328                         break;
1329                 memmove(b->wp, p+sofar, n);
1330                 b->wp += n;
1331
1332                 ilock(q);
1333
1334                 /* we use an artificially high limit for kernel prints since anything
1335                  * over the limit gets dropped
1336                  */
1337                 if(q->dlen >= 16*1024){
1338                         iunlock(q);
1339                         freeb(b);
1340                         break;
1341                 }
1342
1343                 QDEBUG checkb(b, "qiwrite");
1344                 if(q->bfirst)
1345                         q->blast->next = b;
1346                 else
1347                         q->bfirst = b;
1348                 q->blast = b;
1349                 q->len += BALLOC(b);
1350                 q->dlen += n;
1351
1352                 if(q->state & Qstarve){
1353                         q->state &= ~Qstarve;
1354                         dowakeup = 1;
1355                 }
1356
1357                 iunlock(q);
1358
1359                 if(dowakeup){
1360                         if(q->kick)
1361                                 q->kick(q->arg);
1362                         wakeup(&q->rr);
1363                 }
1364
1365                 sofar += n;
1366         } while(sofar < len && (q->state & Qmsg) == 0);
1367
1368         return sofar;
1369 }
1370
1371 /*
1372  *  be extremely careful when calling this,
1373  *  as there is no reference accounting
1374  */
1375 void
1376 qfree(Queue *q)
1377 {
1378         qclose(q);
1379         free(q);
1380 }
1381
1382 /*
1383  *  Mark a queue as closed.  No further IO is permitted.
1384  *  All blocks are released.
1385  */
1386 void
1387 qclose(Queue *q)
1388 {
1389         Block *bfirst;
1390
1391         if(q == nil)
1392                 return;
1393
1394         /* mark it */
1395         ilock(q);
1396         q->state |= Qclosed;
1397         q->state &= ~(Qflow|Qstarve);
1398         strcpy(q->err, Ehungup);
1399         bfirst = q->bfirst;
1400         q->bfirst = 0;
1401         q->len = 0;
1402         q->dlen = 0;
1403         q->noblock = 0;
1404         iunlock(q);
1405
1406         /* free queued blocks */
1407         freeblist(bfirst);
1408
1409         /* wake up readers/writers */
1410         wakeup(&q->rr);
1411         wakeup(&q->wr);
1412 }
1413
1414 /*
1415  *  Mark a queue as closed.  Wakeup any readers.  Don't remove queued
1416  *  blocks.
1417  */
1418 void
1419 qhangup(Queue *q, char *msg)
1420 {
1421         /* mark it */
1422         ilock(q);
1423         q->state |= Qclosed;
1424         if(msg == 0 || *msg == 0)
1425                 strcpy(q->err, Ehungup);
1426         else
1427                 strncpy(q->err, msg, ERRMAX-1);
1428         iunlock(q);
1429
1430         /* wake up readers/writers */
1431         wakeup(&q->rr);
1432         wakeup(&q->wr);
1433 }
1434
1435 /*
1436  *  return non-zero if the q is hungup
1437  */
1438 int
1439 qisclosed(Queue *q)
1440 {
1441         return q->state & Qclosed;
1442 }
1443
1444 /*
1445  *  mark a queue as no longer hung up
1446  */
1447 void
1448 qreopen(Queue *q)
1449 {
1450         ilock(q);
1451         q->state &= ~Qclosed;
1452         q->state |= Qstarve;
1453         q->eof = 0;
1454         q->limit = q->inilim;
1455         iunlock(q);
1456 }
1457
1458 /*
1459  *  return bytes queued
1460  */
1461 int
1462 qlen(Queue *q)
1463 {
1464         return q->dlen;
1465 }
1466
1467 /*
1468  * return space remaining before flow control
1469  */
1470 int
1471 qwindow(Queue *q)
1472 {
1473         int l;
1474
1475         l = q->limit - q->len;
1476         if(l < 0)
1477                 l = 0;
1478         return l;
1479 }
1480
1481 /*
1482  *  return true if we can read without blocking
1483  */
1484 int
1485 qcanread(Queue *q)
1486 {
1487         return q->bfirst!=0;
1488 }
1489
1490 /*
1491  *  change queue limit
1492  */
1493 void
1494 qsetlimit(Queue *q, int limit)
1495 {
1496         q->limit = limit;
1497 }
1498
1499 /*
1500  *  set blocking/nonblocking
1501  */
1502 void
1503 qnoblock(Queue *q, int onoff)
1504 {
1505         q->noblock = onoff;
1506 }
1507
1508 /*
1509  *  flush the output queue
1510  */
1511 void
1512 qflush(Queue *q)
1513 {
1514         Block *bfirst;
1515
1516         /* mark it */
1517         ilock(q);
1518         bfirst = q->bfirst;
1519         q->bfirst = 0;
1520         q->len = 0;
1521         q->dlen = 0;
1522         iunlock(q);
1523
1524         /* free queued blocks */
1525         freeblist(bfirst);
1526
1527         /* wake up readers/writers */
1528         wakeup(&q->wr);
1529 }
1530
1531 int
1532 qfull(Queue *q)
1533 {
1534         return q->state & Qflow;
1535 }
1536
1537 int
1538 qstate(Queue *q)
1539 {
1540         return q->state;
1541 }