]> git.lizzy.rs Git - plan9front.git/blob - sys/src/9/port/qio.c
kernel/qio: fix comments, fix qiwrite() on close queue, remove debug setmalloctag...
[plan9front.git] / sys / src / 9 / port / qio.c
1 #include        "u.h"
2 #include        "../port/lib.h"
3 #include        "mem.h"
4 #include        "dat.h"
5 #include        "fns.h"
6 #include        "../port/error.h"
7
8 static ulong padblockcnt;
9 static ulong concatblockcnt;
10 static ulong pullupblockcnt;
11 static ulong copyblockcnt;
12 static ulong consumecnt;
13 static ulong producecnt;
14 static ulong qcopycnt;
15
16 #define QDEBUG  if(0)
17
18 /*
19  *  IO queues
20  */
21 typedef struct Queue    Queue;
22
23 struct Queue
24 {
25         Lock;
26
27         Block*  bfirst;         /* buffer */
28         Block*  blast;
29
30         int     len;            /* bytes allocated to queue */
31         int     dlen;           /* data bytes in queue */
32         int     limit;          /* max bytes in queue */
33         int     inilim;         /* initial limit */
34         int     state;
35         int     noblock;        /* true if writes return immediately when q full */
36         int     eof;            /* number of eofs read by user */
37
38         void    (*kick)(void*); /* restart output */
39         void    (*bypass)(void*, Block*);       /* bypass queue altogether */
40         void*   arg;            /* argument to kick */
41
42         QLock   rlock;          /* mutex for reading processes */
43         Rendez  rr;             /* process waiting to read */
44         QLock   wlock;          /* mutex for writing processes */
45         Rendez  wr;             /* process waiting to write */
46
47         char    err[ERRMAX];
48 };
49
50 enum
51 {
52         Maxatomic       = 64*1024,
53 };
54
55 uint    qiomaxatomic = Maxatomic;
56
57 /*
58  *  free a list of blocks
59  */
60 void
61 freeblist(Block *b)
62 {
63         Block *next;
64
65         for(; b != nil; b = next){
66                 next = b->next;
67                 b->next = nil;
68                 freeb(b);
69         }
70 }
71
72 /*
73  *  pad a block to the front (or the back if size is negative)
74  */
75 Block*
76 padblock(Block *bp, int size)
77 {
78         int n;
79         Block *nbp;
80
81         if(bp->next != nil)
82                 panic("padblock %#p", getcallerpc(&bp));
83
84         QDEBUG checkb(bp, "padblock 1");
85         if(size >= 0){
86                 if(bp->rp - bp->base >= size){
87                         bp->rp -= size;
88                         return bp;
89                 }
90
91                 n = BLEN(bp);
92                 nbp = allocb(size+n);
93                 nbp->rp += size;
94                 nbp->wp = nbp->rp;
95                 memmove(nbp->wp, bp->rp, n);
96                 nbp->wp += n;
97                 nbp->rp -= size;
98         } else {
99                 size = -size;
100                 if(bp->lim - bp->wp >= size)
101                         return bp;
102
103                 n = BLEN(bp);
104                 nbp = allocb(size+n);
105                 memmove(nbp->wp, bp->rp, n);
106                 nbp->wp += n;
107         }
108         freeb(bp);
109         padblockcnt++;
110         QDEBUG checkb(nbp, "padblock 1");
111         return nbp;
112 }
113
114 /*
115  *  return count of bytes in a string of blocks
116  */
117 int
118 blocklen(Block *bp)
119 {
120         int len;
121
122         len = 0;
123         while(bp != nil) {
124                 len += BLEN(bp);
125                 bp = bp->next;
126         }
127         return len;
128 }
129
130 /*
131  * return count of space in blocks
132  */
133 int
134 blockalloclen(Block *bp)
135 {
136         int len;
137
138         len = 0;
139         while(bp != nil) {
140                 len += BALLOC(bp);
141                 bp = bp->next;
142         }
143         return len;
144 }
145
146 /*
147  *  copy the  string of blocks into
148  *  a single block and free the string
149  */
150 Block*
151 concatblock(Block *bp)
152 {
153         Block *nb, *next;
154         int len;
155
156         if(bp->next == nil)
157                 return bp;
158
159         nb = allocb(blocklen(bp));
160         for(; bp != nil; bp = next) {
161                 next = bp->next;
162                 len = BLEN(bp);
163                 memmove(nb->wp, bp->rp, len);
164                 nb->wp += len;
165                 freeb(bp);
166         }
167         concatblockcnt += BLEN(nb);
168         QDEBUG checkb(nb, "concatblock 1");
169         return nb;
170 }
171
172 /*
173  *  make sure the first block has at least n bytes
174  */
175 Block*
176 pullupblock(Block *bp, int n)
177 {
178         Block *nbp;
179         int i;
180
181         /*
182          *  this should almost always be true, it's
183          *  just to avoid every caller checking.
184          */
185         if(BLEN(bp) >= n)
186                 return bp;
187
188         /*
189          *  if not enough room in the first block,
190          *  add another to the front of the list.
191          */
192         if(bp->lim - bp->rp < n){
193                 nbp = allocb(n);
194                 nbp->next = bp;
195                 bp = nbp;
196         }
197
198         /*
199          *  copy bytes from the trailing blocks into the first
200          */
201         n -= BLEN(bp);
202         while((nbp = bp->next) != nil){
203                 pullupblockcnt++;
204                 i = BLEN(nbp);
205                 if(i > n) {
206                         memmove(bp->wp, nbp->rp, n);
207                         bp->wp += n;
208                         nbp->rp += n;
209                         QDEBUG checkb(bp, "pullupblock 1");
210                         return bp;
211                 } else {
212                         /* shouldn't happen but why crash if it does */
213                         if(i < 0){
214                                 print("pullup negative length packet, called from %#p\n",
215                                         getcallerpc(&bp));
216                                 i = 0;
217                         }
218                         memmove(bp->wp, nbp->rp, i);
219                         bp->wp += i;
220                         bp->next = nbp->next;
221                         nbp->next = nil;
222                         freeb(nbp);
223                         n -= i;
224                         if(n == 0){
225                                 QDEBUG checkb(bp, "pullupblock 2");
226                                 return bp;
227                         }
228                 }
229         }
230         freeb(bp);
231         return nil;
232 }
233
234 /*
235  *  make sure the first block has at least n bytes
236  */
237 Block*
238 pullupqueue(Queue *q, int n)
239 {
240         Block *b;
241
242         if(BLEN(q->bfirst) >= n)
243                 return q->bfirst;
244         q->bfirst = pullupblock(q->bfirst, n);
245         for(b = q->bfirst; b != nil && b->next != nil; b = b->next)
246                 ;
247         q->blast = b;
248         return q->bfirst;
249 }
250
251 /*
252  *  trim to len bytes starting at offset
253  */
254 Block *
255 trimblock(Block *bp, int offset, int len)
256 {
257         ulong l;
258         Block *nb, *startb;
259
260         QDEBUG checkb(bp, "trimblock 1");
261         if(blocklen(bp) < offset+len) {
262                 freeblist(bp);
263                 return nil;
264         }
265
266         while((l = BLEN(bp)) < offset) {
267                 offset -= l;
268                 nb = bp->next;
269                 bp->next = nil;
270                 freeb(bp);
271                 bp = nb;
272         }
273
274         startb = bp;
275         bp->rp += offset;
276
277         while((l = BLEN(bp)) < len) {
278                 len -= l;
279                 bp = bp->next;
280         }
281
282         bp->wp -= (BLEN(bp) - len);
283
284         if(bp->next != nil) {
285                 freeblist(bp->next);
286                 bp->next = nil;
287         }
288
289         return startb;
290 }
291
292 /*
293  *  copy 'count' bytes into a new block
294  */
295 Block*
296 copyblock(Block *bp, int count)
297 {
298         int l;
299         Block *nbp;
300
301         QDEBUG checkb(bp, "copyblock 0");
302         nbp = allocb(count);
303         for(; count > 0 && bp != nil; bp = bp->next){
304                 l = BLEN(bp);
305                 if(l > count)
306                         l = count;
307                 memmove(nbp->wp, bp->rp, l);
308                 nbp->wp += l;
309                 count -= l;
310         }
311         if(count > 0){
312                 memset(nbp->wp, 0, count);
313                 nbp->wp += count;
314         }
315         copyblockcnt++;
316         QDEBUG checkb(nbp, "copyblock 1");
317
318         return nbp;
319 }
320
321 Block*
322 adjustblock(Block* bp, int len)
323 {
324         int n;
325         Block *nbp;
326
327         if(len < 0){
328                 freeb(bp);
329                 return nil;
330         }
331
332         if(bp->rp+len > bp->lim){
333                 nbp = copyblock(bp, len);
334                 freeblist(bp);
335                 QDEBUG checkb(nbp, "adjustblock 1");
336
337                 return nbp;
338         }
339
340         n = BLEN(bp);
341         if(len > n)
342                 memset(bp->wp, 0, len-n);
343         bp->wp = bp->rp+len;
344         QDEBUG checkb(bp, "adjustblock 2");
345
346         return bp;
347 }
348
349
350 /*
351  *  throw away up to count bytes from a
352  *  list of blocks.  Return count of bytes
353  *  thrown away.
354  */
355 int
356 pullblock(Block **bph, int count)
357 {
358         Block *bp;
359         int n, bytes;
360
361         bytes = 0;
362         if(bph == nil)
363                 return 0;
364
365         while(*bph != nil && count != 0) {
366                 bp = *bph;
367                 n = BLEN(bp);
368                 if(count < n)
369                         n = count;
370                 bytes += n;
371                 count -= n;
372                 bp->rp += n;
373                 QDEBUG checkb(bp, "pullblock ");
374                 if(BLEN(bp) == 0) {
375                         *bph = bp->next;
376                         bp->next = nil;
377                         freeb(bp);
378                 }
379         }
380         return bytes;
381 }
382
383 /*
384  *  get next block from a queue, return null if nothing there
385  */
386 Block*
387 qget(Queue *q)
388 {
389         int dowakeup;
390         Block *b;
391
392         /* sync with qwrite */
393         ilock(q);
394
395         b = q->bfirst;
396         if(b == nil){
397                 q->state |= Qstarve;
398                 iunlock(q);
399                 return nil;
400         }
401         QDEBUG checkb(b, "qget");
402         q->bfirst = b->next;
403         b->next = nil;
404         q->len -= BALLOC(b);
405         q->dlen -= BLEN(b);
406
407         /* if writer flow controlled, restart */
408         if((q->state & Qflow) && q->len < q->limit/2){
409                 q->state &= ~Qflow;
410                 dowakeup = 1;
411         } else
412                 dowakeup = 0;
413
414         iunlock(q);
415
416         if(dowakeup)
417                 wakeup(&q->wr);
418
419         return b;
420 }
421
422 /*
423  *  throw away the next 'len' bytes in the queue
424  */
425 int
426 qdiscard(Queue *q, int len)
427 {
428         Block *b, *tofree = nil;
429         int dowakeup, n, sofar;
430
431         ilock(q);
432         for(sofar = 0; sofar < len; sofar += n){
433                 b = q->bfirst;
434                 if(b == nil)
435                         break;
436                 QDEBUG checkb(b, "qdiscard");
437                 n = BLEN(b);
438                 if(n <= len - sofar){
439                         q->bfirst = b->next;
440                         q->len -= BALLOC(b);
441                         q->dlen -= BLEN(b);
442
443                         /* remember to free this */
444                         b->next = tofree;
445                         tofree = b;
446                 } else {
447                         n = len - sofar;
448                         b->rp += n;
449                         q->dlen -= n;
450                 }
451         }
452
453         /*
454          *  if writer flow controlled, restart
455          *
456          *  This used to be
457          *      q->len < q->limit/2
458          *  but it slows down tcp too much for certain write sizes.
459          *  I really don't understand it completely.  It may be
460          *  due to the queue draining so fast that the transmission
461          *  stalls waiting for the app to produce more data.  - presotto
462          */
463         if((q->state & Qflow) && q->len < q->limit){
464                 q->state &= ~Qflow;
465                 dowakeup = 1;
466         } else
467                 dowakeup = 0;
468
469         iunlock(q);
470
471         if(dowakeup)
472                 wakeup(&q->wr);
473
474         if(tofree != nil)
475                 freeblist(tofree);
476
477         return sofar;
478 }
479
480 /*
481  *  Interrupt level copy out of a queue, return # bytes copied.
482  */
483 int
484 qconsume(Queue *q, void *vp, int len)
485 {
486         Block *b, *tofree = nil;
487         int n, dowakeup;
488         uchar *p = vp;
489
490         /* sync with qwrite */
491         ilock(q);
492
493         for(;;) {
494                 b = q->bfirst;
495                 if(b == nil){
496                         q->state |= Qstarve;
497                         len = -1;
498                         goto out;
499                 }
500                 QDEBUG checkb(b, "qconsume 1");
501
502                 n = BLEN(b);
503                 if(n > 0)
504                         break;
505                 q->bfirst = b->next;
506                 q->len -= BALLOC(b);
507
508                 /* remember to free this */
509                 b->next = tofree;
510                 tofree = b;
511         };
512
513         consumecnt += n;
514         if(n < len)
515                 len = n;
516         memmove(p, b->rp, len);
517         b->rp += len;
518         q->dlen -= len;
519
520         /* discard the block if we're done with it */
521         if((q->state & Qmsg) || len == n){
522                 q->bfirst = b->next;
523                 q->len -= BALLOC(b);
524                 q->dlen -= BLEN(b);
525
526                 /* remember to free this */
527                 b->next = tofree;
528                 tofree = b;
529         }
530
531 out:
532         /* if writer flow controlled, restart */
533         if((q->state & Qflow) && q->len < q->limit/2){
534                 q->state &= ~Qflow;
535                 dowakeup = 1;
536         } else
537                 dowakeup = 0;
538
539         iunlock(q);
540
541         if(dowakeup)
542                 wakeup(&q->wr);
543
544         if(tofree != nil)
545                 freeblist(tofree);
546
547         return len;
548 }
549
550 int
551 qpass(Queue *q, Block *b)
552 {
553         int len, dowakeup;
554
555         /* sync with qread */
556         dowakeup = 0;
557         ilock(q);
558         if(q->len >= q->limit){
559                 iunlock(q);
560                 freeblist(b);
561                 return -1;
562         }
563         if(q->state & Qclosed){
564                 iunlock(q);
565                 freeblist(b);
566                 return 0;
567         }
568
569         len = qaddlist(q, b);
570
571         if(q->len >= q->limit/2)
572                 q->state |= Qflow;
573
574         if(q->state & Qstarve){
575                 q->state &= ~Qstarve;
576                 dowakeup = 1;
577         }
578         iunlock(q);
579
580         if(dowakeup)
581                 wakeup(&q->rr);
582
583         return len;
584 }
585
586 int
587 qpassnolim(Queue *q, Block *b)
588 {
589         int len, dowakeup;
590
591         /* sync with qread */
592         dowakeup = 0;
593         ilock(q);
594
595         if(q->state & Qclosed){
596                 iunlock(q);
597                 freeblist(b);
598                 return 0;
599         }
600
601         len = qaddlist(q, b);
602
603         if(q->len >= q->limit/2)
604                 q->state |= Qflow;
605
606         if(q->state & Qstarve){
607                 q->state &= ~Qstarve;
608                 dowakeup = 1;
609         }
610         iunlock(q);
611
612         if(dowakeup)
613                 wakeup(&q->rr);
614
615         return len;
616 }
617
618 /*
619  *  if the allocated space is way out of line with the used
620  *  space, reallocate to a smaller block
621  */
622 Block*
623 packblock(Block *bp)
624 {
625         Block **l, *nbp;
626         int n;
627
628         for(l = &bp; (nbp = *l) != nil; l = &(*l)->next){
629                 n = BLEN(nbp);
630                 if((n<<2) < BALLOC(nbp)){
631                         *l = allocb(n);
632                         memmove((*l)->wp, nbp->rp, n);
633                         (*l)->wp += n;
634                         (*l)->next = nbp->next;
635                         freeb(nbp);
636                 }
637         }
638
639         return bp;
640 }
641
642 int
643 qproduce(Queue *q, void *vp, int len)
644 {
645         Block *b;
646         int dowakeup;
647         uchar *p = vp;
648
649         b = iallocb(len);
650         if(b == nil)
651                 return 0;
652
653         /* sync with qread */
654         dowakeup = 0;
655         ilock(q);
656
657         /* no waiting receivers, room in buffer? */
658         if(q->len >= q->limit){
659                 q->state |= Qflow;
660                 iunlock(q);
661                 return -1;
662         }
663         producecnt += len;
664
665         /* save in buffer */
666         memmove(b->wp, p, len);
667         b->wp += len;
668         qaddlist(q, b);
669
670         if(q->state & Qstarve){
671                 q->state &= ~Qstarve;
672                 dowakeup = 1;
673         }
674
675         if(q->len >= q->limit)
676                 q->state |= Qflow;
677         iunlock(q);
678
679         if(dowakeup)
680                 wakeup(&q->rr);
681
682         return len;
683 }
684
685 /*
686  *  copy from offset in the queue
687  */
688 Block*
689 qcopy(Queue *q, int len, ulong offset)
690 {
691         Block *b;
692
693         b = allocb(len);
694         ilock(q);
695         b->wp += readblist(q->bfirst, b->wp, len, offset);
696         iunlock(q);
697         return b;
698 }
699
700 /*
701  *  called by non-interrupt code
702  */
703 Queue*
704 qopen(int limit, int msg, void (*kick)(void*), void *arg)
705 {
706         Queue *q;
707
708         q = malloc(sizeof(Queue));
709         if(q == nil)
710                 return nil;
711
712         q->limit = q->inilim = limit;
713         q->kick = kick;
714         q->arg = arg;
715         q->state = msg;
716         
717         q->state |= Qstarve;
718         q->eof = 0;
719         q->noblock = 0;
720
721         return q;
722 }
723
724 /* open a queue to be bypassed */
725 Queue*
726 qbypass(void (*bypass)(void*, Block*), void *arg)
727 {
728         Queue *q;
729
730         q = malloc(sizeof(Queue));
731         if(q == nil)
732                 return nil;
733
734         q->limit = 0;
735         q->arg = arg;
736         q->bypass = bypass;
737         q->state = 0;
738
739         return q;
740 }
741
742 static int
743 notempty(void *a)
744 {
745         Queue *q = a;
746
747         return (q->state & Qclosed) || q->bfirst != nil;
748 }
749
750 /*
751  *  wait for the queue to be non-empty or closed.
752  *  called with q ilocked.
753  */
754 static int
755 qwait(Queue *q)
756 {
757         /* wait for data */
758         for(;;){
759                 if(q->bfirst != nil)
760                         break;
761
762                 if(q->state & Qclosed){
763                         if(++q->eof > 3)
764                                 return -1;
765                         if(*q->err && strcmp(q->err, Ehungup) != 0)
766                                 return -1;
767                         return 0;
768                 }
769
770                 q->state |= Qstarve;    /* flag requesting producer to wake me */
771                 iunlock(q);
772                 sleep(&q->rr, notempty, q);
773                 ilock(q);
774         }
775         return 1;
776 }
777
778 /*
779  * add a block list to a queue, return bytes added
780  */
781 int
782 qaddlist(Queue *q, Block *b)
783 {
784         int len, dlen;
785
786         QDEBUG checkb(b, "qaddlist 1");
787
788         /* queue the block */
789         if(q->bfirst != nil)
790                 q->blast->next = b;
791         else
792                 q->bfirst = b;
793
794         len = BALLOC(b);
795         dlen = BLEN(b);
796         while(b->next != nil){
797                 b = b->next;
798                 QDEBUG checkb(b, "qaddlist 2");
799
800                 len += BALLOC(b);
801                 dlen += BLEN(b);
802         }
803         q->blast = b;
804         q->len += len;
805         q->dlen += dlen;
806         return dlen;
807 }
808
809 /*
810  *  called with q ilocked
811  */
812 Block*
813 qremove(Queue *q)
814 {
815         Block *b;
816
817         b = q->bfirst;
818         if(b == nil)
819                 return nil;
820         QDEBUG checkb(b, "qremove");
821         q->bfirst = b->next;
822         b->next = nil;
823         q->dlen -= BLEN(b);
824         q->len -= BALLOC(b);
825         return b;
826 }
827
828 /*
829  *  copy the contents of a string of blocks into
830  *  memory from an offset. blocklist kept unchanged.
831  *  return number of copied bytes.
832  */
833 long
834 readblist(Block *b, uchar *p, long n, long o)
835 {
836         long m, r;
837
838         r = 0;
839         while(n > 0 && b != nil){
840                 m = BLEN(b);
841                 if(o >= m)
842                         o -= m;
843                 else {
844                         m -= o;
845                         if(n < m)
846                                 m = n;
847                         memmove(p, b->rp + o, m);
848                         p += m;
849                         r += m;
850                         n -= m;
851                         o = 0;
852                 }
853                 b = b->next;
854         }
855         return r;
856 }
857
858 /*
859  *  put a block back to the front of the queue
860  *  called with q ilocked
861  */
862 void
863 qputback(Queue *q, Block *b)
864 {
865         b->next = q->bfirst;
866         if(q->bfirst == nil)
867                 q->blast = b;
868         q->bfirst = b;
869         q->len += BALLOC(b);
870         q->dlen += BLEN(b);
871 }
872
873 /*
874  *  cut off n bytes from the end of *h. return a new
875  *  block with the tail and change *h to refer to the
876  *  head.
877  */
878 static Block*
879 splitblock(Block **h, int n)
880 {
881         Block *a, *b;
882         int m;
883
884         a = *h;
885         m = BLEN(a) - n;
886         if(m < n){
887                 b = allocb(m);
888                 memmove(b->wp, a->rp, m);
889                 b->wp += m;
890                 a->rp += m;
891                 *h = b;
892                 return a;
893         } else {
894                 b = allocb(n);
895                 a->wp -= n;
896                 memmove(b->wp, a->wp, n);
897                 b->wp += n;
898                 return b;
899         }
900 }
901
902 /*
903  *  flow control, get producer going again
904  *  called with q ilocked
905  */
906 static void
907 qwakeup_iunlock(Queue *q)
908 {
909         int dowakeup = 0;
910
911         /* if writer flow controlled, restart */
912         if((q->state & Qflow) && q->len < q->limit/2){
913                 q->state &= ~Qflow;
914                 dowakeup = 1;
915         }
916
917         iunlock(q);
918
919         /* wakeup flow controlled writers */
920         if(dowakeup){
921                 if(q->kick != nil)
922                         q->kick(q->arg);
923                 wakeup(&q->wr);
924         }
925 }
926
927 /*
928  *  get next block from a queue (up to a limit)
929  */
930 Block*
931 qbread(Queue *q, int len)
932 {
933         Block *b;
934         int n;
935
936         eqlock(&q->rlock);
937         if(waserror()){
938                 qunlock(&q->rlock);
939                 nexterror();
940         }
941
942         ilock(q);
943         switch(qwait(q)){
944         case 0:
945                 /* queue closed */
946                 iunlock(q);
947                 qunlock(&q->rlock);
948                 poperror();
949                 return nil;
950         case -1:
951                 /* multiple reads on a closed queue */
952                 iunlock(q);
953                 error(q->err);
954         }
955
956         /* if we get here, there's at least one block in the queue */
957         b = qremove(q);
958         n = BLEN(b);
959
960         /* split block if it's too big and this is not a message queue */
961         if(n > len){
962                 n -= len;
963                 if((q->state & Qmsg) == 0)
964                         qputback(q, splitblock(&b, n));
965                 else
966                         b->wp -= n;
967         }
968
969         /* restart producer */
970         qwakeup_iunlock(q);
971
972         qunlock(&q->rlock);
973         poperror();
974
975         return b;
976 }
977
978 /*
979  *  read a queue.  if no data is queued, post a Block
980  *  and wait on its Rendez.
981  */
982 long
983 qread(Queue *q, void *vp, int len)
984 {
985         Block *b, *first, **last;
986         int m, n;
987
988         eqlock(&q->rlock);
989         if(waserror()){
990                 qunlock(&q->rlock);
991                 nexterror();
992         }
993
994         ilock(q);
995 again:
996         switch(qwait(q)){
997         case 0:
998                 /* queue closed */
999                 iunlock(q);
1000                 qunlock(&q->rlock);
1001                 poperror();
1002                 return 0;
1003         case -1:
1004                 /* multiple reads on a closed queue */
1005                 iunlock(q);
1006                 error(q->err);
1007         }
1008
1009         /* if we get here, there's at least one block in the queue */
1010         last = &first;
1011         if(q->state & Qcoalesce){
1012                 /* when coalescing, 0 length blocks just go away */
1013                 b = q->bfirst;
1014                 m = BLEN(b);
1015                 if(m <= 0){
1016                         freeb(qremove(q));
1017                         goto again;
1018                 }
1019
1020                 /*  grab the first block plus as many
1021                  *  following blocks as will partially
1022                  *  fit in the read.
1023                  */
1024                 n = 0;
1025                 for(;;) {
1026                         *last = qremove(q);
1027                         n += m;
1028                         if(n >= len || q->bfirst == nil)
1029                                 break;
1030                         last = &b->next;
1031                         b = q->bfirst;
1032                         m = BLEN(b);
1033                 }
1034         } else {
1035                 first = qremove(q);
1036                 n = BLEN(first);
1037         }
1038
1039         /* split last block if it's too big and this is not a message queue */
1040         if(n > len && (q->state & Qmsg) == 0)
1041                 qputback(q, splitblock(last, n - len));
1042
1043         /* restart producer */
1044         qwakeup_iunlock(q);
1045
1046         qunlock(&q->rlock);
1047         poperror();
1048
1049         if(waserror()){
1050                 freeblist(first);
1051                 nexterror();
1052         }
1053         n = readblist(first, vp, len, 0);
1054         freeblist(first);
1055         poperror();
1056
1057         return n;
1058 }
1059
1060 static int
1061 qnotfull(void *a)
1062 {
1063         Queue *q = a;
1064
1065         return q->len < q->limit || (q->state & Qclosed);
1066 }
1067
1068 /*
1069  *  flow control, wait for queue to get below the limit
1070  */
1071 static void
1072 qflow(Queue *q)
1073 {
1074         for(;;){
1075                 if(q->noblock || qnotfull(q))
1076                         break;
1077
1078                 ilock(q);
1079                 q->state |= Qflow;
1080                 iunlock(q);
1081
1082                 eqlock(&q->wlock);
1083                 if(waserror()){
1084                         qunlock(&q->wlock);
1085                         nexterror();
1086                 }
1087                 sleep(&q->wr, qnotfull, q);
1088                 qunlock(&q->wlock);
1089                 poperror();
1090         }
1091 }
1092
1093 /*
1094  *  add a block to a queue obeying flow control
1095  */
1096 long
1097 qbwrite(Queue *q, Block *b)
1098 {
1099         int len, dowakeup;
1100         Proc *p;
1101
1102         if(q->bypass != nil){
1103                 len = blocklen(b);
1104                 (*q->bypass)(q->arg, b);
1105                 return len;
1106         }
1107
1108         dowakeup = 0;
1109         if(waserror()){
1110                 freeblist(b);
1111                 nexterror();
1112         }
1113         ilock(q);
1114
1115         /* give up if the queue is closed */
1116         if(q->state & Qclosed){
1117                 iunlock(q);
1118                 error(q->err);
1119         }
1120
1121         /* don't queue over the limit */
1122         if(q->len >= q->limit && q->noblock){
1123                 iunlock(q);
1124                 poperror();
1125                 len = blocklen(b);
1126                 freeblist(b);
1127                 return len;
1128         }
1129
1130         len = qaddlist(q, b);
1131
1132         /* make sure other end gets awakened */
1133         if(q->state & Qstarve){
1134                 q->state &= ~Qstarve;
1135                 dowakeup = 1;
1136         }
1137         iunlock(q);
1138         poperror();
1139
1140         /*  get output going again */
1141         if(q->kick != nil && (dowakeup || (q->state&Qkick)))
1142                 q->kick(q->arg);
1143
1144         /* wakeup anyone consuming at the other end */
1145         if(dowakeup){
1146                 p = wakeup(&q->rr);
1147
1148                 /* if we just wokeup a higher priority process, let it run */
1149                 if(p != nil && p->priority > up->priority)
1150                         sched();
1151         }
1152
1153         /*
1154          *  flow control, before allowing the process to continue and
1155          *  queue more. We do this here so that postnote can only
1156          *  interrupt us after the data has been queued.  This means that
1157          *  things like 9p flushes and ssl messages will not be disrupted
1158          *  by software interrupts.
1159          */
1160         qflow(q);
1161
1162         return len;
1163 }
1164
1165 /*
1166  *  write to a queue.  only Maxatomic bytes at a time is atomic.
1167  */
1168 int
1169 qwrite(Queue *q, void *vp, int len)
1170 {
1171         int n, sofar;
1172         Block *b;
1173         uchar *p = vp;
1174
1175         QDEBUG if(!islo())
1176                 print("qwrite hi %#p\n", getcallerpc(&q));
1177
1178         /* stop queue bloat before allocating blocks */
1179         if(q->len/2 >= q->limit && q->noblock == 0 && q->bypass == nil){
1180                 while(waserror()){
1181                         if(up->procctl == Proc_exitme || up->procctl == Proc_exitbig)
1182                                 error(Egreg);
1183                 }
1184                 qflow(q);
1185                 poperror();
1186         }
1187
1188         sofar = 0;
1189         do {
1190                 n = len-sofar;
1191                 if(n > Maxatomic)
1192                         n = Maxatomic;
1193
1194                 b = allocb(n);
1195                 if(waserror()){
1196                         freeb(b);
1197                         nexterror();
1198                 }
1199                 memmove(b->wp, p+sofar, n);
1200                 poperror();
1201                 b->wp += n;
1202
1203                 sofar += qbwrite(q, b);
1204         } while(sofar < len && (q->state & Qmsg) == 0);
1205
1206         return len;
1207 }
1208
1209 /*
1210  *  used by print() to write to a queue.  Since we may be splhi or not in
1211  *  a process, don't qlock.
1212  */
1213 int
1214 qiwrite(Queue *q, void *vp, int len)
1215 {
1216         int n, sofar, dowakeup;
1217         Block *b;
1218         uchar *p = vp;
1219
1220         dowakeup = 0;
1221
1222         sofar = 0;
1223         do {
1224                 n = len-sofar;
1225                 if(n > Maxatomic)
1226                         n = Maxatomic;
1227
1228                 b = iallocb(n);
1229                 if(b == nil)
1230                         break;
1231                 memmove(b->wp, p+sofar, n);
1232                 b->wp += n;
1233
1234                 ilock(q);
1235
1236                 /* we use an artificially high limit for kernel prints since anything
1237                  * over the limit gets dropped
1238                  */
1239                 if((q->state & Qclosed) != 0 || q->len/2 >= q->limit){
1240                         iunlock(q);
1241                         freeb(b);
1242                         break;
1243                 }
1244
1245                 qaddlist(q, b);
1246
1247                 if(q->state & Qstarve){
1248                         q->state &= ~Qstarve;
1249                         dowakeup = 1;
1250                 }
1251
1252                 iunlock(q);
1253
1254                 if(dowakeup){
1255                         if(q->kick != nil)
1256                                 q->kick(q->arg);
1257                         wakeup(&q->rr);
1258                 }
1259
1260                 sofar += n;
1261         } while(sofar < len && (q->state & Qmsg) == 0);
1262
1263         return sofar;
1264 }
1265
1266 /*
1267  *  be extremely careful when calling this,
1268  *  as there is no reference accounting
1269  */
1270 void
1271 qfree(Queue *q)
1272 {
1273         qclose(q);
1274         free(q);
1275 }
1276
1277 /*
1278  *  Mark a queue as closed.  No further IO is permitted.
1279  *  All blocks are released.
1280  */
1281 void
1282 qclose(Queue *q)
1283 {
1284         Block *bfirst;
1285
1286         if(q == nil)
1287                 return;
1288
1289         /* mark it */
1290         ilock(q);
1291         q->state |= Qclosed;
1292         q->state &= ~(Qflow|Qstarve);
1293         kstrcpy(q->err, Ehungup, ERRMAX);
1294         bfirst = q->bfirst;
1295         q->bfirst = nil;
1296         q->len = 0;
1297         q->dlen = 0;
1298         q->noblock = 0;
1299         iunlock(q);
1300
1301         /* free queued blocks */
1302         freeblist(bfirst);
1303
1304         /* wake up readers/writers */
1305         wakeup(&q->rr);
1306         wakeup(&q->wr);
1307 }
1308
1309 /*
1310  *  Mark a queue as closed.  Wakeup any readers.  Don't remove queued
1311  *  blocks.
1312  */
1313 void
1314 qhangup(Queue *q, char *msg)
1315 {
1316         /* mark it */
1317         ilock(q);
1318         q->state |= Qclosed;
1319         if(msg == nil || *msg == '\0')
1320                 msg = Ehungup;
1321         kstrcpy(q->err, msg, ERRMAX);
1322         iunlock(q);
1323
1324         /* wake up readers/writers */
1325         wakeup(&q->rr);
1326         wakeup(&q->wr);
1327 }
1328
1329 /*
1330  *  return non-zero if the q is hungup
1331  */
1332 int
1333 qisclosed(Queue *q)
1334 {
1335         return q->state & Qclosed;
1336 }
1337
1338 /*
1339  *  mark a queue as no longer hung up
1340  */
1341 void
1342 qreopen(Queue *q)
1343 {
1344         ilock(q);
1345         q->state &= ~Qclosed;
1346         q->state |= Qstarve;
1347         q->eof = 0;
1348         q->limit = q->inilim;
1349         iunlock(q);
1350 }
1351
1352 /*
1353  *  return bytes queued
1354  */
1355 int
1356 qlen(Queue *q)
1357 {
1358         return q->dlen;
1359 }
1360
1361 /*
1362  * return space remaining before flow control
1363  */
1364 int
1365 qwindow(Queue *q)
1366 {
1367         int l;
1368
1369         l = q->limit - q->len;
1370         if(l < 0)
1371                 l = 0;
1372         return l;
1373 }
1374
1375 /*
1376  *  return true if we can read without blocking
1377  */
1378 int
1379 qcanread(Queue *q)
1380 {
1381         return q->bfirst != nil;
1382 }
1383
1384 /*
1385  *  change queue limit
1386  */
1387 void
1388 qsetlimit(Queue *q, int limit)
1389 {
1390         q->limit = limit;
1391 }
1392
1393 /*
1394  *  set blocking/nonblocking
1395  */
1396 void
1397 qnoblock(Queue *q, int onoff)
1398 {
1399         q->noblock = onoff;
1400 }
1401
1402 /*
1403  *  flush the output queue
1404  */
1405 void
1406 qflush(Queue *q)
1407 {
1408         Block *bfirst;
1409
1410         /* mark it */
1411         ilock(q);
1412         bfirst = q->bfirst;
1413         q->bfirst = nil;
1414         q->len = 0;
1415         q->dlen = 0;
1416         iunlock(q);
1417
1418         /* free queued blocks */
1419         freeblist(bfirst);
1420
1421         /* wake up writers */
1422         wakeup(&q->wr);
1423 }
1424
1425 int
1426 qfull(Queue *q)
1427 {
1428         return q->state & Qflow;
1429 }