]> git.lizzy.rs Git - plan9front.git/commitdiff
torrent: listen support
authorcinap_lenrek <cinap_lenrek@centraldogma>
Sun, 23 Oct 2011 22:53:27 +0000 (00:53 +0200)
committercinap_lenrek <cinap_lenrek@centraldogma>
Sun, 23 Oct 2011 22:53:27 +0000 (00:53 +0200)
sys/src/cmd/ip/torrent.c

index cf5304274f872f1e35fdabd0f8b6cec12d6b3d48..2c37e7b93d42b235f066e66e8ad03039a67622ac 100644 (file)
@@ -6,6 +6,7 @@
 typedef struct Dict Dict;
 typedef struct Piece Piece;
 typedef struct File File;
+typedef struct Stats Stats;
 
 struct Dict
 {
@@ -33,13 +34,21 @@ struct File
        vlong   len;
 };
 
+struct Stats
+{
+       Lock;
+       vlong   up;
+       vlong   down;
+       vlong   left;
+};
+
 enum {
        MAXIO = 16*1024,
 };
 
 int debug, sflag, pflag, vflag;
-int pidgroup = -1;
-int port = 48123;
+int killgroup = -1;
+int port = 6881;
 char *mntweb = "/mnt/web";
 uchar infohash[20];
 uchar peerid[20];
@@ -50,8 +59,10 @@ Piece *pieces;
 
 int nhavemap;
 uchar *havemap;
+int nhavepieces;
 
 File *files;
+Stats stats;
 
 void
 freedict(Dict *d)
@@ -208,7 +219,13 @@ havepiece(int x)
        free(p);
        if(memcmp(hash, pieces[x].hash, 20))
                return 0;
-       havemap[x>>3] |= m;
+       lock(&stats);
+       if((havemap[x>>3] & m) == 0){
+               havemap[x>>3] |= m;
+               nhavepieces++;
+               stats.left -= pieces[x].len;
+       }
+       unlock(&stats);
        return 1;
 }
 
@@ -309,86 +326,60 @@ Err:
        return -1;
 }
 
