#include <fcall.h>
#include <thread.h>
-#define NS(x) ((vlong)x)
-#define US(x) (NS(x) * 1000LL)
-#define MS(x) (US(x) * 1000LL)
-#define S(x) (MS(x) * 1000LL)
-
-#define LOGNAME "aan"
+#define NS(x) ((vlong)x)
+#define US(x) (NS(x) * 1000LL)
+#define MS(x) (US(x) * 1000LL)
+#define S(x) (MS(x) * 1000LL)
enum {
Synctime = S(8),
K = 1024,
Bufsize = 8 * K,
Stacksize = 8 * K,
- Timer = 0, // Alt channels.
+ Timer = 0, // Alt channels.
Unsent = 1,
Maxto = 24 * 3600, // A full day to reconnect.
-};
-
-typedef struct Endpoints Endpoints;
-struct Endpoints {
- char *lsys;
- char *lserv;
- char *rsys;
- char *rserv;
+ Hdrsz = 3*4,
};
typedef struct {
- ulong nb; // Number of data bytes in this message
- ulong msg; // Message number
- ulong acked; // Number of messages acked
+ uchar nb[4]; // Number of data bytes in this message
+ uchar msg[4]; // Message number
+ uchar acked[4]; // Number of messages acked
} Hdr;
-typedef struct t_Buf {
- Hdr hdr;
- uchar buf[Bufsize];
+typedef struct {
+ Hdr hdr;
+ uchar buf[Bufsize];
} Buf;
-static char *progname;
static Channel *unsent;
static Channel *unacked;
static Channel *empty;
-static int netfd;
-static int inmsg;
+static int netfd;
+static ulong inmsg;
+static ulong outmsg;
static char *devdir;
-static int debug;
-static int done;
+static int debug;
+static int done;
static char *dialstring;
-static int maxto = Maxto;
-static char *Logname = LOGNAME;
-static int client;
+static int maxto = Maxto;
+static char *Logname = "aan";
+static int client;
+static int reader = -1;
+static int lostsync;
static Alt a[] = {
/* c v op */
static void fromnet(void*);
static void fromclient(void*);
-static void reconnect(void);
+static int reconnect(int);
static void synchronize(void);
-static int sendcommand(ulong, ulong);
-static void showmsg(int, char *, Buf *);
static int writen(int, uchar *, int);
-static int getport(char *);
-static void dmessage(int, char *, ...);
static void timerproc(void *);
-static Endpoints *getendpoints(char *);
-static void freeendpoints(Endpoints *);
static void
usage(void)
{
- fprint(2, "Usage: %s [-cd] [-m maxto] dialstring|netdir\n", progname);
+ fprint(2, "Usage: %s [-cd] [-m maxto] dialstring|netdir\n", argv0);
exits("usage");
}
+
static int
catch(void *, char *s)
{
if (!strcmp(s, "alarm")) {
- syslog(0, Logname, "Timed out while waiting for client on %s, exiting...",
- devdir);
+ syslog(0, Logname, "Timed out while waiting for reconnect, exiting...");
threadexitsall(nil);
}
return 0;
}
+static void*
+emalloc(int n)
+{
+ uintptr pc;
+ void *v;
+
+ pc = getcallerpc(&n);
+ v = malloc(n);
+ if(v == nil)
+ sysfatal("Cannot allocate memory; pc=%#p", pc);
+ setmalloctag(v, pc);
+ return v;
+}
+
void
threadmain(int argc, char **argv)
{
- int i, failed;
- Buf *b;
- Channel *timer;
vlong synctime;
+ int i, n, failed;
+ Channel *timer;
+ Hdr hdr;
+ Buf *b;
- progname = argv[0];
ARGBEGIN {
case 'c':
client++;
debug++;
break;
case 'm':
- maxto = (int)strtol(EARGF(usage()), (char **)nil, 0);
+ maxto = (int)strtol(EARGF(usage()), nil, 0);
break;
default:
usage();
dup(fd, 2);
}
- fmtinstall('F', fcallfmt);
-
atnotify(catch, 1);
+ /*
+ * Set up initial connection. use short timeout
+ * of 60 seconds so we wont hang arround for too
+ * long if there is some general connection problem
+ * (like NAT).
+ */
+ netfd = reconnect(60);
+
unsent = chancreate(sizeof(Buf *), Nbuf);
unacked = chancreate(sizeof(Buf *), Nbuf);
empty = chancreate(sizeof(Buf *), Nbuf);
timer = chancreate(sizeof(uchar *), 1);
+ if(unsent == nil || unacked == nil || empty == nil || timer == nil)
+ sysfatal("Cannot allocate channels");
- for (i = 0; i != Nbuf; i++) {
- Buf *b = malloc(sizeof(Buf));
- sendp(empty, b);
- }
-
- netfd = -1;
+ for (i = 0; i < Nbuf; i++)
+ sendp(empty, emalloc(sizeof(Buf)));
- if (proccreate(fromnet, nil, Stacksize) < 0)
- sysfatal("%s; Cannot start fromnet; %r", progname);
-
- reconnect(); // Set up the initial connection.
- synchronize();
+ reader = proccreate(fromnet, nil, Stacksize);
+ if (reader < 0)
+ sysfatal("Cannot start fromnet; %r");
if (proccreate(fromclient, nil, Stacksize) < 0)
- sysfatal("%s; Cannot start fromclient; %r", progname);
+ sysfatal("Cannot start fromclient; %r");
if (proccreate(timerproc, timer, Stacksize) < 0)
- sysfatal("%s; Cannot start timerproc; %r", progname);
+ sysfatal("Cannot start timerproc; %r");
a[Timer].c = timer;
a[Unsent].c = unsent;
a[Unsent].v = &b;
+Restart:
synctime = nsec() + Synctime;
failed = 0;
+ lostsync = 0;
while (!done) {
- vlong now;
- int delta;
-
- if (failed) {
+ if (netfd < 0 || failed) {
// Wait for the netreader to die.
while (netfd >= 0) {
- dmessage(1, "main; waiting for netreader to die\n");
+ if(debug) fprint(2, "main; waiting for netreader to die\n");
+ threadint(reader);
sleep(1000);
}
// the reader died; reestablish the world.
- reconnect();
+ netfd = reconnect(maxto);
synchronize();
- failed = 0;
+ goto Restart;
}
- now = nsec();
- delta = (synctime - nsec()) / MS(1);
+ switch (alt(a)) {
+ case Timer:
+ if (netfd < 0 || nsec() < synctime)
+ break;
- if (delta <= 0) {
- Hdr hdr;
+ PBIT32(hdr.nb, 0);
+ PBIT32(hdr.acked, inmsg);
+ PBIT32(hdr.msg, -1);
- hdr.nb = 0;
- hdr.acked = inmsg;
- hdr.msg = -1;
+ if (writen(netfd, (uchar *)&hdr, Hdrsz) < 0) {
+ failed = 1;
+ continue;
+ }
- if (writen(netfd, (uchar *)&hdr, sizeof(Hdr)) < 0) {
- dmessage(2, "main; writen failed; %r\n");
+ if(++lostsync > 2){
+ syslog(0, Logname, "connection seems hung up...");
failed = 1;
continue;
}
synctime = nsec() + Synctime;
- assert(synctime > now);
- }
-
- switch (alt(a)) {
- case Timer:
break;
case Unsent:
sendp(unacked, b);
- b->hdr.acked = inmsg;
+ if (netfd < 0)
+ break;
- if (writen(netfd, (uchar *)&b->hdr, sizeof(Hdr)) < 0) {
- dmessage(2, "main; writen failed; %r\n");
- failed = 1;
- }
+ PBIT32(b->hdr.acked, inmsg);
- if (writen(netfd, b->buf, b->hdr.nb) < 0) {
- dmessage(2, "main; writen failed; %r\n");
+ if (writen(netfd, (uchar *)&b->hdr, Hdrsz) < 0)
failed = 1;
+ else {
+ n = GBIT32(b->hdr.nb);
+ if (writen(netfd, b->buf, n) < 0)
+ failed = 1;
+ if (n == 0)
+ done = 1;
}
-
- if (b->hdr.nb == 0)
- done = 1;
break;
}
}
static void
fromclient(void*)
{
- static int outmsg;
-
- for (;;) {
- Buf *b;
+ int n;
+ Buf *b;
- b = recvp(empty);
- if ((int)(b->hdr.nb = read(0, b->buf, Bufsize)) <= 0) {
- if ((int)b->hdr.nb < 0)
- dmessage(2, "fromclient; Cannot read 9P message; %r\n");
- else
- dmessage(2, "fromclient; Client terminated\n");
- b->hdr.nb = 0;
- }
- b->hdr.msg = outmsg++;
+ threadsetname("fromclient");
- showmsg(1, "fromclient", b);
+ do {
+ b = recvp(empty);
+ n = read(0, b->buf, Bufsize);
+ if (n < 0)
+ n = 0;
+ PBIT32(b->hdr.nb, n);
+ PBIT32(b->hdr.msg, outmsg);
sendp(unsent, b);
-
- if (b->hdr.nb == 0)
- break;
- }
+ outmsg++;
+ } while(n > 0);
}
static void
fromnet(void*)
{
- static int lastacked;
+ extern void _threadnote(void *, char *);
+ ulong m, acked, lastacked = 0;
+ int n, len;
Buf *b;
- b = (Buf *)malloc(sizeof(Buf));
- assert(b);
+ notify(_threadnote);
- while (!done) {
- int len, acked, i;
+ threadsetname("fromnet");
+ b = emalloc(sizeof(Buf));
+ while (!done) {
while (netfd < 0) {
- dmessage(1, "fromnet; waiting for connection... (inmsg %d)\n",
- inmsg);
+ if(done)
+ return;
+ if(debug) fprint(2, "fromnet; waiting for connection... (inmsg %lud)\n", inmsg);
sleep(1000);
}
// Read the header.
- if ((len = readn(netfd, &b->hdr, sizeof(Hdr))) <= 0) {
- if (len < 0)
- dmessage(1, "fromnet; (hdr) network failure; %r\n");
- else
- dmessage(1, "fromnet; (hdr) network closed\n");
+ len = readn(netfd, (uchar *)&b->hdr, Hdrsz);
+ if (len <= 0) {
+ if (debug) {
+ if (len < 0)
+ fprint(2, "fromnet; (hdr) network failure; %r\n");
+ else
+ fprint(2, "fromnet; (hdr) network closed\n");
+ }
close(netfd);
netfd = -1;
continue;
}
- dmessage(2, "fromnet: Got message, size %d, nb %d, msg %d\n", len,
- b->hdr.nb, b->hdr.msg);
-
- if (b->hdr.nb == 0) {
- if ((long)b->hdr.msg >= 0) {
- dmessage(1, "fromnet; network closed\n");
- break;
- }
- continue;
+ lostsync = 0; // reset timeout
+ n = GBIT32(b->hdr.nb);
+ m = GBIT32(b->hdr.msg);
+ acked = GBIT32(b->hdr.acked);
+ if (n == 0) {
+ if (m == (ulong)-1)
+ continue;
+ if(debug) fprint(2, "fromnet; network closed\n");
+ break;
+ } else if (n < 0 || n > Bufsize) {
+ if(debug) fprint(2, "fromnet; message too big %d > %d\n", n, Bufsize);
+ break;
}
-
- if ((len = readn(netfd, b->buf, b->hdr.nb)) <= 0 || len != b->hdr.nb) {
+
+ len = readn(netfd, b->buf, n);
+ if (len <= 0 || len != n) {
if (len == 0)
- dmessage(1, "fromnet; network closed\n");
+ if(debug) fprint(2, "fromnet; network closed\n");
else
- dmessage(1, "fromnet; network failure; %r\n");
+ if(debug) fprint(2, "fromnet; network failure; %r\n");
close(netfd);
netfd = -1;
continue;
}
- if (b->hdr.msg < inmsg) {
- dmessage(1, "fromnet; skipping message %d, currently at %d\n",
- b->hdr.msg, inmsg);
+ if (m != inmsg) {
+ if(debug) fprint(2, "fromnet; skipping message %lud, currently at %lud\n", m, inmsg);
continue;
}
+ inmsg++;
// Process the acked list.
- acked = b->hdr.acked - lastacked;
- for (i = 0; i != acked; i++) {
+ while((long)(acked - lastacked) > 0) {
Buf *rb;
- rb = recvp(unacked);
- if (rb->hdr.msg != lastacked + i) {
- dmessage(1, "rb %p, msg %d, lastacked %d, i %d\n",
- rb, rb? rb->hdr.msg: -2, lastacked, i);
- assert(0);
+ if((rb = recvp(unacked)) == nil)
+ break;
+ m = GBIT32(rb->hdr.msg);
+ if (m != lastacked) {
+ if(debug) fprint(2, "fromnet; rb %p, msg %lud, lastacked %lud\n", rb, m, lastacked);
+ sysfatal("fromnet; bug");
}
- rb->hdr.msg = -1;
+ PBIT32(rb->hdr.msg, -1);
sendp(empty, rb);
+ lastacked++;
}
- lastacked = b->hdr.acked;
-
- inmsg++;
-
- showmsg(1, "fromnet", b);
if (writen(1, b->buf, len) < 0)
sysfatal("fromnet; cannot write to client; %r");
done = 1;
}
-static void
-reconnect(void)
+static int
+reconnect(int secs)
{
+ NetConnInfo *nci;
char ldir[40];
int lcfd, fd;
if (dialstring) {
syslog(0, Logname, "dialing %s", dialstring);
- while ((fd = dial(dialstring, nil, nil, nil)) < 0) {
+ alarm(secs*1000);
+ while ((fd = dial(dialstring, nil, ldir, nil)) < 0) {
char err[32];
err[0] = '\0';
errstr(err, sizeof err);
if (strstr(err, "connection refused")) {
- dmessage(1, "reconnect; server died...\n");
+ if(debug) fprint(2, "reconnect; server died...\n");
threadexitsall("server died...");
}
- dmessage(1, "reconnect: dialed %s; %s\n", dialstring, err);
+ if(debug) fprint(2, "reconnect: dialed %s; %s\n", dialstring, err);
sleep(1000);
}
+ alarm(0);
syslog(0, Logname, "reconnected to %s", dialstring);
}
else {
- Endpoints *ep;
-
syslog(0, Logname, "waiting for connection on %s", devdir);
- alarm(maxto * 1000);
+ alarm(secs*1000);
if ((lcfd = listen(devdir, ldir)) < 0)
sysfatal("reconnect; cannot listen; %r");
-
if ((fd = accept(lcfd, ldir)) < 0)
sysfatal("reconnect; cannot accept; %r");
alarm(0);
close(lcfd);
-
- ep = getendpoints(ldir);
- dmessage(1, "rsys '%s'\n", ep->rsys);
- syslog(0, Logname, "connected from %s", ep->rsys);
- freeendpoints(ep);
}
-
- netfd = fd; // Wakes up the netreader.
+
+ if(nci = getnetconninfo(ldir, fd)){
+ syslog(0, Logname, "connected from %s", nci->rsys);
+ threadsetname(client? "client %s %s" : "server %s %s", ldir, nci->rsys);
+ freenetconninfo(nci);
+ } else
+ syslog(0, Logname, "connected");
+
+ return fd;
}
static void
{
Channel *tmp;
Buf *b;
+ int n;
// Ignore network errors here. If we fail during
// synchronization, the next alarm will pick up
tmp = chancreate(sizeof(Buf *), Nbuf);
while ((b = nbrecvp(unacked)) != nil) {
- writen(netfd, (uchar *)b, sizeof(Hdr) + b->hdr.nb);
+ n = GBIT32(b->hdr.nb);
+ writen(netfd, (uchar *)&b->hdr, Hdrsz);
+ writen(netfd, b->buf, n);
sendp(tmp, b);
}
chanfree(unacked);
unacked = tmp;
}
-static void
-showmsg(int level, char *s, Buf *b)
-{
- if (b == nil) {
- dmessage(level, "%s; b == nil\n", s);
- return;
- }
-
- dmessage(level,
- "%s; (len %d) %X %X %X %X %X %X %X %X %X (%p)\n", s,
- b->hdr.nb,
- b->buf[0], b->buf[1], b->buf[2],
- b->buf[3], b->buf[4], b->buf[5],
- b->buf[6], b->buf[7], b->buf[8], b);
-}
-
static int
writen(int fd, uchar *buf, int nb)
{
return -1;
if ((n = write(fd, buf, nb)) < 0) {
- dmessage(1, "writen; Write failed; %r\n");
+ if(debug) fprint(2, "writen; Write failed; %r\n");
return -1;
}
- dmessage(2, "writen: wrote %d bytes\n", n);
buf += n;
nb -= n;
timerproc(void *x)
{
Channel *timer = x;
+
+ threadsetname("timer");
+
while (!done) {
sleep((Synctime / MS(1)) >> 1);
sendp(timer, "timer");
}
}
-
-static void
-dmessage(int level, char *fmt, ...)
-{
- va_list arg;
-
- if (level > debug)
- return;
-
- va_start(arg, fmt);
- vfprint(2, fmt, arg);
- va_end(arg);
-}
-
-static void
-getendpoint(char *dir, char *file, char **sysp, char **servp)
-{
- int fd, n;
- char buf[128];
- char *sys, *serv;
-
- sys = serv = 0;
-
- snprint(buf, sizeof buf, "%s/%s", dir, file);
- fd = open(buf, OREAD);
- if(fd >= 0){
- n = read(fd, buf, sizeof(buf)-1);
- if(n>0){
- buf[n-1] = 0;
- serv = strchr(buf, '!');
- if(serv){
- *serv++ = 0;
- serv = strdup(serv);
- }
- sys = strdup(buf);
- }
- close(fd);
- }
- if(serv == 0)
- serv = strdup("unknown");
- if(sys == 0)
- sys = strdup("unknown");
- *servp = serv;
- *sysp = sys;
-}
-
-static Endpoints *
-getendpoints(char *dir)
-{
- Endpoints *ep;
-
- ep = malloc(sizeof(*ep));
- getendpoint(dir, "local", &ep->lsys, &ep->lserv);
- getendpoint(dir, "remote", &ep->rsys, &ep->rserv);
- return ep;
-}
-
-static void
-freeendpoints(Endpoints *ep)
-{
- free(ep->lsys);
- free(ep->rsys);
- free(ep->lserv);
- free(ep->rserv);
- free(ep);
-}
-