7 #define NS(x) ((vlong)x)
8 #define US(x) (NS(x) * 1000LL)
9 #define MS(x) (US(x) * 1000LL)
10 #define S(x) (MS(x) * 1000LL)
18 Timer = 0, // Alt channels.
20 Maxto = 24 * 3600, // A full day to reconnect.
25 uchar nb[4]; // Number of data bytes in this message
26 uchar msg[4]; // Message number
27 uchar acked[4]; // Number of messages acked
35 static Channel *unsent;
36 static Channel *unacked;
37 static Channel *empty;
44 static char *dialstring;
45 static int maxto = Maxto;
46 static char *Logname = "aan";
48 static int reader = -1;
53 { nil, nil, CHANRCV }, // timer
54 { nil, nil, CHANRCV }, // unsent
55 { nil, nil, CHANEND },
58 static void fromnet(void*);
59 static void fromclient(void*);
60 static int reconnect(int);
61 static void synchronize(void);
62 static int writen(int, uchar *, int);
63 static void timerproc(void *);
68 fprint(2, "Usage: %s [-cd] [-m maxto] dialstring|netdir\n", argv0);
74 catch(void *, char *s)
76 if (!strcmp(s, "alarm")) {
77 syslog(0, Logname, "Timed out while waiting for reconnect, exiting...");
92 sysfatal("Cannot allocate memory; pc=%#p", pc);
98 threadmain(int argc, char **argv)
114 maxto = (int)strtol(EARGF(usage()), nil, 0);
127 if ((p = strstr(devdir, "/local")) != nil)
131 dialstring = argv[0];
134 int fd = open("#c/cons", OWRITE|OCEXEC);
141 * Set up initial connection. use short timeout
142 * of 60 seconds so we wont hang arround for too
143 * long if there is some general connection problem
146 netfd = reconnect(60);
148 unsent = chancreate(sizeof(Buf *), Nbuf);
149 unacked = chancreate(sizeof(Buf *), Nbuf);
150 empty = chancreate(sizeof(Buf *), Nbuf);
151 timer = chancreate(sizeof(uchar *), 1);
152 if(unsent == nil || unacked == nil || empty == nil || timer == nil)
153 sysfatal("Cannot allocate channels");
155 for (i = 0; i < Nbuf; i++)
156 sendp(empty, emalloc(sizeof(Buf)));
158 reader = proccreate(fromnet, nil, Stacksize);
160 sysfatal("Cannot start fromnet; %r");
162 if (proccreate(fromclient, nil, Stacksize) < 0)
163 sysfatal("Cannot start fromclient; %r");
165 if (proccreate(timerproc, timer, Stacksize) < 0)
166 sysfatal("Cannot start timerproc; %r");
169 a[Unsent].c = unsent;
173 synctime = nsec() + Synctime;
177 if (netfd < 0 || failed) {
178 // Wait for the netreader to die.
180 if(debug) fprint(2, "main; waiting for netreader to die\n");
185 // the reader died; reestablish the world.
186 netfd = reconnect(maxto);
193 if (netfd < 0 || nsec() < synctime)
197 PBIT32(hdr.acked, inmsg);
200 if (writen(netfd, (uchar *)&hdr, Hdrsz) < 0) {
206 syslog(0, Logname, "connection seems hung up...");
210 synctime = nsec() + Synctime;
219 PBIT32(b->hdr.acked, inmsg);
221 if (writen(netfd, (uchar *)&b->hdr, Hdrsz) < 0)
224 n = GBIT32(b->hdr.nb);
225 if (writen(netfd, b->buf, n) < 0)
233 syslog(0, Logname, "exiting...");
244 threadsetname("fromclient");
248 n = read(0, b->buf, Bufsize);
251 PBIT32(b->hdr.nb, n);
252 PBIT32(b->hdr.msg, outmsg);
261 extern void _threadnote(void *, char *);
262 ulong m, acked, lastacked = 0;
268 threadsetname("fromnet");
270 b = emalloc(sizeof(Buf));
275 if(debug) fprint(2, "fromnet; waiting for connection... (inmsg %lud)\n", inmsg);
280 len = readn(netfd, (uchar *)&b->hdr, Hdrsz);
284 fprint(2, "fromnet; (hdr) network failure; %r\n");
286 fprint(2, "fromnet; (hdr) network closed\n");
292 lostsync = 0; // reset timeout
293 n = GBIT32(b->hdr.nb);
294 m = GBIT32(b->hdr.msg);
295 acked = GBIT32(b->hdr.acked);
299 if(debug) fprint(2, "fromnet; network closed\n");
301 } else if (n < 0 || n > Bufsize) {
302 if(debug) fprint(2, "fromnet; message too big %d > %d\n", n, Bufsize);
306 len = readn(netfd, b->buf, n);
307 if (len <= 0 || len != n) {
309 if(debug) fprint(2, "fromnet; network closed\n");
311 if(debug) fprint(2, "fromnet; network failure; %r\n");
318 if(debug) fprint(2, "fromnet; skipping message %lud, currently at %lud\n", m, inmsg);
323 // Process the acked list.
324 while((long)(acked - lastacked) > 0) {
327 if((rb = recvp(unacked)) == nil)
329 m = GBIT32(rb->hdr.msg);
330 if (m != lastacked) {
331 if(debug) fprint(2, "fromnet; rb %p, msg %lud, lastacked %lud\n", rb, m, lastacked);
332 sysfatal("fromnet; bug");
334 PBIT32(rb->hdr.msg, -1);
339 if (writen(1, b->buf, len) < 0)
340 sysfatal("fromnet; cannot write to client; %r");
353 syslog(0, Logname, "dialing %s", dialstring);
355 while ((fd = dial(dialstring, nil, ldir, nil)) < 0) {
359 errstr(err, sizeof err);
360 if (strstr(err, "connection refused")) {
361 if(debug) fprint(2, "reconnect; server died...\n");
362 threadexitsall("server died...");
364 if(debug) fprint(2, "reconnect: dialed %s; %s\n", dialstring, err);
368 syslog(0, Logname, "reconnected to %s", dialstring);
371 syslog(0, Logname, "waiting for connection on %s", devdir);
373 if ((lcfd = listen(devdir, ldir)) < 0)
374 sysfatal("reconnect; cannot listen; %r");
375 if ((fd = accept(lcfd, ldir)) < 0)
376 sysfatal("reconnect; cannot accept; %r");
381 if(nci = getnetconninfo(ldir, fd)){
382 syslog(0, Logname, "connected from %s", nci->rsys);
383 threadsetname(client? "client %s %s" : "server %s %s", ldir, nci->rsys);
384 freenetconninfo(nci);
386 syslog(0, Logname, "connected");
398 // Ignore network errors here. If we fail during
399 // synchronization, the next alarm will pick up
402 tmp = chancreate(sizeof(Buf *), Nbuf);
403 while ((b = nbrecvp(unacked)) != nil) {
404 n = GBIT32(b->hdr.nb);
405 writen(netfd, (uchar *)&b->hdr, Hdrsz);
406 writen(netfd, b->buf, n);
414 writen(int fd, uchar *buf, int nb)
424 if ((n = write(fd, buf, nb)) < 0) {
425 if(debug) fprint(2, "writen; Write failed; %r\n");
440 threadsetname("timer");
443 sleep((Synctime / MS(1)) >> 1);
444 sendp(timer, "timer");