]> git.lizzy.rs Git - plan9front.git/blob - sys/src/ape/lib/ap/plan9/_buf.c
merge
[plan9front.git] / sys / src / ape / lib / ap / plan9 / _buf.c
1 #define  _BSDTIME_EXTENSION
2 #define _LOCK_EXTENSION
3 #include "lib.h"
4 #include <stdlib.h>
5 #include <errno.h>
6 #include <unistd.h>
7 #include <signal.h>
8 #include <string.h>
9 #include <stdio.h>
10 #include <lock.h>
11 #include <sys/time.h>
12 #include <sys/select.h>
13 #include <unistd.h>
14 #include "sys9.h"
15
16 typedef struct Muxseg {
17         Lock    lock;                   /* for mutual exclusion access to buffer variables */
18         int     curfds;                 /* number of fds currently buffered */
19         int     selwait;                /* true if selecting process is waiting */
20         int     waittime;               /* time for timer process to wait */
21         fd_set  rwant;                  /* fd's that select wants to read */
22         fd_set  ewant;                  /* fd's that select wants to know eof info on */
23         Muxbuf  bufs[INITBUFS];         /* can grow, via segbrk() */
24 } Muxseg;
25
26 #define MUXADDR ((void*)0x6000000)
27 static Muxseg *mux = 0;                 /* shared memory segment */
28
29 /* _muxsid and _killmuxsid are known in libbsd's listen.c */
30 int _muxsid = -1;                       /* group id of copy processes */
31 static int _mainpid = -1;
32 static int timerpid = -1;               /* pid of a timer process */
33
34 void _killmuxsid(void);
35 static void _copyproc(int, Muxbuf*);
36 static void _timerproc(void);
37 static void _resettimer(void);
38
39 static int copynotehandler(void *, char *);
40
41 /* assume FD_SETSIZE is 96 */
42 #define FD_ANYSET(p)    ((p)->fds_bits[0] || (p)->fds_bits[1] || (p)->fds_bits[2])
43
44 /*
45  * Start making fd read-buffered: make the shared segment, if necessary,
46  * allocate a slot (index into mux->bufs), and fork a child to read the fd
47  * and write into the slot-indexed buffer.
48  * Return -1 if we can't do it.
49  */
50 int
51 _startbuf(int fd)
52 {
53         long i, n, slot;
54         int pid, sid;
55         Fdinfo *f;
56         Muxbuf *b;
57
58         if(mux == 0){
59                 _RFORK(RFREND);
60                 mux = (Muxseg*)_SEGATTACH(0, "shared", MUXADDR, sizeof(Muxseg));
61                 if((long)mux == -1){
62                         _syserrno();
63                         return -1;
64                 }
65                 /* segattach has returned zeroed memory */
66                 atexit(_killmuxsid);
67         }
68
69         if(fd == -1)
70                 return 0;
71
72         lock(&mux->lock);
73         slot = mux->curfds++;
74         if(mux->curfds > INITBUFS) {
75                 if(_SEGBRK(mux, mux->bufs+mux->curfds) < 0){
76                         _syserrno();
77                         unlock(&mux->lock);
78                         return -1;
79                 }
80         }
81
82         f = &_fdinfo[fd];
83         b = &mux->bufs[slot];
84         b->n = 0;
85         b->putnext = b->data;
86         b->getnext = b->data;
87         b->eof = 0;
88         b->fd = fd;
89         if(_mainpid == -1)
90                 _mainpid = getpid();
91         if((pid = _RFORK(RFFDG|RFPROC|RFNOWAIT)) == 0){
92                 /* copy process ... */
93                 if(_muxsid == -1) {
94                         _RFORK(RFNOTEG);
95                         _muxsid = getpgrp();
96                 } else
97                         setpgid(getpid(), _muxsid);
98                 _NOTIFY(copynotehandler);
99                 for(i=0; i<OPEN_MAX; i++)
100                         if(i!=fd && (_fdinfo[i].flags&FD_ISOPEN))
101                                 _CLOSE(i);
102                 _RENDEZVOUS(0, _muxsid);
103                 _copyproc(fd, b);
104         }
105
106         /* parent process continues ... */
107         b->copypid = pid;
108         f->buf = b;
109         f->flags |= FD_BUFFERED;
110         unlock(&mux->lock);
111         _muxsid = _RENDEZVOUS(0, 0);
112         /* leave fd open in parent so system doesn't reuse it */
113         return 0;
114 }
115
116 /*
117  * The given buffered fd is being closed.
118  * Set the fd field in the shared buffer to -1 to tell copyproc
119  * to exit, and kill the copyproc.
120  */
121 void
122 _closebuf(int fd)
123 {
124         Muxbuf *b;
125
126         b = _fdinfo[fd].buf;
127         if(!b)
128                 return;
129         lock(&mux->lock);
130         b->fd = -1;
131         unlock(&mux->lock);
132         kill(b->copypid, SIGKILL);
133 }
134
135 /* child copy procs execute this until eof */
136 static void
137 _copyproc(int fd, Muxbuf *b)
138 {
139         unsigned char *e;
140         int n;
141         int nzeros;
142
143         e = &b->data[PERFDMAX];
144         for(;;) {
145                 /* make sure there's room */
146                 lock(&mux->lock);
147                 if(e - b->putnext < READMAX) {
148                         if(b->getnext == b->putnext) {
149                                 b->getnext = b->putnext = b->data;
150                                 unlock(&mux->lock);
151                         } else {
152                                 /* sleep until there's room */
153                                 b->roomwait = 1;
154                                 unlock(&mux->lock);
155                                 _RENDEZVOUS((unsigned long)&b->roomwait, 0);
156                         }
157                 } else
158                         unlock(&mux->lock);
159                 /*
160                  * A Zero-length _READ might mean a zero-length write
161                  * happened, or it might mean eof; try several times to
162                  * disambiguate (posix read() discards 0-length messages)
163                  */
164                 nzeros = 0;
165                 do {
166                         n = _READ(fd, b->putnext, READMAX);
167                         if(b->fd == -1) {
168                                 _exit(0);               /* we've been closed */
169                         }
170                 } while(n == 0 && ++nzeros < 3);
171                 lock(&mux->lock);
172                 if(n <= 0) {
173                         b->eof = 1;
174                         if(mux->selwait && FD_ISSET(fd, &mux->ewant)) {
175                                 mux->selwait = 0;
176                                 unlock(&mux->lock);
177                                 _RENDEZVOUS((unsigned long)&mux->selwait, fd);
178                         } else if(b->datawait) {
179                                 b->datawait = 0;
180                                 unlock(&mux->lock);
181                                 _RENDEZVOUS((unsigned long)&b->datawait, 0);
182                         } else if(mux->selwait && FD_ISSET(fd, &mux->rwant)) {
183                                 mux->selwait = 0;
184                                 unlock(&mux->lock);
185                                 _RENDEZVOUS((unsigned long)&mux->selwait, fd);
186                         } else
187                                 unlock(&mux->lock);
188                         _exit(0);
189                 } else {
190                         b->putnext += n;
191                         b->n += n;
192                         if(b->n > 0) {
193                                 /* parent process cannot be both in datawait and selwait */
194                                 if(b->datawait) {
195                                         b->datawait = 0;
196                                         unlock(&mux->lock);
197                                         /* wake up _bufreading process */
198                                         _RENDEZVOUS((unsigned long)&b->datawait, 0);
199                                 } else if(mux->selwait && FD_ISSET(fd, &mux->rwant)) {
200                                         mux->selwait = 0;
201                                         unlock(&mux->lock);
202                                         /* wake up selecting process */
203                                         _RENDEZVOUS((unsigned long)&mux->selwait, fd);
204                                 } else
205                                         unlock(&mux->lock);
206                         } else
207                                 unlock(&mux->lock);
208                 }
209         }
210 }
211
212 /* like read(), for a buffered fd; extra arg noblock says don't wait for data if true */
213 int
214 _readbuf(int fd, void *addr, int nwant, int noblock)
215 {
216         Muxbuf *b;
217         int ngot;
218
219         b = _fdinfo[fd].buf;
220         if(b->eof && b->n == 0) {
221 goteof:
222                 return 0;
223         }
224         if(b->n == 0 && noblock) {
225                 errno = EAGAIN;
226                 return -1;
227         }
228         /* make sure there's data */
229         lock(&mux->lock);
230         ngot = b->putnext - b->getnext;
231         if(ngot == 0) {
232                 /* maybe EOF just happened */
233                 if(b->eof) {
234                         unlock(&mux->lock);
235                         goto goteof;
236                 }
237                 /* sleep until there's data */
238                 b->datawait = 1;
239                 unlock(&mux->lock);
240                 _RENDEZVOUS((unsigned long)&b->datawait, 0);
241                 lock(&mux->lock);
242                 ngot = b->putnext - b->getnext;
243         }
244         if(ngot == 0) {
245                 unlock(&mux->lock);
246                 goto goteof;
247         }
248         if(ngot > nwant)
249                 ngot = nwant;
250         memcpy(addr, b->getnext, ngot);
251         b->getnext += ngot;
252         b->n -= ngot;
253         if(b->getnext == b->putnext && b->roomwait) {
254                 b->getnext = b->putnext = b->data;
255                 b->roomwait = 0;
256                 unlock(&mux->lock);
257                 /* wake up copy process */
258                 _RENDEZVOUS((unsigned long)&b->roomwait, 0);
259         } else
260                 unlock(&mux->lock);
261         return ngot;
262 }
263
264 int
265 select(int nfds, fd_set *rfds, fd_set *wfds, fd_set *efds, struct timeval *timeout)
266 {
267         int n, i, tmp, t, slots, fd, err;
268         Fdinfo *f;
269         Muxbuf *b;
270
271         if(timeout)
272                 t = timeout->tv_sec*1000 + (timeout->tv_usec+999)/1000;
273         else
274                 t = -1;
275         if(!((rfds && FD_ANYSET(rfds)) || (wfds && FD_ANYSET(wfds))
276                         || (efds && FD_ANYSET(efds)))) {
277                 /* no requested fds */
278                 if(t > 0)
279                         _SLEEP(t);
280                 return 0;
281         }
282
283         _startbuf(-1);
284
285         /* make sure all requested rfds and efds are buffered */
286         if(nfds >= OPEN_MAX)
287                 nfds = OPEN_MAX;
288         for(i = 0; i < nfds; i++)
289                 if((rfds && FD_ISSET(i, rfds)) || (efds && FD_ISSET(i, efds))){
290                         f = &_fdinfo[i];
291                         if(!(f->flags&FD_BUFFERED))
292                                 if(_startbuf(i) != 0) {
293                                         return -1;
294                                 }
295                         b = f->buf;
296                         if(rfds && FD_ISSET(i,rfds) && b->eof && b->n == 0)
297                         if(efds == 0 || !FD_ISSET(i,efds)) {
298                                 errno = EBADF;          /* how X tells a client is gone */
299                                 return -1;
300                         }
301                 }
302
303         /* check wfds;  for now, we'll say they are all ready */
304         n = 0;
305         if(wfds && FD_ANYSET(wfds)){
306                 for(i = 0; i<nfds; i++)
307                         if(FD_ISSET(i, wfds)) {
308                                 n++;
309                         }
310         }
311
312         lock(&mux->lock);
313
314         slots = mux->curfds;
315         FD_ZERO(&mux->rwant);
316         FD_ZERO(&mux->ewant);
317
318         for(i = 0; i<slots; i++) {
319                 b = &mux->bufs[i];
320                 fd = b->fd;
321                 if(fd == -1)
322                         continue;
323                 err = 0;
324                 if(efds && FD_ISSET(fd, efds)) {
325                         if(b->eof && b->n == 0){
326                                 err = 1;
327                                 n++;
328                         }else{
329                                 FD_CLR(fd, efds);
330                                 FD_SET(fd, &mux->ewant);
331                         }
332                 }
333                 if(rfds && FD_ISSET(fd, rfds)) {
334                         if(!err && (b->n > 0 || b->eof))
335                                 n++;
336                         else{
337                                 FD_CLR(fd, rfds);
338                                 FD_SET(fd, &mux->rwant);
339                         }
340                 }
341         }
342         if(n || !(FD_ANYSET(&mux->rwant) || FD_ANYSET(&mux->ewant)) || t == 0) {
343                 FD_ZERO(&mux->rwant);
344                 FD_ZERO(&mux->ewant);
345                 unlock(&mux->lock);
346                 return n;
347         }
348
349         if(timeout) {
350                 mux->waittime = t;
351                 if(timerpid == -1)
352                         _timerproc();
353                 else
354                         _resettimer();
355         }
356         mux->selwait = 1;
357         unlock(&mux->lock);
358         fd = _RENDEZVOUS((unsigned long)&mux->selwait, 0);
359         if(fd >= 0) {
360                 b = _fdinfo[fd].buf;
361                 if(FD_ISSET(fd, &mux->rwant)) {
362                         FD_SET(fd, rfds);
363                         n = 1;
364                 } else if(FD_ISSET(fd, &mux->ewant) && b->eof && b->n == 0) {
365                         FD_SET(fd, efds);
366                         n = 1;
367                 }
368         }
369         FD_ZERO(&mux->rwant);
370         FD_ZERO(&mux->ewant);
371         return n;
372 }
373
374 static int timerreset;
375 static int timerpid;
376
377 static void
378 alarmed(int v)
379 {
380         timerreset = 1;
381 }
382
383 /* a little over an hour */
384 #define LONGWAIT 4000001
385
386 static void
387 _killtimerproc(void)
388 {
389         if(timerpid > 0)
390                 kill(timerpid, SIGKILL);
391 }
392
393 static void
394 _timerproc(void)
395 {
396         int i;
397
398         if((timerpid = _RFORK(RFFDG|RFPROC|RFNOWAIT)) == 0){
399                 /* timer process */
400                 setpgid(getpid(), _muxsid);
401                 signal(SIGALRM, alarmed);
402                 for(i=0; i<OPEN_MAX; i++)
403                                 _CLOSE(i);
404                 _RENDEZVOUS(1, 0);
405                 for(;;) {
406                         _SLEEP(mux->waittime);
407                         if(timerreset) {
408                                 timerreset = 0;
409                         } else {
410                                 lock(&mux->lock);
411                                 if(mux->selwait && mux->waittime != LONGWAIT) {
412                                         mux->selwait = 0;
413                                         mux->waittime = LONGWAIT;
414                                         unlock(&mux->lock);
415                                         _RENDEZVOUS((unsigned long)&mux->selwait, -2);
416                                 } else {
417                                         mux->waittime = LONGWAIT;
418                                         unlock(&mux->lock);
419                                 }
420                         }
421                 }
422         }
423         atexit(_killtimerproc);
424         /* parent process continues */
425         _RENDEZVOUS(1, 0);
426 }
427
428 static void
429 _resettimer(void)
430 {
431         kill(timerpid, SIGALRM);
432 }
433
434 void
435 _killmuxsid(void)
436 {
437         if(_muxsid != -1 && (_mainpid == getpid() || _mainpid == -1))
438                 kill(-_muxsid,SIGTERM);
439 }
440
441 /* call this on fork(), because reading a BUFFERED fd won't work in child */
442 void
443 _detachbuf(void)
444 {
445         int i;
446         Fdinfo *f;
447
448         if(mux == 0)
449                 return;
450         _SEGDETACH(mux);
451         for(i = 0; i < OPEN_MAX; i++){
452                 f = &_fdinfo[i];
453                 if(f->flags&FD_BUFFERED)
454                         f->flags = (f->flags&~FD_BUFFERED) | FD_BUFFEREDX;
455                                 /* mark 'poisoned' */
456         }
457         mux = 0;
458         _muxsid = -1;
459         _mainpid = -1;
460         timerpid = -1;
461 }
462
463 static int
464 copynotehandler(void *u, char *msg)
465 {
466         int i;
467         void(*f)(int);
468
469         if(_finishing)
470                 _finish(0, 0);
471         _NOTED(1);
472         return 0;
473 }