]> git.lizzy.rs Git - plan9front.git/blob - sys/src/libsunrpc/client.c
archacpi: make *acpi=1 the default
[plan9front.git] / sys / src / libsunrpc / client.c
1 /*
2  * Sun RPC client.
3  */
4 #include <u.h>
5 #include <libc.h>
6 #include <thread.h>
7 #include <sunrpc.h>
8
9 typedef struct Out Out;
10 struct Out
11 {
12         char err[ERRMAX];       /* error string */
13         Channel *creply;        /* send to finish rpc */
14         uchar *p;                       /* pending request packet */
15         int n;                          /* size of request */
16         ulong tag;                      /* flush tag of pending request */
17         ulong xid;                      /* xid of pending request */
18         ulong st;                       /* first send time */
19         ulong t;                        /* resend time */
20         int nresend;            /* number of resends */
21         SunRpc rpc;             /* response rpc */
22 };
23
24 static void
25 udpThread(void *v)
26 {
27         uchar *p, *buf;
28         Ioproc *io;
29         int n;
30         SunClient *cli;
31         enum { BufSize = 65536 };
32
33         cli = v;
34         buf = emalloc(BufSize);
35         io = ioproc();
36         p = nil;
37         for(;;){
38                 n = ioread(io, cli->fd, buf, BufSize);
39                 if(n <= 0)
40                         break;
41                 p = emalloc(4+n);
42                 memmove(p+4, buf, n);
43                 p[0] = n>>24;
44                 p[1] = n>>16;
45                 p[2] = n>>8;
46                 p[3] = n;
47                 if(sendp(cli->readchan, p) == 0)
48                         break;
49                 p = nil;
50         }
51         free(p);
52         closeioproc(io);
53         while(send(cli->dying, nil) == -1)
54                 ;
55 }
56
57 static void
58 netThread(void *v)
59 {
60         uchar *p, buf[4];
61         Ioproc *io;
62         uint n, tot;
63         int done;
64         SunClient *cli;
65
66         cli = v;
67         io = ioproc();
68         tot = 0;
69         p = nil;
70         for(;;){
71                 n = ioreadn(io, cli->fd, buf, 4);
72                 if(n != 4)
73                         break;
74                 n = (buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|buf[3];
75                 if(cli->chatty)
76                         fprint(2, "%.8ux...", n);
77                 done = n&0x80000000;
78                 n &= ~0x80000000;
79                 if(tot == 0){
80                         p = emalloc(4+n);
81                         tot = 4;
82                 }else
83                         p = erealloc(p, tot+n);
84                 if(ioreadn(io, cli->fd, p+tot, n) != n)
85                         break;
86                 tot += n;
87                 if(done){
88                         p[0] = tot>>24;
89                         p[1] = tot>>16;
90                         p[2] = tot>>8;
91                         p[3] = tot;
92                         if(sendp(cli->readchan, p) == 0)
93                                 break;
94                         p = nil;
95                         tot = 0;
96                 }
97         }
98         free(p);
99         closeioproc(io);
100         while(send(cli->dying, 0) == -1)
101                 ;
102 }
103
104 static void
105 timerThread(void *v)
106 {
107         Ioproc *io;
108         SunClient *cli;
109
110         cli = v;
111         io = ioproc();
112         for(;;){
113                 if(iosleep(io, 200) < 0)
114                         break;
115                 if(sendul(cli->timerchan, 0) == 0)
116                         break;
117         }
118         closeioproc(io);
119         while(send(cli->dying, 0) == -1)
120                 ;
121 }
122
123 static ulong
124 msec(void)
125 {
126         return nsec()/1000000;
127 }
128
129 static ulong
130 twait(ulong rtt, int nresend)
131 {
132         ulong t;
133
134         t = rtt;
135         if(nresend <= 1)
136                 {}
137         else if(nresend <= 3)
138                 t *= 2;
139         else if(nresend <= 18)
140                 t <<= nresend-2;
141         else
142                 t = 60*1000;
143         if(t > 60*1000)
144                 t = 60*1000;
145
146         return t;
147 }
148
149 static void
150 rpcMuxThread(void *v)
151 {
152         uchar *buf, *p, *ep;
153         int i, n, nout, mout;
154         ulong t, xidgen, tag;
155         Alt a[5];
156         Out *o, **out;
157         SunRpc rpc;
158         SunClient *cli;
159
160         cli = v;
161         mout = 16;
162         nout = 0;
163         out = emalloc(mout*sizeof(out[0]));
164         xidgen = truerand();
165
166         a[0].op = CHANRCV;
167         a[0].c = cli->rpcchan;
168         a[0].v = &o;
169         a[1].op = CHANNOP;
170         a[1].c = cli->timerchan;
171         a[1].v = nil;
172         a[2].op = CHANRCV;
173         a[2].c = cli->flushchan;
174         a[2].v = &tag;
175         a[3].op = CHANRCV;
176         a[3].c = cli->readchan;
177         a[3].v = &buf;
178         a[4].op = CHANEND;
179
180         for(;;){
181                 switch(alt(a)){
182                 case 0: /* o = <-rpcchan */
183                         if(o == nil)
184                                 goto Done;
185                         cli->nsend++;
186                         /* set xid */
187                         o->xid = ++xidgen;
188                         if(cli->needcount)
189                                 p = o->p+4;
190                         else
191                                 p = o->p;
192                         p[0] = xidgen>>24;
193                         p[1] = xidgen>>16;
194                         p[2] = xidgen>>8;
195                         p[3] = xidgen;
196                         if(write(cli->fd, o->p, o->n) != o->n){
197                                 free(o->p);
198                                 o->p = nil;
199                                 snprint(o->err, sizeof o->err, "write: %r");
200                                 sendp(o->creply, 0);
201                                 break;
202                         }
203                         if(nout >= mout){
204                                 mout *= 2;
205                                 out = erealloc(out, mout*sizeof(out[0]));
206                         }
207                         o->st = msec();
208                         o->nresend = 0;
209                         o->t = o->st + twait(cli->rtt.avg, 0);
210 if(cli->chatty) fprint(2, "send %lux %lud %lud\n", o->xid, o->st, o->t);
211                         out[nout++] = o;
212                         a[1].op = CHANRCV;
213                         break;
214
215                 case 1: /* <-timerchan */
216                         t = msec();
217                         for(i=0; i<nout; i++){
218                                 o = out[i];
219                                 if((int)(t - o->t) > 0){
220 if(cli->chatty) fprint(2, "resend %lux %lud %lud\n", o->xid, t, o->t);
221                                         if(cli->maxwait && t - o->st >= cli->maxwait){
222                                                 free(o->p);
223                                                 o->p = nil;
224                                                 strcpy(o->err, "timeout");
225                                                 sendp(o->creply, 0);
226                                                 out[i--] = out[--nout];
227                                                 continue;
228                                         }
229                                         cli->nresend++;
230                                         o->nresend++;
231                                         o->t = t + twait(cli->rtt.avg, o->nresend);
232                                         if(write(cli->fd, o->p, o->n) != o->n){
233                                                 free(o->p);
234                                                 o->p = nil;
235                                                 snprint(o->err, sizeof o->err, "rewrite: %r");
236                                                 sendp(o->creply, 0);
237                                                 out[i--] = out[--nout];
238                                                 continue;
239                                         }
240                                 }
241                         }
242                         /* stop ticking if no work; rpcchan will turn it back on */
243                         if(nout == 0)
244                                 a[1].op = CHANNOP;
245                         break;
246                         
247                 case 2: /* tag = <-flushchan */
248                         for(i=0; i<nout; i++){
249                                 o = out[i];
250                                 if(o->tag == tag){
251                                         out[i--] = out[--nout];
252                                         strcpy(o->err, "flushed");
253                                         free(o->p);
254                                         o->p = nil;
255                                         sendp(o->creply, 0);
256                                 }
257                         }
258                         break;
259
260                 case 3: /* buf = <-readchan */
261                         p = buf;
262                         n = (p[0]<<24)|(p[1]<<16)|(p[2]<<8)|p[3];
263                         p += 4;
264                         ep = p+n;
265                         if(sunRpcUnpack(p, ep, &p, &rpc) < 0){
266                                 fprint(2, "in: %.*H unpack failed\n", n, buf+4);
267                                 free(buf);
268                                 break;
269                         }
270                         if(cli->chatty)
271                                 fprint(2, "in: %B\n", &rpc);
272                         if(rpc.iscall){
273                                 fprint(2, "did not get reply\n");
274                                 free(buf);
275                                 break;
276                         }
277                         o = nil;
278                         for(i=0; i<nout; i++){
279                                 o = out[i];
280                                 if(o->xid == rpc.xid)
281                                         break;
282                         }
283                         if(i==nout){
284                                 if(cli->chatty) fprint(2, "did not find waiting request\n");
285                                 free(buf);
286                                 break;
287                         }
288                         out[i] = out[--nout];
289                         free(o->p);
290                         o->p = nil;
291                         if(rpc.status == SunSuccess){
292                                 o->p = buf;
293                                 o->rpc = rpc;
294                         }else{
295                                 o->p = nil;
296                                 free(buf);
297                                 sunErrstr(rpc.status);
298                                 rerrstr(o->err, sizeof o->err);
299                         }
300                         sendp(o->creply, 0);
301                         break;
302                 }
303         }
304 Done:
305         free(out);
306         sendp(cli->dying, 0);
307 }
308
309 SunClient*
310 sunDial(char *address)
311 {
312         int fd;
313         SunClient *cli;
314
315         if((fd = dial(address, 0, 0, 0)) < 0)
316                 return nil;
317
318         cli = emalloc(sizeof(SunClient));
319         cli->fd = fd;
320         cli->maxwait = 15000;
321         cli->rtt.avg = 1000;
322         cli->dying = chancreate(sizeof(void*), 0);
323         cli->rpcchan = chancreate(sizeof(Out*), 0);
324         cli->timerchan = chancreate(sizeof(ulong), 0);
325         cli->flushchan = chancreate(sizeof(ulong), 0);
326         cli->readchan = chancreate(sizeof(uchar*), 0);
327         if(strstr(address, "udp!")){
328                 cli->needcount = 0;
329                 cli->nettid = threadcreate(udpThread, cli, SunStackSize);
330                 cli->timertid = threadcreate(timerThread, cli, SunStackSize);
331         }else{
332                 cli->needcount = 1;
333                 cli->nettid = threadcreate(netThread, cli, SunStackSize);
334                 /* assume reliable: don't need timer */
335                 /* BUG: netThread should know how to redial */
336         }
337         threadcreate(rpcMuxThread, cli, SunStackSize);
338
339         return cli;
340 }
341
342 void
343 sunClientClose(SunClient *cli)
344 {
345         int n;
346
347         /*
348          * Threadints get you out of any stuck system calls
349          * or thread rendezvouses, but do nothing if the thread
350          * is in the ready state.  Keep interrupting until it takes.
351          */
352         n = 0;
353         if(!cli->timertid)
354                 n++;
355         while(n < 2){
356                 threadint(cli->nettid);
357                 if(cli->timertid)
358                         threadint(cli->timertid);
359                 yield();
360                 while(nbrecv(cli->dying, nil) == 1)
361                         n++;
362         }
363
364         sendp(cli->rpcchan, 0);
365         recvp(cli->dying);
366
367         /* everyone's gone: clean up */
368         close(cli->fd);
369         chanfree(cli->flushchan);
370         chanfree(cli->readchan);
371         chanfree(cli->timerchan);
372         free(cli);
373 }
374         
375 void
376 sunClientFlushRpc(SunClient *cli, ulong tag)
377 {
378         sendul(cli->flushchan, tag);
379 }
380
381 void
382 sunClientProg(SunClient *cli, SunProg *p)
383 {
384         if(cli->nprog%16 == 0)
385                 cli->prog = erealloc(cli->prog, (cli->nprog+16)*sizeof(cli->prog[0]));
386         cli->prog[cli->nprog++] = p;
387 }
388
389 int
390 sunClientRpc(SunClient *cli, ulong tag, SunCall *tx, SunCall *rx, uchar **tofree)
391 {
392         uchar *bp, *p, *ep;
393         int i, n1, n2, n, nn;
394         Out o;
395         SunProg *prog;
396         SunStatus ok;
397
398         for(i=0; i<cli->nprog; i++)
399                 if(cli->prog[i]->prog == tx->rpc.prog && cli->prog[i]->vers == tx->rpc.vers)
400                         break;
401         if(i==cli->nprog){
402                 werrstr("unknown sun rpc program %d version %d", tx->rpc.prog, tx->rpc.vers);
403                 return -1;
404         }
405         prog = cli->prog[i];
406
407         if(cli->chatty){
408                 fprint(2, "out: %B\n", &tx->rpc);
409                 fprint(2, "\t%C\n", tx);
410         }
411
412         n1 = sunRpcSize(&tx->rpc);
413         n2 = sunCallSize(prog, tx);
414
415         n = n1+n2;
416         if(cli->needcount)
417                 n += 4;
418
419         bp = emalloc(n);
420         ep = bp+n;
421         p = bp;
422         if(cli->needcount){
423                 nn = n-4;
424                 p[0] = (nn>>24)|0x80;
425                 p[1] = nn>>16;
426                 p[2] = nn>>8;
427                 p[3] = nn;
428                 p += 4;
429         }
430         if((ok = sunRpcPack(p, ep, &p, &tx->rpc)) != SunSuccess
431         || (ok = sunCallPack(prog, p, ep, &p, tx)) != SunSuccess){
432                 sunErrstr(ok);
433                 free(bp);
434                 return -1;
435         }
436         if(p != ep){
437                 werrstr("rpc: packet size mismatch");
438                 free(bp);
439                 return -1;
440         }
441
442         memset(&o, 0, sizeof o);
443         o.creply = chancreate(sizeof(void*), 0);
444         o.tag = tag;
445         o.p = bp;
446         o.n = n;
447
448         sendp(cli->rpcchan, &o);
449         recvp(o.creply);
450         chanfree(o.creply);
451
452         if(o.p == nil){
453                 werrstr("%s", o.err);
454                 return -1;
455         }
456
457         p = o.rpc.data;
458         ep = p+o.rpc.ndata;
459         rx->rpc = o.rpc;
460         rx->rpc.proc = tx->rpc.proc;
461         rx->rpc.prog = tx->rpc.prog;
462         rx->rpc.vers = tx->rpc.vers;
463         rx->type = (rx->rpc.proc<<1)|1;
464         if((ok = sunCallUnpack(prog, p, ep, &p, rx)) != SunSuccess){
465                 sunErrstr(ok);
466                 werrstr("unpack: %r");
467                 free(o.p);
468                 return -1;
469         }
470
471         if(cli->chatty){
472                 fprint(2, "in: %B\n", &rx->rpc);
473                 fprint(2, "in:\t%C\n", rx);
474         }
475
476         if(tofree)
477                 *tofree = o.p;
478         else
479                 free(o.p);
480
481         return 0;
482 }