]> git.lizzy.rs Git - plan9front.git/blob - sys/src/cmd/execnet/client.c
ndb/dns: remove single-ip-address assuptions
[plan9front.git] / sys / src / cmd / execnet / client.c
1 #include <u.h>
2 #include <libc.h>
3 #include <fcall.h>
4 #include <thread.h>
5 #include <9p.h>
6 #include "dat.h"
7
8 int nclient;
9 Client **client;
10 #define Zmsg ((Msg*)~0)
11 char nocmd[] = "";
12
13 static void readthread(void*);
14 static void writethread(void*);
15 static void kickwriter(Client*);
16
17 int
18 newclient(void)
19 {
20         int i;
21         Client *c;
22
23         for(i=0; i<nclient; i++)
24                 if(client[i]->ref==0 && !client[i]->moribund)
25                         return i;
26
27         c = emalloc(sizeof(Client));
28         c->writerkick = chancreate(sizeof(void*), 1);
29         c->execpid = chancreate(sizeof(ulong), 0);
30         c->cmd = nocmd;
31
32         c->readerproc = ioproc();
33         c->writerproc = ioproc();
34         c->num = nclient;
35         if(nclient%16 == 0)
36                 client = erealloc(client, (nclient+16)*sizeof(client[0]));
37         client[nclient++] = c;
38         return nclient-1;
39 }
40
41 void
42 die(Client *c)
43 {
44         Msg *m, *next;
45         Req *r, *rnext;
46
47         c->moribund = 1;
48         kickwriter(c);
49         iointerrupt(c->readerproc);
50         iointerrupt(c->writerproc);
51         if(--c->activethread == 0){
52                 if(c->cmd != nocmd){
53                         free(c->cmd);
54                         c->cmd = nocmd;
55                 }
56                 c->pid = 0;
57                 c->moribund = 0;
58                 c->status = Closed;
59                 for(m=c->mq; m && m != Zmsg; m=next){
60                         next = m->link;
61                         free(m);
62                 }
63                 c->mq = nil;
64                 if(c->rq != nil){
65                         for(r=c->rq; r; r=rnext){
66                                 rnext = r->aux;
67                                 respond(r, "hangup");
68                         }
69                         c->rq = nil;
70                 }
71                 if(c->wq != nil){
72                         for(r=c->wq; r; r=rnext){
73                                 rnext = r->aux;
74                                 respond(r, "hangup");
75                         }
76                         c->wq = nil;
77                 }
78                 c->rq = nil;
79                 c->wq = nil;
80                 c->emq = nil;
81                 c->erq = nil;
82                 c->ewq = nil;
83         }
84 }
85
86 void
87 closeclient(Client *c)
88 {
89         if(--c->ref == 0){
90                 if(c->pid > 0)
91                         postnote(PNPROC, c->pid, "kill");
92                 c->status = Hangup;
93                 close(c->fd[0]);
94                 c->fd[0] = c->fd[1] = -1;
95                 c->moribund = 1;
96                 kickwriter(c);
97                 iointerrupt(c->readerproc);
98                 iointerrupt(c->writerproc);             
99                 c->activethread++;
100                 die(c);
101         }
102 }
103
104 void
105 queuerdreq(Client *c, Req *r)
106 {
107         if(c->rq==nil)
108                 c->erq = &c->rq;
109         *c->erq = r;
110         r->aux = nil;
111         c->erq = (Req**)&r->aux;
112 }
113
114 void
115 queuewrreq(Client *c, Req *r)
116 {
117         if(c->wq==nil)
118                 c->ewq = &c->wq;
119         *c->ewq = r;
120         r->aux = nil;
121         c->ewq = (Req**)&r->aux;
122 }
123
124 void
125 queuemsg(Client *c, Msg *m)
126 {
127         if(c->mq==nil)
128                 c->emq = &c->mq;
129         *c->emq = m;
130         if(m != Zmsg){
131                 m->link = nil;
132                 c->emq = (Msg**)&m->link;
133         }else
134                 c->emq = nil;
135 }
136
137 void
138 matchmsgs(Client *c)
139 {
140         Req *r;
141         Msg *m;
142         int n, rm;
143
144         while(c->rq && c->mq){
145                 r = c->rq;
146                 c->rq = r->aux;
147
148                 rm = 0;
149                 m = c->mq;
150                 if(m == Zmsg){
151                         respond(r, "execnet: no more data");
152                         break;
153                 }
154                 n = r->ifcall.count;
155                 if(n >= m->ep - m->rp){
156                         n = m->ep - m->rp;
157                         c->mq = m->link;
158                         rm = 1;
159                 }
160                 if(n)
161                         memmove(r->ofcall.data, m->rp, n);
162                 if(rm)
163                         free(m);
164                 else
165                         m->rp += n;
166                 r->ofcall.count = n;
167                 respond(r, nil);
168         }
169 }
170
171 void
172 findrdreq(Client *c, Req *r)
173 {
174         Req **l;
175
176         for(l=&c->rq; *l; l=(Req**)&(*l)->aux){
177                 if(*l == r){
178                         *l = r->aux;
179                         if(*l == nil)
180                                 c->erq = l;
181                         respond(r, "interrupted");
182                         break;
183                 }
184         }
185 }
186
187 void
188 findwrreq(Client *c, Req *r)
189 {
190         Req **l;
191
192         for(l=&c->wq; *l; l=(Req**)&(*l)->aux){
193                 if(*l == r){
194                         *l = r->aux;
195                         if(*l == nil)
196                                 c->ewq = l;
197                         respond(r, "interrupted");
198                         return;
199                 }
200         }
201 }
202
203 void
204 dataread(Req *r, Client *c)
205 {
206         queuerdreq(c, r);
207         matchmsgs(c);
208 }
209
210 static void
211 readthread(void *a)
212 {
213         uchar *buf;
214         int n;
215         Client *c;
216         Ioproc *io;
217         Msg *m;
218
219         c = a;
220         threadsetname("read%d", c->num);
221
222         buf = emalloc(8192);
223         io = c->readerproc;
224         while((n = ioread(io, c->fd[0], buf, 8192)) >= 0){
225                 m = emalloc(sizeof(Msg)+n);
226                 m->rp = (uchar*)&m[1];
227                 m->ep = m->rp + n;
228                 if(n)
229                         memmove(m->rp, buf, n);
230                 queuemsg(c, m);
231                 matchmsgs(c);
232         }
233         queuemsg(c, Zmsg);
234         free(buf);
235         die(c);
236 }
237
238 static void
239 kickwriter(Client *c)
240 {
241         nbsendp(c->writerkick, nil);
242 }
243
244 void
245 clientflush(Req *or, Client *c)
246 {
247         if(or->ifcall.type == Tread)
248                 findrdreq(c, or);
249         else{
250                 if(c->execreq == or){
251                         c->execreq = nil;
252                         iointerrupt(c->writerproc);
253                         ioflush(c->writerproc);
254                 }
255                 findwrreq(c, or);
256                 if(c->curw == or){
257                         c->curw = nil;
258                         iointerrupt(c->writerproc);
259                         kickwriter(c);
260                 }
261         }
262 }
263
264 void
265 datawrite(Req *r, Client *c)
266 {
267         queuewrreq(c, r);
268         kickwriter(c);
269 }
270
271 static void
272 writethread(void *a)
273 {
274         char e[ERRMAX];
275         uchar *buf;
276         int n;
277         Ioproc *io;
278         Req *r;
279         Client *c;
280
281         c = a;
282         threadsetname("write%d", c->num);
283
284         buf = emalloc(8192);
285         io = c->writerproc;
286         for(;;){
287                 while(c->wq == nil){
288                         if(c->moribund)
289                                 goto Out;
290                         recvp(c->writerkick);
291                         if(c->moribund)
292                                 goto Out;
293                 }
294                 r = c->wq;
295                 c->wq = r->aux;
296                 c->curw = r;
297                 n = iowrite(io, c->fd[1], r->ifcall.data, r->ifcall.count);
298                 c->curw = nil;
299                 if(chatty9p)
300                         fprint(2, "io->write returns %d\n", n);
301                 if(n >= 0){
302                         r->ofcall.count = n;
303                         respond(r, nil);
304                 }else{
305                         rerrstr(e, sizeof e);
306                         respond(r, e);
307                 }
308         }
309 Out:
310         free(buf);
311         die(c);
312 }
313
314 static void
315 execproc(void *a)
316 {
317         int i, fd;
318         Client *c;
319
320         c = a;
321         threadsetname("execproc%d", c->num);
322         if(pipe(c->fd) < 0){
323                 rerrstr(c->err, sizeof c->err);
324                 sendul(c->execpid, -1);
325                 return;
326         }
327         rfork(RFFDG);
328         fd = c->fd[1];
329         close(c->fd[0]);
330         dup(fd, 0);
331         dup(fd, 1);
332         for(i=3; i<100; i++)    /* should do better */
333                 close(i);
334         strcpy(c->err, "exec failed");
335         procexecl(c->execpid, "/bin/rc", "rc", "-c", c->cmd, nil);
336 }
337
338 static void
339 execthread(void *a)
340 {
341         Client *c;
342         int p;
343
344         c = a;
345         threadsetname("exec%d", c->num);
346         c->execpid = chancreate(sizeof(ulong), 0);
347         proccreate(execproc, c, STACK);
348         p = recvul(c->execpid);
349         chanfree(c->execpid);
350         c->execpid = nil;
351         close(c->fd[1]);
352         c->fd[1] = c->fd[0];
353         if(p != -1){
354                 c->pid = p;
355                 c->activethread = 2;
356                 threadcreate(readthread, c, STACK);
357                 threadcreate(writethread, c, STACK);
358                 if(c->execreq)
359                         respond(c->execreq, nil);
360         }else{
361                 if(c->execreq)
362                         respond(c->execreq, c->err);
363         }
364 }
365
366 void
367 ctlwrite(Req *r, Client *c)
368 {
369         char *f[3], *s, *p;
370         int nf;
371
372         s = emalloc(r->ifcall.count+1);
373         memmove(s, r->ifcall.data, r->ifcall.count);
374         s[r->ifcall.count] = '\0';
375
376         f[0] = s;
377         p = strchr(s, ' ');
378         if(p == nil)
379                 nf = 1;
380         else{
381                 *p++ = '\0';
382                 f[1] = p;
383                 nf = 2;
384         }
385
386         if(f[0][0] == '\0'){
387                 free(s);
388                 respond(r, nil);
389                 return;
390         }
391
392         r->ofcall.count = r->ifcall.count;
393         if(strcmp(f[0], "hangup") == 0){
394                 if(c->pid == 0){
395                         respond(r, "connection already hung up");
396                         goto Out;
397                 }
398                 postnote(PNPROC, c->pid, "kill");
399                 respond(r, nil);
400                 goto Out;
401         }
402
403         if(strcmp(f[0], "connect") == 0){
404                 if(c->cmd != nocmd){
405                         respond(r, "already have connection");
406                         goto Out;
407                 }
408                 if(nf == 1){
409                         respond(r, "need argument to connect");
410                         goto Out;
411                 }
412                 c->status = Exec;
413                 if(p = strrchr(f[1], '!'))
414                         *p = '\0';
415                 c->cmd = emalloc(4+1+strlen(f[1])+1);
416                 strcpy(c->cmd, "exec ");
417                 strcat(c->cmd, f[1]);
418                 c->execreq = r;
419                 threadcreate(execthread, c, STACK);
420                 goto Out;
421         }
422
423         respond(r, "bad or inappropriate control message");
424 Out:
425         free(s);
426 }