-void
-peer(char *ip, char *port)
-{
-       static Dict *peers;
-       static QLock peerslk;
 
+
+int
+peer(int fd, int incoming, char *addr)
+{
        uchar buf[64+MAXIO], *map, *told, *p, m;
-       char *addr;
-       int retry, i, o, l, x, n, fd;
        int mechoking, hechoking;
        int mewant, hewant;
        int workpiece;
-       Dict *d;
+       int i, o, l, x, n;
 
-       if(ip == nil || port == nil)
-               return;
+       if(debug) fprint(2, "peer %s: %s connected\n", addr, incoming ? "incoming" : "outgoing");
 
-       d = mallocz(sizeof(*d) + 64, 1);
-       snprint(addr = d->str, 64, "tcp!%s!%s", ip, port);
-       qlock(&peerslk);
-       if(dlook(peers, addr)){
-               qunlock(&peerslk);
-               free(d);
-               return;
+       for(i=0; i<2; i++){
+               if((incoming && i) || (!incoming && !i)){
+                       if(debug) fprint(2, "peer %s: -> handshake\n", addr);
+                       n = pack(buf, sizeof(buf), "*________**", 
+                               20, "\x13BitTorrent protocol",
+                               sizeof(infohash), infohash,
+                               sizeof(peerid), peerid);
+                       if(write(fd, buf, n) != n)
+                               return 1;
+               }
+               if((incoming && !i) || (!incoming && i)){
+                       n = 20 + 8 + sizeof(infohash);
+                       if((n = readn(fd, buf, n)) != n)
+                               return 1;
+                       if(memcmp(buf, "\x13BitTorrent protocol", 20))
+                               return 0;
+                       if(debug) fprint(2, "peer %s: <- handshake\n", addr);
+                       if(memcmp(infohash, buf + 20 + 8, sizeof(infohash)))
+                               return 0;
+               }
        }
-       d->len = strlen(addr);
-       d->typ = 'd';
-       d->val = d;
-       d->next = peers;
-       peers = d;
-       qunlock(&peerslk);
-
-       if(rfork(RFFDG|RFPROC|RFMEM) <= 0)
-               return;
+       if(readn(fd, buf, sizeof(peerid)) != sizeof(peerid))
+               return 1;
+       if(memcmp(peerid, buf, sizeof(peerid)) == 0)
+               return 0;
+       if(debug) fprint(2, "peer %s: peerid %.*s\n", addr, sizeof(peerid), (char*)buf);
 
-       fd = -1;
-       retry = 0;
-       map = malloc(nhavemap);
+       mechoking = 1;
+       hechoking = 1;
+       mewant = 0;
+       hewant = 0;
+       workpiece = -1;
+       map = mallocz(nhavemap, 1);
        told = malloc(nhavemap);
-Retry:
-       if(fd >= 0){
-               close(fd);
-               sleep(10000 + nrand(5000));
-       }
-       if(++retry >= 10)
-               goto Exit;
-
-       if(debug) fprint(2, "dial %s\n", addr);
-       if((fd = dial(addr, nil, nil, nil)) < 0)
-               goto Retry;
-
-       if(debug) fprint(2, "peer %s: -> handshake\n", addr);
-       n = pack(buf, sizeof(buf), "*________**", 
-               20, "\x13BitTorrent protocol",
-               sizeof(infohash), infohash,
-               sizeof(peerid), peerid);
-       if(write(fd, buf, n) != n)
-               goto Retry;
-
-       if(read(fd, buf, 1) != 1)
-               goto Retry;
-       n = buf[0] + 8 + sizeof(infohash) + sizeof(peerid);
-       if((n = readn(fd, buf+1, n)) != n)
-               goto Retry;
-       if(debug) fprint(2, "peer %s: <- handshake %.*s\n", addr, buf[0], (char*)buf+1);
-       if(memcmp(infohash, buf + 1 + buf[0] + 8, sizeof(infohash)))
-               goto Exit;
 
        if(debug) fprint(2, "peer %s: -> bitfield %d\n", addr, nhavemap);
        memmove(told, havemap, nhavemap);
        n = pack(buf, sizeof(buf), "lb*", nhavemap+1, 0x05, nhavemap, havemap);
        if(write(fd, buf, n) != n)
-               goto Retry;
+               goto Out;
 
-       mechoking = 1;
-       hechoking = 1;
-       mewant = 0;
-       hewant = 0;
-       workpiece = -1;
-       memset(map, 0, nhavemap);
        for(;;){
                for(i=0; i<nhavemap; i++){
                        if(told[i] != havemap[i]){
@@ -399,7 +390,7 @@ Retry:
                                        if(debug) fprint(2, "peer %s: -> have %d\n", addr, x);
                                        n = pack(buf, sizeof(buf), "lbl", 1+4, 0x04, x);
                                        if(write(fd, buf, n) != n)
-                                               goto Retry;
+                                               goto Out;
                                }
                        }
                        if(!mewant && (map[i] & ~havemap[i])){
@@ -407,7 +398,7 @@ Retry:
                                if(debug) fprint(2, "peer %s: -> interested\n", addr);
                                n = pack(buf, sizeof(buf), "lb", 1, 0x02);
                                if(write(fd, buf, n) != n)
-                                       goto Retry;
+                                       goto Out;
                        }
                }
                if(!hechoking && mewant){
@@ -423,7 +414,7 @@ Retry:
                                if(debug) fprint(2, "peer %s: -> request %d %d %d\n", addr, x, o, l);
                                n = pack(buf, sizeof(buf), "lblll", 1+4+4+4, 0x06, x, o, l);
                                if(write(fd, buf, n) != n)
-                                       goto Retry;
+                                       goto Out;
                                workpiece = x;
                        }
                }
@@ -432,21 +423,21 @@ Retry:
                        if(debug) fprint(2, "peer %s: -> unchoke\n", addr);
                        n = pack(buf, sizeof(buf), "lb", 1, 0x01);
                        if(write(fd, buf, n) != n)
-                               goto Retry;
+                               goto Out;
                }
 
                if(readn(fd, buf, 4) != 4)
-                       goto Retry;
+                       break;
                unpack(buf, 4, "l", &n);
+               if(n < 0 || n > sizeof(buf))
+                       break;
                if(n == 0)
                        continue;
-               if(n < 0 || n > sizeof(buf))
-                       goto Retry;
                if(readn(fd, buf, n) != n)
-                       goto Retry;
-               retry = 0;
-               p = buf+1;
+                       break;
+
                n--;
+               p = buf+1;
                switch(*buf){
                case 0x00:      // Choke
                        hechoking = 1;
@@ -467,7 +458,7 @@ Retry:
                        break;
                case 0x04:      // Have <piceindex>
                        if(unpack(p, n, "l", &x) < 0)
-                               goto Retry;
+                               goto Out;
                        if(debug) fprint(2, "peer %s: <- have %d\n", addr, x);
                        if(x < 0 || x >= npieces)
                                continue;
@@ -481,7 +472,7 @@ Retry:
                        break;
                case 0x06:      // Request <index> <begin> <length>
                        if(unpack(p, n, "lll", &x, &o, &l) < 0)
-                               goto Retry;
+                               goto Out;
                        if(debug) fprint(2, "peer %s: <- request %d %d %d\n", addr, x, o, l);
                        if(x < 0 || x >= npieces)
                                continue;
@@ -496,13 +487,19 @@ Retry:
                        n = pack(buf, sizeof(buf), "lbll", 1+4+4+l, 0x07, x, o);
                        n += l;
                        if(write(fd, buf, n) != n)
-                               goto Retry;
+                               goto Out;
+                       lock(&stats);
+                       stats.up += n;
+                       unlock(&stats);
                        break;
                case 0x07:      // Piece <index> <begin> <block>
                        if(unpack(p, n, "ll", &x, &o) != 8)
-                               goto Retry;
+                               goto Out;
                        p += 8;
                        n -= 8;
+                       lock(&stats);
+                       stats.down += n;
+                       unlock(&stats);
                        if(debug) fprint(2, "peer %s: <- piece %d %d %d\n", addr, x, o, n);
                        if(x < 0 || x >= npieces)
                                continue;
@@ -517,19 +514,102 @@ Retry:
                        break;
                case 0x08:      // Cancel <index> <begin> <length>
                        if(unpack(p, n, "lll", &x, &o, &l) < 0)
-                               goto Retry;
+                               goto Out;
                        if(debug) fprint(2, "peer %s: <- cancel %d %d %d\n", addr, x, o, l);
                        break;
                case 0x09:      // Port <port>
                        if(unpack(p, n, "l", &x) < 0)
-                               goto Retry;
+                               goto Out;
                        if(debug) fprint(2, "peer %s: <- port %d\n", addr, x);
                        break;
                }
        }
