]> git.lizzy.rs Git - plan9front.git/blob - sys/src/9/port/qio.c
merge
[plan9front.git] / sys / src / 9 / port / qio.c
1 #include        "u.h"
2 #include        "../port/lib.h"
3 #include        "mem.h"
4 #include        "dat.h"
5 #include        "fns.h"
6 #include        "../port/error.h"
7
8 static ulong padblockcnt;
9 static ulong concatblockcnt;
10 static ulong pullupblockcnt;
11 static ulong copyblockcnt;
12 static ulong consumecnt;
13 static ulong producecnt;
14
15 #define QDEBUG  if(0)
16
17 /*
18  *  IO queues
19  */
20 typedef struct Queue    Queue;
21
22 struct Queue
23 {
24         Lock;
25
26         Block*  bfirst;         /* buffer */
27         Block*  blast;
28
29         int     len;            /* bytes allocated to queue */
30         int     dlen;           /* data bytes in queue */
31         int     limit;          /* max bytes in queue */
32         int     inilim;         /* initial limit */
33         int     state;
34         int     noblock;        /* true if writes return immediately when q full */
35         int     eof;            /* number of eofs read by user */
36
37         void    (*kick)(void*); /* restart output */
38         void    (*bypass)(void*, Block*);       /* bypass queue altogether */
39         void*   arg;            /* argument to kick */
40
41         QLock   rlock;          /* mutex for reading processes */
42         Rendez  rr;             /* process waiting to read */
43         QLock   wlock;          /* mutex for writing processes */
44         Rendez  wr;             /* process waiting to write */
45
46         char    err[ERRMAX];
47 };
48
49 enum
50 {
51         Maxatomic       = 64*1024,
52 };
53
54 uint    qiomaxatomic = Maxatomic;
55
56 /*
57  *  free a list of blocks
58  */
59 void
60 freeblist(Block *b)
61 {
62         Block *next;
63
64         for(; b != nil; b = next){
65                 next = b->next;
66                 b->next = nil;
67                 freeb(b);
68         }
69 }
70
71 /*
72  *  pad a block to the front (or the back if size is negative)
73  */
74 Block*
75 padblock(Block *bp, int size)
76 {
77         int n;
78         Block *nbp;
79
80         QDEBUG checkb(bp, "padblock 0");
81         if(size >= 0){
82                 if(bp->rp - bp->base >= size){
83                         bp->rp -= size;
84                         return bp;
85                 }
86                 n = BLEN(bp);
87                 nbp = allocb(size+n);
88                 nbp->rp += size;
89                 nbp->wp = nbp->rp;
90                 memmove(nbp->wp, bp->rp, n);
91                 nbp->wp += n;
92                 nbp->rp -= size;
93         } else {
94                 size = -size;
95                 if(bp->lim - bp->wp >= size)
96                         return bp;
97                 n = BLEN(bp);
98                 nbp = allocb(n+size);
99                 memmove(nbp->wp, bp->rp, n);
100                 nbp->wp += n;
101         }
102         nbp->next = bp->next;
103         freeb(bp);
104         padblockcnt++;
105         QDEBUG checkb(nbp, "padblock 1");
106         return nbp;
107 }
108
109 /*
110  *  return count of bytes in a string of blocks
111  */
112 int
113 blocklen(Block *bp)
114 {
115         int len;
116
117         len = 0;
118         while(bp != nil) {
119                 len += BLEN(bp);
120                 bp = bp->next;
121         }
122         return len;
123 }
124
125 /*
126  * return count of space in blocks
127  */
128 int
129 blockalloclen(Block *bp)
130 {
131         int len;
132
133         len = 0;
134         while(bp != nil) {
135                 len += BALLOC(bp);
136                 bp = bp->next;
137         }
138         return len;
139 }
140
141 /*
142  *  copy the string of blocks into
143  *  a single block and free the string
144  */
145 Block*
146 concatblock(Block *bp)
147 {
148         int len;
149
150         if(bp->next == nil)
151                 return bp;
152         len = blocklen(bp);
153         concatblockcnt += len;
154         return pullupblock(bp, len);
155 }
156
157 /*
158  *  make sure the first block has at least n bytes
159  */
160 Block*
161 pullupblock(Block *bp, int n)
162 {
163         Block *nbp;
164         int i;
165
166         /*
167          *  this should almost always be true, it's
168          *  just to avoid every caller checking.
169          */
170         if(BLEN(bp) >= n)
171                 return bp;
172
173         /*
174          *  if not enough room in the first block,
175          *  add another to the front of the list.
176          */
177         if(bp->lim - bp->rp < n){
178                 nbp = allocb(n);
179                 nbp->next = bp;
180                 bp = nbp;
181         }
182
183         /*
184          *  copy bytes from the trailing blocks into the first
185          */
186         n -= BLEN(bp);
187         while((nbp = bp->next) != nil){
188                 pullupblockcnt++;
189                 i = BLEN(nbp);
190                 if(i > n) {
191                         memmove(bp->wp, nbp->rp, n);
192                         bp->wp += n;
193                         nbp->rp += n;
194                         QDEBUG checkb(bp, "pullupblock 1");
195                         return bp;
196                 } else {
197                         /* shouldn't happen but why crash if it does */
198                         if(i < 0){
199                                 print("pullup negative length packet, called from %#p\n",
200                                         getcallerpc(&bp));
201                                 i = 0;
202                         }
203                         memmove(bp->wp, nbp->rp, i);
204                         bp->wp += i;
205                         bp->next = nbp->next;
206                         nbp->next = nil;
207                         freeb(nbp);
208                         n -= i;
209                         if(n == 0){
210                                 QDEBUG checkb(bp, "pullupblock 2");
211                                 return bp;
212                         }
213                 }
214         }
215         freeb(bp);
216         return nil;
217 }
218
219 /*
220  *  make sure the first block has at least n bytes
221  */
222 Block*
223 pullupqueue(Queue *q, int n)
224 {
225         Block *b;
226
227         if(BLEN(q->bfirst) >= n)
228                 return q->bfirst;
229         q->bfirst = pullupblock(q->bfirst, n);
230         for(b = q->bfirst; b != nil && b->next != nil; b = b->next)
231                 ;
232         q->blast = b;
233         return q->bfirst;
234 }
235
236 /*
237  *  trim to len bytes starting at offset
238  */
239 Block *
240 trimblock(Block *bp, int offset, int len)
241 {
242         ulong l;
243         Block *nb, *startb;
244
245         QDEBUG checkb(bp, "trimblock 1");
246         l = blocklen(bp);
247         if(offset == 0 && len == l)
248                 return bp;
249         if(l < offset+len) {
250                 freeblist(bp);
251                 return nil;
252         }
253
254         while((l = BLEN(bp)) < offset) {
255                 offset -= l;
256                 nb = bp->next;
257                 bp->next = nil;
258                 freeb(bp);
259                 bp = nb;
260         }
261
262         startb = bp;
263         bp->rp += offset;
264
265         while((l = BLEN(bp)) < len) {
266                 len -= l;
267                 bp = bp->next;
268         }
269
270         bp->wp -= (BLEN(bp) - len);
271
272         if(bp->next != nil) {
273                 freeblist(bp->next);
274                 bp->next = nil;
275         }
276
277         return startb;
278 }
279
280 /*
281  *  copy 'count' bytes into a new block
282  */
283 Block*
284 copyblock(Block *bp, int count)
285 {
286         int l;
287         Block *nbp;
288
289         QDEBUG checkb(bp, "copyblock 0");
290         nbp = allocb(count);
291         for(; count > 0 && bp != nil; bp = bp->next){
292                 l = BLEN(bp);
293                 if(l > count)
294                         l = count;
295                 memmove(nbp->wp, bp->rp, l);
296                 nbp->wp += l;
297                 count -= l;
298         }
299         if(count > 0){
300                 memset(nbp->wp, 0, count);
301                 nbp->wp += count;
302         }
303         copyblockcnt++;
304         QDEBUG checkb(nbp, "copyblock 1");
305
306         return nbp;
307 }
308
309 Block*
310 adjustblock(Block* bp, int len)
311 {
312         int n;
313         Block *nbp;
314
315         if(len < 0){
316                 freeb(bp);
317                 return nil;
318         }
319
320         if(bp->rp+len > bp->lim){
321                 nbp = copyblock(bp, len);
322                 freeblist(bp);
323                 QDEBUG checkb(nbp, "adjustblock 1");
324
325                 return nbp;
326         }
327
328         n = BLEN(bp);
329         if(len > n)
330                 memset(bp->wp, 0, len-n);
331         bp->wp = bp->rp+len;
332         QDEBUG checkb(bp, "adjustblock 2");
333
334         return bp;
335 }
336
337
338 /*
339  *  throw away up to count bytes from a
340  *  list of blocks.  Return count of bytes
341  *  thrown away.
342  */
343 int
344 pullblock(Block **bph, int count)
345 {
346         Block *bp;
347         int n, bytes;
348
349         bytes = 0;
350         if(bph == nil)
351                 return 0;
352
353         while(*bph != nil && count != 0) {
354                 bp = *bph;
355                 n = BLEN(bp);
356                 if(count < n)
357                         n = count;
358                 bytes += n;
359                 count -= n;
360                 bp->rp += n;
361                 QDEBUG checkb(bp, "pullblock ");
362                 if(BLEN(bp) == 0) {
363                         *bph = bp->next;
364                         bp->next = nil;
365                         freeb(bp);
366                 }
367         }
368         return bytes;
369 }
370
371 /*
372  *  get next block from a queue, return null if nothing there
373  */
374 Block*
375 qget(Queue *q)
376 {
377         int dowakeup;
378         Block *b;
379
380         /* sync with qwrite */
381         ilock(q);
382
383         b = q->bfirst;
384         if(b == nil){
385                 q->state |= Qstarve;
386                 iunlock(q);
387                 return nil;
388         }
389         QDEBUG checkb(b, "qget");
390         q->bfirst = b->next;
391         b->next = nil;
392         q->len -= BALLOC(b);
393         q->dlen -= BLEN(b);
394
395         /* if writer flow controlled, restart */
396         if((q->state & Qflow) && q->len < q->limit/2){
397                 q->state &= ~Qflow;
398                 dowakeup = 1;
399         } else
400                 dowakeup = 0;
401
402         iunlock(q);
403
404         if(dowakeup)
405                 wakeup(&q->wr);
406
407         return b;
408 }
409
410 /*
411  *  throw away the next 'len' bytes in the queue
412  */
413 int
414 qdiscard(Queue *q, int len)
415 {
416         Block *b, *tofree = nil;
417         int dowakeup, n, sofar;
418
419         ilock(q);
420         for(sofar = 0; sofar < len; sofar += n){
421                 b = q->bfirst;
422                 if(b == nil)
423                         break;
424                 QDEBUG checkb(b, "qdiscard");
425                 n = BLEN(b);
426                 if(n <= len - sofar){
427                         q->bfirst = b->next;
428                         q->len -= BALLOC(b);
429                         q->dlen -= BLEN(b);
430
431                         /* remember to free this */
432                         b->next = tofree;
433                         tofree = b;
434                 } else {
435                         n = len - sofar;
436                         b->rp += n;
437                         q->dlen -= n;
438                 }
439         }
440
441         /*
442          *  if writer flow controlled, restart
443          *
444          *  This used to be
445          *      q->len < q->limit/2
446          *  but it slows down tcp too much for certain write sizes.
447          *  I really don't understand it completely.  It may be
448          *  due to the queue draining so fast that the transmission
449          *  stalls waiting for the app to produce more data.  - presotto
450          */
451         if((q->state & Qflow) && q->len < q->limit){
452                 q->state &= ~Qflow;
453                 dowakeup = 1;
454         } else
455                 dowakeup = 0;
456
457         iunlock(q);
458
459         if(dowakeup)
460                 wakeup(&q->wr);
461
462         if(tofree != nil)
463                 freeblist(tofree);
464
465         return sofar;
466 }
467
468 /*
469  *  Interrupt level copy out of a queue, return # bytes copied.
470  */
471 int
472 qconsume(Queue *q, void *vp, int len)
473 {
474         Block *b, *tofree = nil;
475         int n, dowakeup;
476         uchar *p = vp;
477
478         /* sync with qwrite */
479         ilock(q);
480
481         for(;;) {
482                 b = q->bfirst;
483                 if(b == nil){
484                         q->state |= Qstarve;
485                         len = -1;
486                         goto out;
487                 }
488                 QDEBUG checkb(b, "qconsume 1");
489
490                 n = BLEN(b);
491                 if(n > 0)
492                         break;
493                 q->bfirst = b->next;
494                 q->len -= BALLOC(b);
495
496                 /* remember to free this */
497                 b->next = tofree;
498                 tofree = b;
499         };
500
501         consumecnt += n;
502         if(n < len)
503                 len = n;
504         memmove(p, b->rp, len);
505         b->rp += len;
506         q->dlen -= len;
507
508         /* discard the block if we're done with it */
509         if((q->state & Qmsg) || len == n){
510                 q->bfirst = b->next;
511                 q->len -= BALLOC(b);
512                 q->dlen -= BLEN(b);
513
514                 /* remember to free this */
515                 b->next = tofree;
516                 tofree = b;
517         }
518
519 out:
520         /* if writer flow controlled, restart */
521         if((q->state & Qflow) && q->len < q->limit/2){
522                 q->state &= ~Qflow;
523                 dowakeup = 1;
524         } else
525                 dowakeup = 0;
526
527         iunlock(q);
528
529         if(dowakeup)
530                 wakeup(&q->wr);
531
532         if(tofree != nil)
533                 freeblist(tofree);
534
535         return len;
536 }
537
538 int
539 qpass(Queue *q, Block *b)
540 {
541         int len, dowakeup;
542
543         /* sync with qread */
544         dowakeup = 0;
545         ilock(q);
546         if(q->len >= q->limit){
547                 iunlock(q);
548                 freeblist(b);
549                 return -1;
550         }
551         if(q->state & Qclosed){
552                 iunlock(q);
553                 freeblist(b);
554                 return 0;
555         }
556
557         len = qaddlist(q, b);
558
559         if(q->len >= q->limit/2)
560                 q->state |= Qflow;
561
562         if(q->state & Qstarve){
563                 q->state &= ~Qstarve;
564                 dowakeup = 1;
565         }
566         iunlock(q);
567
568         if(dowakeup)
569                 wakeup(&q->rr);
570
571         return len;
572 }
573
574 int
575 qpassnolim(Queue *q, Block *b)
576 {
577         int len, dowakeup;
578
579         /* sync with qread */
580         dowakeup = 0;
581         ilock(q);
582
583         if(q->state & Qclosed){
584                 iunlock(q);
585                 freeblist(b);
586                 return 0;
587         }
588
589         len = qaddlist(q, b);
590
591         if(q->len >= q->limit/2)
592                 q->state |= Qflow;
593
594         if(q->state & Qstarve){
595                 q->state &= ~Qstarve;
596                 dowakeup = 1;
597         }
598         iunlock(q);
599
600         if(dowakeup)
601                 wakeup(&q->rr);
602
603         return len;
604 }
605
606 /*
607  *  if the allocated space is way out of line with the used
608  *  space, reallocate to a smaller block
609  */
610 Block*
611 packblock(Block *bp)
612 {
613         Block **l, *nbp;
614         int n;
615
616         for(l = &bp; (nbp = *l) != nil; l = &(*l)->next){
617                 n = BLEN(nbp);
618                 if((n<<2) < BALLOC(nbp)){
619                         *l = allocb(n);
620                         memmove((*l)->wp, nbp->rp, n);
621                         (*l)->wp += n;
622                         (*l)->next = nbp->next;
623                         freeb(nbp);
624                 }
625         }
626
627         return bp;
628 }
629
630 int
631 qproduce(Queue *q, void *vp, int len)
632 {
633         Block *b;
634         int dowakeup;
635         uchar *p = vp;
636
637         b = iallocb(len);
638         if(b == nil)
639                 return 0;
640
641         /* sync with qread */
642         dowakeup = 0;
643         ilock(q);
644
645         /* no waiting receivers, room in buffer? */
646         if(q->len >= q->limit){
647                 q->state |= Qflow;
648                 iunlock(q);
649                 return -1;
650         }
651         producecnt += len;
652
653         /* save in buffer */
654         memmove(b->wp, p, len);
655         b->wp += len;
656         qaddlist(q, b);
657
658         if(q->state & Qstarve){
659                 q->state &= ~Qstarve;
660                 dowakeup = 1;
661         }
662
663         if(q->len >= q->limit)
664                 q->state |= Qflow;
665         iunlock(q);
666
667         if(dowakeup)
668                 wakeup(&q->rr);
669
670         return len;
671 }
672
673 /*
674  *  copy from offset in the queue
675  */
676 Block*
677 qcopy(Queue *q, int len, ulong offset)
678 {
679         Block *b;
680
681         b = allocb(len);
682         ilock(q);
683         b->wp += readblist(q->bfirst, b->wp, len, offset);
684         iunlock(q);
685         return b;
686 }
687
688 /*
689  *  called by non-interrupt code
690  */
691 Queue*
692 qopen(int limit, int msg, void (*kick)(void*), void *arg)
693 {
694         Queue *q;
695
696         q = malloc(sizeof(Queue));
697         if(q == nil)
698                 return nil;
699
700         q->limit = q->inilim = limit;
701         q->kick = kick;
702         q->arg = arg;
703         q->state = msg;
704         
705         q->state |= Qstarve;
706         q->eof = 0;
707         q->noblock = 0;
708
709         return q;
710 }
711
712 /* open a queue to be bypassed */
713 Queue*
714 qbypass(void (*bypass)(void*, Block*), void *arg)
715 {
716         Queue *q;
717
718         q = malloc(sizeof(Queue));
719         if(q == nil)
720                 return nil;
721
722         q->limit = 0;
723         q->arg = arg;
724         q->bypass = bypass;
725         q->state = 0;
726
727         return q;
728 }
729
730 static int
731 notempty(void *a)
732 {
733         Queue *q = a;
734
735         return (q->state & Qclosed) || q->bfirst != nil;
736 }
737
738 /*
739  *  wait for the queue to be non-empty or closed.
740  *  called with q ilocked.
741  */
742 static int
743 qwait(Queue *q)
744 {
745         /* wait for data */
746         for(;;){
747                 if(q->bfirst != nil)
748                         break;
749
750                 if(q->state & Qclosed){
751                         if(++q->eof > 3)
752                                 return -1;
753                         if(*q->err && strcmp(q->err, Ehungup) != 0)
754                                 return -1;
755                         return 0;
756                 }
757
758                 q->state |= Qstarve;    /* flag requesting producer to wake me */
759                 iunlock(q);
760                 sleep(&q->rr, notempty, q);
761                 ilock(q);
762         }
763         return 1;
764 }
765
766 /*
767  * add a block list to a queue, return bytes added
768  */
769 int
770 qaddlist(Queue *q, Block *b)
771 {
772         int len, dlen;
773
774         QDEBUG checkb(b, "qaddlist 1");
775
776         /* queue the block */
777         if(q->bfirst != nil)
778                 q->blast->next = b;
779         else
780                 q->bfirst = b;
781
782         len = BALLOC(b);
783         dlen = BLEN(b);
784         while(b->next != nil){
785                 b = b->next;
786                 QDEBUG checkb(b, "qaddlist 2");
787
788                 len += BALLOC(b);
789                 dlen += BLEN(b);
790         }
791         q->blast = b;
792         q->len += len;
793         q->dlen += dlen;
794         return dlen;
795 }
796
797 /*
798  *  called with q ilocked
799  */
800 Block*
801 qremove(Queue *q)
802 {
803         Block *b;
804
805         b = q->bfirst;
806         if(b == nil)
807                 return nil;
808         QDEBUG checkb(b, "qremove");
809         q->bfirst = b->next;
810         b->next = nil;
811         q->dlen -= BLEN(b);
812         q->len -= BALLOC(b);
813         return b;
814 }
815
816 /*
817  *  copy the contents of a string of blocks into
818  *  memory from an offset. blocklist kept unchanged.
819  *  return number of copied bytes.
820  */
821 long
822 readblist(Block *b, uchar *p, long n, ulong o)
823 {
824         ulong m, r;
825
826         r = 0;
827         while(n > 0 && b != nil){
828                 m = BLEN(b);
829                 if(o >= m)
830                         o -= m;
831                 else {
832                         m -= o;
833                         if(n < m)
834                                 m = n;
835                         memmove(p, b->rp + o, m);
836                         p += m;
837                         r += m;
838                         n -= m;
839                         o = 0;
840                 }
841                 b = b->next;
842         }
843         return r;
844 }
845
846 /*
847  *  put a block back to the front of the queue
848  *  called with q ilocked
849  */
850 void
851 qputback(Queue *q, Block *b)
852 {
853         b->next = q->bfirst;
854         if(q->bfirst == nil)
855                 q->blast = b;
856         q->bfirst = b;
857         q->len += BALLOC(b);
858         q->dlen += BLEN(b);
859 }
860
861 /*
862  *  cut off n bytes from the end of *h. return a new
863  *  block with the tail and change *h to refer to the
864  *  head.
865  */
866 static Block*
867 splitblock(Block **h, int n)
868 {
869         Block *a, *b;
870         int m;
871
872         a = *h;
873         m = BLEN(a) - n;
874         if(m < n){
875                 b = allocb(m);
876                 memmove(b->wp, a->rp, m);
877                 b->wp += m;
878                 a->rp += m;
879                 *h = b;
880                 return a;
881         } else {
882                 b = allocb(n);
883                 a->wp -= n;
884                 memmove(b->wp, a->wp, n);
885                 b->wp += n;
886                 return b;
887         }
888 }
889
890 /*
891  *  flow control, get producer going again
892  *  called with q ilocked
893  */
894 static void
895 qwakeup_iunlock(Queue *q)
896 {
897         int dowakeup = 0;
898
899         /* if writer flow controlled, restart */
900         if((q->state & Qflow) && q->len < q->limit/2){
901                 q->state &= ~Qflow;
902                 dowakeup = 1;
903         }
904
905         iunlock(q);
906
907         /* wakeup flow controlled writers */
908         if(dowakeup){
909                 if(q->kick != nil)
910                         q->kick(q->arg);
911                 wakeup(&q->wr);
912         }
913 }
914
915 /*
916  *  get next block from a queue (up to a limit)
917  */
918 Block*
919 qbread(Queue *q, int len)
920 {
921         Block *b;
922         int n;
923
924         eqlock(&q->rlock);
925         if(waserror()){
926                 qunlock(&q->rlock);
927                 nexterror();
928         }
929
930         ilock(q);
931         switch(qwait(q)){
932         case 0:
933                 /* queue closed */
934                 iunlock(q);
935                 qunlock(&q->rlock);
936                 poperror();
937                 return nil;
938         case -1:
939                 /* multiple reads on a closed queue */
940                 iunlock(q);
941                 error(q->err);
942         }
943
944         /* if we get here, there's at least one block in the queue */
945         b = qremove(q);
946         n = BLEN(b);
947
948         /* split block if it's too big and this is not a message queue */
949         if(n > len){
950                 n -= len;
951                 if((q->state & Qmsg) == 0)
952                         qputback(q, splitblock(&b, n));
953                 else
954                         b->wp -= n;
955         }
956
957         /* restart producer */
958         qwakeup_iunlock(q);
959
960         qunlock(&q->rlock);
961         poperror();
962
963         return b;
964 }
965
966 /*
967  *  read a queue.  if no data is queued, post a Block
968  *  and wait on its Rendez.
969  */
970 long
971 qread(Queue *q, void *vp, int len)
972 {
973         Block *b, *first, **last;
974         int m, n;
975
976         eqlock(&q->rlock);
977         if(waserror()){
978                 qunlock(&q->rlock);
979                 nexterror();
980         }
981
982         ilock(q);
983 again:
984         switch(qwait(q)){
985         case 0:
986                 /* queue closed */
987                 iunlock(q);
988                 qunlock(&q->rlock);
989                 poperror();
990                 return 0;
991         case -1:
992                 /* multiple reads on a closed queue */
993                 iunlock(q);
994                 error(q->err);
995         }
996
997         /* if we get here, there's at least one block in the queue */
998         last = &first;
999         if(q->state & Qcoalesce){
1000                 /* when coalescing, 0 length blocks just go away */
1001                 b = q->bfirst;
1002                 m = BLEN(b);
1003                 if(m <= 0){
1004                         freeb(qremove(q));
1005                         goto again;
1006                 }
1007
1008                 /*  grab the first block plus as many
1009                  *  following blocks as will partially
1010                  *  fit in the read.
1011                  */
1012                 n = 0;
1013                 for(;;) {
1014                         *last = qremove(q);
1015                         n += m;
1016                         if(n >= len || q->bfirst == nil)
1017                                 break;
1018                         last = &b->next;
1019                         b = q->bfirst;
1020                         m = BLEN(b);
1021                 }
1022         } else {
1023                 first = qremove(q);
1024                 n = BLEN(first);
1025         }
1026
1027         /* split last block if it's too big and this is not a message queue */
1028         if(n > len && (q->state & Qmsg) == 0)
1029                 qputback(q, splitblock(last, n - len));
1030
1031         /* restart producer */
1032         qwakeup_iunlock(q);
1033
1034         qunlock(&q->rlock);
1035         poperror();
1036
1037         if(waserror()){
1038                 freeblist(first);
1039                 nexterror();
1040         }
1041         n = readblist(first, vp, len, 0);
1042         freeblist(first);
1043         poperror();
1044
1045         return n;
1046 }
1047
1048 static int
1049 qnotfull(void *a)
1050 {
1051         Queue *q = a;
1052
1053         return q->len < q->limit || (q->state & Qclosed);
1054 }
1055
1056 /*
1057  *  flow control, wait for queue to get below the limit
1058  */
1059 static void
1060 qflow(Queue *q)
1061 {
1062         for(;;){
1063                 if(q->noblock || qnotfull(q))
1064                         break;
1065
1066                 ilock(q);
1067                 q->state |= Qflow;
1068                 iunlock(q);
1069
1070                 eqlock(&q->wlock);
1071                 if(waserror()){
1072                         qunlock(&q->wlock);
1073                         nexterror();
1074                 }
1075                 sleep(&q->wr, qnotfull, q);
1076                 qunlock(&q->wlock);
1077                 poperror();
1078         }
1079 }
1080
1081 /*
1082  *  add a block to a queue obeying flow control
1083  */
1084 long
1085 qbwrite(Queue *q, Block *b)
1086 {
1087         int len, dowakeup;
1088         Proc *p;
1089
1090         if(q->bypass != nil){
1091                 len = blocklen(b);
1092                 (*q->bypass)(q->arg, b);
1093                 return len;
1094         }
1095
1096         dowakeup = 0;
1097         if(waserror()){
1098                 freeblist(b);
1099                 nexterror();
1100         }
1101         ilock(q);
1102
1103         /* give up if the queue is closed */
1104         if(q->state & Qclosed){
1105                 iunlock(q);
1106                 error(q->err);
1107         }
1108
1109         /* don't queue over the limit */
1110         if(q->len >= q->limit && q->noblock){
1111                 iunlock(q);
1112                 poperror();
1113                 len = blocklen(b);
1114                 freeblist(b);
1115                 return len;
1116         }
1117
1118         len = qaddlist(q, b);
1119
1120         /* make sure other end gets awakened */
1121         if(q->state & Qstarve){
1122                 q->state &= ~Qstarve;
1123                 dowakeup = 1;
1124         }
1125         iunlock(q);
1126         poperror();
1127
1128         /*  get output going again */
1129         if(q->kick != nil && (dowakeup || (q->state&Qkick)))
1130                 q->kick(q->arg);
1131
1132         /* wakeup anyone consuming at the other end */
1133         if(dowakeup){
1134                 p = wakeup(&q->rr);
1135
1136                 /* if we just wokeup a higher priority process, let it run */
1137                 if(p != nil && p->priority > up->priority)
1138                         sched();
1139         }
1140
1141         /*
1142          *  flow control, before allowing the process to continue and
1143          *  queue more. We do this here so that postnote can only
1144          *  interrupt us after the data has been queued.  This means that
1145          *  things like 9p flushes and ssl messages will not be disrupted
1146          *  by software interrupts.
1147          */
1148         qflow(q);
1149
1150         return len;
1151 }
1152
1153 /*
1154  *  write to a queue.  only Maxatomic bytes at a time is atomic.
1155  */
1156 int
1157 qwrite(Queue *q, void *vp, int len)
1158 {
1159         int n, sofar;
1160         Block *b;
1161         uchar *p = vp;
1162
1163         QDEBUG if(!islo())
1164                 print("qwrite hi %#p\n", getcallerpc(&q));
1165
1166         /* stop queue bloat before allocating blocks */
1167         if(q->len/2 >= q->limit && q->noblock == 0 && q->bypass == nil){
1168                 while(waserror()){
1169                         if(up->procctl == Proc_exitme || up->procctl == Proc_exitbig)
1170                                 error(Egreg);
1171                 }
1172                 qflow(q);
1173                 poperror();
1174         }
1175
1176         sofar = 0;
1177         do {
1178                 n = len-sofar;
1179                 if(n > Maxatomic)
1180                         n = Maxatomic;
1181
1182                 b = allocb(n);
1183                 if(waserror()){
1184                         freeb(b);
1185                         nexterror();
1186                 }
1187                 memmove(b->wp, p+sofar, n);
1188                 poperror();
1189                 b->wp += n;
1190
1191                 sofar += qbwrite(q, b);
1192         } while(sofar < len && (q->state & Qmsg) == 0);
1193
1194         return len;
1195 }
1196
1197 /*
1198  *  used by print() to write to a queue.  Since we may be splhi or not in
1199  *  a process, don't qlock.
1200  */
1201 int
1202 qiwrite(Queue *q, void *vp, int len)
1203 {
1204         int n, sofar, dowakeup;
1205         Block *b;
1206         uchar *p = vp;
1207
1208         dowakeup = 0;
1209
1210         sofar = 0;
1211         do {
1212                 n = len-sofar;
1213                 if(n > Maxatomic)
1214                         n = Maxatomic;
1215
1216                 b = iallocb(n);
1217                 if(b == nil)
1218                         break;
1219                 memmove(b->wp, p+sofar, n);
1220                 b->wp += n;
1221
1222                 ilock(q);
1223
1224                 /* we use an artificially high limit for kernel prints since anything
1225                  * over the limit gets dropped
1226                  */
1227                 if((q->state & Qclosed) != 0 || q->len/2 >= q->limit){
1228                         iunlock(q);
1229                         freeb(b);
1230                         break;
1231                 }
1232
1233                 qaddlist(q, b);
1234
1235                 if(q->state & Qstarve){
1236                         q->state &= ~Qstarve;
1237                         dowakeup = 1;
1238                 }
1239
1240                 iunlock(q);
1241
1242                 if(dowakeup){
1243                         if(q->kick != nil)
1244                                 q->kick(q->arg);
1245                         wakeup(&q->rr);
1246                 }
1247
1248                 sofar += n;
1249         } while(sofar < len && (q->state & Qmsg) == 0);
1250
1251         return sofar;
1252 }
1253
1254 /*
1255  *  be extremely careful when calling this,
1256  *  as there is no reference accounting
1257  */
1258 void
1259 qfree(Queue *q)
1260 {
1261         qclose(q);
1262         free(q);
1263 }
1264
1265 /*
1266  *  Mark a queue as closed.  No further IO is permitted.
1267  *  All blocks are released.
1268  */
1269 void
1270 qclose(Queue *q)
1271 {
1272         Block *bfirst;
1273
1274         if(q == nil)
1275                 return;
1276
1277         /* mark it */
1278         ilock(q);
1279         q->state |= Qclosed;
1280         q->state &= ~(Qflow|Qstarve);
1281         kstrcpy(q->err, Ehungup, ERRMAX);
1282         bfirst = q->bfirst;
1283         q->bfirst = nil;
1284         q->len = 0;
1285         q->dlen = 0;
1286         q->noblock = 0;
1287         iunlock(q);
1288
1289         /* free queued blocks */
1290         freeblist(bfirst);
1291
1292         /* wake up readers/writers */
1293         wakeup(&q->rr);
1294         wakeup(&q->wr);
1295 }
1296
1297 /*
1298  *  Mark a queue as closed.  Wakeup any readers.  Don't remove queued
1299  *  blocks.
1300  */
1301 void
1302 qhangup(Queue *q, char *msg)
1303 {
1304         /* mark it */
1305         ilock(q);
1306         q->state |= Qclosed;
1307         if(msg == nil || *msg == '\0')
1308                 msg = Ehungup;
1309         kstrcpy(q->err, msg, ERRMAX);
1310         iunlock(q);
1311
1312         /* wake up readers/writers */
1313         wakeup(&q->rr);
1314         wakeup(&q->wr);
1315 }
1316
1317 /*
1318  *  return non-zero if the q is hungup
1319  */
1320 int
1321 qisclosed(Queue *q)
1322 {
1323         return q->state & Qclosed;
1324 }
1325
1326 /*
1327  *  mark a queue as no longer hung up
1328  */
1329 void
1330 qreopen(Queue *q)
1331 {
1332         ilock(q);
1333         q->state &= ~Qclosed;
1334         q->state |= Qstarve;
1335         q->eof = 0;
1336         q->limit = q->inilim;
1337         iunlock(q);
1338 }
1339
1340 /*
1341  *  return bytes queued
1342  */
1343 int
1344 qlen(Queue *q)
1345 {
1346         return q->dlen;
1347 }
1348
1349 /*
1350  * return space remaining before flow control
1351  */
1352 int
1353 qwindow(Queue *q)
1354 {
1355         int l;
1356
1357         l = q->limit - q->len;
1358         if(l < 0)
1359                 l = 0;
1360         return l;
1361 }
1362
1363 /*
1364  *  return true if we can read without blocking
1365  */
1366 int
1367 qcanread(Queue *q)
1368 {
1369         return q->bfirst != nil;
1370 }
1371
1372 /*
1373  *  change queue limit
1374  */
1375 void
1376 qsetlimit(Queue *q, int limit)
1377 {
1378         q->limit = limit;
1379 }
1380
1381 /*
1382  *  set blocking/nonblocking
1383  */
1384 void
1385 qnoblock(Queue *q, int onoff)
1386 {
1387         q->noblock = onoff;
1388 }
1389
1390 /*
1391  *  flush the output queue
1392  */
1393 void
1394 qflush(Queue *q)
1395 {
1396         Block *bfirst;
1397
1398         /* mark it */
1399         ilock(q);
1400         bfirst = q->bfirst;
1401         q->bfirst = nil;
1402         q->len = 0;
1403         q->dlen = 0;
1404         iunlock(q);
1405
1406         /* free queued blocks */
1407         freeblist(bfirst);
1408
1409         /* wake up writers */
1410         wakeup(&q->wr);
1411 }
1412
1413 int
1414 qfull(Queue *q)
1415 {
1416         return q->state & Qflow;
1417 }