2 * Reliable User Datagram Protocol, currently only for IPv4.
3 * This protocol is compatible with UDP's packet format.
4 * It could be done over UDP if need be.
7 #include "../port/lib.h"
11 #include "../port/error.h"
16 #define DPRINT if(DEBUG)print
18 #define SEQDIFF(a,b) ( (a)>=(b)?\
20 0xffffffffUL-((b)-(a)) )
21 #define INSEQ(a,start,end) ( (start)<=(end)?\
22 ((a)>(start)&&(a)<=(end)):\
23 ((a)>(start)||(a)<=(end)) )
24 #define UNACKED(r) SEQDIFF(r->sndseq, r->ackrcvd)
25 #define NEXTSEQ(a) ( (a)+1 == 0 ? 1 : (a)+1 )
29 UDP_PHDRSIZE = 12, /* pseudo header */
30 // UDP_HDRSIZE = 20, /* pseudo header + udp header */
31 UDP_RHDRSIZE = 36, /* pseudo header + udp header + rudp header */
32 UDP_IPHDR = 8, /* ip header */
34 UDP_USEAD7 = 52, /* size of new ipv6 headers struct */
42 #define Hangupgen 0xffffffff /* used only in hangup messages */
44 typedef struct Udphdr Udphdr;
48 uchar vihl; /* Version and header length */
49 uchar tos; /* Type of service */
50 uchar length[2]; /* packet length */
51 uchar id[2]; /* Identification */
52 uchar frag[2]; /* Fragment information */
54 /* pseudo header starts here */
56 uchar udpproto; /* Protocol */
57 uchar udpplen[2]; /* Header plus data length */
58 uchar udpsrc[4]; /* Ip source */
59 uchar udpdst[4]; /* Ip destination */
62 uchar udpsport[2]; /* Source port */
63 uchar udpdport[2]; /* Destination port */
64 uchar udplen[2]; /* data length */
65 uchar udpcksum[2]; /* Checksum */
68 typedef struct Rudphdr Rudphdr;
72 uchar vihl; /* Version and header length */
73 uchar tos; /* Type of service */
74 uchar length[2]; /* packet length */
75 uchar id[2]; /* Identification */
76 uchar frag[2]; /* Fragment information */
78 /* pseudo header starts here */
80 uchar udpproto; /* Protocol */
81 uchar udpplen[2]; /* Header plus data length */
82 uchar udpsrc[4]; /* Ip source */
83 uchar udpdst[4]; /* Ip destination */
86 uchar udpsport[2]; /* Source port */
87 uchar udpdport[2]; /* Destination port */
88 uchar udplen[2]; /* data length (includes rudp header) */
89 uchar udpcksum[2]; /* Checksum */
92 uchar relseq[4]; /* id of this packet (or 0) */
93 uchar relsgen[4]; /* generation/time stamp */
94 uchar relack[4]; /* packet being acked (or 0) */
95 uchar relagen[4]; /* generation/time stamp */
100 * one state structure per destination
102 typedef struct Reliable Reliable;
109 uchar addr[IPaddrlen]; /* always V6 when put here */
112 Block *unacked; /* unacked msg list */
113 Block *unackedtail; /* and its tail */
115 int timeout; /* time since first unacked msg sent */
116 int xmits; /* number of times first unacked msg sent */
118 ulong sndseq; /* next packet to be sent */
119 ulong sndgen; /* and its generation */
121 ulong rcvseq; /* last packet received */
122 ulong rcvgen; /* and its generation */
124 ulong acksent; /* last ack sent */
125 ulong ackrcvd; /* last msg for which ack was rcvd */
135 /* MIB II counters */
136 typedef struct Rudpstats Rudpstats;
139 ulong rudpInDatagrams;
142 ulong rudpOutDatagrams;
145 typedef struct Rudppriv Rudppriv;
154 ulong csumerr; /* checksum errors */
155 ulong lenerr; /* short packet */
156 ulong rxmits; /* # of retransmissions */
157 ulong orders; /* # of out of order pkts */
159 /* keeping track of the ack kproc */
165 static ulong generation = 0;
169 * protocol specific part of Conv
171 typedef struct Rudpcb Rudpcb;
183 void relsendack(Conv*, Reliable*, int);
184 int reliput(Conv*, Block*, uchar*, ushort);
185 Reliable *relstate(Rudpcb*, uchar*, ushort, char*);
186 void relput(Reliable*);
187 void relforget(Conv *, uchar*, int, int);
188 void relackproc(void *);
189 void relackq(Reliable *, Block*);
190 void relhangup(Conv *, Reliable*);
191 void relrexmit(Conv *, Reliable*);
192 void relput(Reliable*);
193 void rudpkick(void *x);
196 rudpstartackproc(Proto *rudp)
199 char kpname[KNAMELEN];
202 if(rpriv->ackprocstarted == 0){
204 if(rpriv->ackprocstarted == 0){
205 sprint(kpname, "#I%drudpack", rudp->f->dev);
206 kproc(kpname, relackproc, rudp);
207 rpriv->ackprocstarted = 1;
209 qunlock(&rpriv->apl);
214 rudpconnect(Conv *c, char **argv, int argc)
220 rudpstartackproc(c->p);
221 e = Fsstdconnect(c, argv, argc);
223 iphtadd(&upriv->ht, c);
230 rudpstate(Conv *c, char *state, int n)
236 m = snprint(state, n, "%s", c->inuse?"Open":"Closed");
237 ucb = (Rudpcb*)c->ptcl;
239 for(r = ucb->r; r; r = r->next)
240 m += snprint(state+m, n-m, " %I/%ld", r->addr, UNACKED(r));
241 m += snprint(state+m, n-m, "\n");
247 rudpannounce(Conv *c, char** argv, int argc)
253 rudpstartackproc(c->p);
254 e = Fsstdannounce(c, argv, argc);
258 iphtadd(&upriv->ht, c);
266 c->rq = qopen(64*1024, Qmsg, 0, 0);
267 c->wq = qopen(64*1024, Qkick, rudpkick, c);
278 iphtrem(&upriv->ht, c);
280 /* force out any delayed acks */
281 ucb = (Rudpcb*)c->ptcl;
283 for(r = ucb->r; r; r = r->next){
284 if(r->acksent != r->rcvseq)
292 ipmove(c->laddr, IPnoaddr);
293 ipmove(c->raddr, IPnoaddr);
300 for(r = ucb->r; r; r = nr){
301 if(r->acksent != r->rcvseq)
313 * randomly don't send packets
316 doipoput(Conv *c, Fs *f, Block *bp, int x, int ttl, int tos)
320 ucb = (Rudpcb*)c->ptcl;
321 if(ucb->randdrop && nrand(100) < ucb->randdrop)
324 ipoput4(f, bp, x, ttl, tos, nil);
332 return UNACKED(r) <= Maxunacked;
341 uchar laddr[IPaddrlen], raddr[IPaddrlen];
353 netlog(c->p->f, Logrudp, "rudp: kick\n");
358 ucb = (Rudpcb*)c->ptcl;
359 switch(ucb->headers) {
361 /* get user specified addresses */
362 bp = pullupblock(bp, UDP_USEAD7);
365 ipmove(raddr, bp->rp);
367 ipmove(laddr, bp->rp);
369 /* pick interface closest to dest */
370 if(ipforme(f, laddr) != Runi)
371 findlocalip(f, laddr, raddr);
372 bp->rp += IPaddrlen; /* Ignore ifc address */
373 rport = nhgets(bp->rp);
374 bp->rp += 2+2; /* Ignore local port */
377 ipmove(raddr, c->raddr);
378 ipmove(laddr, c->laddr);
385 /* Make space to fit rudp & ip header */
386 bp = padblock(bp, UDP_IPHDR+UDP_RHDRSIZE);
390 uh = (Udphdr *)(bp->rp);
395 ptcllen = dlen + (UDP_RHDRSIZE-UDP_PHDRSIZE);
397 uh->udpproto = IP_UDPPROTO;
400 hnputs(uh->udpplen, ptcllen);
401 switch(ucb->headers){
403 v6tov4(uh->udpdst, raddr);
404 hnputs(uh->udpdport, rport);
405 v6tov4(uh->udpsrc, laddr);
408 v6tov4(uh->udpdst, c->raddr);
409 hnputs(uh->udpdport, c->rport);
410 if(ipcmp(c->laddr, IPnoaddr) == 0)
411 findlocalip(f, c->laddr, c->raddr);
412 v6tov4(uh->udpsrc, c->laddr);
415 hnputs(uh->udpsport, c->lport);
416 hnputs(uh->udplen, ptcllen);
421 r = relstate(ucb, raddr, rport, "kick");
422 r->sndseq = NEXTSEQ(r->sndseq);
423 hnputl(rh->relseq, r->sndseq);
424 hnputl(rh->relsgen, r->sndgen);
426 hnputl(rh->relack, r->rcvseq); /* ACK last rcvd packet */
427 hnputl(rh->relagen, r->rcvgen);
429 if(r->rcvseq != r->acksent)
430 r->acksent = r->rcvseq;
432 hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, dlen+UDP_RHDRSIZE));
437 upriv->ustats.rudpOutDatagrams++;
439 DPRINT("sent: %lud/%lud, %lud/%lud\n",
440 r->sndseq, r->sndgen, r->rcvseq, r->rcvgen);
442 doipoput(c, f, bp, 0, c->ttl, c->tos);
450 /* flow control of sorts */
452 if(UNACKED(r) > Maxunacked){
454 sleep(&r->vous, flow, r);
464 rudpiput(Proto *rudp, Ipifc *ifc, Block *bp)
470 uchar raddr[IPaddrlen], laddr[IPaddrlen];
479 upriv->ustats.rudpInDatagrams++;
481 uh = (Udphdr*)(bp->rp);
483 /* Put back pseudo header for checksum
484 * (remember old values for icmpnoconv())
488 len = nhgets(uh->udplen);
489 olen = nhgets(uh->udpplen);
490 hnputs(uh->udpplen, len);
492 v4tov6(raddr, uh->udpsrc);
493 v4tov6(laddr, uh->udpdst);
494 lport = nhgets(uh->udpdport);
495 rport = nhgets(uh->udpsport);
497 if(nhgets(uh->udpcksum)) {
498 if(ptclcsum(bp, UDP_IPHDR, len+UDP_PHDRSIZE)) {
499 upriv->ustats.rudpInErrors++;
501 netlog(f, Logrudp, "rudp: checksum error %I\n", raddr);
502 DPRINT("rudp: checksum error %I\n", raddr);
510 c = iphtlook(&upriv->ht, raddr, rport, laddr, lport);
512 /* no conversation found */
513 upriv->ustats.rudpNoPorts++;
515 netlog(f, Logudp, "udp: no conv %I!%d -> %I!%d\n", raddr, rport,
518 hnputs(uh->udpplen, olen);
523 ucb = (Rudpcb*)c->ptcl;
527 if(reliput(c, bp, raddr, rport) < 0){
534 * Trim the packet down to data size
537 len -= (UDP_RHDRSIZE-UDP_PHDRSIZE);
538 bp = trimblock(bp, UDP_IPHDR+UDP_RHDRSIZE, len);
540 netlog(f, Logrudp, "rudp: len err %I.%d -> %I.%d\n",
541 raddr, rport, laddr, lport);
542 DPRINT("rudp: len err %I.%d -> %I.%d\n",
543 raddr, rport, laddr, lport);
548 netlog(f, Logrudpmsg, "rudp: %I.%d -> %I.%d l %d\n",
549 raddr, rport, laddr, lport, len);
551 switch(ucb->headers){
553 /* pass the src address */
554 bp = padblock(bp, UDP_USEAD7);
556 ipmove(p, raddr); p += IPaddrlen;
557 ipmove(p, laddr); p += IPaddrlen;
558 ipmove(p, ifc->lifc->local); p += IPaddrlen;
559 hnputs(p, rport); p += 2;
563 /* connection oriented rudp */
564 if(ipcmp(c->raddr, IPnoaddr) == 0){
565 /* save the src address in the conversation */
566 ipmove(c->raddr, raddr);
569 /* reply with the same ip address (if not broadcast) */
570 if(ipforme(f, laddr) == Runi)
571 ipmove(c->laddr, laddr);
573 v4tov6(c->laddr, ifc->lifc->local);
578 bp = concatblock(bp);
581 netlog(f, Logrudp, "rudp: qfull %I.%d -> %I.%d\n", raddr, rport,
591 static char *rudpunknown = "unknown rudp ctl request";
594 rudpctl(Conv *c, char **f, int n)
600 ucb = (Rudpcb*)c->ptcl;
604 if(strcmp(f[0], "headers") == 0){
605 ucb->headers = 7; /* new headers format */
607 } else if(strcmp(f[0], "hangup") == 0){
610 if (parseip(ip, f[1]) == -1)
614 relforget(c, ip, x, 1);
617 } else if(strcmp(f[0], "randdrop") == 0){
618 x = 10; /* default is 10% */
622 return "illegal rudp drop rate";
630 rudpadvise(Proto *rudp, Block *bp, char *msg)
633 uchar source[IPaddrlen], dest[IPaddrlen];
634 ushort psource, pdest;
637 h = (Udphdr*)(bp->rp);
639 v4tov6(dest, h->udpdst);
640 v4tov6(source, h->udpsrc);
641 psource = nhgets(h->udpsport);
642 pdest = nhgets(h->udpdport);
644 /* Look for a connection */
645 for(p = rudp->conv; *p; p++) {
647 if(s->rport == pdest)
648 if(s->lport == psource)
649 if(ipcmp(s->raddr, dest) == 0)
650 if(ipcmp(s->laddr, source) == 0){
660 rudpstats(Proto *rudp, char *buf, int len)
665 return snprint(buf, len, "%lud %lud %lud %lud %lud %lud\n",
666 upriv->ustats.rudpInDatagrams,
667 upriv->ustats.rudpNoPorts,
668 upriv->ustats.rudpInErrors,
669 upriv->ustats.rudpOutDatagrams,
680 rudp = smalloc(sizeof(Proto));
681 rudp->priv = smalloc(sizeof(Rudppriv));
683 rudp->connect = rudpconnect;
684 rudp->announce = rudpannounce;
686 rudp->state = rudpstate;
687 rudp->create = rudpcreate;
688 rudp->close = rudpclose;
689 rudp->rcv = rudpiput;
690 rudp->advise = rudpadvise;
691 rudp->stats = rudpstats;
692 rudp->ipproto = IP_UDPPROTO;
694 rudp->ptclsize = sizeof(Rudpcb);
699 /*********************************************/
700 /* Here starts the reliable helper functions */
701 /*********************************************/
703 * Enqueue a copy of an unacked block for possible retransmissions
706 relackq(Reliable *r, Block *bp)
710 np = copyblock(bp, blocklen(bp));
712 r->unackedtail->list = np;
724 * retransmit unacked blocks
739 tsleep(&up->sleep, return0, 0, Rudptickms);
741 for(s = rudp->conv; *s; s++) {
743 ucb = (Rudpcb*)c->ptcl;
746 for(r = ucb->r; r; r = r->next) {
747 if(r->unacked != nil){
748 r->timeout += Rudptickms;
749 if(r->timeout > Rudprxms*r->xmits)
752 if(r->acksent != r->rcvseq)
761 * get the state record for a conversation
764 relstate(Rudpcb *ucb, uchar *addr, ushort port, char *from)
769 for(r = *l; r; r = *l){
770 if(memcmp(addr, r->addr, IPaddrlen) == 0 &&
776 /* no state for this addr/port, create some */
778 while(generation == 0)
781 DPRINT("from %s new state %lud for %I!%ud\n",
782 from, generation, addr, port);
784 r = smalloc(sizeof(Reliable));
785 memmove(r->addr, addr, IPaddrlen);
788 if(generation == Hangupgen)
790 r->sndgen = generation++;
799 incref(r); /* one reference for being in the list */
816 * forget a Reliable state
819 relforget(Conv *c, uchar *ip, int port, int originator)
824 ucb = (Rudpcb*)c->ptcl;
827 for(r = *l; r; r = *l){
828 if(ipcmp(ip, r->addr) == 0 && port == r->port){
833 relput(r); /* remove from the list */
841 * process a rcvd reliable packet. return -1 if not to be passed to user process,
844 * called with ucb locked.
847 reliput(Conv *c, Block *bp, uchar *addr, ushort port)
855 ulong seq, ack, sgen, agen, ackreal;
859 uh = (Udphdr*)(bp->rp);
861 seq = nhgetl(rh->relseq);
862 sgen = nhgetl(rh->relsgen);
863 ack = nhgetl(rh->relack);
864 agen = nhgetl(rh->relagen);
867 ucb = (Rudpcb*)c->ptcl;
868 r = relstate(ucb, addr, port, "input");
870 DPRINT("rcvd %lud/%lud, %lud/%lud, r->sndgen = %lud\n",
871 seq, sgen, ack, agen, r->sndgen);
873 /* if acking an incorrect generation, ignore */
874 if(ack && agen != r->sndgen)
877 /* Look for a hangup */
878 if(sgen == Hangupgen) {
879 if(agen == r->sndgen)
880 relforget(c, addr, port, 0);
884 /* make sure we're not talking to a new remote side */
885 if(r->rcvgen != sgen){
886 if(seq != 0 && seq != 1)
891 DPRINT("new con r->rcvgen = %lud, sgen = %lud\n", r->rcvgen, sgen);
897 /* dequeue acked packets */
898 if(ack && agen == r->sndgen){
900 while(r->unacked != nil && INSEQ(ack, r->ackrcvd, r->sndseq)){
902 r->unacked = nbp->list;
903 DPRINT("%lud/%lud acked, r->sndgen = %lud\n",
904 ack, agen, r->sndgen);
906 r->ackrcvd = NEXTSEQ(r->ackrcvd);
911 if(UNACKED(r) < Maxunacked/8 && r->blocked)
915 * retransmit next packet if the acked packet
916 * was transmitted more than once
918 if(ackreal && r->unacked != nil){
928 /* no message or input queue full */
929 if(seq == 0 || qfull(c->rq))
932 /* refuse out of order delivery */
933 if(seq != NEXTSEQ(r->rcvseq)){
934 relsendack(c, r, 0); /* tell him we got it already */
936 DPRINT("out of sequence %lud not %lud\n", seq, NEXTSEQ(r->rcvseq));
948 relsendack(Conv *c, Reliable *r, int hangup)
956 bp = allocb(UDP_IPHDR + UDP_RHDRSIZE);
959 bp->wp += UDP_IPHDR + UDP_RHDRSIZE;
961 uh = (Udphdr *)(bp->rp);
965 ptcllen = (UDP_RHDRSIZE-UDP_PHDRSIZE);
967 uh->udpproto = IP_UDPPROTO;
970 hnputs(uh->udpplen, ptcllen);
972 v6tov4(uh->udpdst, r->addr);
973 hnputs(uh->udpdport, r->port);
974 hnputs(uh->udpsport, c->lport);
975 if(ipcmp(c->laddr, IPnoaddr) == 0)
976 findlocalip(f, c->laddr, c->raddr);
977 v6tov4(uh->udpsrc, c->laddr);
978 hnputs(uh->udplen, ptcllen);
981 hnputl(rh->relsgen, Hangupgen);
983 hnputl(rh->relsgen, r->sndgen);
984 hnputl(rh->relseq, 0);
985 hnputl(rh->relagen, r->rcvgen);
986 hnputl(rh->relack, r->rcvseq);
988 if(r->acksent < r->rcvseq)
989 r->acksent = r->rcvseq;
993 hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, UDP_RHDRSIZE));
995 DPRINT("sendack: %lud/%lud, %lud/%lud\n", 0L, r->sndgen, r->rcvseq, r->rcvgen);
996 doipoput(c, f, bp, 0, c->ttl, c->tos);
1001 * called with ucb locked (and c locked if user initiated close)
1004 relhangup(Conv *c, Reliable *r)
1010 n = snprint(hup, sizeof(hup), "hangup %I!%d", r->addr, r->port);
1011 qproduce(c->eq, hup, n);
1014 * dump any unacked outgoing messages
1016 for(bp = r->unacked; bp != nil; bp = r->unacked){
1017 r->unacked = bp->list;
1025 if(generation == Hangupgen)
1027 r->sndgen = generation++;
1036 * called with ucb locked
1039 relrexmit(Conv *c, Reliable *r)
1048 if(r->xmits++ > Rudpmaxxmit){
1054 np = copyblock(r->unacked, blocklen(r->unacked));
1055 DPRINT("rxmit r->ackrvcd+1 = %lud\n", r->ackrcvd+1);
1056 doipoput(c, f, np, 0, c->ttl, c->tos);