9 typedef struct Out Out;
12 char err[ERRMAX]; /* error string */
13 Channel *creply; /* send to finish rpc */
14 uchar *p; /* pending request packet */
15 int n; /* size of request */
16 ulong tag; /* flush tag of pending request */
17 ulong xid; /* xid of pending request */
18 ulong st; /* first send time */
19 ulong t; /* resend time */
20 int nresend; /* number of resends */
21 SunRpc rpc; /* response rpc */
31 enum { BufSize = 65536 };
34 buf = emalloc(BufSize);
38 n = ioread(io, cli->fd, buf, BufSize);
47 if(sendp(cli->readchan, p) == 0)
53 while(send(cli->dying, nil) == -1)
71 n = ioreadn(io, cli->fd, buf, 4);
74 n = (buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|buf[3];
76 fprint(2, "%.8ux...", n);
83 p = erealloc(p, tot+n);
84 if(ioreadn(io, cli->fd, p+tot, n) != n)
92 if(sendp(cli->readchan, p) == 0)
100 while(send(cli->dying, 0) == -1)
113 if(iosleep(io, 200) < 0)
115 if(sendul(cli->timerchan, 0) == 0)
119 while(send(cli->dying, 0) == -1)
126 return nsec()/1000000;
130 twait(ulong rtt, int nresend)
137 else if(nresend <= 3)
139 else if(nresend <= 18)
150 rpcMuxThread(void *v)
153 int i, n, nout, mout;
154 ulong t, xidgen, tag;
163 out = emalloc(mout*sizeof(out[0]));
167 a[0].c = cli->rpcchan;
170 a[1].c = cli->timerchan;
173 a[2].c = cli->flushchan;
176 a[3].c = cli->readchan;
182 case 0: /* o = <-rpcchan */
196 if(write(cli->fd, o->p, o->n) != o->n){
199 snprint(o->err, sizeof o->err, "write: %r");
205 out = erealloc(out, mout*sizeof(out[0]));
209 o->t = o->st + twait(cli->rtt.avg, 0);
210 if(cli->chatty) fprint(2, "send %lux %lud %lud\n", o->xid, o->st, o->t);
215 case 1: /* <-timerchan */
217 for(i=0; i<nout; i++){
219 if((int)(t - o->t) > 0){
220 if(cli->chatty) fprint(2, "resend %lux %lud %lud\n", o->xid, t, o->t);
221 if(cli->maxwait && t - o->st >= cli->maxwait){
224 strcpy(o->err, "timeout");
226 out[i--] = out[--nout];
231 o->t = t + twait(cli->rtt.avg, o->nresend);
232 if(write(cli->fd, o->p, o->n) != o->n){
235 snprint(o->err, sizeof o->err, "rewrite: %r");
237 out[i--] = out[--nout];
242 /* stop ticking if no work; rpcchan will turn it back on */
247 case 2: /* tag = <-flushchan */
248 for(i=0; i<nout; i++){
251 out[i--] = out[--nout];
252 strcpy(o->err, "flushed");
260 case 3: /* buf = <-readchan */
262 n = (p[0]<<24)|(p[1]<<16)|(p[2]<<8)|p[3];
265 if(sunRpcUnpack(p, ep, &p, &rpc) < 0){
266 fprint(2, "in: %.*H unpack failed\n", n, buf+4);
271 fprint(2, "in: %B\n", &rpc);
273 fprint(2, "did not get reply\n");
278 for(i=0; i<nout; i++){
280 if(o->xid == rpc.xid)
284 if(cli->chatty) fprint(2, "did not find waiting request\n");
288 out[i] = out[--nout];
291 if(rpc.status == SunSuccess){
297 sunErrstr(rpc.status);
298 rerrstr(o->err, sizeof o->err);
306 sendp(cli->dying, 0);
310 sunDial(char *address)
315 if((fd = dial(address, 0, 0, 0)) < 0)
318 cli = emalloc(sizeof(SunClient));
320 cli->maxwait = 15000;
322 cli->dying = chancreate(sizeof(void*), 0);
323 cli->rpcchan = chancreate(sizeof(Out*), 0);
324 cli->timerchan = chancreate(sizeof(ulong), 0);
325 cli->flushchan = chancreate(sizeof(ulong), 0);
326 cli->readchan = chancreate(sizeof(uchar*), 0);
327 if(strstr(address, "udp!")){
329 cli->nettid = threadcreate(udpThread, cli, SunStackSize);
330 cli->timertid = threadcreate(timerThread, cli, SunStackSize);
333 cli->nettid = threadcreate(netThread, cli, SunStackSize);
334 /* assume reliable: don't need timer */
335 /* BUG: netThread should know how to redial */
337 threadcreate(rpcMuxThread, cli, SunStackSize);
343 sunClientClose(SunClient *cli)
348 * Threadints get you out of any stuck system calls
349 * or thread rendezvouses, but do nothing if the thread
350 * is in the ready state. Keep interrupting until it takes.
356 threadint(cli->nettid);
358 threadint(cli->timertid);
360 while(nbrecv(cli->dying, nil) == 1)
364 sendp(cli->rpcchan, 0);
367 /* everyone's gone: clean up */
369 chanfree(cli->flushchan);
370 chanfree(cli->readchan);
371 chanfree(cli->timerchan);
376 sunClientFlushRpc(SunClient *cli, ulong tag)
378 sendul(cli->flushchan, tag);
382 sunClientProg(SunClient *cli, SunProg *p)
384 if(cli->nprog%16 == 0)
385 cli->prog = erealloc(cli->prog, (cli->nprog+16)*sizeof(cli->prog[0]));
386 cli->prog[cli->nprog++] = p;
390 sunClientRpc(SunClient *cli, ulong tag, SunCall *tx, SunCall *rx, uchar **tofree)
393 int i, n1, n2, n, nn;
398 for(i=0; i<cli->nprog; i++)
399 if(cli->prog[i]->prog == tx->rpc.prog && cli->prog[i]->vers == tx->rpc.vers)
402 werrstr("unknown sun rpc program %d version %d", tx->rpc.prog, tx->rpc.vers);
408 fprint(2, "out: %B\n", &tx->rpc);
409 fprint(2, "\t%C\n", tx);
412 n1 = sunRpcSize(&tx->rpc);
413 n2 = sunCallSize(prog, tx);
424 p[0] = (nn>>24)|0x80;
430 if((ok = sunRpcPack(p, ep, &p, &tx->rpc)) != SunSuccess
431 || (ok = sunCallPack(prog, p, ep, &p, tx)) != SunSuccess){
437 werrstr("rpc: packet size mismatch");
442 memset(&o, 0, sizeof o);
443 o.creply = chancreate(sizeof(void*), 0);
448 sendp(cli->rpcchan, &o);
453 werrstr("%s", o.err);
460 rx->rpc.proc = tx->rpc.proc;
461 rx->rpc.prog = tx->rpc.prog;
462 rx->rpc.vers = tx->rpc.vers;
463 rx->type = (rx->rpc.proc<<1)|1;
464 if((ok = sunCallUnpack(prog, p, ep, &p, rx)) != SunSuccess){
466 werrstr("unpack: %r");
472 fprint(2, "in: %B\n", &rx->rpc);
473 fprint(2, "in:\t%C\n", rx);