]> git.lizzy.rs Git - plan9front.git/blob - sys/src/libthread/channel.c
vmx: clean up mksegment, memset only if segment existed (devsegment clears new ones)
[plan9front.git] / sys / src / libthread / channel.c
1 #include <u.h>
2 #include <libc.h>
3 #include <thread.h>
4 #include "threadimpl.h"
5
6 /* Value to indicate the channel is closed */
7 enum {
8         CHANCLOSD = 0xc105ed,
9 };
10
11 static char errcl[] = "channel was closed";
12 static Lock chanlock;           /* central channel access lock */
13
14 static void enqueue(Alt*, Channel**);
15 static void dequeue(Alt*);
16 static int canexec(Alt*);
17 static int altexec(Alt*, int);
18
19 #define Closed  ((void*)CHANCLOSD)
20 #define Intred  ((void*)~0)             /* interrupted */
21
22 static void
23 _chanfree(Channel *c)
24 {
25         int i, inuse;
26
27         if(c->closed == 1)                      /* chanclose is ongoing */
28                 inuse = 1;
29         else{
30                 inuse = 0;
31                 for(i = 0; i < c->nentry; i++)  /* alt ongoing */
32                         if(c->qentry[i])
33                                 inuse = 1;
34         }
35         if(inuse)
36                 c->freed = 1;
37         else{
38                 if(c->qentry)
39                         free(c->qentry);
40                 free(c);
41         }
42 }
43
44 void
45 chanfree(Channel *c)
46 {
47         lock(&chanlock);
48         _chanfree(c);
49         unlock(&chanlock);
50 }
51
52 Channel*
53 chancreate(int elemsize, int elemcnt)
54 {
55         Channel *c;
56
57         if(elemcnt < 0 || elemsize <= 0)
58                 return nil;
59         c = _threadmalloc(sizeof(Channel)+elemsize*elemcnt, 1);
60         c->e = elemsize;
61         c->s = elemcnt;
62         _threaddebug(DBGCHAN, "chancreate %p", c);
63         return c;
64 }
65
66 static int
67 isopenfor(Channel *c, int op)
68 {
69         return c->closed == 0 || (op == CHANRCV && c->n > 0);
70 }
71
72 int
73 alt(Alt *alts)
74 {
75         Alt *a, *xa, *ca;
76         Channel volatile *c;
77         int n, s, waiting, allreadycl;
78         void* r;
79         Thread *t;
80
81         /*
82          * The point of going splhi here is that note handlers
83          * might reasonably want to use channel operations,
84          * but that will hang if the note comes while we hold the
85          * chanlock.  Instead, we delay the note until we've dropped
86          * the lock.
87          */
88         t = _threadgetproc()->thread;
89         if(t->moribund || _threadexitsallstatus)
90                 yield();        /* won't return */
91         s = _procsplhi();
92         lock(&chanlock);
93         t->alt = alts;
94         t->chan = Chanalt;
95
96         /* test whether any channels can proceed */
97         n = 0;
98         a = nil;
99
100         for(xa=alts; xa->op!=CHANEND && xa->op!=CHANNOBLK; xa++){
101                 xa->entryno = -1;
102                 if(xa->op == CHANNOP)
103                         continue;
104
105                 c = xa->c;
106                 if(c==nil){
107                         unlock(&chanlock);
108                         _procsplx(s);
109                         t->chan = Channone;
110                         return -1;
111                 }
112
113                 if(isopenfor(c, xa->op) && canexec(xa))
114                         if(nrand(++n) == 0)
115                                 a = xa;
116         }
117
118
119         if(a==nil){
120                 /* nothing can proceed */
121                 if(xa->op == CHANNOBLK){
122                         unlock(&chanlock);
123                         _procsplx(s);
124                         t->chan = Channone;
125                         if(xa->op == CHANNOBLK)
126                                 return xa - alts;
127                 }
128
129                 /* enqueue on all channels open for us. */
130                 c = nil;
131                 ca = nil;
132                 waiting = 0;
133                 allreadycl = 0;
134                 for(xa=alts; xa->op!=CHANEND; xa++)
135                         if(xa->op==CHANNOP)
136                                 continue;
137                         else if(isopenfor(xa->c, xa->op)){
138                                 waiting = 1;
139                                 enqueue(xa, &c);
140                         } else if(xa->err != errcl)
141                                 ca = xa;
142                         else
143                                 allreadycl = 1;
144
145                 if(waiting == 0)
146                         if(ca != nil){
147                                 /* everything was closed, select last channel */
148                                 ca->err = errcl;
149                                 unlock(&chanlock);
150                                 _procsplx(s);
151                                 t->chan = Channone;
152                                 return ca - alts;
153                         } else if(allreadycl){
154                                 /* everything was already closed */
155                                 unlock(&chanlock);
156                                 _procsplx(s);
157                                 t->chan = Channone;
158                                 return -1;
159                         }
160                 /*
161                  * wait for successful rendezvous.
162                  * we can't just give up if the rendezvous
163                  * is interrupted -- someone else might come
164                  * along and try to rendezvous with us, so
165                  * we need to be here.
166                  * if the channel was closed, the op is done
167                  * and we flag an error for the entry.
168                  */
169             Again:
170                 unlock(&chanlock);
171                 _procsplx(s);
172                 r = _threadrendezvous(&c, 0);
173                 s = _procsplhi();
174                 lock(&chanlock);
175
176                 if(r==Intred){          /* interrupted */
177                         if(c!=nil)      /* someone will meet us; go back */
178                                 goto Again;
179                         c = (Channel*)~0;       /* so no one tries to meet us */
180                 }
181
182                 /* dequeue from channels, find selected one */
183                 a = nil;
184                 for(xa=alts; xa->op!=CHANEND; xa++){
185                         if(xa->op==CHANNOP)
186                                 continue;
187                         if(xa->c == c){
188                                 a = xa;
189                                 a->err = nil;
190                                 if(r == Closed)
191                                         a->err = errcl;
192                         }
193                         dequeue(xa);
194                 }
195                 unlock(&chanlock);
196                 _procsplx(s);
197                 if(a == nil){   /* we were interrupted */
198                         assert(c==(Channel*)~0);
199                         return -1;
200                 }
201         }else
202                 altexec(a, s);  /* unlocks chanlock, does splx */
203         _sched();
204         t->chan = Channone;
205         return a - alts;
206 }
207
208 int
209 chanclose(Channel *c)
210 {
211         Alt *a;
212         int i, s;
213
214         s = _procsplhi();       /* note handlers; see :/^alt */
215         lock(&chanlock);
216         if(c->closed){
217                 /* Already close; we fail but it's ok. don't print */
218                 unlock(&chanlock);
219                 _procsplx(s);
220                 return -1;
221         }
222         c->closed = 1;          /* Being closed */
223         /*
224          * Locate entries that will fail due to close
225          * (send, and receive if nothing buffered) and wake them up.
226          * the situation cannot change because all queries
227          * should be committed by now and new ones will find the channel
228          * closed.  We still need to take the lock during the iteration
229          * because we can wake threads on qentrys we have not seen yet
230          * as in alt and there would be a race in the access to *a.
231          */
232         for(i = 0; i < c->nentry; i++){
233                 if((a = c->qentry[i]) == nil || *a->tag != nil)
234                         continue;
235
236                 if(a->op != CHANSND && (a->op != CHANRCV || c->n != 0))
237                         continue;
238                 *a->tag = c;
239                 unlock(&chanlock);
240                 _procsplx(s);
241                 while(_threadrendezvous(a->tag, Closed) == Intred)
242                         ;
243                 s = _procsplhi();
244                 lock(&chanlock);
245         }
246
247         c->closed = 2;          /* Fully closed */
248         if(c->freed)
249                 _chanfree(c);
250         unlock(&chanlock);
251         _procsplx(s);
252         return 0;
253 }
254
255 int
256 chanclosing(Channel *c)
257 {
258         int n, s;
259
260         s = _procsplhi();       /* note handlers; see :/^alt */
261         lock(&chanlock);
262         if(c->closed == 0)
263                 n = -1;
264         else
265                 n = c->n;
266         unlock(&chanlock);
267         _procsplx(s);
268         return n;
269 }
270
271 /*
272  * superseded by chanclosing
273 int
274 chanisclosed(Channel *c)
275 {
276         return chanisclosing(c) >= 0;
277 }
278  */
279
280 static int
281 runop(int op, Channel *c, void *v, int nb)
282 {
283         int r;
284         Alt a[2];
285
286         /*
287          * we could do this without calling alt,
288          * but the only reason would be performance,
289          * and i'm not convinced it matters.
290          */
291         a[0].op = op;
292         a[0].c = c;
293         a[0].v = v;
294         a[0].err = nil;
295         a[1].op = CHANEND;
296         if(nb)
297                 a[1].op = CHANNOBLK;
298         switch(r=alt(a)){
299         case -1:        /* interrupted */
300                 return -1;
301         case 1: /* nonblocking, didn't accomplish anything */
302                 assert(nb);
303                 return 0;
304         case 0:
305                 /*
306                  * Okay, but return -1 if the op is done because of a close.
307                  */
308                 if(a[0].err != nil)
309                         return -1;
310                 return 1;
311         default:
312                 fprint(2, "ERROR: channel alt returned %d\n", r);
313                 abort();
314                 return -1;
315         }
316 }
317
318 int
319 recv(Channel *c, void *v)
320 {
321         return runop(CHANRCV, c, v, 0);
322 }
323
324 int
325 nbrecv(Channel *c, void *v)
326 {
327         return runop(CHANRCV, c, v, 1);
328 }
329
330 int
331 send(Channel *c, void *v)
332 {
333         return runop(CHANSND, c, v, 0);
334 }
335
336 int
337 nbsend(Channel *c, void *v)
338 {
339         return runop(CHANSND, c, v, 1);
340 }
341
342 static void
343 channelsize(Channel *c, int sz)
344 {
345         if(c->e != sz){
346                 fprint(2, "expected channel with elements of size %d, got size %d\n",
347                         sz, c->e);
348                 abort();
349         }
350 }
351
352 int
353 sendul(Channel *c, ulong v)
354 {
355         channelsize(c, sizeof(ulong));
356         return send(c, &v);
357 }
358
359 ulong
360 recvul(Channel *c)
361 {
362         ulong v;
363
364         channelsize(c, sizeof(ulong));
365         if(recv(c, &v) < 0)
366                 return ~0;
367         return v;
368 }
369
370 int
371 sendp(Channel *c, void *v)
372 {
373         channelsize(c, sizeof(void*));
374         return send(c, &v);
375 }
376
377 void*
378 recvp(Channel *c)
379 {
380         void *v;
381
382         channelsize(c, sizeof(void*));
383         if(recv(c, &v) < 0)
384                 return nil;
385         return v;
386 }
387
388 int
389 nbsendul(Channel *c, ulong v)
390 {
391         channelsize(c, sizeof(ulong));
392         return nbsend(c, &v);
393 }
394
395 ulong
396 nbrecvul(Channel *c)
397 {
398         ulong v;
399
400         channelsize(c, sizeof(ulong));
401         if(nbrecv(c, &v) == 0)
402                 return 0;
403         return v;
404 }
405
406 int
407 nbsendp(Channel *c, void *v)
408 {
409         channelsize(c, sizeof(void*));
410         return nbsend(c, &v);
411 }
412
413 void*
414 nbrecvp(Channel *c)
415 {
416         void *v;
417
418         channelsize(c, sizeof(void*));
419         if(nbrecv(c, &v) == 0)
420                 return nil;
421         return v;
422 }
423
424 static int
425 emptyentry(Channel *c)
426 {
427         int i, extra;
428
429         assert((c->nentry==0 && c->qentry==nil) || (c->nentry && c->qentry));
430
431         for(i=0; i<c->nentry; i++)
432                 if(c->qentry[i]==nil)
433                         return i;
434
435         extra = 16;
436         c->nentry += extra;
437         c->qentry = realloc((void*)c->qentry, c->nentry*sizeof(c->qentry[0]));
438         if(c->qentry == nil)
439                 sysfatal("realloc channel entries: %r");
440         memset(&c->qentry[i], 0, extra*sizeof(c->qentry[0]));
441         return i;
442 }
443
444 static void
445 enqueue(Alt *a, Channel **c)
446 {
447         int i;
448
449         _threaddebug(DBGCHAN, "Queuing alt %p on channel %p", a, a->c);
450         a->tag = c;
451         i = emptyentry(a->c);
452         a->c->qentry[i] = a;
453 }
454
455 static void
456 dequeue(Alt *a)
457 {
458         int i;
459         Channel *c;
460
461         c = a->c;
462         for(i=0; i<c->nentry; i++)
463                 if(c->qentry[i]==a){
464                         _threaddebug(DBGCHAN, "Dequeuing alt %p from channel %p", a, a->c);
465                         c->qentry[i] = nil;
466                         /* release if freed and not closing */
467                         if(c->freed && c->closed != 1)
468                                 _chanfree(c);
469                         return;
470                 }
471 }
472
473 static int
474 canexec(Alt *a)
475 {
476         int i, otherop;
477         Channel *c;
478
479         c = a->c;
480         /* are there senders or receivers blocked? */
481         otherop = (CHANSND+CHANRCV) - a->op;
482         for(i=0; i<c->nentry; i++)
483                 if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil){
484                         _threaddebug(DBGCHAN, "can rendez alt %p chan %p", a, c);
485                         return 1;
486                 }
487
488         /* is there room in the channel? */
489         if((a->op==CHANSND && c->n < c->s)
490         || (a->op==CHANRCV && c->n > 0)){
491                 _threaddebug(DBGCHAN, "can buffer alt %p chan %p", a, c);
492                 return 1;
493         }
494
495         return 0;
496 }
497
498 static void*
499 altexecbuffered(Alt *a, int willreplace)
500 {
501         uchar *v;
502         Channel *c;
503
504         c = a->c;
505         /* use buffered channel queue */
506         if(a->op==CHANRCV && c->n > 0){
507                 _threaddebug(DBGCHAN, "buffer recv alt %p chan %p", a, c);
508                 v = c->v + c->e*(c->f%c->s);
509                 if(!willreplace)
510                         c->n--;
511                 c->f++;
512                 return v;
513         }
514         if(a->op==CHANSND && c->n < c->s){
515                 _threaddebug(DBGCHAN, "buffer send alt %p chan %p", a, c);
516                 v = c->v + c->e*((c->f+c->n)%c->s);
517                 if(!willreplace)
518                         c->n++;
519                 return v;
520         }
521         abort();
522         return nil;
523 }
524
525 static void
526 altcopy(void *dst, void *src, int sz)
527 {
528         if(dst){
529                 if(src)
530                         memmove(dst, src, sz);
531                 else
532                         memset(dst, 0, sz);
533         }
534 }
535
536 static int
537 altexec(Alt *a, int spl)
538 {
539         volatile Alt *b;
540         int i, n, otherop;
541         Channel *c;
542         void *me, *waiter, *buf;
543
544         c = a->c;
545
546         /* rendezvous with others */
547         otherop = (CHANSND+CHANRCV) - a->op;
548         n = 0;
549         b = nil;
550         me = a->v;
551         for(i=0; i<c->nentry; i++)
552                 if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil)
553                         if(nrand(++n) == 0)
554                                 b = c->qentry[i];
555         if(b != nil){
556                 _threaddebug(DBGCHAN, "rendez %s alt %p chan %p alt %p", a->op==CHANRCV?"recv":"send", a, c, b);
557                 waiter = b->v;
558                 if(c->s && c->n){
559                         /*
560                          * if buffer is full and there are waiters
561                          * and we're meeting a waiter,
562                          * we must be receiving.
563                          *
564                          * we use the value in the channel buffer,
565                          * copy the waiter's value into the channel buffer
566                          * on behalf of the waiter, and then wake the waiter.
567                          */
568                         if(a->op!=CHANRCV)
569                                 abort();
570                         buf = altexecbuffered(a, 1);
571                         altcopy(me, buf, c->e);
572                         altcopy(buf, waiter, c->e);
573                 }else{
574                         if(a->op==CHANRCV)
575                                 altcopy(me, waiter, c->e);
576                         else
577                                 altcopy(waiter, me, c->e);
578                 }
579                 *b->tag = c;    /* commits us to rendezvous */
580                 _threaddebug(DBGCHAN, "unlocking the chanlock");
581                 unlock(&chanlock);
582                 _procsplx(spl);
583                 _threaddebug(DBGCHAN, "chanlock is %lud", *(ulong*)&chanlock);
584                 while(_threadrendezvous(b->tag, 0) == Intred)
585                         ;
586                 return 1;
587         }
588
589         buf = altexecbuffered(a, 0);
590         if(a->op==CHANRCV)
591                 altcopy(me, buf, c->e);
592         else
593                 altcopy(buf, me, c->e);
594
595         unlock(&chanlock);
596         _procsplx(spl);
597         return 1;
598 }