2 * Reliable User Datagram Protocol, currently only for IPv4.
3 * This protocol is compatible with UDP's packet format.
4 * It could be done over UDP if need be.
7 #include "../port/lib.h"
11 #include "../port/error.h"
16 #define DPRINT if(DEBUG)print
18 #define SEQDIFF(a,b) ( (a)>=(b)?\
20 0xffffffffUL-((b)-(a)) )
21 #define INSEQ(a,start,end) ( (start)<=(end)?\
22 ((a)>(start)&&(a)<=(end)):\
23 ((a)>(start)||(a)<=(end)) )
24 #define UNACKED(r) SEQDIFF(r->sndseq, r->ackrcvd)
25 #define NEXTSEQ(a) ( (a)+1 == 0 ? 1 : (a)+1 )
29 UDP_PHDRSIZE = 12, /* pseudo header */
30 // UDP_HDRSIZE = 20, /* pseudo header + udp header */
31 UDP_RHDRSIZE = 36, /* pseudo header + udp header + rudp header */
32 UDP_IPHDR = 8, /* ip header */
34 UDP_USEAD7 = 52, /* size of new ipv6 headers struct */
42 #define Hangupgen 0xffffffff /* used only in hangup messages */
44 typedef struct Udphdr Udphdr;
48 uchar vihl; /* Version and header length */
49 uchar tos; /* Type of service */
50 uchar length[2]; /* packet length */
51 uchar id[2]; /* Identification */
52 uchar frag[2]; /* Fragment information */
54 /* pseudo header starts here */
56 uchar udpproto; /* Protocol */
57 uchar udpplen[2]; /* Header plus data length */
58 uchar udpsrc[4]; /* Ip source */
59 uchar udpdst[4]; /* Ip destination */
62 uchar udpsport[2]; /* Source port */
63 uchar udpdport[2]; /* Destination port */
64 uchar udplen[2]; /* data length */
65 uchar udpcksum[2]; /* Checksum */
68 typedef struct Rudphdr Rudphdr;
72 uchar vihl; /* Version and header length */
73 uchar tos; /* Type of service */
74 uchar length[2]; /* packet length */
75 uchar id[2]; /* Identification */
76 uchar frag[2]; /* Fragment information */
78 /* pseudo header starts here */
80 uchar udpproto; /* Protocol */
81 uchar udpplen[2]; /* Header plus data length */
82 uchar udpsrc[4]; /* Ip source */
83 uchar udpdst[4]; /* Ip destination */
86 uchar udpsport[2]; /* Source port */
87 uchar udpdport[2]; /* Destination port */
88 uchar udplen[2]; /* data length (includes rudp header) */
89 uchar udpcksum[2]; /* Checksum */
92 uchar relseq[4]; /* id of this packet (or 0) */
93 uchar relsgen[4]; /* generation/time stamp */
94 uchar relack[4]; /* packet being acked (or 0) */
95 uchar relagen[4]; /* generation/time stamp */
100 * one state structure per destination
102 typedef struct Reliable Reliable;
109 uchar addr[IPaddrlen]; /* always V6 when put here */
112 Block *unacked; /* unacked msg list */
113 Block *unackedtail; /* and its tail */
115 int timeout; /* time since first unacked msg sent */
116 int xmits; /* number of times first unacked msg sent */
118 ulong sndseq; /* next packet to be sent */
119 ulong sndgen; /* and its generation */
121 ulong rcvseq; /* last packet received */
122 ulong rcvgen; /* and its generation */
124 ulong acksent; /* last ack sent */
125 ulong ackrcvd; /* last msg for which ack was rcvd */
135 /* MIB II counters */
136 typedef struct Rudpstats Rudpstats;
139 ulong rudpInDatagrams;
142 ulong rudpOutDatagrams;
145 typedef struct Rudppriv Rudppriv;
154 ulong csumerr; /* checksum errors */
155 ulong lenerr; /* short packet */
156 ulong rxmits; /* # of retransmissions */
157 ulong orders; /* # of out of order pkts */
159 /* keeping track of the ack kproc */
165 static ulong generation = 0;
169 * protocol specific part of Conv
171 typedef struct Rudpcb Rudpcb;
183 void relsendack(Conv*, Reliable*, int);
184 int reliput(Conv*, Block*, uchar*, ushort);
185 Reliable *relstate(Rudpcb*, uchar*, ushort, char*);
186 void relput(Reliable*);
187 void relforget(Conv *, uchar*, int, int);
188 void relackproc(void *);
189 void relackq(Reliable *, Block*);
190 void relhangup(Conv *, Reliable*);
191 void relrexmit(Conv *, Reliable*);
192 void relput(Reliable*);
193 void rudpkick(void *x);
196 rudpstartackproc(Proto *rudp)
199 char kpname[KNAMELEN];
202 if(rpriv->ackprocstarted == 0){
204 if(rpriv->ackprocstarted == 0){
205 sprint(kpname, "#I%drudpack", rudp->f->dev);
206 kproc(kpname, relackproc, rudp);
207 rpriv->ackprocstarted = 1;
209 qunlock(&rpriv->apl);
214 rudpconnect(Conv *c, char **argv, int argc)
220 rudpstartackproc(c->p);
221 e = Fsstdconnect(c, argv, argc);
223 iphtadd(&upriv->ht, c);
230 rudpstate(Conv *c, char *state, int n)
236 m = snprint(state, n, "%s", c->inuse?"Open":"Closed");
237 ucb = (Rudpcb*)c->ptcl;
239 for(r = ucb->r; r; r = r->next)
240 m += snprint(state+m, n-m, " %I/%ld", r->addr, UNACKED(r));
241 m += snprint(state+m, n-m, "\n");
247 rudpannounce(Conv *c, char** argv, int argc)
253 rudpstartackproc(c->p);
254 e = Fsstdannounce(c, argv, argc);
258 iphtadd(&upriv->ht, c);
266 c->rq = qopen(64*1024, Qmsg, 0, 0);
267 c->wq = qopen(64*1024, Qkick, rudpkick, c);
278 iphtrem(&upriv->ht, c);
280 /* force out any delayed acks */
281 ucb = (Rudpcb*)c->ptcl;
283 for(r = ucb->r; r; r = r->next){
284 if(r->acksent != r->rcvseq)
292 ipmove(c->laddr, IPnoaddr);
293 ipmove(c->raddr, IPnoaddr);
300 for(r = ucb->r; r; r = nr){
301 if(r->acksent != r->rcvseq)
313 * randomly don't send packets
316 doipoput(Conv *c, Fs *f, Block *bp, int x, int ttl, int tos)
320 ucb = (Rudpcb*)c->ptcl;
321 if(ucb->randdrop && nrand(100) < ucb->randdrop)
324 ipoput4(f, bp, x, ttl, tos, nil);
332 return UNACKED(r) <= Maxunacked;
341 uchar laddr[IPaddrlen], raddr[IPaddrlen];
353 netlog(c->p->f, Logrudp, "rudp: kick\n");
358 ucb = (Rudpcb*)c->ptcl;
359 switch(ucb->headers) {
361 /* get user specified addresses */
362 bp = pullupblock(bp, UDP_USEAD7);
365 ipmove(raddr, bp->rp);
367 ipmove(laddr, bp->rp);
369 /* pick interface closest to dest */
370 if(ipforme(f, laddr) != Runi)
371 findlocalip(f, laddr, raddr);
372 bp->rp += IPaddrlen; /* Ignore ifc address */
373 rport = nhgets(bp->rp);
374 bp->rp += 2+2; /* Ignore local port */
377 ipmove(raddr, c->raddr);
378 ipmove(laddr, c->laddr);
385 /* Make space to fit rudp & ip header */
386 bp = padblock(bp, UDP_IPHDR+UDP_RHDRSIZE);
387 uh = (Udphdr *)(bp->rp);
392 ptcllen = dlen + (UDP_RHDRSIZE-UDP_PHDRSIZE);
394 uh->udpproto = IP_UDPPROTO;
397 hnputs(uh->udpplen, ptcllen);
398 switch(ucb->headers){
400 v6tov4(uh->udpdst, raddr);
401 hnputs(uh->udpdport, rport);
402 v6tov4(uh->udpsrc, laddr);
405 v6tov4(uh->udpdst, c->raddr);
406 hnputs(uh->udpdport, c->rport);
407 if(ipcmp(c->laddr, IPnoaddr) == 0)
408 findlocalip(f, c->laddr, c->raddr);
409 v6tov4(uh->udpsrc, c->laddr);
412 hnputs(uh->udpsport, c->lport);
413 hnputs(uh->udplen, ptcllen);
418 r = relstate(ucb, raddr, rport, "kick");
419 r->sndseq = NEXTSEQ(r->sndseq);
420 hnputl(rh->relseq, r->sndseq);
421 hnputl(rh->relsgen, r->sndgen);
423 hnputl(rh->relack, r->rcvseq); /* ACK last rcvd packet */
424 hnputl(rh->relagen, r->rcvgen);
426 if(r->rcvseq != r->acksent)
427 r->acksent = r->rcvseq;
429 hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, dlen+UDP_RHDRSIZE));
434 upriv->ustats.rudpOutDatagrams++;
436 DPRINT("sent: %lud/%lud, %lud/%lud\n",
437 r->sndseq, r->sndgen, r->rcvseq, r->rcvgen);
439 doipoput(c, f, bp, 0, c->ttl, c->tos);
447 /* flow control of sorts */
449 if(UNACKED(r) > Maxunacked){
451 sleep(&r->vous, flow, r);
461 rudpiput(Proto *rudp, Ipifc *ifc, Block *bp)
467 uchar raddr[IPaddrlen], laddr[IPaddrlen];
476 upriv->ustats.rudpInDatagrams++;
478 uh = (Udphdr*)(bp->rp);
480 /* Put back pseudo header for checksum
481 * (remember old values for icmpnoconv())
485 len = nhgets(uh->udplen);
486 olen = nhgets(uh->udpplen);
487 hnputs(uh->udpplen, len);
489 v4tov6(raddr, uh->udpsrc);
490 v4tov6(laddr, uh->udpdst);
491 lport = nhgets(uh->udpdport);
492 rport = nhgets(uh->udpsport);
494 if(nhgets(uh->udpcksum)) {
495 if(ptclcsum(bp, UDP_IPHDR, len+UDP_PHDRSIZE)) {
496 upriv->ustats.rudpInErrors++;
498 netlog(f, Logrudp, "rudp: checksum error %I\n", raddr);
499 DPRINT("rudp: checksum error %I\n", raddr);
507 c = iphtlook(&upriv->ht, raddr, rport, laddr, lport);
509 /* no conversation found */
510 upriv->ustats.rudpNoPorts++;
512 netlog(f, Logudp, "udp: no conv %I!%d -> %I!%d\n", raddr, rport,
515 hnputs(uh->udpplen, olen);
520 ucb = (Rudpcb*)c->ptcl;
524 if(reliput(c, bp, raddr, rport) < 0){
531 * Trim the packet down to data size
534 len -= (UDP_RHDRSIZE-UDP_PHDRSIZE);
535 bp = trimblock(bp, UDP_IPHDR+UDP_RHDRSIZE, len);
537 netlog(f, Logrudp, "rudp: len err %I.%d -> %I.%d\n",
538 raddr, rport, laddr, lport);
539 DPRINT("rudp: len err %I.%d -> %I.%d\n",
540 raddr, rport, laddr, lport);
545 netlog(f, Logrudpmsg, "rudp: %I.%d -> %I.%d l %d\n",
546 raddr, rport, laddr, lport, len);
548 switch(ucb->headers){
550 /* pass the src address */
551 bp = padblock(bp, UDP_USEAD7);
553 ipmove(p, raddr); p += IPaddrlen;
554 ipmove(p, laddr); p += IPaddrlen;
555 ipmove(p, ifc->lifc->local); p += IPaddrlen;
556 hnputs(p, rport); p += 2;
560 /* connection oriented rudp */
561 if(ipcmp(c->raddr, IPnoaddr) == 0){
562 /* save the src address in the conversation */
563 ipmove(c->raddr, raddr);
566 /* reply with the same ip address (if not broadcast) */
567 if(ipforme(f, laddr) == Runi)
568 ipmove(c->laddr, laddr);
570 v4tov6(c->laddr, ifc->lifc->local);
576 netlog(f, Logrudp, "rudp: qfull %I.%d -> %I.%d\n", raddr, rport,
581 bp = concatblock(bp);
587 static char *rudpunknown = "unknown rudp ctl request";
590 rudpctl(Conv *c, char **f, int n)
596 ucb = (Rudpcb*)c->ptcl;
600 if(strcmp(f[0], "headers") == 0){
601 ucb->headers = 7; /* new headers format */
603 } else if(strcmp(f[0], "hangup") == 0){
606 if (parseip(ip, f[1]) == -1)
610 relforget(c, ip, x, 1);
613 } else if(strcmp(f[0], "randdrop") == 0){
614 x = 10; /* default is 10% */
618 return "illegal rudp drop rate";
626 rudpadvise(Proto *rudp, Block *bp, char *msg)
629 uchar source[IPaddrlen], dest[IPaddrlen];
630 ushort psource, pdest;
633 h = (Udphdr*)(bp->rp);
635 v4tov6(dest, h->udpdst);
636 v4tov6(source, h->udpsrc);
637 psource = nhgets(h->udpsport);
638 pdest = nhgets(h->udpdport);
640 /* Look for a connection */
641 for(p = rudp->conv; *p; p++) {
643 if(s->rport == pdest)
644 if(s->lport == psource)
645 if(ipcmp(s->raddr, dest) == 0)
646 if(ipcmp(s->laddr, source) == 0){
658 rudpstats(Proto *rudp, char *buf, int len)
663 return snprint(buf, len, "%lud %lud %lud %lud %lud %lud\n",
664 upriv->ustats.rudpInDatagrams,
665 upriv->ustats.rudpNoPorts,
666 upriv->ustats.rudpInErrors,
667 upriv->ustats.rudpOutDatagrams,
678 rudp = smalloc(sizeof(Proto));
679 rudp->priv = smalloc(sizeof(Rudppriv));
681 rudp->connect = rudpconnect;
682 rudp->announce = rudpannounce;
684 rudp->state = rudpstate;
685 rudp->create = rudpcreate;
686 rudp->close = rudpclose;
687 rudp->rcv = rudpiput;
688 rudp->advise = rudpadvise;
689 rudp->stats = rudpstats;
690 rudp->ipproto = IP_UDPPROTO;
692 rudp->ptclsize = sizeof(Rudpcb);
697 /*********************************************/
698 /* Here starts the reliable helper functions */
699 /*********************************************/
701 * Enqueue a copy of an unacked block for possible retransmissions
704 relackq(Reliable *r, Block *bp)
708 np = copyblock(bp, blocklen(bp));
710 r->unackedtail->list = np;
722 * retransmit unacked blocks
737 tsleep(&up->sleep, return0, 0, Rudptickms);
739 for(s = rudp->conv; *s; s++) {
741 ucb = (Rudpcb*)c->ptcl;
744 for(r = ucb->r; r; r = r->next) {
745 if(r->unacked != nil){
746 r->timeout += Rudptickms;
747 if(r->timeout > Rudprxms*r->xmits)
750 if(r->acksent != r->rcvseq)
759 * get the state record for a conversation
762 relstate(Rudpcb *ucb, uchar *addr, ushort port, char *from)
767 for(r = *l; r; r = *l){
768 if(memcmp(addr, r->addr, IPaddrlen) == 0 &&
774 /* no state for this addr/port, create some */
776 while(generation == 0)
779 DPRINT("from %s new state %lud for %I!%ud\n",
780 from, generation, addr, port);
782 r = smalloc(sizeof(Reliable));
783 memmove(r->addr, addr, IPaddrlen);
786 if(generation == Hangupgen)
788 r->sndgen = generation++;
797 incref(r); /* one reference for being in the list */
814 * forget a Reliable state
817 relforget(Conv *c, uchar *ip, int port, int originator)
822 ucb = (Rudpcb*)c->ptcl;
825 for(r = *l; r; r = *l){
826 if(ipcmp(ip, r->addr) == 0 && port == r->port){
831 relput(r); /* remove from the list */
839 * process a rcvd reliable packet. return -1 if not to be passed to user process,
842 * called with ucb locked.
845 reliput(Conv *c, Block *bp, uchar *addr, ushort port)
853 ulong seq, ack, sgen, agen, ackreal;
857 uh = (Udphdr*)(bp->rp);
859 seq = nhgetl(rh->relseq);
860 sgen = nhgetl(rh->relsgen);
861 ack = nhgetl(rh->relack);
862 agen = nhgetl(rh->relagen);
865 ucb = (Rudpcb*)c->ptcl;
866 r = relstate(ucb, addr, port, "input");
868 DPRINT("rcvd %lud/%lud, %lud/%lud, r->sndgen = %lud\n",
869 seq, sgen, ack, agen, r->sndgen);
871 /* if acking an incorrect generation, ignore */
872 if(ack && agen != r->sndgen)
875 /* Look for a hangup */
876 if(sgen == Hangupgen) {
877 if(agen == r->sndgen)
878 relforget(c, addr, port, 0);
882 /* make sure we're not talking to a new remote side */
883 if(r->rcvgen != sgen){
884 if(seq != 0 && seq != 1)
889 DPRINT("new con r->rcvgen = %lud, sgen = %lud\n", r->rcvgen, sgen);
895 /* dequeue acked packets */
896 if(ack && agen == r->sndgen){
898 while(r->unacked != nil && INSEQ(ack, r->ackrcvd, r->sndseq)){
900 r->unacked = nbp->list;
901 DPRINT("%lud/%lud acked, r->sndgen = %lud\n",
902 ack, agen, r->sndgen);
904 r->ackrcvd = NEXTSEQ(r->ackrcvd);
909 if(UNACKED(r) < Maxunacked/8 && r->blocked)
913 * retransmit next packet if the acked packet
914 * was transmitted more than once
916 if(ackreal && r->unacked != nil){
926 /* no message or input queue full */
927 if(seq == 0 || qfull(c->rq))
930 /* refuse out of order delivery */
931 if(seq != NEXTSEQ(r->rcvseq)){
932 relsendack(c, r, 0); /* tell him we got it already */
934 DPRINT("out of sequence %lud not %lud\n", seq, NEXTSEQ(r->rcvseq));
946 relsendack(Conv *c, Reliable *r, int hangup)
954 bp = allocb(UDP_IPHDR + UDP_RHDRSIZE);
955 bp->wp += UDP_IPHDR + UDP_RHDRSIZE;
957 uh = (Udphdr *)(bp->rp);
961 ptcllen = (UDP_RHDRSIZE-UDP_PHDRSIZE);
963 uh->udpproto = IP_UDPPROTO;
966 hnputs(uh->udpplen, ptcllen);
968 v6tov4(uh->udpdst, r->addr);
969 hnputs(uh->udpdport, r->port);
970 hnputs(uh->udpsport, c->lport);
971 if(ipcmp(c->laddr, IPnoaddr) == 0)
972 findlocalip(f, c->laddr, c->raddr);
973 v6tov4(uh->udpsrc, c->laddr);
974 hnputs(uh->udplen, ptcllen);
977 hnputl(rh->relsgen, Hangupgen);
979 hnputl(rh->relsgen, r->sndgen);
980 hnputl(rh->relseq, 0);
981 hnputl(rh->relagen, r->rcvgen);
982 hnputl(rh->relack, r->rcvseq);
984 if(r->acksent < r->rcvseq)
985 r->acksent = r->rcvseq;
989 hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, UDP_RHDRSIZE));
991 DPRINT("sendack: %lud/%lud, %lud/%lud\n", 0L, r->sndgen, r->rcvseq, r->rcvgen);
992 doipoput(c, f, bp, 0, c->ttl, c->tos);
997 * called with ucb locked (and c locked if user initiated close)
1000 relhangup(Conv *c, Reliable *r)
1006 n = snprint(hup, sizeof(hup), "hangup %I!%d", r->addr, r->port);
1007 qproduce(c->eq, hup, n);
1010 * dump any unacked outgoing messages
1012 for(bp = r->unacked; bp != nil; bp = r->unacked){
1013 r->unacked = bp->list;
1021 if(generation == Hangupgen)
1023 r->sndgen = generation++;
1032 * called with ucb locked
1035 relrexmit(Conv *c, Reliable *r)
1044 if(r->xmits++ > Rudpmaxxmit){
1050 np = copyblock(r->unacked, blocklen(r->unacked));
1051 DPRINT("rxmit r->ackrvcd+1 = %lud\n", r->ackrcvd+1);
1052 doipoput(c, f, np, 0, c->ttl, c->tos);