-Exit:
+
+Out:
        free(told);
        free(map);
+       return 1;
+}
+
+void
+server(void)
+{
+       char addr[64], adir[40], ldir[40];
+       int afd, lfd, dfd;
+       NetConnInfo *ni;
+
+       afd = -1;
+       for(port=6881; port<6890; port++){
+               snprint(addr, sizeof(addr), "tcp!*!%d", port);
+               if((afd = announce(addr, adir)) >= 0)
+                       break;
+       }
+       if(afd < 0){
+               fprint(2, "announce: %r");
+               return;
+       }
+       if(rfork(RFFDG|RFPROC|RFMEM))
+               return;
+       for(;;){
+               if((lfd = listen(adir, ldir)) < 0){
+                       fprint(2, "listen: %r");
+                       break;
+               }
+               if(rfork(RFFDG|RFPROC|RFMEM)){
+                       close(lfd);
+                       continue;
+               }
+               if((dfd = accept(lfd, ldir)) < 0){
+                       fprint(2, "accept: %r");
+                       break;
+               }
+               ni = getnetconninfo(ldir, dfd);
+               peer(dfd, 1, ni ? ni->raddr : "???");
+               if(ni) freenetconninfo(ni);     
+               break;
+       }
+       exits(0);
+}
+
+void
+client(char *ip, char *port)
+{
+       static Dict *peers;
+       static QLock peerslk;
+       int try, fd;
+       char *addr;
+       Dict *d;
+
+       if(ip == nil || port == nil)
+               return;
+
+       d = mallocz(sizeof(*d) + 64, 1);
+       snprint(addr = d->str, 64, "tcp!%s!%s", ip, port);
+       qlock(&peerslk);
+       if(dlook(peers, addr)){
+               qunlock(&peerslk);
+               free(d);
+               return;
+       }
+       d->len = strlen(addr);
+       d->typ = 'd';
+       d->val = d;
+       d->next = peers;
+       peers = d;
+       qunlock(&peerslk);
+
+       if(debug) fprint(2, "client %s\n", addr);
+
+       if(rfork(RFFDG|RFPROC|RFMEM))
+               return;
+       for(try = 0; try < 10; try++){
+               if((fd = dial(addr, nil, nil, nil)) >= 0){
+                       if(!peer(fd, 0, addr))
+                               break;
+                       close(fd);
+               }
+               sleep((1000<<try)+nrand(5000));
+       }
        exits(0);
 }
 
@@ -571,9 +651,9 @@ tracker(char *url)
        static Dict *trackers;
        static QLock trackerslk;
 
+       Dict *d, *l;
        int n, fd;
        char *p;
-       Dict *d, *l;
 
        if(url == nil)
                return;
@@ -594,17 +674,32 @@ tracker(char *url)
        url = d->str;
        qunlock(&trackerslk);
 
