]> git.lizzy.rs Git - plan9front.git/blob - sys/src/libventi/rpc.c
ndb/dns: lookup *all* entries in dblookup(), v4 and v6 queries in parallel, remove...
[plan9front.git] / sys / src / libventi / rpc.c
1 /*
2  * Multiplexed Venti client.  It would be nice if we 
3  * could turn this into a generic library routine rather
4  * than keep it Venti specific.  A user-level 9P client
5  * could use something like this too.
6  * 
7  * (Actually it does - this should be replaced with libmux,
8  * which should be renamed librpcmux.)
9  *
10  * This is a little more complicated than it might be
11  * because we want it to work well within and without libthread.
12  *
13  * The mux code is inspired by tra's, which is inspired by the Plan 9 kernel.
14  */
15
16 #include <u.h>
17 #include <libc.h>
18 #include <venti.h>
19
20 typedef struct Rwait Rwait;
21 struct Rwait
22 {
23         Rendez r;
24         Packet *p;
25         int done;
26         int sleeping;
27 };
28
29 static int gettag(VtConn*, Rwait*);
30 static void puttag(VtConn*, Rwait*, int);
31 static void muxrpc(VtConn*, Packet*);
32
33 Packet*
34 _vtrpc(VtConn *z, Packet *p, VtFcall *tx)
35 {
36         int i;
37         uchar tag, buf[2], *top;
38         Rwait *r, *rr;
39
40  
41         if(z == nil){
42                 werrstr("not connected");
43                 packetfree(p);
44                 return nil;
45         }
46
47         /* must malloc because stack could be private */
48         r = vtmallocz(sizeof(Rwait));
49
50         qlock(&z->lk);
51         r->r.l = &z->lk;
52         tag = gettag(z, r);
53         if(tx){
54                 /* vtfcallrpc can't print packet because it doesn't have tag */
55                 tx->tag = tag;
56                 if(chattyventi)
57                         fprint(2, "%s -> %F\n", argv0, tx);
58         }
59
60         /* slam tag into packet */
61         top = packetpeek(p, buf, 0, 2);
62         if(top == nil){
63                 packetfree(p);
64                 return nil;
65         }
66         if(top == buf){
67                 werrstr("first two bytes must be in same packet fragment");
68                 packetfree(p);
69                 vtfree(r);
70                 return nil;
71         }
72         top[1] = tag;
73         qunlock(&z->lk);
74         if(vtsend(z, p) < 0){
75                 vtfree(r);
76                 return nil;
77         }
78
79         qlock(&z->lk);
80         /* wait for the muxer to give us our packet */
81         r->sleeping = 1;
82         z->nsleep++;
83         while(z->muxer && !r->done)
84                 rsleep(&r->r);
85         z->nsleep--;
86         r->sleeping = 0;
87
88         /* if not done, there's no muxer: start muxing */
89         if(!r->done){
90                 if(z->muxer)
91                         abort();
92                 z->muxer = 1;
93                 while(!r->done){
94                         qunlock(&z->lk);
95                         if((p = vtrecv(z)) == nil){
96                                 werrstr("unexpected eof on venti connection");
97                                 z->muxer = 0;
98                                 vtfree(r);
99                                 return nil;
100                         }
101                         qlock(&z->lk);
102                         muxrpc(z, p);
103                 }
104                 z->muxer = 0;
105                 /* if there is anyone else sleeping, wake first unfinished to mux */
106                 if(z->nsleep)
107                 for(i=0; i<256; i++){
108                         rr = z->wait[i];
109                         if(rr && rr->sleeping && !rr->done){
110                                 rwakeup(&rr->r);
111                                 break;
112                         }
113                 }
114         }
115
116         p = r->p;
117         puttag(z, r, tag);
118         vtfree(r);
119         qunlock(&z->lk);
120         return p;
121 }
122
123 Packet*
124 vtrpc(VtConn *z, Packet *p)
125 {
126         return _vtrpc(z, p, nil);
127 }
128
129 static int 
130 gettag(VtConn *z, Rwait *r)
131 {
132         int i;
133
134 Again:
135         while(z->ntag == 256)
136                 rsleep(&z->tagrend);
137         for(i=0; i<256; i++)
138                 if(z->wait[i] == 0){
139                         z->ntag++;
140                         z->wait[i] = r;
141                         return i;
142                 }
143         fprint(2, "libventi: ntag botch\n");
144         goto Again;
145 }
146
147 static void
148 puttag(VtConn *z, Rwait *r, int tag)
149 {
150         assert(z->wait[tag] == r);
151         z->wait[tag] = nil;
152         z->ntag--;
153         rwakeup(&z->tagrend);
154 }
155
156 static void
157 muxrpc(VtConn *z, Packet *p)
158 {
159         uchar tag, buf[2], *top;
160         Rwait *r;
161
162         if((top = packetpeek(p, buf, 0, 2)) == nil){
163                 fprint(2, "libventi: short packet in vtrpc\n");
164                 packetfree(p);
165                 return;
166         }
167
168         tag = top[1];
169         if((r = z->wait[tag]) == nil){
170                 fprint(2, "libventi: unexpected packet tag %d in vtrpc\n", tag);
171 abort();
172                 packetfree(p);
173                 return;
174         }
175
176         r->p = p;
177         r->done = 1;
178         rwakeup(&r->r);
179 }
180