static ulong copyblockcnt;
static ulong consumecnt;
static ulong producecnt;
-static ulong qcopycnt;
-
-static int debugging;
#define QDEBUG if(0)
uint qiomaxatomic = Maxatomic;
-void
-ixsummary(void)
-{
- debugging ^= 1;
- iallocsummary();
- print("pad %lud, concat %lud, pullup %lud, copy %lud\n",
- padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
- print("consume %lud, produce %lud, qcopy %lud\n",
- consumecnt, producecnt, qcopycnt);
-}
-
/*
* free a list of blocks
*/
{
Block *next;
- for(; b != 0; b = next){
+ for(; b != nil; b = next){
next = b->next;
- if(b->ref == 1)
- b->next = nil;
+ b->next = nil;
freeb(b);
}
}
int n;
Block *nbp;
- QDEBUG checkb(bp, "padblock 1");
+ QDEBUG checkb(bp, "padblock 0");
if(size >= 0){
if(bp->rp - bp->base >= size){
bp->rp -= size;
return bp;
}
-
- if(bp->next)
- panic("padblock %#p", getcallerpc(&bp));
n = BLEN(bp);
- padblockcnt++;
nbp = allocb(size+n);
nbp->rp += size;
nbp->wp = nbp->rp;
memmove(nbp->wp, bp->rp, n);
nbp->wp += n;
- freeb(bp);
nbp->rp -= size;
} else {
size = -size;
-
- if(bp->next)
- panic("padblock %#p", getcallerpc(&bp));
-
if(bp->lim - bp->wp >= size)
return bp;
-
n = BLEN(bp);
- padblockcnt++;
- nbp = allocb(size+n);
+ nbp = allocb(n+size);
memmove(nbp->wp, bp->rp, n);
nbp->wp += n;
- freeb(bp);
}
+ nbp->next = bp->next;
+ freeb(bp);
+ padblockcnt++;
QDEBUG checkb(nbp, "padblock 1");
return nbp;
}
int len;
len = 0;
- while(bp) {
+ while(bp != nil) {
len += BLEN(bp);
bp = bp->next;
}
int len;
len = 0;
- while(bp) {
+ while(bp != nil) {
len += BALLOC(bp);
bp = bp->next;
}
}
/*
- * copy the string of blocks into
+ * copy the string of blocks into
* a single block and free the string
*/
Block*
concatblock(Block *bp)
{
int len;
- Block *nb, *f;
- if(bp->next == 0)
+ if(bp->next == nil)
return bp;
-
- nb = allocb(blocklen(bp));
- for(f = bp; f; f = f->next) {
- len = BLEN(f);
- memmove(nb->wp, f->rp, len);
- nb->wp += len;
- }
- concatblockcnt += BLEN(nb);
- freeblist(bp);
- QDEBUG checkb(nb, "concatblock 1");
- return nb;
+ len = blocklen(bp);
+ concatblockcnt += len;
+ return pullupblock(bp, len);
}
/*
Block*
pullupblock(Block *bp, int n)
{
- int i;
Block *nbp;
+ int i;
/*
* this should almost always be true, it's
* copy bytes from the trailing blocks into the first
*/
n -= BLEN(bp);
- while(nbp = bp->next){
+ while((nbp = bp->next) != nil){
+ pullupblockcnt++;
i = BLEN(nbp);
if(i > n) {
memmove(bp->wp, nbp->rp, n);
- pullupblockcnt++;
bp->wp += n;
nbp->rp += n;
QDEBUG checkb(bp, "pullupblock 1");
i = 0;
}
memmove(bp->wp, nbp->rp, i);
- pullupblockcnt++;
bp->wp += i;
bp->next = nbp->next;
- nbp->next = 0;
+ nbp->next = nil;
freeb(nbp);
n -= i;
if(n == 0){
}
}
freeb(bp);
- return 0;
+ return nil;
}
/*
Block *nb, *startb;
QDEBUG checkb(bp, "trimblock 1");
- if(blocklen(bp) < offset+len) {
+ l = blocklen(bp);
+ if(offset == 0 && len == l)
+ return bp;
+ if(l < offset+len) {
freeblist(bp);
return nil;
}
bp->wp -= (BLEN(bp) - len);
- if(bp->next) {
+ if(bp->next != nil) {
freeblist(bp->next);
bp->next = nil;
}
QDEBUG checkb(bp, "copyblock 0");
nbp = allocb(count);
- for(; count > 0 && bp != 0; bp = bp->next){
+ for(; count > 0 && bp != nil; bp = bp->next){
l = BLEN(bp);
if(l > count)
l = count;
iunlock(q);
return nil;
}
+ QDEBUG checkb(b, "qget");
q->bfirst = b->next;
- b->next = 0;
+ b->next = nil;
q->len -= BALLOC(b);
q->dlen -= BLEN(b);
- QDEBUG checkb(b, "qget");
/* if writer flow controlled, restart */
if((q->state & Qflow) && q->len < q->limit/2){
int
qdiscard(Queue *q, int len)
{
- Block *b;
+ Block *b, *tofree = nil;
int dowakeup, n, sofar;
ilock(q);
n = BLEN(b);
if(n <= len - sofar){
q->bfirst = b->next;
- b->next = 0;
q->len -= BALLOC(b);
q->dlen -= BLEN(b);
- freeb(b);
+
+ /* remember to free this */
+ b->next = tofree;
+ tofree = b;
} else {
n = len - sofar;
b->rp += n;
if(dowakeup)
wakeup(&q->wr);
+ if(tofree != nil)
+ freeblist(tofree);
+
return sofar;
}
int
qconsume(Queue *q, void *vp, int len)
{
- Block *b;
+ Block *b, *tofree = nil;
int n, dowakeup;
uchar *p = vp;
- Block *tofree = nil;
/* sync with qwrite */
ilock(q);
for(;;) {
b = q->bfirst;
- if(b == 0){
+ if(b == nil){
q->state |= Qstarve;
- iunlock(q);
- return -1;
+ len = -1;
+ goto out;
}
QDEBUG checkb(b, "qconsume 1");
tofree = b;
};
+ consumecnt += n;
if(n < len)
len = n;
memmove(p, b->rp, len);
- consumecnt += n;
b->rp += len;
q->dlen -= len;
/* discard the block if we're done with it */
if((q->state & Qmsg) || len == n){
q->bfirst = b->next;
- b->next = 0;
q->len -= BALLOC(b);
q->dlen -= BLEN(b);
tofree = b;
}
+out:
/* if writer flow controlled, restart */
if((q->state & Qflow) && q->len < q->limit/2){
q->state &= ~Qflow;
int
qpass(Queue *q, Block *b)
{
- int dlen, len, dowakeup;
+ int len, dowakeup;
/* sync with qread */
dowakeup = 0;
ilock(q);
if(q->len >= q->limit){
- freeblist(b);
iunlock(q);
+ freeblist(b);
return -1;
}
if(q->state & Qclosed){
- len = BALLOC(b);
- freeblist(b);
iunlock(q);
- return len;
+ freeblist(b);
+ return 0;
}
- /* add buffer to queue */
- if(q->bfirst)
- q->blast->next = b;
- else
- q->bfirst = b;
- len = BALLOC(b);
- dlen = BLEN(b);
- QDEBUG checkb(b, "qpass");
- while(b->next){
- b = b->next;
- QDEBUG checkb(b, "qpass");
- len += BALLOC(b);
- dlen += BLEN(b);
- }
- q->blast = b;
- q->len += len;
- q->dlen += dlen;
+ len = qaddlist(q, b);
if(q->len >= q->limit/2)
q->state |= Qflow;
int
qpassnolim(Queue *q, Block *b)
{
- int dlen, len, dowakeup;
+ int len, dowakeup;
/* sync with qread */
dowakeup = 0;
ilock(q);
if(q->state & Qclosed){
- freeblist(b);
iunlock(q);
- return BALLOC(b);
+ freeblist(b);
+ return 0;
}
- /* add buffer to queue */
- if(q->bfirst)
- q->blast->next = b;
- else
- q->bfirst = b;
- len = BALLOC(b);
- dlen = BLEN(b);
- QDEBUG checkb(b, "qpass");
- while(b->next){
- b = b->next;
- QDEBUG checkb(b, "qpass");
- len += BALLOC(b);
- dlen += BLEN(b);
- }
- q->blast = b;
- q->len += len;
- q->dlen += dlen;
+ len = qaddlist(q, b);
if(q->len >= q->limit/2)
q->state |= Qflow;
Block **l, *nbp;
int n;
- for(l = &bp; *l; l = &(*l)->next){
- nbp = *l;
+ for(l = &bp; (nbp = *l) != nil; l = &(*l)->next){
n = BLEN(nbp);
if((n<<2) < BALLOC(nbp)){
*l = allocb(n);
int dowakeup;
uchar *p = vp;
+ b = iallocb(len);
+ if(b == nil)
+ return 0;
+
/* sync with qread */
dowakeup = 0;
ilock(q);
iunlock(q);
return -1;
}
+ producecnt += len;
/* save in buffer */
- b = iallocb(len);
- if(b == 0){
- iunlock(q);
- return 0;
- }
memmove(b->wp, p, len);
- producecnt += len;
b->wp += len;
- if(q->bfirst)
- q->blast->next = b;
- else
- q->bfirst = b;
- q->blast = b;
- /* b->next = 0; done by iallocb() */
- q->len += BALLOC(b);
- q->dlen += BLEN(b);
- QDEBUG checkb(b, "qproduce");
+ qaddlist(q, b);
if(q->state & Qstarve){
q->state &= ~Qstarve;
Block*
qcopy(Queue *q, int len, ulong offset)
{
- int sofar;
- int n;
- Block *b, *nb;
- uchar *p;
-
- nb = allocb(len);
+ Block *b;
+ b = allocb(len);
ilock(q);
-
- /* go to offset */
- b = q->bfirst;
- for(sofar = 0; ; sofar += n){
- if(b == nil){
- iunlock(q);
- return nb;
- }
- n = BLEN(b);
- if(sofar + n > offset){
- p = b->rp + offset - sofar;
- n -= offset - sofar;
- break;
- }
- QDEBUG checkb(b, "qcopy");
- b = b->next;
- }
-
- /* copy bytes from there */
- for(sofar = 0; sofar < len;){
- if(n > len - sofar)
- n = len - sofar;
- memmove(nb->wp, p, n);
- qcopycnt += n;
- sofar += n;
- nb->wp += n;
- b = b->next;
- if(b == nil)
- break;
- n = BLEN(b);
- p = b->rp;
- }
+ b->wp += readblist(q->bfirst, b->wp, len, offset);
iunlock(q);
-
- return nb;
+ return b;
}
/*
Queue *q;
q = malloc(sizeof(Queue));
- if(q == 0)
- return 0;
+ if(q == nil)
+ return nil;
q->limit = q->inilim = limit;
q->kick = kick;
Queue *q;
q = malloc(sizeof(Queue));
- if(q == 0)
- return 0;
+ if(q == nil)
+ return nil;
q->limit = 0;
q->arg = arg;
{
Queue *q = a;
- return (q->state & Qclosed) || q->bfirst != 0;
+ return (q->state & Qclosed) || q->bfirst != nil;
}
/*
}
/*
- * add a block list to a queue
+ * add a block list to a queue, return bytes added
*/
-void
+int
qaddlist(Queue *q, Block *b)
{
+ int len, dlen;
+
+ QDEBUG checkb(b, "qaddlist 1");
+
/* queue the block */
- if(q->bfirst)
+ if(q->bfirst != nil)
q->blast->next = b;
else
q->bfirst = b;
- q->len += blockalloclen(b);
- q->dlen += blocklen(b);
- while(b->next)
+
+ len = BALLOC(b);
+ dlen = BLEN(b);
+ while(b->next != nil){
b = b->next;
+ QDEBUG checkb(b, "qaddlist 2");
+
+ len += BALLOC(b);
+ dlen += BLEN(b);
+ }
q->blast = b;
+ q->len += len;
+ q->dlen += dlen;
+ return dlen;
}
/*
b = q->bfirst;
if(b == nil)
return nil;
+ QDEBUG checkb(b, "qremove");
q->bfirst = b->next;
b->next = nil;
q->dlen -= BLEN(b);
q->len -= BALLOC(b);
- QDEBUG checkb(b, "qremove");
return b;
}
/*
* copy the contents of a string of blocks into
- * memory. emptied blocks are freed. return
- * pointer to first unconsumed block.
+ * memory from an offset. blocklist kept unchanged.
+ * return number of copied bytes.
*/
-Block*
-bl2mem(uchar *p, Block *b, int n)
+long
+readblist(Block *b, uchar *p, long n, ulong o)
{
- int i;
- Block *next;
+ ulong m, r;
- for(; b != nil; b = next){
- i = BLEN(b);
- if(i > n){
- memmove(p, b->rp, n);
- b->rp += n;
- return b;
+ r = 0;
+ while(n > 0 && b != nil){
+ m = BLEN(b);
+ if(o >= m)
+ o -= m;
+ else {
+ m -= o;
+ if(n < m)
+ m = n;
+ memmove(p, b->rp + o, m);
+ p += m;
+ r += m;
+ n -= m;
+ o = 0;
}
- memmove(p, b->rp, i);
- n -= i;
- p += i;
- b->rp += i;
- next = b->next;
- freeb(b);
- }
- return nil;
-}
-
-/*
- * copy the contents of memory into a string of blocks.
- * return nil on error.
- */
-Block*
-mem2bl(uchar *p, int len)
-{
- int n;
- Block *b, *first, **l;
-
- first = nil;
- l = &first;
- if(waserror()){
- freeblist(first);
- nexterror();
+ b = b->next;
}
- do {
- n = len;
- if(n > Maxatomic)
- n = Maxatomic;
-
- *l = b = allocb(n);
- setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]);
- memmove(b->wp, p, n);
- b->wp += n;
- p += n;
- len -= n;
- l = &b->next;
- } while(len > 0);
- poperror();
-
- return first;
+ return r;
}
/*
q->dlen += BLEN(b);
}
+/*
+ * cut off n bytes from the end of *h. return a new
+ * block with the tail and change *h to refer to the
+ * head.
+ */
+static Block*
+splitblock(Block **h, int n)
+{
+ Block *a, *b;
+ int m;
+
+ a = *h;
+ m = BLEN(a) - n;
+ if(m < n){
+ b = allocb(m);
+ memmove(b->wp, a->rp, m);
+ b->wp += m;
+ a->rp += m;
+ *h = b;
+ return a;
+ } else {
+ b = allocb(n);
+ a->wp -= n;
+ memmove(b->wp, a->wp, n);
+ b->wp += n;
+ return b;
+ }
+}
+
/*
* flow control, get producer going again
* called with q ilocked
/* wakeup flow controlled writers */
if(dowakeup){
- if(q->kick)
+ if(q->kick != nil)
q->kick(q->arg);
wakeup(&q->wr);
}
Block*
qbread(Queue *q, int len)
{
- Block *b, *nb;
+ Block *b;
int n;
eqlock(&q->rlock);
n = BLEN(b);
/* split block if it's too big and this is not a message queue */
- nb = b;
if(n > len){
- if((q->state&Qmsg) == 0){
- n -= len;
- b = allocb(n);
- memmove(b->wp, nb->rp+len, n);
- b->wp += n;
- qputback(q, b);
- }
- nb->wp = nb->rp + len;
+ n -= len;
+ if((q->state & Qmsg) == 0)
+ qputback(q, splitblock(&b, n));
+ else
+ b->wp -= n;
}
/* restart producer */
qwakeup_iunlock(q);
- poperror();
qunlock(&q->rlock);
- return nb;
+ poperror();
+
+ return b;
}
/*
long
qread(Queue *q, void *vp, int len)
{
- Block *b, *first, **l;
+ Block *b, *first, **last;
int m, n;
eqlock(&q->rlock);
}
/* if we get here, there's at least one block in the queue */
+ last = &first;
if(q->state & Qcoalesce){
/* when coalescing, 0 length blocks just go away */
b = q->bfirst;
- if(BLEN(b) <= 0){
+ m = BLEN(b);
+ if(m <= 0){
freeb(qremove(q));
goto again;
}
/* grab the first block plus as many
- * following blocks as will completely
+ * following blocks as will partially
* fit in the read.
*/
n = 0;
- l = &first;
- m = BLEN(b);
for(;;) {
- *l = qremove(q);
- l = &b->next;
+ *last = qremove(q);
n += m;
-
- b = q->bfirst;
- if(b == nil)
+ if(n >= len || q->bfirst == nil)
break;
+ last = &b->next;
+ b = q->bfirst;
m = BLEN(b);
- if(n+m > len)
- break;
}
} else {
first = qremove(q);
n = BLEN(first);
}
- /* copy to user space outside of the ilock */
- iunlock(q);
- b = bl2mem(vp, first, len);
- ilock(q);
-
- /* take care of any left over partial block */
- if(b != nil){
- n -= BLEN(b);
- if(q->state & Qmsg)
- freeb(b);
- else
- qputback(q, b);
- }
+ /* split last block if it's too big and this is not a message queue */
+ if(n > len && (q->state & Qmsg) == 0)
+ qputback(q, splitblock(last, n - len));
/* restart producer */
qwakeup_iunlock(q);
- poperror();
qunlock(&q->rlock);
+ poperror();
+
+ if(waserror()){
+ freeblist(first);
+ nexterror();
+ }
+ n = readblist(first, vp, len, 0);
+ freeblist(first);
+ poperror();
+
return n;
}
return q->len < q->limit || (q->state & Qclosed);
}
+/*
+ * flow control, wait for queue to get below the limit
+ */
+static void
+qflow(Queue *q)
+{
+ for(;;){
+ if(q->noblock || qnotfull(q))
+ break;
+
+ ilock(q);
+ q->state |= Qflow;
+ iunlock(q);
+
+ eqlock(&q->wlock);
+ if(waserror()){
+ qunlock(&q->wlock);
+ nexterror();
+ }
+ sleep(&q->wr, qnotfull, q);
+ qunlock(&q->wlock);
+ poperror();
+ }
+}
+
/*
* add a block to a queue obeying flow control
*/
long
qbwrite(Queue *q, Block *b)
{
- int n, dowakeup;
+ int len, dowakeup;
Proc *p;
- n = BLEN(b);
-
- if(q->bypass){
+ if(q->bypass != nil){
+ len = blocklen(b);
(*q->bypass)(q->arg, b);
- return n;
+ return len;
}
dowakeup = 0;
if(waserror()){
- freeb(b);
+ freeblist(b);
nexterror();
}
ilock(q);
}
/* don't queue over the limit */
- if(q->len >= q->limit){
- if(q->noblock){
- iunlock(q);
- freeb(b);
- poperror();
- return n;
- }
- if(q->len >= q->limit*2){
- iunlock(q);
- error(Egreg);
- }
+ if(q->len >= q->limit && q->noblock){
+ iunlock(q);
+ poperror();
+ len = blocklen(b);
+ freeblist(b);
+ return len;
}
- /* queue the block */
- if(q->bfirst)
- q->blast->next = b;
- else
- q->bfirst = b;
- q->blast = b;
- b->next = 0;
- q->len += BALLOC(b);
- q->dlen += n;
- QDEBUG checkb(b, "qbwrite");
+ len = qaddlist(q, b);
/* make sure other end gets awakened */
if(q->state & Qstarve){
poperror();
/* get output going again */
- if(q->kick && (dowakeup || (q->state&Qkick)))
+ if(q->kick != nil && (dowakeup || (q->state&Qkick)))
q->kick(q->arg);
/* wakeup anyone consuming at the other end */
}
/*
- * flow control, wait for queue to get below the limit
- * before allowing the process to continue and queue
- * more. We do this here so that postnote can only
- * interrupt us after the data has been queued. This
- * means that things like 9p flushes and ssl messages
- * will not be disrupted by software interrupts.
- *
- * Note - this is moderately dangerous since a process
- * that keeps getting interrupted and rewriting will
- * queue infinite crud.
+ * flow control, before allowing the process to continue and
+ * queue more. We do this here so that postnote can only
+ * interrupt us after the data has been queued. This means that
+ * things like 9p flushes and ssl messages will not be disrupted
+ * by software interrupts.
*/
- for(;;){
- if(q->noblock || qnotfull(q))
- break;
-
- ilock(q);
- q->state |= Qflow;
- iunlock(q);
-
- eqlock(&q->wlock);
- if(waserror()){
- qunlock(&q->wlock);
- nexterror();
- }
- sleep(&q->wr, qnotfull, q);
- qunlock(&q->wlock);
- poperror();
- }
+ qflow(q);
- return n;
+ return len;
}
/*
QDEBUG if(!islo())
print("qwrite hi %#p\n", getcallerpc(&q));
+ /* stop queue bloat before allocating blocks */
+ if(q->len/2 >= q->limit && q->noblock == 0 && q->bypass == nil){
+ while(waserror()){
+ if(up->procctl == Proc_exitme || up->procctl == Proc_exitbig)
+ error(Egreg);
+ }
+ qflow(q);
+ poperror();
+ }
+
sofar = 0;
do {
n = len-sofar;
n = Maxatomic;
b = allocb(n);
- setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]);
if(waserror()){
freeb(b);
nexterror();
poperror();
b->wp += n;
- qbwrite(q, b);
-
- sofar += n;
+ sofar += qbwrite(q, b);
} while(sofar < len && (q->state & Qmsg) == 0);
return len;
/*
* used by print() to write to a queue. Since we may be splhi or not in
* a process, don't qlock.
- *
- * this routine merges adjacent blocks if block n+1 will fit into
- * the free space of block n.
*/
int
qiwrite(Queue *q, void *vp, int len)
/* we use an artificially high limit for kernel prints since anything
* over the limit gets dropped
*/
- if(q->dlen >= 16*1024){
+ if((q->state & Qclosed) != 0 || q->len/2 >= q->limit){
iunlock(q);
freeb(b);
break;
}
- QDEBUG checkb(b, "qiwrite");
- if(q->bfirst)
- q->blast->next = b;
- else
- q->bfirst = b;
- q->blast = b;
- q->len += BALLOC(b);
- q->dlen += n;
+ qaddlist(q, b);
if(q->state & Qstarve){
q->state &= ~Qstarve;
iunlock(q);
if(dowakeup){
- if(q->kick)
+ if(q->kick != nil)
q->kick(q->arg);
wakeup(&q->rr);
}
ilock(q);
q->state |= Qclosed;
q->state &= ~(Qflow|Qstarve);
- strcpy(q->err, Ehungup);
+ kstrcpy(q->err, Ehungup, ERRMAX);
bfirst = q->bfirst;
- q->bfirst = 0;
+ q->bfirst = nil;
q->len = 0;
q->dlen = 0;
q->noblock = 0;
/* mark it */
ilock(q);
q->state |= Qclosed;
- if(msg == 0 || *msg == 0)
- strcpy(q->err, Ehungup);
- else
- strncpy(q->err, msg, ERRMAX-1);
+ if(msg == nil || *msg == '\0')
+ msg = Ehungup;
+ kstrcpy(q->err, msg, ERRMAX);
iunlock(q);
/* wake up readers/writers */
int
qcanread(Queue *q)
{
- return q->bfirst!=0;
+ return q->bfirst != nil;
}
/*
/* mark it */
ilock(q);
bfirst = q->bfirst;
- q->bfirst = 0;
+ q->bfirst = nil;
q->len = 0;
q->dlen = 0;
iunlock(q);
/* free queued blocks */
freeblist(bfirst);
- /* wake up readers/writers */
+ /* wake up writers */
wakeup(&q->wr);
}
{
return q->state & Qflow;
}
-
-int
-qstate(Queue *q)
-{
- return q->state;
-}