]> git.lizzy.rs Git - plan9front.git/blob - sys/src/libventi/send.c
upas/Mail: avoid showing empty To: and CC: lines in compose windows
[plan9front.git] / sys / src / libventi / send.c
1 #include <u.h>
2 #include <libc.h>
3 #include <venti.h>
4 #include "queue.h"
5
6 long ventisendbytes, ventisendpackets;
7 long ventirecvbytes, ventirecvpackets;
8
9 static int
10 _vtsend(VtConn *z, Packet *p)
11 {
12         IOchunk ioc;
13         int n, tot;
14         uchar buf[2];
15
16         if(z->state != VtStateConnected) {
17                 werrstr("session not connected");
18                 return -1;
19         }
20
21         /* add framing */
22         n = packetsize(p);
23         if(n >= (1<<16)) {
24                 werrstr("packet too large");
25                 packetfree(p);
26                 return -1;
27         }
28         buf[0] = n>>8;
29         buf[1] = n;
30         packetprefix(p, buf, 2);
31         ventisendbytes += n+2;
32         ventisendpackets++;
33
34         tot = 0;
35         for(;;){
36                 n = packetfragments(p, &ioc, 1, 0);
37                 if(n == 0)
38                         break;
39                 if(write(z->outfd, ioc.addr, ioc.len) < ioc.len){
40                         vtlog(VtServerLog, "<font size=-1>%T %s:</font> sending packet %p: %r<br>\n", z->addr, p);
41                         packetfree(p);
42                         return -1;
43                 }
44                 packetconsume(p, nil, ioc.len);
45                 tot += ioc.len;
46         }
47         vtlog(VtServerLog, "<font size=-1>%T %s:</font> sent packet %p (%d bytes)<br>\n", z->addr, p, tot);
48         packetfree(p);
49         return 1;
50 }
51
52 static int
53 interrupted(void)
54 {
55         char e[ERRMAX];
56
57         rerrstr(e, sizeof e);
58         return strstr(e, "interrupted") != nil;
59 }
60
61
62 static Packet*
63 _vtrecv(VtConn *z)
64 {
65         uchar buf[10], *b;
66         int n;
67         Packet *p;
68         int size, len;
69
70         if(z->state != VtStateConnected) {
71                 werrstr("session not connected");
72                 return nil;
73         }
74
75         p = z->part;
76         /* get enough for head size */
77         size = packetsize(p);
78         while(size < 2) {
79                 b = packettrailer(p, 2);
80                 assert(b != nil);
81                 if(0) fprint(2, "%d read hdr\n", getpid());
82                 n = read(z->infd, b, 2);
83                 if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
84                 if(n==0 || (n<0 && !interrupted()))
85                         goto Err;
86                 size += n;
87                 packettrim(p, 0, size);
88         }
89
90         if(packetconsume(p, buf, 2) < 0)
91                 goto Err;
92         len = (buf[0] << 8) | buf[1];
93         size -= 2;
94
95         while(size < len) {
96                 n = len - size;
97                 if(n > MaxFragSize)
98                         n = MaxFragSize;
99                 b = packettrailer(p, n);
100                 if(0) fprint(2, "%d read body %d\n", getpid(), n);
101                 n = read(z->infd, b, n);
102                 if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
103                 if(n > 0)
104                         size += n;
105                 packettrim(p, 0, size);
106                 if(n==0 || (n<0 && !interrupted()))
107                         goto Err;
108         }
109         ventirecvbytes += len;
110         ventirecvpackets++;
111         p = packetsplit(p, len);
112         vtlog(VtServerLog, "<font size=-1>%T %s:</font> read packet %p len %d<br>\n", z->addr, p, len);
113         return p;
114 Err:    
115         vtlog(VtServerLog, "<font size=-1>%T %s:</font> error reading packet: %r<br>\n", z->addr);
116         return nil;     
117 }
118
119 /*
120  * If you fork off two procs running vtrecvproc and vtsendproc,
121  * then vtrecv/vtsend (and thus vtrpc) will never block except on 
122  * rendevouses, which is nice when it's running in one thread of many.
123  */
124 void
125 vtrecvproc(void *v)
126 {
127         Packet *p;
128         VtConn *z;
129         Queue *q;
130
131         z = v;
132         q = _vtqalloc();
133
134         qlock(&z->lk);
135         z->readq = q;
136         qlock(&z->inlk);
137         rwakeup(&z->rpcfork);
138         qunlock(&z->lk);
139
140         while((p = _vtrecv(z)) != nil)
141                 if(_vtqsend(q, p) < 0){
142                         packetfree(p);
143                         break;
144                 }
145         qunlock(&z->inlk);
146         qlock(&z->lk);
147         _vtqhangup(q);
148         while((p = _vtnbqrecv(q)) != nil)
149                 packetfree(p);
150         _vtqdecref(q);
151         z->readq = nil;
152         rwakeup(&z->rpcfork);
153         qunlock(&z->lk);
154         vthangup(z);
155 }
156
157 void
158 vtsendproc(void *v)
159 {
160         Queue *q;
161         Packet *p;
162         VtConn *z;
163
164         z = v;
165         q = _vtqalloc();
166
167         qlock(&z->lk);
168         z->writeq = q;
169         qlock(&z->outlk);
170         rwakeup(&z->rpcfork);
171         qunlock(&z->lk);
172
173         while((p = _vtqrecv(q)) != nil)
174                 if(_vtsend(z, p) < 0)
175                         break;
176         qunlock(&z->outlk);
177         qlock(&z->lk);
178         _vtqhangup(q);
179         while((p = _vtnbqrecv(q)) != nil)
180                 packetfree(p);
181         _vtqdecref(q);
182         z->writeq = nil;
183         rwakeup(&z->rpcfork);
184         qunlock(&z->lk);
185         return;
186 }
187
188 Packet*
189 vtrecv(VtConn *z)
190 {
191         Packet *p;
192         Queue *q;
193
194         qlock(&z->lk);
195         if(z->state != VtStateConnected){
196                 werrstr("not connected");
197                 qunlock(&z->lk);
198                 return nil;
199         }
200         if(z->readq){
201                 q = _vtqincref(z->readq);
202                 qunlock(&z->lk);
203                 p = _vtqrecv(q);
204                 _vtqdecref(q);
205                 return p;
206         }
207
208         qlock(&z->inlk);
209         qunlock(&z->lk);
210         p = _vtrecv(z);
211         qunlock(&z->inlk);
212         if(!p)
213                 vthangup(z);
214         return p;
215 }
216
217 int
218 vtsend(VtConn *z, Packet *p)
219 {
220         Queue *q;
221
222         qlock(&z->lk);
223         if(z->state != VtStateConnected){
224                 packetfree(p);
225                 werrstr("not connected");
226                 qunlock(&z->lk);
227                 return -1;
228         }
229         if(z->writeq){
230                 q = _vtqincref(z->writeq);
231                 qunlock(&z->lk);
232                 if(_vtqsend(q, p) < 0){
233                         _vtqdecref(q);
234                         packetfree(p);
235                         return -1;
236                 }
237                 _vtqdecref(q);
238                 return 0;
239         }
240
241         qlock(&z->outlk);
242         qunlock(&z->lk);
243         if(_vtsend(z, p) < 0){
244                 qunlock(&z->outlk);
245                 vthangup(z);
246                 return -1;      
247         }
248         qunlock(&z->outlk);
249         return 0;
250 }
251