4 #include "threadimpl.h"
6 /* Value to indicate the channel is closed */
11 static char errcl[] = "channel was closed";
12 static Lock chanlock; /* central channel access lock */
14 static void enqueue(Alt*, Channel**);
15 static void dequeue(Alt*);
16 static int canexec(Alt*);
17 static int altexec(Alt*, int);
19 #define Closed ((void*)CHANCLOSD)
20 #define Intred ((void*)~0) /* interrupted */
27 if(c->closed == 1) /* chanclose is ongoing */
31 for(i = 0; i < c->nentry; i++) /* alt ongoing */
53 chancreate(int elemsize, int elemcnt)
57 if(elemcnt < 0 || elemsize <= 0)
59 c = _threadmalloc(sizeof(Channel)+elemsize*elemcnt, 1);
62 _threaddebug(DBGCHAN, "chancreate %p", c);
67 isopenfor(Channel *c, int op)
69 return c->closed == 0 || (op == CHANRCV && c->n > 0);
77 int n, s, waiting, allreadycl;
82 * The point of going splhi here is that note handlers
83 * might reasonably want to use channel operations,
84 * but that will hang if the note comes while we hold the
85 * chanlock. Instead, we delay the note until we've dropped
88 t = _threadgetproc()->thread;
89 if(t->moribund || _threadexitsallstatus)
90 yield(); /* won't return */
96 /* test whether any channels can proceed */
100 for(xa=alts; xa->op!=CHANEND && xa->op!=CHANNOBLK; xa++){
102 if(xa->op == CHANNOP)
113 if(isopenfor(c, xa->op) && canexec(xa))
120 /* nothing can proceed */
121 if(xa->op == CHANNOBLK){
125 if(xa->op == CHANNOBLK)
129 /* enqueue on all channels open for us. */
134 for(xa=alts; xa->op!=CHANEND; xa++)
137 else if(isopenfor(xa->c, xa->op)){
140 } else if(xa->err != errcl)
147 /* everything was closed, select last channel */
153 } else if(allreadycl){
154 /* everything was already closed */
161 * wait for successful rendezvous.
162 * we can't just give up if the rendezvous
163 * is interrupted -- someone else might come
164 * along and try to rendezvous with us, so
165 * we need to be here.
166 * if the channel was closed, the op is done
167 * and we flag an error for the entry.
172 r = _threadrendezvous(&c, 0);
176 if(r==Intred){ /* interrupted */
177 if(c!=nil) /* someone will meet us; go back */
179 c = (Channel*)~0; /* so no one tries to meet us */
182 /* dequeue from channels, find selected one */
184 for(xa=alts; xa->op!=CHANEND; xa++){
197 if(a == nil){ /* we were interrupted */
198 assert(c==(Channel*)~0);
202 altexec(a, s); /* unlocks chanlock, does splx */
209 chanclose(Channel *c)
214 s = _procsplhi(); /* note handlers; see :/^alt */
217 /* Already close; we fail but it's ok. don't print */
222 c->closed = 1; /* Being closed */
224 * Locate entries that will fail due to close
225 * (send, and receive if nothing buffered) and wake them up.
226 * the situation cannot change because all queries
227 * should be committed by now and new ones will find the channel
228 * closed. We still need to take the lock during the iteration
229 * because we can wake threads on qentrys we have not seen yet
230 * as in alt and there would be a race in the access to *a.
232 for(i = 0; i < c->nentry; i++){
233 if((a = c->qentry[i]) == nil || *a->tag != nil)
236 if(a->op != CHANSND && (a->op != CHANRCV || c->n != 0))
241 while(_threadrendezvous(a->tag, Closed) == Intred)
247 c->closed = 2; /* Fully closed */
256 chanclosing(Channel *c)
260 s = _procsplhi(); /* note handlers; see :/^alt */
272 * superseded by chanclosing
274 chanisclosed(Channel *c)
276 return chanisclosing(c) >= 0;
281 runop(int op, Channel *c, void *v, int nb)
287 * we could do this without calling alt,
288 * but the only reason would be performance,
289 * and i'm not convinced it matters.
299 case -1: /* interrupted */
301 case 1: /* nonblocking, didn't accomplish anything */
306 * Okay, but return -1 if the op is done because of a close.
312 fprint(2, "ERROR: channel alt returned %d\n", r);
319 recv(Channel *c, void *v)
321 return runop(CHANRCV, c, v, 0);
325 nbrecv(Channel *c, void *v)
327 return runop(CHANRCV, c, v, 1);
331 send(Channel *c, void *v)
333 return runop(CHANSND, c, v, 0);
337 nbsend(Channel *c, void *v)
339 return runop(CHANSND, c, v, 1);
343 channelsize(Channel *c, int sz)
346 fprint(2, "expected channel with elements of size %d, got size %d\n",
353 sendul(Channel *c, ulong v)
355 channelsize(c, sizeof(ulong));
364 channelsize(c, sizeof(ulong));
371 sendp(Channel *c, void *v)
373 channelsize(c, sizeof(void*));
382 channelsize(c, sizeof(void*));
389 nbsendul(Channel *c, ulong v)
391 channelsize(c, sizeof(ulong));
392 return nbsend(c, &v);
400 channelsize(c, sizeof(ulong));
401 if(nbrecv(c, &v) == 0)
407 nbsendp(Channel *c, void *v)
409 channelsize(c, sizeof(void*));
410 return nbsend(c, &v);
418 channelsize(c, sizeof(void*));
419 if(nbrecv(c, &v) == 0)
425 emptyentry(Channel *c)
429 assert((c->nentry==0 && c->qentry==nil) || (c->nentry && c->qentry));
431 for(i=0; i<c->nentry; i++)
432 if(c->qentry[i]==nil)
437 c->qentry = realloc((void*)c->qentry, c->nentry*sizeof(c->qentry[0]));
439 sysfatal("realloc channel entries: %r");
440 memset(&c->qentry[i], 0, extra*sizeof(c->qentry[0]));
445 enqueue(Alt *a, Channel **c)
449 _threaddebug(DBGCHAN, "Queuing alt %p on channel %p", a, a->c);
451 i = emptyentry(a->c);
462 for(i=0; i<c->nentry; i++)
464 _threaddebug(DBGCHAN, "Dequeuing alt %p from channel %p", a, a->c);
466 /* release if freed and not closing */
467 if(c->freed && c->closed != 1)
480 /* are there senders or receivers blocked? */
481 otherop = (CHANSND+CHANRCV) - a->op;
482 for(i=0; i<c->nentry; i++)
483 if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil){
484 _threaddebug(DBGCHAN, "can rendez alt %p chan %p", a, c);
488 /* is there room in the channel? */
489 if((a->op==CHANSND && c->n < c->s)
490 || (a->op==CHANRCV && c->n > 0)){
491 _threaddebug(DBGCHAN, "can buffer alt %p chan %p", a, c);
499 altexecbuffered(Alt *a, int willreplace)
505 /* use buffered channel queue */
506 if(a->op==CHANRCV && c->n > 0){
507 _threaddebug(DBGCHAN, "buffer recv alt %p chan %p", a, c);
508 v = c->v + c->e*(c->f%c->s);
514 if(a->op==CHANSND && c->n < c->s){
515 _threaddebug(DBGCHAN, "buffer send alt %p chan %p", a, c);
516 v = c->v + c->e*((c->f+c->n)%c->s);
526 altcopy(void *dst, void *src, int sz)
530 memmove(dst, src, sz);
537 altexec(Alt *a, int spl)
542 void *me, *waiter, *buf;
546 /* rendezvous with others */
547 otherop = (CHANSND+CHANRCV) - a->op;
551 for(i=0; i<c->nentry; i++)
552 if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil)
556 _threaddebug(DBGCHAN, "rendez %s alt %p chan %p alt %p", a->op==CHANRCV?"recv":"send", a, c, b);
560 * if buffer is full and there are waiters
561 * and we're meeting a waiter,
562 * we must be receiving.
564 * we use the value in the channel buffer,
565 * copy the waiter's value into the channel buffer
566 * on behalf of the waiter, and then wake the waiter.
570 buf = altexecbuffered(a, 1);
571 altcopy(me, buf, c->e);
572 altcopy(buf, waiter, c->e);
575 altcopy(me, waiter, c->e);
577 altcopy(waiter, me, c->e);
579 *b->tag = c; /* commits us to rendezvous */
580 _threaddebug(DBGCHAN, "unlocking the chanlock");
583 _threaddebug(DBGCHAN, "chanlock is %lud", *(ulong*)&chanlock);
584 while(_threadrendezvous(b->tag, 0) == Intred)
589 buf = altexecbuffered(a, 0);
591 altcopy(me, buf, c->e);
593 altcopy(buf, me, c->e);