-       if(rfork(RFFDG|RFPROC|RFMEM) <= 0)
+       if(debug) fprint(2, "tracker %s\n", url);
+
+       if(rfork(RFPROC|RFMEM))
                return;
 
        for(;;){
+               vlong up, down, left;
+
+               lock(&stats);
+               up = stats.up;
+               down = stats.down;
+               left = stats.left;
+               unlock(&stats);
+
                d = nil;
-               if((fd = hopen("%s?info_hash=%.*H&peer_id=%.*H&port=%d&compact=1",
-                       url, sizeof(infohash), infohash, sizeof(peerid), peerid, port)) >= 0){
+               if((fd = hopen("%s?info_hash=%.*H&peer_id=%.*H&port=%d&"
+                       "uploaded=%lld&downloaded=%lld&left=%lld&"
+                       "compact=1",
+                       url, sizeof(infohash), infohash, sizeof(peerid), peerid, port,
+                       up, down, left)) >= 0){
                        n = readall(fd, &p);
                        close(fd);
                        bparse(p, p+n, &d);
                        free(p);
+               } else {
+                       if(debug) fprint(2, "tracker %s: %r\n", url);
                }
                if(l = dlook(d, "peers")){
                        if(l->typ == 's'){
@@ -617,10 +712,10 @@ tracker(char *url)
 
                                        snprint(ip, sizeof(ip), "%d.%d.%d.%d", b[0], b[1], b[2], b[3]);
                                        snprint(port, sizeof(port), "%d", b[4]<<8 | b[5]);
-                                       peer(ip, port);
+                                       client(ip, port);
                                }
                        } else for(; l && l->typ == 'l'; l = l->next)
-                               peer(dstr(dlook(l->val, "ip")), dstr(dlook(l->val, "port")));
+                               client(dstr(dlook(l->val, "ip")), dstr(dlook(l->val, "port")));
                }
                n = 0;
                if(p = dstr(dlook(d, "interval")))
@@ -651,34 +746,9 @@ Hfmt(Fmt *f)
 }
 
 int
-progress(void)
-{
-       int i, c;
-       uchar m;
-       c = 0;
-       for(i=0; i<nhavemap; i++)
-               for(m = 0x80; m; m>>=1)
-                       if(havemap[i] & m)
-                               c++;
-       if(pflag)
-               print("%d %d\n", c, npieces);
-       return c == npieces;
-}
-
-void
-killcohort(void)
+killnote(void *, char *)
 {
-       int i;
-       for(i=0;i!=3;i++){      /* It's a long way to the kitchen */
-               postnote(PNGROUP, pidgroup, "kill");
-               sleep(1);
-       }
-}
-
-int
-catchnote(void *, char *msg)
-{
-       exits(msg);
+       postnote(PNGROUP, killgroup, "kill");
        return 0;
 }
 
@@ -788,6 +858,7 @@ main(int argc, char *argv[])
                else
                        pieces[i].len = blocksize;
                len -= pieces[i].len;
+               stats.left += pieces[i].len;
        }
        if(len)
                sysfatal("pieces do not match file length");
@@ -795,23 +866,31 @@ main(int argc, char *argv[])
        for(i = 0; i<npieces; i++)
                havepiece(i);
 
-       switch(i = rfork(RFPROC|RFMEM|RFNOTEG|RFNAMEG)){
+       srand(time(0));
+       atnotify(killnote, 1);
+       switch(i = rfork(RFPROC|RFMEM|RFNOTEG)){
        case -1:
                sysfatal("fork: %r");
        case 0:
                memmove(peerid, "-NF9001-", 8);
-               genrandom(peerid+8, sizeof(peerid)-8);
+               for(i=8; i<sizeof(peerid); i++)
+                       peerid[i] = nrand(10)+'0';
+               server();
                tracker(dstr(dlook(torrent, "announce")));
                for(d = dlook(torrent, "announce-list"); d && d->typ == 'l'; d = d->next)
                        if(d->val && d->val->typ == 'l')
                                tracker(dstr(d->val->val));
+               while(waitpid() != -1)
+                       ;
                break;
        default:
-               pidgroup = i;
-               atexit(killcohort);
-               atnotify(catchnote, 1);
-               while(!progress() || sflag)
+               killgroup = i;
+               while((nhavepieces < npieces) || sflag){
+                       if(pflag)
+                               print("%d %d\n", nhavepieces, npieces);
                        sleep(1000);
+               }
        }
+       postnote(PNGROUP, killgroup, "kill");
        exits(0);
 }