]> git.lizzy.rs Git - plan9front.git/blob - sys/src/9/port/qio.c
pc kernel: fix wrong simd exception mask (fixes go bootstrap)
[plan9front.git] / sys / src / 9 / port / qio.c
1 #include        "u.h"
2 #include        "../port/lib.h"
3 #include        "mem.h"
4 #include        "dat.h"
5 #include        "fns.h"
6 #include        "../port/error.h"
7
8 static ulong padblockcnt;
9 static ulong concatblockcnt;
10 static ulong pullupblockcnt;
11 static ulong copyblockcnt;
12 static ulong consumecnt;
13 static ulong producecnt;
14
15 #define QDEBUG  if(0)
16
17 /*
18  *  IO queues
19  */
20 typedef struct Queue    Queue;
21
22 struct Queue
23 {
24         Lock;
25
26         Block*  bfirst;         /* buffer */
27         Block*  blast;
28
29         int     len;            /* bytes allocated to queue */
30         int     dlen;           /* data bytes in queue */
31         int     limit;          /* max bytes in queue */
32         int     inilim;         /* initial limit */
33         int     state;
34         int     noblock;        /* true if writes return immediately when q full */
35         int     eof;            /* number of eofs read by user */
36
37         void    (*kick)(void*); /* restart output */
38         void    (*bypass)(void*, Block*);       /* bypass queue altogether */
39         void*   arg;            /* argument to kick */
40
41         QLock   rlock;          /* mutex for reading processes */
42         Rendez  rr;             /* process waiting to read */
43         QLock   wlock;          /* mutex for writing processes */
44         Rendez  wr;             /* process waiting to write */
45
46         char    err[ERRMAX];
47 };
48
49 enum
50 {
51         Maxatomic       = 64*1024,
52 };
53
54 uint    qiomaxatomic = Maxatomic;
55
56 /*
57  *  free a list of blocks
58  */
59 void
60 freeblist(Block *b)
61 {
62         Block *next;
63
64         for(; b != nil; b = next){
65                 next = b->next;
66                 b->next = nil;
67                 freeb(b);
68         }
69 }
70
71 /*
72  *  pad a block to the front (or the back if size is negative)
73  */
74 Block*
75 padblock(Block *bp, int size)
76 {
77         int n;
78         Block *nbp;
79
80         QDEBUG checkb(bp, "padblock 0");
81         if(size >= 0){
82                 if(bp->rp - bp->base >= size){
83                         bp->rp -= size;
84                         return bp;
85                 }
86                 n = BLEN(bp);
87                 nbp = allocb(size+n);
88                 nbp->rp += size;
89                 nbp->wp = nbp->rp;
90                 memmove(nbp->wp, bp->rp, n);
91                 nbp->wp += n;
92                 nbp->rp -= size;
93         } else {
94                 size = -size;
95                 if(bp->lim - bp->wp >= size)
96                         return bp;
97                 n = BLEN(bp);
98                 nbp = allocb(n+size);
99                 memmove(nbp->wp, bp->rp, n);
100                 nbp->wp += n;
101         }
102         nbp->next = bp->next;
103         freeb(bp);
104         padblockcnt++;
105         QDEBUG checkb(nbp, "padblock 1");
106         return nbp;
107 }
108
109 /*
110  *  return count of bytes in a string of blocks
111  */
112 int
113 blocklen(Block *bp)
114 {
115         int len;
116
117         len = 0;
118         while(bp != nil) {
119                 len += BLEN(bp);
120                 bp = bp->next;
121         }
122         return len;
123 }
124
125 /*
126  * return count of space in blocks
127  */
128 int
129 blockalloclen(Block *bp)
130 {
131         int len;
132
133         len = 0;
134         while(bp != nil) {
135                 len += BALLOC(bp);
136                 bp = bp->next;
137         }
138         return len;
139 }
140
141 /*
142  *  copy the string of blocks into
143  *  a single block and free the string
144  */
145 Block*
146 concatblock(Block *bp)
147 {
148         int len;
149
150         if(bp->next == nil)
151                 return bp;
152         len = blocklen(bp);
153         concatblockcnt += len;
154         return pullupblock(bp, len);
155 }
156
157 /*
158  *  make sure the first block has at least n bytes
159  */
160 Block*
161 pullupblock(Block *bp, int n)
162 {
163         Block *nbp;
164         int i;
165
166         /*
167          *  this should almost always be true, it's
168          *  just to avoid every caller checking.
169          */
170         if(BLEN(bp) >= n)
171                 return bp;
172
173         /*
174          *  if not enough room in the first block,
175          *  add another to the front of the list.
176          */
177         if(bp->lim - bp->rp < n){
178                 nbp = allocb(n);
179                 nbp->next = bp;
180                 bp = nbp;
181         }
182
183         /*
184          *  copy bytes from the trailing blocks into the first
185          */
186         n -= BLEN(bp);
187         while((nbp = bp->next) != nil){
188                 pullupblockcnt++;
189                 i = BLEN(nbp);
190                 if(i > n) {
191                         memmove(bp->wp, nbp->rp, n);
192                         bp->wp += n;
193                         nbp->rp += n;
194                         QDEBUG checkb(bp, "pullupblock 1");
195                         return bp;
196                 } else {
197                         /* shouldn't happen but why crash if it does */
198                         if(i < 0){
199                                 print("pullup negative length packet, called from %#p\n",
200                                         getcallerpc(&bp));
201                                 i = 0;
202                         }
203                         memmove(bp->wp, nbp->rp, i);
204                         bp->wp += i;
205                         bp->next = nbp->next;
206                         nbp->next = nil;
207                         freeb(nbp);
208                         n -= i;
209                         if(n == 0){
210                                 QDEBUG checkb(bp, "pullupblock 2");
211                                 return bp;
212                         }
213                 }
214         }
215         freeb(bp);
216         return nil;
217 }
218
219 /*
220  *  make sure the first block has at least n bytes
221  */
222 Block*
223 pullupqueue(Queue *q, int n)
224 {
225         Block *b;
226
227         if(BLEN(q->bfirst) >= n)
228                 return q->bfirst;
229         q->bfirst = pullupblock(q->bfirst, n);
230         for(b = q->bfirst; b != nil && b->next != nil; b = b->next)
231                 ;
232         q->blast = b;
233         return q->bfirst;
234 }
235
236 /*
237  *  trim to len bytes starting at offset
238  */
239 Block *
240 trimblock(Block *bp, int offset, int len)
241 {
242         ulong l;
243         Block *nb, *startb;
244
245         QDEBUG checkb(bp, "trimblock 1");
246         if(blocklen(bp) < offset+len) {
247                 freeblist(bp);
248                 return nil;
249         }
250
251         while((l = BLEN(bp)) < offset) {
252                 offset -= l;
253                 nb = bp->next;
254                 bp->next = nil;
255                 freeb(bp);
256                 bp = nb;
257         }
258
259         startb = bp;
260         bp->rp += offset;
261
262         while((l = BLEN(bp)) < len) {
263                 len -= l;
264                 bp = bp->next;
265         }
266
267         bp->wp -= (BLEN(bp) - len);
268
269         if(bp->next != nil) {
270                 freeblist(bp->next);
271                 bp->next = nil;
272         }
273
274         return startb;
275 }
276
277 /*
278  *  copy 'count' bytes into a new block
279  */
280 Block*
281 copyblock(Block *bp, int count)
282 {
283         int l;
284         Block *nbp;
285
286         QDEBUG checkb(bp, "copyblock 0");
287         nbp = allocb(count);
288         for(; count > 0 && bp != nil; bp = bp->next){
289                 l = BLEN(bp);
290                 if(l > count)
291                         l = count;
292                 memmove(nbp->wp, bp->rp, l);
293                 nbp->wp += l;
294                 count -= l;
295         }
296         if(count > 0){
297                 memset(nbp->wp, 0, count);
298                 nbp->wp += count;
299         }
300         copyblockcnt++;
301         QDEBUG checkb(nbp, "copyblock 1");
302
303         return nbp;
304 }
305
306 Block*
307 adjustblock(Block* bp, int len)
308 {
309         int n;
310         Block *nbp;
311
312         if(len < 0){
313                 freeb(bp);
314                 return nil;
315         }
316
317         if(bp->rp+len > bp->lim){
318                 nbp = copyblock(bp, len);
319                 freeblist(bp);
320                 QDEBUG checkb(nbp, "adjustblock 1");
321
322                 return nbp;
323         }
324
325         n = BLEN(bp);
326         if(len > n)
327                 memset(bp->wp, 0, len-n);
328         bp->wp = bp->rp+len;
329         QDEBUG checkb(bp, "adjustblock 2");
330
331         return bp;
332 }
333
334
335 /*
336  *  throw away up to count bytes from a
337  *  list of blocks.  Return count of bytes
338  *  thrown away.
339  */
340 int
341 pullblock(Block **bph, int count)
342 {
343         Block *bp;
344         int n, bytes;
345
346         bytes = 0;
347         if(bph == nil)
348                 return 0;
349
350         while(*bph != nil && count != 0) {
351                 bp = *bph;
352                 n = BLEN(bp);
353                 if(count < n)
354                         n = count;
355                 bytes += n;
356                 count -= n;
357                 bp->rp += n;
358                 QDEBUG checkb(bp, "pullblock ");
359                 if(BLEN(bp) == 0) {
360                         *bph = bp->next;
361                         bp->next = nil;
362                         freeb(bp);
363                 }
364         }
365         return bytes;
366 }
367
368 /*
369  *  get next block from a queue, return null if nothing there
370  */
371 Block*
372 qget(Queue *q)
373 {
374         int dowakeup;
375         Block *b;
376
377         /* sync with qwrite */
378         ilock(q);
379
380         b = q->bfirst;
381         if(b == nil){
382                 q->state |= Qstarve;
383                 iunlock(q);
384                 return nil;
385         }
386         QDEBUG checkb(b, "qget");
387         q->bfirst = b->next;
388         b->next = nil;
389         q->len -= BALLOC(b);
390         q->dlen -= BLEN(b);
391
392         /* if writer flow controlled, restart */
393         if((q->state & Qflow) && q->len < q->limit/2){
394                 q->state &= ~Qflow;
395                 dowakeup = 1;
396         } else
397                 dowakeup = 0;
398
399         iunlock(q);
400
401         if(dowakeup)
402                 wakeup(&q->wr);
403
404         return b;
405 }
406
407 /*
408  *  throw away the next 'len' bytes in the queue
409  */
410 int
411 qdiscard(Queue *q, int len)
412 {
413         Block *b, *tofree = nil;
414         int dowakeup, n, sofar;
415
416         ilock(q);
417         for(sofar = 0; sofar < len; sofar += n){
418                 b = q->bfirst;
419                 if(b == nil)
420                         break;
421                 QDEBUG checkb(b, "qdiscard");
422                 n = BLEN(b);
423                 if(n <= len - sofar){
424                         q->bfirst = b->next;
425                         q->len -= BALLOC(b);
426                         q->dlen -= BLEN(b);
427
428                         /* remember to free this */
429                         b->next = tofree;
430                         tofree = b;
431                 } else {
432                         n = len - sofar;
433                         b->rp += n;
434                         q->dlen -= n;
435                 }
436         }
437
438         /*
439          *  if writer flow controlled, restart
440          *
441          *  This used to be
442          *      q->len < q->limit/2
443          *  but it slows down tcp too much for certain write sizes.
444          *  I really don't understand it completely.  It may be
445          *  due to the queue draining so fast that the transmission
446          *  stalls waiting for the app to produce more data.  - presotto
447          */
448         if((q->state & Qflow) && q->len < q->limit){
449                 q->state &= ~Qflow;
450                 dowakeup = 1;
451         } else
452                 dowakeup = 0;
453
454         iunlock(q);
455
456         if(dowakeup)
457                 wakeup(&q->wr);
458
459         if(tofree != nil)
460                 freeblist(tofree);
461
462         return sofar;
463 }
464
465 /*
466  *  Interrupt level copy out of a queue, return # bytes copied.
467  */
468 int
469 qconsume(Queue *q, void *vp, int len)
470 {
471         Block *b, *tofree = nil;
472         int n, dowakeup;
473         uchar *p = vp;
474
475         /* sync with qwrite */
476         ilock(q);
477
478         for(;;) {
479                 b = q->bfirst;
480                 if(b == nil){
481                         q->state |= Qstarve;
482                         len = -1;
483                         goto out;
484                 }
485                 QDEBUG checkb(b, "qconsume 1");
486
487                 n = BLEN(b);
488                 if(n > 0)
489                         break;
490                 q->bfirst = b->next;
491                 q->len -= BALLOC(b);
492
493                 /* remember to free this */
494                 b->next = tofree;
495                 tofree = b;
496         };
497
498         consumecnt += n;
499         if(n < len)
500                 len = n;
501         memmove(p, b->rp, len);
502         b->rp += len;
503         q->dlen -= len;
504
505         /* discard the block if we're done with it */
506         if((q->state & Qmsg) || len == n){
507                 q->bfirst = b->next;
508                 q->len -= BALLOC(b);
509                 q->dlen -= BLEN(b);
510
511                 /* remember to free this */
512                 b->next = tofree;
513                 tofree = b;
514         }
515
516 out:
517         /* if writer flow controlled, restart */
518         if((q->state & Qflow) && q->len < q->limit/2){
519                 q->state &= ~Qflow;
520                 dowakeup = 1;
521         } else
522                 dowakeup = 0;
523
524         iunlock(q);
525
526         if(dowakeup)
527                 wakeup(&q->wr);
528
529         if(tofree != nil)
530                 freeblist(tofree);
531
532         return len;
533 }
534
535 int
536 qpass(Queue *q, Block *b)
537 {
538         int len, dowakeup;
539
540         /* sync with qread */
541         dowakeup = 0;
542         ilock(q);
543         if(q->len >= q->limit){
544                 iunlock(q);
545                 freeblist(b);
546                 return -1;
547         }
548         if(q->state & Qclosed){
549                 iunlock(q);
550                 freeblist(b);
551                 return 0;
552         }
553
554         len = qaddlist(q, b);
555
556         if(q->len >= q->limit/2)
557                 q->state |= Qflow;
558
559         if(q->state & Qstarve){
560                 q->state &= ~Qstarve;
561                 dowakeup = 1;
562         }
563         iunlock(q);
564
565         if(dowakeup)
566                 wakeup(&q->rr);
567
568         return len;
569 }
570
571 int
572 qpassnolim(Queue *q, Block *b)
573 {
574         int len, dowakeup;
575
576         /* sync with qread */
577         dowakeup = 0;
578         ilock(q);
579
580         if(q->state & Qclosed){
581                 iunlock(q);
582                 freeblist(b);
583                 return 0;
584         }
585
586         len = qaddlist(q, b);
587
588         if(q->len >= q->limit/2)
589                 q->state |= Qflow;
590
591         if(q->state & Qstarve){
592                 q->state &= ~Qstarve;
593                 dowakeup = 1;
594         }
595         iunlock(q);
596
597         if(dowakeup)
598                 wakeup(&q->rr);
599
600         return len;
601 }
602
603 /*
604  *  if the allocated space is way out of line with the used
605  *  space, reallocate to a smaller block
606  */
607 Block*
608 packblock(Block *bp)
609 {
610         Block **l, *nbp;
611         int n;
612
613         for(l = &bp; (nbp = *l) != nil; l = &(*l)->next){
614                 n = BLEN(nbp);
615                 if((n<<2) < BALLOC(nbp)){
616                         *l = allocb(n);
617                         memmove((*l)->wp, nbp->rp, n);
618                         (*l)->wp += n;
619                         (*l)->next = nbp->next;
620                         freeb(nbp);
621                 }
622         }
623
624         return bp;
625 }
626
627 int
628 qproduce(Queue *q, void *vp, int len)
629 {
630         Block *b;
631         int dowakeup;
632         uchar *p = vp;
633
634         b = iallocb(len);
635         if(b == nil)
636                 return 0;
637
638         /* sync with qread */
639         dowakeup = 0;
640         ilock(q);
641
642         /* no waiting receivers, room in buffer? */
643         if(q->len >= q->limit){
644                 q->state |= Qflow;
645                 iunlock(q);
646                 return -1;
647         }
648         producecnt += len;
649
650         /* save in buffer */
651         memmove(b->wp, p, len);
652         b->wp += len;
653         qaddlist(q, b);
654
655         if(q->state & Qstarve){
656                 q->state &= ~Qstarve;
657                 dowakeup = 1;
658         }
659
660         if(q->len >= q->limit)
661                 q->state |= Qflow;
662         iunlock(q);
663
664         if(dowakeup)
665                 wakeup(&q->rr);
666
667         return len;
668 }
669
670 /*
671  *  copy from offset in the queue
672  */
673 Block*
674 qcopy(Queue *q, int len, ulong offset)
675 {
676         Block *b;
677
678         b = allocb(len);
679         ilock(q);
680         b->wp += readblist(q->bfirst, b->wp, len, offset);
681         iunlock(q);
682         return b;
683 }
684
685 /*
686  *  called by non-interrupt code
687  */
688 Queue*
689 qopen(int limit, int msg, void (*kick)(void*), void *arg)
690 {
691         Queue *q;
692
693         q = malloc(sizeof(Queue));
694         if(q == nil)
695                 return nil;
696
697         q->limit = q->inilim = limit;
698         q->kick = kick;
699         q->arg = arg;
700         q->state = msg;
701         
702         q->state |= Qstarve;
703         q->eof = 0;
704         q->noblock = 0;
705
706         return q;
707 }
708
709 /* open a queue to be bypassed */
710 Queue*
711 qbypass(void (*bypass)(void*, Block*), void *arg)
712 {
713         Queue *q;
714
715         q = malloc(sizeof(Queue));
716         if(q == nil)
717                 return nil;
718
719         q->limit = 0;
720         q->arg = arg;
721         q->bypass = bypass;
722         q->state = 0;
723
724         return q;
725 }
726
727 static int
728 notempty(void *a)
729 {
730         Queue *q = a;
731
732         return (q->state & Qclosed) || q->bfirst != nil;
733 }
734
735 /*
736  *  wait for the queue to be non-empty or closed.
737  *  called with q ilocked.
738  */
739 static int
740 qwait(Queue *q)
741 {
742         /* wait for data */
743         for(;;){
744                 if(q->bfirst != nil)
745                         break;
746
747                 if(q->state & Qclosed){
748                         if(++q->eof > 3)
749                                 return -1;
750                         if(*q->err && strcmp(q->err, Ehungup) != 0)
751                                 return -1;
752                         return 0;
753                 }
754
755                 q->state |= Qstarve;    /* flag requesting producer to wake me */
756                 iunlock(q);
757                 sleep(&q->rr, notempty, q);
758                 ilock(q);
759         }
760         return 1;
761 }
762
763 /*
764  * add a block list to a queue, return bytes added
765  */
766 int
767 qaddlist(Queue *q, Block *b)
768 {
769         int len, dlen;
770
771         QDEBUG checkb(b, "qaddlist 1");
772
773         /* queue the block */
774         if(q->bfirst != nil)
775                 q->blast->next = b;
776         else
777                 q->bfirst = b;
778
779         len = BALLOC(b);
780         dlen = BLEN(b);
781         while(b->next != nil){
782                 b = b->next;
783                 QDEBUG checkb(b, "qaddlist 2");
784
785                 len += BALLOC(b);
786                 dlen += BLEN(b);
787         }
788         q->blast = b;
789         q->len += len;
790         q->dlen += dlen;
791         return dlen;
792 }
793
794 /*
795  *  called with q ilocked
796  */
797 Block*
798 qremove(Queue *q)
799 {
800         Block *b;
801
802         b = q->bfirst;
803         if(b == nil)
804                 return nil;
805         QDEBUG checkb(b, "qremove");
806         q->bfirst = b->next;
807         b->next = nil;
808         q->dlen -= BLEN(b);
809         q->len -= BALLOC(b);
810         return b;
811 }
812
813 /*
814  *  copy the contents of a string of blocks into
815  *  memory from an offset. blocklist kept unchanged.
816  *  return number of copied bytes.
817  */
818 long
819 readblist(Block *b, uchar *p, long n, ulong o)
820 {
821         ulong m, r;
822
823         r = 0;
824         while(n > 0 && b != nil){
825                 m = BLEN(b);
826                 if(o >= m)
827                         o -= m;
828                 else {
829                         m -= o;
830                         if(n < m)
831                                 m = n;
832                         memmove(p, b->rp + o, m);
833                         p += m;
834                         r += m;
835                         n -= m;
836                         o = 0;
837                 }
838                 b = b->next;
839         }
840         return r;
841 }
842
843 /*
844  *  put a block back to the front of the queue
845  *  called with q ilocked
846  */
847 void
848 qputback(Queue *q, Block *b)
849 {
850         b->next = q->bfirst;
851         if(q->bfirst == nil)
852                 q->blast = b;
853         q->bfirst = b;
854         q->len += BALLOC(b);
855         q->dlen += BLEN(b);
856 }
857
858 /*
859  *  cut off n bytes from the end of *h. return a new
860  *  block with the tail and change *h to refer to the
861  *  head.
862  */
863 static Block*
864 splitblock(Block **h, int n)
865 {
866         Block *a, *b;
867         int m;
868
869         a = *h;
870         m = BLEN(a) - n;
871         if(m < n){
872                 b = allocb(m);
873                 memmove(b->wp, a->rp, m);
874                 b->wp += m;
875                 a->rp += m;
876                 *h = b;
877                 return a;
878         } else {
879                 b = allocb(n);
880                 a->wp -= n;
881                 memmove(b->wp, a->wp, n);
882                 b->wp += n;
883                 return b;
884         }
885 }
886
887 /*
888  *  flow control, get producer going again
889  *  called with q ilocked
890  */
891 static void
892 qwakeup_iunlock(Queue *q)
893 {
894         int dowakeup = 0;
895
896         /* if writer flow controlled, restart */
897         if((q->state & Qflow) && q->len < q->limit/2){
898                 q->state &= ~Qflow;
899                 dowakeup = 1;
900         }
901
902         iunlock(q);
903
904         /* wakeup flow controlled writers */
905         if(dowakeup){
906                 if(q->kick != nil)
907                         q->kick(q->arg);
908                 wakeup(&q->wr);
909         }
910 }
911
912 /*
913  *  get next block from a queue (up to a limit)
914  */
915 Block*
916 qbread(Queue *q, int len)
917 {
918         Block *b;
919         int n;
920
921         eqlock(&q->rlock);
922         if(waserror()){
923                 qunlock(&q->rlock);
924                 nexterror();
925         }
926
927         ilock(q);
928         switch(qwait(q)){
929         case 0:
930                 /* queue closed */
931                 iunlock(q);
932                 qunlock(&q->rlock);
933                 poperror();
934                 return nil;
935         case -1:
936                 /* multiple reads on a closed queue */
937                 iunlock(q);
938                 error(q->err);
939         }
940
941         /* if we get here, there's at least one block in the queue */
942         b = qremove(q);
943         n = BLEN(b);
944
945         /* split block if it's too big and this is not a message queue */
946         if(n > len){
947                 n -= len;
948                 if((q->state & Qmsg) == 0)
949                         qputback(q, splitblock(&b, n));
950                 else
951                         b->wp -= n;
952         }
953
954         /* restart producer */
955         qwakeup_iunlock(q);
956
957         qunlock(&q->rlock);
958         poperror();
959
960         return b;
961 }
962
963 /*
964  *  read a queue.  if no data is queued, post a Block
965  *  and wait on its Rendez.
966  */
967 long
968 qread(Queue *q, void *vp, int len)
969 {
970         Block *b, *first, **last;
971         int m, n;
972
973         eqlock(&q->rlock);
974         if(waserror()){
975                 qunlock(&q->rlock);
976                 nexterror();
977         }
978
979         ilock(q);
980 again:
981         switch(qwait(q)){
982         case 0:
983                 /* queue closed */
984                 iunlock(q);
985                 qunlock(&q->rlock);
986                 poperror();
987                 return 0;
988         case -1:
989                 /* multiple reads on a closed queue */
990                 iunlock(q);
991                 error(q->err);
992         }
993
994         /* if we get here, there's at least one block in the queue */
995         last = &first;
996         if(q->state & Qcoalesce){
997                 /* when coalescing, 0 length blocks just go away */
998                 b = q->bfirst;
999                 m = BLEN(b);
1000                 if(m <= 0){
1001                         freeb(qremove(q));
1002                         goto again;
1003                 }
1004
1005                 /*  grab the first block plus as many
1006                  *  following blocks as will partially
1007                  *  fit in the read.
1008                  */
1009                 n = 0;
1010                 for(;;) {
1011                         *last = qremove(q);
1012                         n += m;
1013                         if(n >= len || q->bfirst == nil)
1014                                 break;
1015                         last = &b->next;
1016                         b = q->bfirst;
1017                         m = BLEN(b);
1018                 }
1019         } else {
1020                 first = qremove(q);
1021                 n = BLEN(first);
1022         }
1023
1024         /* split last block if it's too big and this is not a message queue */
1025         if(n > len && (q->state & Qmsg) == 0)
1026                 qputback(q, splitblock(last, n - len));
1027
1028         /* restart producer */
1029         qwakeup_iunlock(q);
1030
1031         qunlock(&q->rlock);
1032         poperror();
1033
1034         if(waserror()){
1035                 freeblist(first);
1036                 nexterror();
1037         }
1038         n = readblist(first, vp, len, 0);
1039         freeblist(first);
1040         poperror();
1041
1042         return n;
1043 }
1044
1045 static int
1046 qnotfull(void *a)
1047 {
1048         Queue *q = a;
1049
1050         return q->len < q->limit || (q->state & Qclosed);
1051 }
1052
1053 /*
1054  *  flow control, wait for queue to get below the limit
1055  */
1056 static void
1057 qflow(Queue *q)
1058 {
1059         for(;;){
1060                 if(q->noblock || qnotfull(q))
1061                         break;
1062
1063                 ilock(q);
1064                 q->state |= Qflow;
1065                 iunlock(q);
1066
1067                 eqlock(&q->wlock);
1068                 if(waserror()){
1069                         qunlock(&q->wlock);
1070                         nexterror();
1071                 }
1072                 sleep(&q->wr, qnotfull, q);
1073                 qunlock(&q->wlock);
1074                 poperror();
1075         }
1076 }
1077
1078 /*
1079  *  add a block to a queue obeying flow control
1080  */
1081 long
1082 qbwrite(Queue *q, Block *b)
1083 {
1084         int len, dowakeup;
1085         Proc *p;
1086
1087         if(q->bypass != nil){
1088                 len = blocklen(b);
1089                 (*q->bypass)(q->arg, b);
1090                 return len;
1091         }
1092
1093         dowakeup = 0;
1094         if(waserror()){
1095                 freeblist(b);
1096                 nexterror();
1097         }
1098         ilock(q);
1099
1100         /* give up if the queue is closed */
1101         if(q->state & Qclosed){
1102                 iunlock(q);
1103                 error(q->err);
1104         }
1105
1106         /* don't queue over the limit */
1107         if(q->len >= q->limit && q->noblock){
1108                 iunlock(q);
1109                 poperror();
1110                 len = blocklen(b);
1111                 freeblist(b);
1112                 return len;
1113         }
1114
1115         len = qaddlist(q, b);
1116
1117         /* make sure other end gets awakened */
1118         if(q->state & Qstarve){
1119                 q->state &= ~Qstarve;
1120                 dowakeup = 1;
1121         }
1122         iunlock(q);
1123         poperror();
1124
1125         /*  get output going again */
1126         if(q->kick != nil && (dowakeup || (q->state&Qkick)))
1127                 q->kick(q->arg);
1128
1129         /* wakeup anyone consuming at the other end */
1130         if(dowakeup){
1131                 p = wakeup(&q->rr);
1132
1133                 /* if we just wokeup a higher priority process, let it run */
1134                 if(p != nil && p->priority > up->priority)
1135                         sched();
1136         }
1137
1138         /*
1139          *  flow control, before allowing the process to continue and
1140          *  queue more. We do this here so that postnote can only
1141          *  interrupt us after the data has been queued.  This means that
1142          *  things like 9p flushes and ssl messages will not be disrupted
1143          *  by software interrupts.
1144          */
1145         qflow(q);
1146
1147         return len;
1148 }
1149
1150 /*
1151  *  write to a queue.  only Maxatomic bytes at a time is atomic.
1152  */
1153 int
1154 qwrite(Queue *q, void *vp, int len)
1155 {
1156         int n, sofar;
1157         Block *b;
1158         uchar *p = vp;
1159
1160         QDEBUG if(!islo())
1161                 print("qwrite hi %#p\n", getcallerpc(&q));
1162
1163         /* stop queue bloat before allocating blocks */
1164         if(q->len/2 >= q->limit && q->noblock == 0 && q->bypass == nil){
1165                 while(waserror()){
1166                         if(up->procctl == Proc_exitme || up->procctl == Proc_exitbig)
1167                                 error(Egreg);
1168                 }
1169                 qflow(q);
1170                 poperror();
1171         }
1172
1173         sofar = 0;
1174         do {
1175                 n = len-sofar;
1176                 if(n > Maxatomic)
1177                         n = Maxatomic;
1178
1179                 b = allocb(n);
1180                 if(waserror()){
1181                         freeb(b);
1182                         nexterror();
1183                 }
1184                 memmove(b->wp, p+sofar, n);
1185                 poperror();
1186                 b->wp += n;
1187
1188                 sofar += qbwrite(q, b);
1189         } while(sofar < len && (q->state & Qmsg) == 0);
1190
1191         return len;
1192 }
1193
1194 /*
1195  *  used by print() to write to a queue.  Since we may be splhi or not in
1196  *  a process, don't qlock.
1197  */
1198 int
1199 qiwrite(Queue *q, void *vp, int len)
1200 {
1201         int n, sofar, dowakeup;
1202         Block *b;
1203         uchar *p = vp;
1204
1205         dowakeup = 0;
1206
1207         sofar = 0;
1208         do {
1209                 n = len-sofar;
1210                 if(n > Maxatomic)
1211                         n = Maxatomic;
1212
1213                 b = iallocb(n);
1214                 if(b == nil)
1215                         break;
1216                 memmove(b->wp, p+sofar, n);
1217                 b->wp += n;
1218
1219                 ilock(q);
1220
1221                 /* we use an artificially high limit for kernel prints since anything
1222                  * over the limit gets dropped
1223                  */
1224                 if((q->state & Qclosed) != 0 || q->len/2 >= q->limit){
1225                         iunlock(q);
1226                         freeb(b);
1227                         break;
1228                 }
1229
1230                 qaddlist(q, b);
1231
1232                 if(q->state & Qstarve){
1233                         q->state &= ~Qstarve;
1234                         dowakeup = 1;
1235                 }
1236
1237                 iunlock(q);
1238
1239                 if(dowakeup){
1240                         if(q->kick != nil)
1241                                 q->kick(q->arg);
1242                         wakeup(&q->rr);
1243                 }
1244
1245                 sofar += n;
1246         } while(sofar < len && (q->state & Qmsg) == 0);
1247
1248         return sofar;
1249 }
1250
1251 /*
1252  *  be extremely careful when calling this,
1253  *  as there is no reference accounting
1254  */
1255 void
1256 qfree(Queue *q)
1257 {
1258         qclose(q);
1259         free(q);
1260 }
1261
1262 /*
1263  *  Mark a queue as closed.  No further IO is permitted.
1264  *  All blocks are released.
1265  */
1266 void
1267 qclose(Queue *q)
1268 {
1269         Block *bfirst;
1270
1271         if(q == nil)
1272                 return;
1273
1274         /* mark it */
1275         ilock(q);
1276         q->state |= Qclosed;
1277         q->state &= ~(Qflow|Qstarve);
1278         kstrcpy(q->err, Ehungup, ERRMAX);
1279         bfirst = q->bfirst;
1280         q->bfirst = nil;
1281         q->len = 0;
1282         q->dlen = 0;
1283         q->noblock = 0;
1284         iunlock(q);
1285
1286         /* free queued blocks */
1287         freeblist(bfirst);
1288
1289         /* wake up readers/writers */
1290         wakeup(&q->rr);
1291         wakeup(&q->wr);
1292 }
1293
1294 /*
1295  *  Mark a queue as closed.  Wakeup any readers.  Don't remove queued
1296  *  blocks.
1297  */
1298 void
1299 qhangup(Queue *q, char *msg)
1300 {
1301         /* mark it */
1302         ilock(q);
1303         q->state |= Qclosed;
1304         if(msg == nil || *msg == '\0')
1305                 msg = Ehungup;
1306         kstrcpy(q->err, msg, ERRMAX);
1307         iunlock(q);
1308
1309         /* wake up readers/writers */
1310         wakeup(&q->rr);
1311         wakeup(&q->wr);
1312 }
1313
1314 /*
1315  *  return non-zero if the q is hungup
1316  */
1317 int
1318 qisclosed(Queue *q)
1319 {
1320         return q->state & Qclosed;
1321 }
1322
1323 /*
1324  *  mark a queue as no longer hung up
1325  */
1326 void
1327 qreopen(Queue *q)
1328 {
1329         ilock(q);
1330         q->state &= ~Qclosed;
1331         q->state |= Qstarve;
1332         q->eof = 0;
1333         q->limit = q->inilim;
1334         iunlock(q);
1335 }
1336
1337 /*
1338  *  return bytes queued
1339  */
1340 int
1341 qlen(Queue *q)
1342 {
1343         return q->dlen;
1344 }
1345
1346 /*
1347  * return space remaining before flow control
1348  */
1349 int
1350 qwindow(Queue *q)
1351 {
1352         int l;
1353
1354         l = q->limit - q->len;
1355         if(l < 0)
1356                 l = 0;
1357         return l;
1358 }
1359
1360 /*
1361  *  return true if we can read without blocking
1362  */
1363 int
1364 qcanread(Queue *q)
1365 {
1366         return q->bfirst != nil;
1367 }
1368
1369 /*
1370  *  change queue limit
1371  */
1372 void
1373 qsetlimit(Queue *q, int limit)
1374 {
1375         q->limit = limit;
1376 }
1377
1378 /*
1379  *  set blocking/nonblocking
1380  */
1381 void
1382 qnoblock(Queue *q, int onoff)
1383 {
1384         q->noblock = onoff;
1385 }
1386
1387 /*
1388  *  flush the output queue
1389  */
1390 void
1391 qflush(Queue *q)
1392 {
1393         Block *bfirst;
1394
1395         /* mark it */
1396         ilock(q);
1397         bfirst = q->bfirst;
1398         q->bfirst = nil;
1399         q->len = 0;
1400         q->dlen = 0;
1401         iunlock(q);
1402
1403         /* free queued blocks */
1404         freeblist(bfirst);
1405
1406         /* wake up writers */
1407         wakeup(&q->wr);
1408 }
1409
1410 int
1411 qfull(Queue *q)
1412 {
1413         return q->state & Qflow;
1414 }