]> git.lizzy.rs Git - plan9front.git/blob - sys/src/9/ip/rudp.c
kernel: dont use atomic increment for Proc.nlocks, maintain Lock.m for lock(), use...
[plan9front.git] / sys / src / 9 / ip / rudp.c
1 /*
2  *  Reliable User Datagram Protocol, currently only for IPv4.
3  *  This protocol is compatible with UDP's packet format.
4  *  It could be done over UDP if need be.
5  */
6 #include        "u.h"
7 #include        "../port/lib.h"
8 #include        "mem.h"
9 #include        "dat.h"
10 #include        "fns.h"
11 #include        "../port/error.h"
12
13 #include        "ip.h"
14
15 #define DEBUG   0
16 #define DPRINT if(DEBUG)print
17
18 #define SEQDIFF(a,b) ( (a)>=(b)?\
19                         (a)-(b):\
20                         0xffffffffUL-((b)-(a)) )
21 #define INSEQ(a,start,end) ( (start)<=(end)?\
22                                 ((a)>(start)&&(a)<=(end)):\
23                                 ((a)>(start)||(a)<=(end)) )
24 #define UNACKED(r) SEQDIFF(r->sndseq, r->ackrcvd)
25 #define NEXTSEQ(a) ( (a)+1 == 0 ? 1 : (a)+1 )
26
27 enum
28 {
29         UDP_PHDRSIZE    = 12,   /* pseudo header */
30 //      UDP_HDRSIZE     = 20,   /* pseudo header + udp header */
31         UDP_RHDRSIZE    = 36,   /* pseudo header + udp header + rudp header */
32         UDP_IPHDR       = 8,    /* ip header */
33         IP_UDPPROTO     = 254,
34         UDP_USEAD7      = 52,   /* size of new ipv6 headers struct */
35
36         Rudprxms        = 200,
37         Rudptickms      = 50,
38         Rudpmaxxmit     = 10,
39         Maxunacked      = 100,
40 };
41
42 #define Hangupgen       0xffffffff      /* used only in hangup messages */
43
44 typedef struct Udphdr Udphdr;
45 struct Udphdr
46 {
47         /* ip header */
48         uchar   vihl;           /* Version and header length */
49         uchar   tos;            /* Type of service */
50         uchar   length[2];      /* packet length */
51         uchar   id[2];          /* Identification */
52         uchar   frag[2];        /* Fragment information */
53
54         /* pseudo header starts here */
55         uchar   Unused;
56         uchar   udpproto;       /* Protocol */
57         uchar   udpplen[2];     /* Header plus data length */
58         uchar   udpsrc[4];      /* Ip source */
59         uchar   udpdst[4];      /* Ip destination */
60
61         /* udp header */
62         uchar   udpsport[2];    /* Source port */
63         uchar   udpdport[2];    /* Destination port */
64         uchar   udplen[2];      /* data length */
65         uchar   udpcksum[2];    /* Checksum */
66 };
67
68 typedef struct Rudphdr Rudphdr;
69 struct Rudphdr
70 {
71         /* ip header */
72         uchar   vihl;           /* Version and header length */
73         uchar   tos;            /* Type of service */
74         uchar   length[2];      /* packet length */
75         uchar   id[2];          /* Identification */
76         uchar   frag[2];        /* Fragment information */
77
78         /* pseudo header starts here */
79         uchar   Unused;
80         uchar   udpproto;       /* Protocol */
81         uchar   udpplen[2];     /* Header plus data length */
82         uchar   udpsrc[4];      /* Ip source */
83         uchar   udpdst[4];      /* Ip destination */
84
85         /* udp header */
86         uchar   udpsport[2];    /* Source port */
87         uchar   udpdport[2];    /* Destination port */
88         uchar   udplen[2];      /* data length (includes rudp header) */
89         uchar   udpcksum[2];    /* Checksum */
90
91         /* rudp header */
92         uchar   relseq[4];      /* id of this packet (or 0) */
93         uchar   relsgen[4];     /* generation/time stamp */
94         uchar   relack[4];      /* packet being acked (or 0) */
95         uchar   relagen[4];     /* generation/time stamp */
96 };
97
98
99 /*
100  *  one state structure per destination
101  */
102 typedef struct Reliable Reliable;
103 struct Reliable
104 {
105         Ref;
106
107         Reliable *next;
108
109         uchar   addr[IPaddrlen];        /* always V6 when put here */
110         ushort  port;
111
112         Block   *unacked;       /* unacked msg list */
113         Block   *unackedtail;   /*  and its tail */
114
115         int     timeout;        /* time since first unacked msg sent */
116         int     xmits;          /* number of times first unacked msg sent */
117
118         ulong   sndseq;         /* next packet to be sent */
119         ulong   sndgen;         /*  and its generation */
120
121         ulong   rcvseq;         /* last packet received */
122         ulong   rcvgen;         /*  and its generation */
123
124         ulong   acksent;        /* last ack sent */
125         ulong   ackrcvd;        /* last msg for which ack was rcvd */
126
127         /* flow control */
128         QLock   lock;
129         Rendez  vous;
130         int     blocked;
131 };
132
133
134
135 /* MIB II counters */
136 typedef struct Rudpstats Rudpstats;
137 struct Rudpstats
138 {
139         ulong   rudpInDatagrams;
140         ulong   rudpNoPorts;
141         ulong   rudpInErrors;
142         ulong   rudpOutDatagrams;
143 };
144
145 typedef struct Rudppriv Rudppriv;
146 struct Rudppriv
147 {
148         Ipht    ht;
149
150         /* MIB counters */
151         Rudpstats       ustats;
152
153         /* non-MIB stats */
154         ulong   csumerr;                /* checksum errors */
155         ulong   lenerr;                 /* short packet */
156         ulong   rxmits;                 /* # of retransmissions */
157         ulong   orders;                 /* # of out of order pkts */
158
159         /* keeping track of the ack kproc */
160         int     ackprocstarted;
161         QLock   apl;
162 };
163
164
165 static ulong generation = 0;
166 static Rendez rend;
167
168 /*
169  *  protocol specific part of Conv
170  */
171 typedef struct Rudpcb Rudpcb;
172 struct Rudpcb
173 {
174         QLock;
175         uchar   headers;
176         uchar   randdrop;
177         Reliable *r;
178 };
179
180 /*
181  * local functions 
182  */
183 void    relsendack(Conv*, Reliable*, int);
184 int     reliput(Conv*, Block*, uchar*, ushort);
185 Reliable *relstate(Rudpcb*, uchar*, ushort, char*);
186 void    relput(Reliable*);
187 void    relforget(Conv *, uchar*, int, int);
188 void    relackproc(void *);
189 void    relackq(Reliable *, Block*);
190 void    relhangup(Conv *, Reliable*);
191 void    relrexmit(Conv *, Reliable*);
192 void    relput(Reliable*);
193 void    rudpkick(void *x);
194
195 static void
196 rudpstartackproc(Proto *rudp)
197 {
198         Rudppriv *rpriv;
199         char kpname[KNAMELEN];
200
201         rpriv = rudp->priv;
202         if(rpriv->ackprocstarted == 0){
203                 qlock(&rpriv->apl);
204                 if(rpriv->ackprocstarted == 0){
205                         sprint(kpname, "#I%drudpack", rudp->f->dev);
206                         kproc(kpname, relackproc, rudp);
207                         rpriv->ackprocstarted = 1;
208                 }
209                 qunlock(&rpriv->apl);
210         }
211 }
212
213 static char*
214 rudpconnect(Conv *c, char **argv, int argc)
215 {
216         char *e;
217         Rudppriv *upriv;
218
219         upriv = c->p->priv;
220         rudpstartackproc(c->p);
221         e = Fsstdconnect(c, argv, argc);
222         Fsconnected(c, e);
223         iphtadd(&upriv->ht, c);
224
225         return e;
226 }
227
228
229 static int
230 rudpstate(Conv *c, char *state, int n)
231 {
232         Rudpcb *ucb;
233         Reliable *r;
234         int m;
235
236         m = snprint(state, n, "%s", c->inuse?"Open":"Closed");
237         ucb = (Rudpcb*)c->ptcl;
238         qlock(ucb);
239         for(r = ucb->r; r; r = r->next)
240                 m += snprint(state+m, n-m, " %I/%ld", r->addr, UNACKED(r));
241         m += snprint(state+m, n-m, "\n");
242         qunlock(ucb);
243         return m;
244 }
245
246 static char*
247 rudpannounce(Conv *c, char** argv, int argc)
248 {
249         char *e;
250         Rudppriv *upriv;
251
252         upriv = c->p->priv;
253         rudpstartackproc(c->p);
254         e = Fsstdannounce(c, argv, argc);
255         if(e != nil)
256                 return e;
257         Fsconnected(c, nil);
258         iphtadd(&upriv->ht, c);
259
260         return nil;
261 }
262
263 static void
264 rudpcreate(Conv *c)
265 {
266         c->rq = qopen(64*1024, Qmsg, 0, 0);
267         c->wq = qopen(64*1024, Qkick, rudpkick, c);
268 }
269
270 static void
271 rudpclose(Conv *c)
272 {
273         Rudpcb *ucb;
274         Reliable *r, *nr;
275         Rudppriv *upriv;
276
277         upriv = c->p->priv;
278         iphtrem(&upriv->ht, c);
279
280         /* force out any delayed acks */
281         ucb = (Rudpcb*)c->ptcl;
282         qlock(ucb);
283         for(r = ucb->r; r; r = r->next){
284                 if(r->acksent != r->rcvseq)
285                         relsendack(c, r, 0);
286         }
287         qunlock(ucb);
288
289         qclose(c->rq);
290         qclose(c->wq);
291         qclose(c->eq);
292         ipmove(c->laddr, IPnoaddr);
293         ipmove(c->raddr, IPnoaddr);
294         c->lport = 0;
295         c->rport = 0;
296
297         ucb->headers = 0;
298         ucb->randdrop = 0;
299         qlock(ucb);
300         for(r = ucb->r; r; r = nr){
301                 if(r->acksent != r->rcvseq)
302                         relsendack(c, r, 0);
303                 nr = r->next;
304                 relhangup(c, r);
305                 relput(r);
306         }
307         ucb->r = 0;
308
309         qunlock(ucb);
310 }
311
312 /*
313  *  randomly don't send packets
314  */
315 static void
316 doipoput(Conv *c, Fs *f, Block *bp, int x, int ttl, int tos)
317 {
318         Rudpcb *ucb;
319
320         ucb = (Rudpcb*)c->ptcl;
321         if(ucb->randdrop && nrand(100) < ucb->randdrop)
322                 freeblist(bp);
323         else
324                 ipoput4(f, bp, x, ttl, tos, nil);
325 }
326
327 int
328 flow(void *v)
329 {
330         Reliable *r = v;
331
332         return UNACKED(r) <= Maxunacked;
333 }
334
335 void
336 rudpkick(void *x)
337 {
338         Conv *c = x;
339         Udphdr *uh;
340         ushort rport;
341         uchar laddr[IPaddrlen], raddr[IPaddrlen];
342         Block *bp;
343         Rudpcb *ucb;
344         Rudphdr *rh;
345         Reliable *r;
346         int dlen, ptcllen;
347         Rudppriv *upriv;
348         Fs *f;
349
350         upriv = c->p->priv;
351         f = c->p->f;
352
353         netlog(c->p->f, Logrudp, "rudp: kick\n");
354         bp = qget(c->wq);
355         if(bp == nil)
356                 return;
357
358         ucb = (Rudpcb*)c->ptcl;
359         switch(ucb->headers) {
360         case 7:
361                 /* get user specified addresses */
362                 bp = pullupblock(bp, UDP_USEAD7);
363                 if(bp == nil)
364                         return;
365                 ipmove(raddr, bp->rp);
366                 bp->rp += IPaddrlen;
367                 ipmove(laddr, bp->rp);
368                 bp->rp += IPaddrlen;
369                 /* pick interface closest to dest */
370                 if(ipforme(f, laddr) != Runi)
371                         findlocalip(f, laddr, raddr);
372                 bp->rp += IPaddrlen;            /* Ignore ifc address */
373                 rport = nhgets(bp->rp);
374                 bp->rp += 2+2;                  /* Ignore local port */
375                 break;
376         default:
377                 ipmove(raddr, c->raddr);
378                 ipmove(laddr, c->laddr);
379                 rport = c->rport;
380                 break;
381         }
382
383         dlen = blocklen(bp);
384
385         /* Make space to fit rudp & ip header */
386         bp = padblock(bp, UDP_IPHDR+UDP_RHDRSIZE);
387         if(bp == nil)
388                 return;
389
390         uh = (Udphdr *)(bp->rp);
391         uh->vihl = IP_VER4;
392
393         rh = (Rudphdr*)uh;
394
395         ptcllen = dlen + (UDP_RHDRSIZE-UDP_PHDRSIZE);
396         uh->Unused = 0;
397         uh->udpproto = IP_UDPPROTO;
398         uh->frag[0] = 0;
399         uh->frag[1] = 0;
400         hnputs(uh->udpplen, ptcllen);
401         switch(ucb->headers){
402         case 7:
403                 v6tov4(uh->udpdst, raddr);
404                 hnputs(uh->udpdport, rport);
405                 v6tov4(uh->udpsrc, laddr);
406                 break;
407         default:
408                 v6tov4(uh->udpdst, c->raddr);
409                 hnputs(uh->udpdport, c->rport);
410                 if(ipcmp(c->laddr, IPnoaddr) == 0)
411                         findlocalip(f, c->laddr, c->raddr);
412                 v6tov4(uh->udpsrc, c->laddr);
413                 break;
414         }
415         hnputs(uh->udpsport, c->lport);
416         hnputs(uh->udplen, ptcllen);
417         uh->udpcksum[0] = 0;
418         uh->udpcksum[1] = 0;
419
420         qlock(ucb);
421         r = relstate(ucb, raddr, rport, "kick");
422         r->sndseq = NEXTSEQ(r->sndseq);
423         hnputl(rh->relseq, r->sndseq);
424         hnputl(rh->relsgen, r->sndgen);
425
426         hnputl(rh->relack, r->rcvseq);  /* ACK last rcvd packet */
427         hnputl(rh->relagen, r->rcvgen);
428
429         if(r->rcvseq != r->acksent)
430                 r->acksent = r->rcvseq;
431
432         hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, dlen+UDP_RHDRSIZE));
433
434         relackq(r, bp);
435         qunlock(ucb);
436
437         upriv->ustats.rudpOutDatagrams++;
438
439         DPRINT("sent: %lud/%lud, %lud/%lud\n", 
440                 r->sndseq, r->sndgen, r->rcvseq, r->rcvgen);
441
442         doipoput(c, f, bp, 0, c->ttl, c->tos);
443
444         if(waserror()) {
445                 relput(r);
446                 qunlock(&r->lock);
447                 nexterror();
448         }
449
450         /* flow control of sorts */
451         qlock(&r->lock);
452         if(UNACKED(r) > Maxunacked){
453                 r->blocked = 1;
454                 sleep(&r->vous, flow, r);
455                 r->blocked = 0;
456         }
457
458         qunlock(&r->lock);
459         relput(r);
460         poperror();
461 }
462
463 void
464 rudpiput(Proto *rudp, Ipifc *ifc, Block *bp)
465 {
466         int len, olen, ottl;
467         Udphdr *uh;
468         Conv *c;
469         Rudpcb *ucb;
470         uchar raddr[IPaddrlen], laddr[IPaddrlen];
471         ushort rport, lport;
472         Rudppriv *upriv;
473         Fs *f;
474         uchar *p;
475
476         upriv = rudp->priv;
477         f = rudp->f;
478
479         upriv->ustats.rudpInDatagrams++;
480
481         uh = (Udphdr*)(bp->rp);
482
483         /* Put back pseudo header for checksum 
484          * (remember old values for icmpnoconv()) 
485          */
486         ottl = uh->Unused;
487         uh->Unused = 0;
488         len = nhgets(uh->udplen);
489         olen = nhgets(uh->udpplen);
490         hnputs(uh->udpplen, len);
491
492         v4tov6(raddr, uh->udpsrc);
493         v4tov6(laddr, uh->udpdst);
494         lport = nhgets(uh->udpdport);
495         rport = nhgets(uh->udpsport);
496
497         if(nhgets(uh->udpcksum)) {
498                 if(ptclcsum(bp, UDP_IPHDR, len+UDP_PHDRSIZE)) {
499                         upriv->ustats.rudpInErrors++;
500                         upriv->csumerr++;
501                         netlog(f, Logrudp, "rudp: checksum error %I\n", raddr);
502                         DPRINT("rudp: checksum error %I\n", raddr);
503                         freeblist(bp);
504                         return;
505                 }
506         }
507
508         qlock(rudp);
509
510         c = iphtlook(&upriv->ht, raddr, rport, laddr, lport);
511         if(c == nil){
512                 /* no conversation found */
513                 upriv->ustats.rudpNoPorts++;
514                 qunlock(rudp);
515                 netlog(f, Logudp, "udp: no conv %I!%d -> %I!%d\n", raddr, rport,
516                         laddr, lport);
517                 uh->Unused = ottl;
518                 hnputs(uh->udpplen, olen);
519                 icmpnoconv(f, bp);
520                 freeblist(bp);
521                 return;
522         }
523         ucb = (Rudpcb*)c->ptcl;
524         qlock(ucb);
525         qunlock(rudp);
526
527         if(reliput(c, bp, raddr, rport) < 0){
528                 qunlock(ucb);
529                 freeb(bp);
530                 return;
531         }
532
533         /*
534          * Trim the packet down to data size
535          */
536
537         len -= (UDP_RHDRSIZE-UDP_PHDRSIZE);
538         bp = trimblock(bp, UDP_IPHDR+UDP_RHDRSIZE, len);
539         if(bp == nil) {
540                 netlog(f, Logrudp, "rudp: len err %I.%d -> %I.%d\n", 
541                         raddr, rport, laddr, lport);
542                 DPRINT("rudp: len err %I.%d -> %I.%d\n", 
543                         raddr, rport, laddr, lport);
544                 upriv->lenerr++;
545                 return;
546         }
547
548         netlog(f, Logrudpmsg, "rudp: %I.%d -> %I.%d l %d\n", 
549                 raddr, rport, laddr, lport, len);
550
551         switch(ucb->headers){
552         case 7:
553                 /* pass the src address */
554                 bp = padblock(bp, UDP_USEAD7);
555                 p = bp->rp;
556                 ipmove(p, raddr); p += IPaddrlen;
557                 ipmove(p, laddr); p += IPaddrlen;
558                 ipmove(p, ifc->lifc->local); p += IPaddrlen;
559                 hnputs(p, rport); p += 2;
560                 hnputs(p, lport);
561                 break;
562         default:
563                 /* connection oriented rudp */
564                 if(ipcmp(c->raddr, IPnoaddr) == 0){
565                         /* save the src address in the conversation */
566                         ipmove(c->raddr, raddr);
567                         c->rport = rport;
568
569                         /* reply with the same ip address (if not broadcast) */
570                         if(ipforme(f, laddr) == Runi)
571                                 ipmove(c->laddr, laddr);
572                         else
573                                 v4tov6(c->laddr, ifc->lifc->local);
574                 }
575                 break;
576         }
577         if(bp->next)
578                 bp = concatblock(bp);
579
580         if(qfull(c->rq)) {
581                 netlog(f, Logrudp, "rudp: qfull %I.%d -> %I.%d\n", raddr, rport,
582                         laddr, lport);
583                 freeblist(bp);
584         }
585         else
586                 qpass(c->rq, bp);
587         
588         qunlock(ucb);
589 }
590
591 static char *rudpunknown = "unknown rudp ctl request";
592
593 char*
594 rudpctl(Conv *c, char **f, int n)
595 {
596         Rudpcb *ucb;
597         uchar ip[IPaddrlen];
598         int x;
599
600         ucb = (Rudpcb*)c->ptcl;
601         if(n < 1)
602                 return rudpunknown;
603
604         if(strcmp(f[0], "headers") == 0){
605                 ucb->headers = 7;               /* new headers format */
606                 return nil;
607         } else if(strcmp(f[0], "hangup") == 0){
608                 if(n < 3)
609                         return "bad syntax";
610                 if (parseip(ip, f[1]) == -1)
611                         return Ebadip;
612                 x = atoi(f[2]);
613                 qlock(ucb);
614                 relforget(c, ip, x, 1);
615                 qunlock(ucb);
616                 return nil;
617         } else if(strcmp(f[0], "randdrop") == 0){
618                 x = 10;                 /* default is 10% */
619                 if(n > 1)
620                         x = atoi(f[1]);
621                 if(x > 100 || x < 0)
622                         return "illegal rudp drop rate";
623                 ucb->randdrop = x;
624                 return nil;
625         }
626         return rudpunknown;
627 }
628
629 void
630 rudpadvise(Proto *rudp, Block *bp, char *msg)
631 {
632         Udphdr *h;
633         uchar source[IPaddrlen], dest[IPaddrlen];
634         ushort psource, pdest;
635         Conv *s, **p;
636
637         h = (Udphdr*)(bp->rp);
638
639         v4tov6(dest, h->udpdst);
640         v4tov6(source, h->udpsrc);
641         psource = nhgets(h->udpsport);
642         pdest = nhgets(h->udpdport);
643
644         /* Look for a connection */
645         for(p = rudp->conv; *p; p++) {
646                 s = *p;
647                 if(s->rport == pdest)
648                 if(s->lport == psource)
649                 if(ipcmp(s->raddr, dest) == 0)
650                 if(ipcmp(s->laddr, source) == 0){
651                         qhangup(s->rq, msg);
652                         qhangup(s->wq, msg);
653                         break;
654                 }
655         }
656         freeblist(bp);
657 }
658
659 int
660 rudpstats(Proto *rudp, char *buf, int len)
661 {
662         Rudppriv *upriv;
663
664         upriv = rudp->priv;
665         return snprint(buf, len, "%lud %lud %lud %lud %lud %lud\n",
666                 upriv->ustats.rudpInDatagrams,
667                 upriv->ustats.rudpNoPorts,
668                 upriv->ustats.rudpInErrors,
669                 upriv->ustats.rudpOutDatagrams,
670                 upriv->rxmits,
671                 upriv->orders);
672 }
673
674 void
675 rudpinit(Fs *fs)
676 {
677
678         Proto *rudp;
679
680         rudp = smalloc(sizeof(Proto));
681         rudp->priv = smalloc(sizeof(Rudppriv));
682         rudp->name = "rudp";
683         rudp->connect = rudpconnect;
684         rudp->announce = rudpannounce;
685         rudp->ctl = rudpctl;
686         rudp->state = rudpstate;
687         rudp->create = rudpcreate;
688         rudp->close = rudpclose;
689         rudp->rcv = rudpiput;
690         rudp->advise = rudpadvise;
691         rudp->stats = rudpstats;
692         rudp->ipproto = IP_UDPPROTO;
693         rudp->nc = 32;
694         rudp->ptclsize = sizeof(Rudpcb);
695
696         Fsproto(fs, rudp);
697 }
698
699 /*********************************************/
700 /* Here starts the reliable helper functions */
701 /*********************************************/
702 /*
703  *  Enqueue a copy of an unacked block for possible retransmissions
704  */
705 void
706 relackq(Reliable *r, Block *bp)
707 {
708         Block *np;
709
710         np = copyblock(bp, blocklen(bp));
711         if(r->unacked)
712                 r->unackedtail->list = np;
713         else {
714                 /* restart timer */
715                 r->timeout = 0;
716                 r->xmits = 1;
717                 r->unacked = np;
718         }
719         r->unackedtail = np;
720         np->list = nil;
721 }
722
723 /*
724  *  retransmit unacked blocks
725  */
726 void
727 relackproc(void *a)
728 {
729         Rudpcb *ucb;
730         Proto *rudp;
731         Reliable *r;
732         Conv **s, *c;
733
734         rudp = (Proto *)a;
735
736         while(waserror())
737                 ;
738 loop:
739         tsleep(&up->sleep, return0, 0, Rudptickms);
740
741         for(s = rudp->conv; *s; s++) {
742                 c = *s;
743                 ucb = (Rudpcb*)c->ptcl;
744                 qlock(ucb);
745
746                 for(r = ucb->r; r; r = r->next) {
747                         if(r->unacked != nil){
748                                 r->timeout += Rudptickms;
749                                 if(r->timeout > Rudprxms*r->xmits)
750                                         relrexmit(c, r);
751                         }
752                         if(r->acksent != r->rcvseq)
753                                 relsendack(c, r, 0);
754                 }
755                 qunlock(ucb);
756         }
757         goto loop;
758 }
759
760 /*
761  *  get the state record for a conversation
762  */
763 Reliable*
764 relstate(Rudpcb *ucb, uchar *addr, ushort port, char *from)
765 {
766         Reliable *r, **l;
767
768         l = &ucb->r;
769         for(r = *l; r; r = *l){
770                 if(memcmp(addr, r->addr, IPaddrlen) == 0 && 
771                     port == r->port)
772                         break;
773                 l = &r->next;
774         }
775
776         /* no state for this addr/port, create some */
777         if(r == nil){
778                 while(generation == 0)
779                         generation = rand();
780
781                 DPRINT("from %s new state %lud for %I!%ud\n", 
782                         from, generation, addr, port);
783
784                 r = smalloc(sizeof(Reliable));
785                 memmove(r->addr, addr, IPaddrlen);
786                 r->port = port;
787                 r->unacked = 0;
788                 if(generation == Hangupgen)
789                         generation++;
790                 r->sndgen = generation++;
791                 r->sndseq = 0;
792                 r->ackrcvd = 0;
793                 r->rcvgen = 0;
794                 r->rcvseq = 0;
795                 r->acksent = 0;
796                 r->xmits = 0;
797                 r->timeout = 0;
798                 r->ref = 0;
799                 incref(r);      /* one reference for being in the list */
800
801                 *l = r;
802         }
803
804         incref(r);
805         return r;
806 }
807
808 void
809 relput(Reliable *r)
810 {
811         if(decref(r) == 0)
812                 free(r);
813 }
814
815 /*
816  *  forget a Reliable state
817  */
818 void
819 relforget(Conv *c, uchar *ip, int port, int originator)
820 {
821         Rudpcb *ucb;
822         Reliable *r, **l;
823
824         ucb = (Rudpcb*)c->ptcl;
825
826         l = &ucb->r;
827         for(r = *l; r; r = *l){
828                 if(ipcmp(ip, r->addr) == 0 && port == r->port){
829                         *l = r->next;
830                         if(originator)
831                                 relsendack(c, r, 1);
832                         relhangup(c, r);
833                         relput(r);      /* remove from the list */
834                         break;
835                 }
836                 l = &r->next;
837         }
838 }
839
840 /* 
841  *  process a rcvd reliable packet. return -1 if not to be passed to user process,
842  *  0 therwise.
843  *
844  *  called with ucb locked.
845  */
846 int
847 reliput(Conv *c, Block *bp, uchar *addr, ushort port)
848 {
849         Block *nbp;
850         Rudpcb *ucb;
851         Rudppriv *upriv;
852         Udphdr *uh;
853         Reliable *r;
854         Rudphdr *rh;
855         ulong seq, ack, sgen, agen, ackreal;
856         int rv = -1;
857
858         /* get fields */
859         uh = (Udphdr*)(bp->rp);
860         rh = (Rudphdr*)uh;
861         seq = nhgetl(rh->relseq);
862         sgen = nhgetl(rh->relsgen);
863         ack = nhgetl(rh->relack);
864         agen = nhgetl(rh->relagen);
865
866         upriv = c->p->priv;
867         ucb = (Rudpcb*)c->ptcl;
868         r = relstate(ucb, addr, port, "input");
869
870         DPRINT("rcvd %lud/%lud, %lud/%lud, r->sndgen = %lud\n", 
871                 seq, sgen, ack, agen, r->sndgen);
872
873         /* if acking an incorrect generation, ignore */
874         if(ack && agen != r->sndgen)
875                 goto out;
876
877         /* Look for a hangup */
878         if(sgen == Hangupgen) {
879                 if(agen == r->sndgen)
880                         relforget(c, addr, port, 0);
881                 goto out;
882         }
883
884         /* make sure we're not talking to a new remote side */
885         if(r->rcvgen != sgen){
886                 if(seq != 0 && seq != 1)
887                         goto out;
888
889                 /* new connection */
890                 if(r->rcvgen != 0){
891                         DPRINT("new con r->rcvgen = %lud, sgen = %lud\n", r->rcvgen, sgen);
892                         relhangup(c, r);
893                 }
894                 r->rcvgen = sgen;
895         }
896
897         /* dequeue acked packets */
898         if(ack && agen == r->sndgen){
899                 ackreal = 0;
900                 while(r->unacked != nil && INSEQ(ack, r->ackrcvd, r->sndseq)){
901                         nbp = r->unacked;
902                         r->unacked = nbp->list;
903                         DPRINT("%lud/%lud acked, r->sndgen = %lud\n", 
904                                ack, agen, r->sndgen);
905                         freeb(nbp);
906                         r->ackrcvd = NEXTSEQ(r->ackrcvd);
907                         ackreal = 1;
908                 }
909
910                 /* flow control */
911                 if(UNACKED(r) < Maxunacked/8 && r->blocked)
912                         wakeup(&r->vous);
913
914                 /*
915                  *  retransmit next packet if the acked packet
916                  *  was transmitted more than once
917                  */
918                 if(ackreal && r->unacked != nil){
919                         r->timeout = 0;
920                         if(r->xmits > 1){
921                                 r->xmits = 1;
922                                 relrexmit(c, r);
923                         }
924                 }
925                 
926         }
927
928         /* no message or input queue full */
929         if(seq == 0 || qfull(c->rq))
930                 goto out;
931
932         /* refuse out of order delivery */
933         if(seq != NEXTSEQ(r->rcvseq)){
934                 relsendack(c, r, 0);    /* tell him we got it already */
935                 upriv->orders++;
936                 DPRINT("out of sequence %lud not %lud\n", seq, NEXTSEQ(r->rcvseq));
937                 goto out;
938         }
939         r->rcvseq = seq;
940
941         rv = 0;
942 out:
943         relput(r);
944         return rv;
945 }
946
947 void
948 relsendack(Conv *c, Reliable *r, int hangup)
949 {
950         Udphdr *uh;
951         Block *bp;
952         Rudphdr *rh;
953         int ptcllen;
954         Fs *f;
955
956         bp = allocb(UDP_IPHDR + UDP_RHDRSIZE);
957         if(bp == nil)
958                 return;
959         bp->wp += UDP_IPHDR + UDP_RHDRSIZE;
960         f = c->p->f;
961         uh = (Udphdr *)(bp->rp);
962         uh->vihl = IP_VER4;
963         rh = (Rudphdr*)uh;
964
965         ptcllen = (UDP_RHDRSIZE-UDP_PHDRSIZE);
966         uh->Unused = 0;
967         uh->udpproto = IP_UDPPROTO;
968         uh->frag[0] = 0;
969         uh->frag[1] = 0;
970         hnputs(uh->udpplen, ptcllen);
971
972         v6tov4(uh->udpdst, r->addr);
973         hnputs(uh->udpdport, r->port);
974         hnputs(uh->udpsport, c->lport);
975         if(ipcmp(c->laddr, IPnoaddr) == 0)
976                 findlocalip(f, c->laddr, c->raddr);
977         v6tov4(uh->udpsrc, c->laddr);
978         hnputs(uh->udplen, ptcllen);
979
980         if(hangup)
981                 hnputl(rh->relsgen, Hangupgen);
982         else
983                 hnputl(rh->relsgen, r->sndgen);
984         hnputl(rh->relseq, 0);
985         hnputl(rh->relagen, r->rcvgen);
986         hnputl(rh->relack, r->rcvseq);
987
988         if(r->acksent < r->rcvseq)
989                 r->acksent = r->rcvseq;
990
991         uh->udpcksum[0] = 0;
992         uh->udpcksum[1] = 0;
993         hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, UDP_RHDRSIZE));
994
995         DPRINT("sendack: %lud/%lud, %lud/%lud\n", 0L, r->sndgen, r->rcvseq, r->rcvgen);
996         doipoput(c, f, bp, 0, c->ttl, c->tos);
997 }
998
999
1000 /*
1001  *  called with ucb locked (and c locked if user initiated close)
1002  */
1003 void
1004 relhangup(Conv *c, Reliable *r)
1005 {
1006         int n;
1007         Block *bp;
1008         char hup[ERRMAX];
1009
1010         n = snprint(hup, sizeof(hup), "hangup %I!%d", r->addr, r->port);
1011         qproduce(c->eq, hup, n);
1012
1013         /*
1014          *  dump any unacked outgoing messages
1015          */
1016         for(bp = r->unacked; bp != nil; bp = r->unacked){
1017                 r->unacked = bp->list;
1018                 bp->list = nil;
1019                 freeb(bp);
1020         }
1021
1022         r->rcvgen = 0;
1023         r->rcvseq = 0;
1024         r->acksent = 0;
1025         if(generation == Hangupgen)
1026                 generation++;
1027         r->sndgen = generation++;
1028         r->sndseq = 0;
1029         r->ackrcvd = 0;
1030         r->xmits = 0;
1031         r->timeout = 0;
1032         wakeup(&r->vous);
1033 }
1034
1035 /*
1036  *  called with ucb locked
1037  */
1038 void
1039 relrexmit(Conv *c, Reliable *r)
1040 {
1041         Rudppriv *upriv;
1042         Block *np;
1043         Fs *f;
1044
1045         upriv = c->p->priv;
1046         f = c->p->f;
1047         r->timeout = 0;
1048         if(r->xmits++ > Rudpmaxxmit){
1049                 relhangup(c, r);
1050                 return;
1051         }
1052
1053         upriv->rxmits++;
1054         np = copyblock(r->unacked, blocklen(r->unacked));
1055         DPRINT("rxmit r->ackrvcd+1 = %lud\n", r->ackrcvd+1);
1056         doipoput(c, f, np, 0, c->ttl, c->tos);
1057 }