]> git.lizzy.rs Git - plan9front.git/blob - sys/src/cmd/aan.c
Add exponential function.
[plan9front.git] / sys / src / cmd / aan.c
1 #include <u.h>
2 #include <libc.h>
3 #include <auth.h>
4 #include <fcall.h>
5 #include <thread.h>
6
7 #define NS(x)   ((vlong)x)
8 #define US(x)   (NS(x) * 1000LL)
9 #define MS(x)   (US(x) * 1000LL)
10 #define S(x)    (MS(x) * 1000LL)
11
12 enum {
13         Synctime = S(8),
14         Nbuf = 10,
15         K = 1024,
16         Bufsize = 8 * K,
17         Stacksize = 8 * K,
18         Timer = 0,                              // Alt channels.
19         Unsent = 1,
20         Maxto = 24 * 3600,                      // A full day to reconnect.
21 };
22
23 typedef struct Endpoints Endpoints;
24 struct Endpoints {
25         char    *lsys;
26         char    *lserv;
27         char    *rsys;
28         char    *rserv;
29 };
30
31 typedef struct {
32         uchar   nb[4];          // Number of data bytes in this message
33         uchar   msg[4];         // Message number
34         uchar   acked[4];       // Number of messages acked
35 } Hdr;
36
37 typedef struct t_Buf {
38         Hdr     hdr;
39         uchar   buf[Bufsize];
40 } Buf;
41
42 static Channel  *unsent;
43 static Channel  *unacked;
44 static Channel  *empty;
45 static int      netfd;
46 static int      inmsg;
47 static char     *devdir;
48 static int      debug;
49 static int      done;
50 static char     *dialstring;
51 static int      maxto = Maxto;
52 static char     *Logname = "aan";
53 static int      client;
54
55 static Alt a[] = {
56         /*      c       v        op   */
57         {       nil,    nil,    CHANRCV                 },      // timer
58         {       nil,    nil,    CHANRCV                 },      // unsent
59         {       nil,    nil,    CHANEND         },
60 };
61
62 static void             fromnet(void*);
63 static void             fromclient(void*);
64 static void             reconnect(int);
65 static void             synchronize(void);
66 static int              sendcommand(ulong, ulong);
67 static void             showmsg(int, char *, Buf *);
68 static int              writen(int, uchar *, int);
69 static void             dmessage(int, char *, ...);
70 static void             timerproc(void *);
71
72 static void
73 usage(void)
74 {
75         fprint(2, "Usage: %s [-cd] [-m maxto] dialstring|netdir\n", argv0);
76         exits("usage");
77 }
78
79 static int
80 catch(void *, char *s)
81 {
82         if (!strcmp(s, "alarm")) {
83                 syslog(0, Logname, "Timed out while waiting for reconnect, exiting...");
84                 threadexitsall(nil);
85         }
86         return 0;
87 }
88
89 static void*
90 emalloc(int n)
91 {
92         ulong pc;
93         void *v;
94
95         pc = getcallerpc(&n);
96         v = malloc(n);
97         if(v == nil)
98                 sysfatal("Cannot allocate memory; pc=%lux", pc);
99         setmalloctag(v, pc);
100         return v;
101 }
102
103 void
104 threadmain(int argc, char **argv)
105 {
106         vlong synctime;
107         int i, n, failed;
108         Channel *timer;
109         Buf *b;
110
111         ARGBEGIN {
112         case 'c':
113                 client++;
114                 break;
115         case 'd':
116                 debug++;
117                 break;
118         case 'm':
119                 maxto = (int)strtol(EARGF(usage()), nil, 0);
120                 break;
121         default:
122                 usage();
123         } ARGEND;
124
125         if (argc != 1)
126                 usage();
127
128         if (!client) {
129                 char *p;
130
131                 devdir = argv[0];
132                 if ((p = strstr(devdir, "/local")) != nil)
133                         *p = '\0';
134         }
135         else
136                 dialstring = argv[0];
137
138         if (debug > 0) {
139                 int fd = open("#c/cons", OWRITE|OCEXEC);        
140                 dup(fd, 2);
141         }
142
143         fmtinstall('F', fcallfmt);
144
145         atnotify(catch, 1);
146
147         /*
148          * Set up initial connection. use short timeout
149          * of 60 seconds so we wont hang arround for too
150          * long if there is some general connection problem
151          * (like NAT).
152          */
153         netfd = -1;
154         reconnect(60);
155
156         unsent = chancreate(sizeof(Buf *), Nbuf);
157         unacked = chancreate(sizeof(Buf *), Nbuf);
158         empty = chancreate(sizeof(Buf *), Nbuf);
159         timer = chancreate(sizeof(uchar *), 1);
160         if(unsent == nil || unacked == nil || empty == nil || timer == nil)
161                 sysfatal("Cannot allocate channels");
162
163         for (i = 0; i < Nbuf; i++)
164                 sendp(empty, emalloc(sizeof(Buf)));
165
166         if (proccreate(fromnet, nil, Stacksize) < 0)
167                 sysfatal("Cannot start fromnet; %r");
168
169         if (proccreate(fromclient, nil, Stacksize) < 0)
170                 sysfatal("Cannot start fromclient; %r");
171
172         if (proccreate(timerproc, timer, Stacksize) < 0)
173                 sysfatal("Cannot start timerproc; %r");
174
175         a[Timer].c = timer;
176         a[Unsent].c = unsent;
177         a[Unsent].v = &b;
178
179         synctime = nsec() + Synctime;
180         failed = 0;
181         while (!done) {
182                 if (failed) {
183                         // Wait for the netreader to die.
184                         while (netfd >= 0) {
185                                 dmessage(1, "main; waiting for netreader to die\n");
186                                 sleep(1000);
187                         }
188
189                         // the reader died; reestablish the world.
190                         reconnect(maxto);
191                         synchronize();
192                         failed = 0;
193                 }
194
195                 if (nsec() >= synctime) {
196                         Hdr hdr;
197
198                         PBIT32(hdr.nb, 0);
199                         PBIT32(hdr.acked, inmsg);
200                         PBIT32(hdr.msg, -1);
201
202                         if (writen(netfd, (uchar *)&hdr, sizeof(Hdr)) < 0) {
203                                 dmessage(2, "main; writen failed; %r\n");
204                                 failed = 1;
205                                 continue;
206                         }
207                         synctime = nsec() + Synctime;
208                 }
209
210                 switch (alt(a)) {
211                 case Timer:
212                         break;
213
214                 case Unsent:
215                         sendp(unacked, b);
216
217                         PBIT32(b->hdr.acked, inmsg);
218
219                         if (writen(netfd, (uchar *)&b->hdr, sizeof(Hdr)) < 0) {
220                                 dmessage(2, "main; writen failed; %r\n");
221                                 failed = 1;
222                         }
223
224                         n = GBIT32(b->hdr.nb);
225                         if (writen(netfd, b->buf, n) < 0) {
226                                 dmessage(2, "main; writen failed; %r\n");
227                                 failed = 1;
228                         }
229
230                         if (n == 0)
231                                 done = 1;
232                         break;
233                 }
234         }
235         syslog(0, Logname, "exiting...");
236         threadexitsall(nil);
237 }
238
239
240 static void
241 fromclient(void*)
242 {
243         static int outmsg;
244         int n;
245         Buf *b;
246
247         threadsetname("fromclient");
248
249         do {
250                 b = recvp(empty);
251                 n = read(0, b->buf, Bufsize);
252                 if (n <= 0) {
253                         if (n < 0)
254                                 dmessage(2, "fromclient; Cannot read 9P message; %r\n");
255                         else
256                                 dmessage(2, "fromclient; Client terminated\n");
257                         n = 0;
258                 }
259                 PBIT32(b->hdr.nb, n);
260                 PBIT32(b->hdr.msg, outmsg);
261                 showmsg(1, "fromclient", b);
262                 sendp(unsent, b);
263                 outmsg++;
264         } while(n > 0);
265 }
266
267 static void
268 fromnet(void*)
269 {
270         static int lastacked;
271         int n, m, len, acked;
272         Buf *b;
273
274         threadsetname("fromnet");
275
276         b = emalloc(sizeof(Buf));
277         while (!done) {
278                 while (netfd < 0) {
279                         if(done)
280                                 return;
281                         dmessage(1, "fromnet; waiting for connection... (inmsg %d)\n", inmsg);
282                         sleep(1000);
283                 }
284
285                 // Read the header.
286                 len = readn(netfd, (uchar *)&b->hdr, sizeof(Hdr));
287                 if (len <= 0) {
288                         if (len < 0)
289                                 dmessage(1, "fromnet; (hdr) network failure; %r\n");
290                         else
291                                 dmessage(1, "fromnet; (hdr) network closed\n");
292                         close(netfd);
293                         netfd = -1;
294                         continue;
295                 }
296                 n = GBIT32(b->hdr.nb);
297                 m = GBIT32(b->hdr.msg);
298                 acked = GBIT32(b->hdr.acked);
299                 dmessage(2, "fromnet: Got message, size %d, nb %d, msg %d, acked %d, lastacked %d\n",
300                         len, n, m, acked, lastacked);
301
302                 if (n == 0) {
303                         if (m >= 0) {
304                                 dmessage(1, "fromnet; network closed\n");
305                                 break;
306                         }
307                         continue;
308                 }
309
310                 if (n > Bufsize) {
311                         dmessage(1, "fromnet; message too big %d > %d\n", n, Bufsize);
312                         break;
313                 }
314
315                 len = readn(netfd, b->buf, n);
316                 if (len <= 0 || len != n) {
317                         if (len == 0)
318                                 dmessage(1, "fromnet; network closed\n");
319                         else
320                                 dmessage(1, "fromnet; network failure; %r\n");
321                         close(netfd);
322                         netfd = -1;
323                         continue;
324                 }
325
326                 if (m < inmsg) {
327                         dmessage(1, "fromnet; skipping message %d, currently at %d\n", m, inmsg);
328                         continue;
329                 }                       
330
331                 // Process the acked list.
332                 while(lastacked != acked) {
333                         Buf *rb;
334
335                         rb = recvp(unacked);
336                         m = GBIT32(rb->hdr.msg);
337                         if (m != lastacked) {
338                                 dmessage(1, "fromnet; rb %p, msg %d, lastacked %d\n", rb, m, lastacked);
339                                 sysfatal("fromnet; bug");
340                         }
341                         PBIT32(rb->hdr.msg, -1);
342                         sendp(empty, rb);
343                         lastacked++;
344                 } 
345                 inmsg++;
346
347                 showmsg(1, "fromnet", b);
348
349                 if (writen(1, b->buf, len) < 0) 
350                         sysfatal("fromnet; cannot write to client; %r");
351         }
352         done = 1;
353 }
354
355 static void
356 reconnect(int secs)
357 {
358         NetConnInfo *nci;
359         char ldir[40];
360         int lcfd, fd;
361
362         if (dialstring) {
363                 syslog(0, Logname, "dialing %s", dialstring);
364                 alarm(secs*1000);
365                 while ((fd = dial(dialstring, nil, ldir, nil)) < 0) {
366                         char err[32];
367
368                         err[0] = '\0';
369                         errstr(err, sizeof err);
370                         if (strstr(err, "connection refused")) {
371                                 dmessage(1, "reconnect; server died...\n");
372                                 threadexitsall("server died...");
373                         }
374                         dmessage(1, "reconnect: dialed %s; %s\n", dialstring, err);
375                         sleep(1000);
376                 }
377                 alarm(0);
378                 syslog(0, Logname, "reconnected to %s", dialstring);
379         } 
380         else {
381                 syslog(0, Logname, "waiting for connection on %s", devdir);
382                 alarm(secs*1000);
383                 if ((lcfd = listen(devdir, ldir)) < 0) 
384                         sysfatal("reconnect; cannot listen; %r");
385                 if ((fd = accept(lcfd, ldir)) < 0)
386                         sysfatal("reconnect; cannot accept; %r");
387                 alarm(0);
388                 close(lcfd);
389         }
390
391         if(nci = getnetconninfo(ldir, fd)){
392                 syslog(0, Logname, "connected from %s", nci->rsys);
393                 threadsetname(client? "client %s %s" : "server %s %s", ldir, nci->rsys);
394                 freenetconninfo(nci);
395         } else
396                 syslog(0, Logname, "connected");
397         
398         // Wakes up the netreader.
399         netfd = fd;
400 }
401
402 static void
403 synchronize(void)
404 {
405         Channel *tmp;
406         Buf *b;
407         int n;
408
409         // Ignore network errors here.  If we fail during 
410         // synchronization, the next alarm will pick up 
411         // the error.
412
413         tmp = chancreate(sizeof(Buf *), Nbuf);
414         while ((b = nbrecvp(unacked)) != nil) {
415                 n = GBIT32(b->hdr.nb);
416                 writen(netfd, (uchar *)&b->hdr, sizeof(Hdr));
417                 writen(netfd, b->buf, n);
418                 sendp(tmp, b);
419         }
420         chanfree(unacked);
421         unacked = tmp;
422 }
423
424 static void
425 showmsg(int level, char *s, Buf *b)
426 {
427         int n;
428
429         if (b == nil) {
430                 dmessage(level, "%s; b == nil\n", s);
431                 return;
432         }
433         n = GBIT32(b->hdr.nb);
434         dmessage(level, "%s;  (len %d) %X %X %X %X %X %X %X %X %X (%p)\n", s, n, 
435                         b->buf[0], b->buf[1], b->buf[2],
436                         b->buf[3], b->buf[4], b->buf[5],
437                         b->buf[6], b->buf[7], b->buf[8], b);
438 }
439
440 static int
441 writen(int fd, uchar *buf, int nb)
442 {
443         int len = nb;
444
445         while (nb > 0) {
446                 int n;
447
448                 if (fd < 0) 
449                         return -1;
450
451                 if ((n = write(fd, buf, nb)) < 0) {
452                         dmessage(1, "writen; Write failed; %r\n");
453                         return -1;
454                 }
455                 dmessage(2, "writen: wrote %d bytes\n", n);
456
457                 buf += n;
458                 nb -= n;
459         }
460         return len;
461 }
462
463 static void
464 timerproc(void *x)
465 {
466         Channel *timer = x;
467
468         threadsetname("timer");
469
470         while (!done) {
471                 sleep((Synctime / MS(1)) >> 1);
472                 sendp(timer, "timer");
473         }
474 }
475
476 static void
477 dmessage(int level, char *fmt, ...)
478 {
479         va_list arg; 
480
481         if (level > debug) 
482                 return;
483
484         va_start(arg, fmt);
485         vfprint(2, fmt, arg);
486         va_end(arg);
487 }