]> git.lizzy.rs Git - plan9front.git/blob - sys/src/cmd/aan.c
abaco: cleanup, handle image/x-icon, don't use backspace as a hotkey, and remove...
[plan9front.git] / sys / src / cmd / aan.c
1 #include <u.h>
2 #include <libc.h>
3 #include <auth.h>
4 #include <fcall.h>
5 #include <thread.h>
6
7 #define NS(x)   ((vlong)x)
8 #define US(x)   (NS(x) * 1000LL)
9 #define MS(x)   (US(x) * 1000LL)
10 #define S(x)    (MS(x) * 1000LL)
11
12 enum {
13         Synctime = S(8),
14         Nbuf = 10,
15         K = 1024,
16         Bufsize = 8 * K,
17         Stacksize = 8 * K,
18         Timer = 0,                              // Alt channels.
19         Unsent = 1,
20         Maxto = 24 * 3600,                      // A full day to reconnect.
21         Hdrsz = 3*4,
22 };
23
24 typedef struct Endpoints Endpoints;
25 struct Endpoints {
26         char    *lsys;
27         char    *lserv;
28         char    *rsys;
29         char    *rserv;
30 };
31
32 typedef struct {
33         uchar   nb[4];          // Number of data bytes in this message
34         uchar   msg[4];         // Message number
35         uchar   acked[4];       // Number of messages acked
36 } Hdr;
37
38 typedef struct t_Buf {
39         Hdr     hdr;
40         uchar   buf[Bufsize];
41 } Buf;
42
43 static Channel  *unsent;
44 static Channel  *unacked;
45 static Channel  *empty;
46 static int      netfd;
47 static int      inmsg;
48 static char     *devdir;
49 static int      debug;
50 static int      done;
51 static char     *dialstring;
52 static int      maxto = Maxto;
53 static char     *Logname = "aan";
54 static int      client;
55
56 static Alt a[] = {
57         /*      c       v        op   */
58         {       nil,    nil,    CHANRCV                 },      // timer
59         {       nil,    nil,    CHANRCV                 },      // unsent
60         {       nil,    nil,    CHANEND         },
61 };
62
63 static void             fromnet(void*);
64 static void             fromclient(void*);
65 static void             reconnect(int);
66 static void             synchronize(void);
67 static int              sendcommand(ulong, ulong);
68 static void             showmsg(int, char *, Buf *);
69 static int              writen(int, uchar *, int);
70 static void             dmessage(int, char *, ...);
71 static void             timerproc(void *);
72
73 static void
74 usage(void)
75 {
76         fprint(2, "Usage: %s [-cd] [-m maxto] dialstring|netdir\n", argv0);
77         exits("usage");
78 }
79
80 static int
81 catch(void *, char *s)
82 {
83         if (!strcmp(s, "alarm")) {
84                 syslog(0, Logname, "Timed out while waiting for reconnect, exiting...");
85                 threadexitsall(nil);
86         }
87         return 0;
88 }
89
90 static void*
91 emalloc(int n)
92 {
93         ulong pc;
94         void *v;
95
96         pc = getcallerpc(&n);
97         v = malloc(n);
98         if(v == nil)
99                 sysfatal("Cannot allocate memory; pc=%lux", pc);
100         setmalloctag(v, pc);
101         return v;
102 }
103
104 void
105 threadmain(int argc, char **argv)
106 {
107         vlong synctime;
108         int i, n, failed;
109         Channel *timer;
110         Buf *b;
111
112         ARGBEGIN {
113         case 'c':
114                 client++;
115                 break;
116         case 'd':
117                 debug++;
118                 break;
119         case 'm':
120                 maxto = (int)strtol(EARGF(usage()), nil, 0);
121                 break;
122         default:
123                 usage();
124         } ARGEND;
125
126         if (argc != 1)
127                 usage();
128
129         if (!client) {
130                 char *p;
131
132                 devdir = argv[0];
133                 if ((p = strstr(devdir, "/local")) != nil)
134                         *p = '\0';
135         }
136         else
137                 dialstring = argv[0];
138
139         if (debug > 0) {
140                 int fd = open("#c/cons", OWRITE|OCEXEC);        
141                 dup(fd, 2);
142         }
143
144         fmtinstall('F', fcallfmt);
145
146         atnotify(catch, 1);
147
148         /*
149          * Set up initial connection. use short timeout
150          * of 60 seconds so we wont hang arround for too
151          * long if there is some general connection problem
152          * (like NAT).
153          */
154         netfd = -1;
155         reconnect(60);
156
157         unsent = chancreate(sizeof(Buf *), Nbuf);
158         unacked = chancreate(sizeof(Buf *), Nbuf);
159         empty = chancreate(sizeof(Buf *), Nbuf);
160         timer = chancreate(sizeof(uchar *), 1);
161         if(unsent == nil || unacked == nil || empty == nil || timer == nil)
162                 sysfatal("Cannot allocate channels");
163
164         for (i = 0; i < Nbuf; i++)
165                 sendp(empty, emalloc(sizeof(Buf)));
166
167         if (proccreate(fromnet, nil, Stacksize) < 0)
168                 sysfatal("Cannot start fromnet; %r");
169
170         if (proccreate(fromclient, nil, Stacksize) < 0)
171                 sysfatal("Cannot start fromclient; %r");
172
173         if (proccreate(timerproc, timer, Stacksize) < 0)
174                 sysfatal("Cannot start timerproc; %r");
175
176         a[Timer].c = timer;
177         a[Unsent].c = unsent;
178         a[Unsent].v = &b;
179
180         synctime = nsec() + Synctime;
181         failed = 0;
182         while (!done) {
183                 if (failed) {
184                         // Wait for the netreader to die.
185                         while (netfd >= 0) {
186                                 dmessage(1, "main; waiting for netreader to die\n");
187                                 sleep(1000);
188                         }
189
190                         // the reader died; reestablish the world.
191                         reconnect(maxto);
192                         synchronize();
193                         failed = 0;
194                 }
195
196                 if (nsec() >= synctime) {
197                         Hdr hdr;
198
199                         PBIT32(hdr.nb, 0);
200                         PBIT32(hdr.acked, inmsg);
201                         PBIT32(hdr.msg, -1);
202
203                         if (writen(netfd, (uchar *)&hdr, Hdrsz) < 0) {
204                                 dmessage(2, "main; writen failed; %r\n");
205                                 failed = 1;
206                                 continue;
207                         }
208                         synctime = nsec() + Synctime;
209                 }
210
211                 switch (alt(a)) {
212                 case Timer:
213                         break;
214
215                 case Unsent:
216                         sendp(unacked, b);
217
218                         PBIT32(b->hdr.acked, inmsg);
219
220                         if (writen(netfd, (uchar *)&b->hdr, Hdrsz) < 0) {
221                                 dmessage(2, "main; writen failed; %r\n");
222                                 failed = 1;
223                         }
224
225                         n = GBIT32(b->hdr.nb);
226                         if (writen(netfd, b->buf, n) < 0) {
227                                 dmessage(2, "main; writen failed; %r\n");
228                                 failed = 1;
229                         }
230
231                         if (n == 0)
232                                 done = 1;
233                         break;
234                 }
235         }
236         syslog(0, Logname, "exiting...");
237         threadexitsall(nil);
238 }
239
240
241 static void
242 fromclient(void*)
243 {
244         static int outmsg;
245         int n;
246         Buf *b;
247
248         threadsetname("fromclient");
249
250         do {
251                 b = recvp(empty);
252                 n = read(0, b->buf, Bufsize);
253                 if (n <= 0) {
254                         if (n < 0)
255                                 dmessage(2, "fromclient; Cannot read 9P message; %r\n");
256                         else
257                                 dmessage(2, "fromclient; Client terminated\n");
258                         n = 0;
259                 }
260                 PBIT32(b->hdr.nb, n);
261                 PBIT32(b->hdr.msg, outmsg);
262                 showmsg(1, "fromclient", b);
263                 sendp(unsent, b);
264                 outmsg++;
265         } while(n > 0);
266 }
267
268 static void
269 fromnet(void*)
270 {
271         static int lastacked;
272         int n, m, len, acked;
273         Buf *b;
274
275         threadsetname("fromnet");
276
277         b = emalloc(sizeof(Buf));
278         while (!done) {
279                 while (netfd < 0) {
280                         if(done)
281                                 return;
282                         dmessage(1, "fromnet; waiting for connection... (inmsg %d)\n", inmsg);
283                         sleep(1000);
284                 }
285
286                 // Read the header.
287                 len = readn(netfd, (uchar *)&b->hdr, Hdrsz);
288                 if (len <= 0) {
289                         if (len < 0)
290                                 dmessage(1, "fromnet; (hdr) network failure; %r\n");
291                         else
292                                 dmessage(1, "fromnet; (hdr) network closed\n");
293                         close(netfd);
294                         netfd = -1;
295                         continue;
296                 }
297                 n = GBIT32(b->hdr.nb);
298                 m = GBIT32(b->hdr.msg);
299                 acked = GBIT32(b->hdr.acked);
300                 dmessage(2, "fromnet: Got message, size %d, nb %d, msg %d, acked %d, lastacked %d\n",
301                         len, n, m, acked, lastacked);
302
303                 if (n == 0) {
304                         if (m >= 0) {
305                                 dmessage(1, "fromnet; network closed\n");
306                                 break;
307                         }
308                         continue;
309                 }
310
311                 if (n > Bufsize) {
312                         dmessage(1, "fromnet; message too big %d > %d\n", n, Bufsize);
313                         break;
314                 }
315
316                 len = readn(netfd, b->buf, n);
317                 if (len <= 0 || len != n) {
318                         if (len == 0)
319                                 dmessage(1, "fromnet; network closed\n");
320                         else
321                                 dmessage(1, "fromnet; network failure; %r\n");
322                         close(netfd);
323                         netfd = -1;
324                         continue;
325                 }
326
327                 if (m < inmsg) {
328                         dmessage(1, "fromnet; skipping message %d, currently at %d\n", m, inmsg);
329                         continue;
330                 }                       
331
332                 // Process the acked list.
333                 while(lastacked != acked) {
334                         Buf *rb;
335
336                         rb = recvp(unacked);
337                         m = GBIT32(rb->hdr.msg);
338                         if (m != lastacked) {
339                                 dmessage(1, "fromnet; rb %p, msg %d, lastacked %d\n", rb, m, lastacked);
340                                 sysfatal("fromnet; bug");
341                         }
342                         PBIT32(rb->hdr.msg, -1);
343                         sendp(empty, rb);
344                         lastacked++;
345                 } 
346                 inmsg++;
347
348                 showmsg(1, "fromnet", b);
349
350                 if (writen(1, b->buf, len) < 0) 
351                         sysfatal("fromnet; cannot write to client; %r");
352         }
353         done = 1;
354 }
355
356 static void
357 reconnect(int secs)
358 {
359         NetConnInfo *nci;
360         char ldir[40];
361         int lcfd, fd;
362
363         if (dialstring) {
364                 syslog(0, Logname, "dialing %s", dialstring);
365                 alarm(secs*1000);
366                 while ((fd = dial(dialstring, nil, ldir, nil)) < 0) {
367                         char err[32];
368
369                         err[0] = '\0';
370                         errstr(err, sizeof err);
371                         if (strstr(err, "connection refused")) {
372                                 dmessage(1, "reconnect; server died...\n");
373                                 threadexitsall("server died...");
374                         }
375                         dmessage(1, "reconnect: dialed %s; %s\n", dialstring, err);
376                         sleep(1000);
377                 }
378                 alarm(0);
379                 syslog(0, Logname, "reconnected to %s", dialstring);
380         } 
381         else {
382                 syslog(0, Logname, "waiting for connection on %s", devdir);
383                 alarm(secs*1000);
384                 if ((lcfd = listen(devdir, ldir)) < 0) 
385                         sysfatal("reconnect; cannot listen; %r");
386                 if ((fd = accept(lcfd, ldir)) < 0)
387                         sysfatal("reconnect; cannot accept; %r");
388                 alarm(0);
389                 close(lcfd);
390         }
391
392         if(nci = getnetconninfo(ldir, fd)){
393                 syslog(0, Logname, "connected from %s", nci->rsys);
394                 threadsetname(client? "client %s %s" : "server %s %s", ldir, nci->rsys);
395                 freenetconninfo(nci);
396         } else
397                 syslog(0, Logname, "connected");
398         
399         // Wakes up the netreader.
400         netfd = fd;
401 }
402
403 static void
404 synchronize(void)
405 {
406         Channel *tmp;
407         Buf *b;
408         int n;
409
410         // Ignore network errors here.  If we fail during 
411         // synchronization, the next alarm will pick up 
412         // the error.
413
414         tmp = chancreate(sizeof(Buf *), Nbuf);
415         while ((b = nbrecvp(unacked)) != nil) {
416                 n = GBIT32(b->hdr.nb);
417                 writen(netfd, (uchar *)&b->hdr, Hdrsz);
418                 writen(netfd, b->buf, n);
419                 sendp(tmp, b);
420         }
421         chanfree(unacked);
422         unacked = tmp;
423 }
424
425 static void
426 showmsg(int level, char *s, Buf *b)
427 {
428         int n;
429
430         if (b == nil) {
431                 dmessage(level, "%s; b == nil\n", s);
432                 return;
433         }
434         n = GBIT32(b->hdr.nb);
435         dmessage(level, "%s;  (len %d) %X %X %X %X %X %X %X %X %X (%p)\n", s, n, 
436                         b->buf[0], b->buf[1], b->buf[2],
437                         b->buf[3], b->buf[4], b->buf[5],
438                         b->buf[6], b->buf[7], b->buf[8], b);
439 }
440
441 static int
442 writen(int fd, uchar *buf, int nb)
443 {
444         int len = nb;
445
446         while (nb > 0) {
447                 int n;
448
449                 if (fd < 0) 
450                         return -1;
451
452                 if ((n = write(fd, buf, nb)) < 0) {
453                         dmessage(1, "writen; Write failed; %r\n");
454                         return -1;
455                 }
456                 dmessage(2, "writen: wrote %d bytes\n", n);
457
458                 buf += n;
459                 nb -= n;
460         }
461         return len;
462 }
463
464 static void
465 timerproc(void *x)
466 {
467         Channel *timer = x;
468
469         threadsetname("timer");
470
471         while (!done) {
472                 sleep((Synctime / MS(1)) >> 1);
473                 sendp(timer, "timer");
474         }
475 }
476
477 static void
478 dmessage(int level, char *fmt, ...)
479 {
480         va_list arg; 
481
482         if (level > debug) 
483                 return;
484
485         va_start(arg, fmt);
486         vfprint(2, fmt, arg);
487         va_end(arg);
488 }