10 #define min(x,y) ((x)<(y)?(x):(y))
13 #define max(x,y) ((x)>(y)?(x):(y))
17 #define offsetof(s, m) ((int)(&((s*)0)->m))
20 #if defined(WIN32) || defined(ARM_UCOS_LWIP)
21 #define SETEVENT(event) PA_SetEvent(event)
22 #elif defined(__LINUX__) || defined(__ANDROID__)
23 #define SETEVENT(event) pthread_cond_signal(&event)
25 #define SAFE_FREE(p) if(p) { free(p); p = NULL; }
26 //---------------------------------------------------------
27 PA_MUTEX mutex_sock_list;
31 static volatile int run = 1;
33 static PA_MUTEX mutex_pkt_pool;
34 static int n_free_pkt = 0;
35 static struct rudp_pkt *free_pkt = NULL;
37 //static int sockw_r = -1, sockw_s;
38 struct output_notify {
39 struct rudp_socket *s;
43 unsigned int rudp_now = 0;
44 //==========================================================
49 #define RO_REXMT 2 //rexmt as RCT_REXMT timer
51 #define RO_REXMT_FAST 4 //rexmt as duplicated ack received
53 #define INITIAL_SEQ_NO 0
56 #define FASTRETRANS2 1 //multiple packet lost
59 //value for should_ack
60 #define ACKT_DELAYED 1
61 #define ACKT_OPENWND 2
64 static char* IP2STR(unsigned int ip, char ips[16])
66 int len = sprintf(ips, "%d.", ip&0xFF);
67 len += sprintf(ips+len, "%d.", (ip>>8)&0xFF);
68 len += sprintf(ips+len, "%d.", (ip>>16)&0xFF);
69 len += sprintf(ips+len, "%d", (ip>>24)&0xFF);
73 static void _printTime()
76 #if defined(ARM_UCOS_LWIP)
80 sprintf(sf, "%.06f", t.wMilliseconds/1000.0);
81 PRINTF("%02d:%02d:%02d.%s ", t.wHour, t.wMinute, t.wSecond, sf+2);
85 gettimeofday(&tv, NULL);
86 localtime_r(&tv.tv_sec, &_tm);
87 sprintf(sf, "%.06f", tv.tv_usec/1000000.0);
88 PRINTF("%02d:%02d:%02d.%s ", _tm.tm_hour, _tm.tm_min, _tm.tm_sec, sf+2);
91 #define PHF_FROM 0x10000000
92 #define PHF_DATA 0x20000000
93 static void __printHdr(const struct rudp_pcb *pcb, const struct rudp_hdr *phdr, const struct sockaddr_in *pa, int phf, int data_len)
99 PRINTF("%s.%d > ", IP2STR(pa->sin_addr.s_addr, ip), (int)ntohs(pa->sin_port));
100 if(pcb) PRINTF("%s.%d ", IP2STR(pcb->local.sin_addr.s_addr, ip), (int)ntohs(pcb->local.sin_port));
104 if(pcb) PRINTF("%s.%d > ", IP2STR(pcb->local.sin_addr.s_addr, ip), (int)ntohs(pcb->local.sin_port));
105 PRINTF("%s.%d ", IP2STR(pa->sin_addr.s_addr, ip), (int)ntohs(pa->sin_port));
107 PRINTF("c:%d ", phdr->flags.chno);
111 PRINTF("P s:%u(%d) ", ntohl(phdr->seqno), data_len);
114 if(phdr->flags.syn) PRINTF("syn ");
115 if(phdr->flags.ack) PRINTF("ack %u(%d) ", ntohl(phdr->ackno), phdr->flags.n_loss);
116 if(phdr->flags.rst) PRINTF("rst ");
117 if(phdr->flags.fin) PRINTF("fin ");
118 PRINTF("win %d ", (int)WINDOW_NTOH(phdr->flags.window));
122 PRINTF("[rwnd %d cwnd %d ssth %d", pcb->channel[phdr->flags.chno].sbuf.rwnd, pcb->cwnd, pcb->ssthresh);
123 if(phdr->flags.ack && (phf&PHF_FROM))
125 PRINTF(" rto %d", pcb->rto);
126 PRINTF(" rtw %d", pcb->rtw_size);
127 if(phdr->flags.n_loss) PRINTF(" lost %d", phdr->flags.n_loss);
129 PRINTF(" una %d", pcb->channel[phdr->flags.chno].sbuf.n_unacked);
137 static void _printHdr(const struct rudp_pcb *pcb, const struct rudp_hdr *phdr, const struct sockaddr_in *pa)
139 __printHdr(pcb, phdr, pa, 0, 0);
141 static void _printPkt(const struct rudp_pcb *pcb, const struct rudp_pkt *pkt, int phf, const struct sockaddr_in *pa)
143 if(pkt->len) { phf |= PHF_DATA; }
144 __printHdr(pcb, &pkt->hdr, pa, phf, pkt->len);
147 #define _printHdr(a,b,c)
148 #define _printPkt(a,b,c,d)
152 #define DELAY_ACK_MS 100
153 #define RTT_UINT 200 //accuracy of RTT, ms
154 #define RTT_MIN (1000/RTT_UINT) //count in 1 second.
156 #define MAX_REXMT_ATTEMPT 6
157 static int rudp_backoff[MAX_REXMT_ATTEMPT+1] = { 1, 2, 4, 8, 16, 32, 32/*, 64, 64, 64, 64, 64*/ };
158 #define MAX_RECONN_ATTEMPT 5
159 static int conn_backoff[MAX_RECONN_ATTEMPT+1] = { RTT_MIN, 1*RTT_MIN, 2*RTT_MIN, 2*RTT_MIN, 2*RTT_MIN, 2*RTT_MIN }; //s
161 static struct rudp_pkt *_MBufGetPacket();
162 static void _MBufPutPacket(struct rudp_pkt *pkt);
163 static int _ProcessPacket(struct rudp_socket *s, struct rudp_pkt *pkt, const struct sockaddr *from, int from_len);
164 int _DispatchPacket(struct rudp_socket *s, struct rudp_pkt *pkt, const struct sockaddr *from, int from_len);
165 static INLINE void _sendPacket(struct rudp_socket *s, struct rudp_channel *pch, struct rudp_pkt *pkt, int opt);
166 static void _sendReset(struct rudp_socket *s, const struct sockaddr *to);
167 static /*INLINE */void _sendHeader(struct rudp_socket *s, struct rudp_hdr *phdr);
168 static void _sendSyn(struct rudp_socket *s);
169 static void _sendSynAck(struct rudp_socket *s);
170 static void _sendAck(struct rudp_socket *s, int chno);
171 static void _sendFin(struct rudp_socket *s);
173 typedef void (*TimerHandler)(struct rudp_socket *s);
174 static void _timerProc(TimerHandler handler);
175 static void _handleTimer500ms(struct rudp_socket *s);
176 static void _handleTimer200ms(struct rudp_socket *s);
178 static struct rudp_pkt *_MBufGetPacket()
182 PA_MutexLock(mutex_pkt_pool);
186 free_pkt = free_pkt->next;
191 p = (struct rudp_pkt*)malloc(sizeof(struct rudp_pkt));
193 PA_MutexUnlock(mutex_pkt_pool);
198 p->hdr.u32_flags = 0;
199 p->hdr.flags.rudp = RUDP_HEADER_TAG;
203 PA_MutexUnlock(mutex_pkt_pool);
207 static void _MBufPutPacket(struct rudp_pkt *pkt)
209 PA_MutexLock(mutex_pkt_pool);
216 pkt->next = free_pkt;
220 PA_MutexUnlock(mutex_pkt_pool);
223 static struct rudp_socket *_AllocRudpSocket()
225 struct rudp_socket *sock = (struct rudp_socket*)calloc(sizeof(struct rudp_socket), 1);
226 sock->tag = RUDP_SOCKET_TAG;
228 sock->state = RS_CLOSED;
229 sock->rcvbuf_sz = DEFAULT_RCVBUF_SIZE;
231 PA_MutexInit(sock->mutex_r);
232 PA_MutexInit(sock->mutex_w);
233 #if defined(WIN32) || defined(ARM_UCOS_LWIP)
234 PA_EventInit(sock->event_r);
235 PA_EventInit(sock->event_w);
237 pthread_cond_init(&sock->event_r, NULL);
238 pthread_cond_init(&sock->event_w, NULL);
241 INIT_LIST_HEAD(&sock->inst_list);
242 INIT_LIST_HEAD(&sock->listen_queue);
243 INIT_LIST_HEAD(&sock->accepted_list);
247 static struct rudp_pcb *_AllocRudpPcb(uint32_t rcvbuf_size, uint32_t initial_seqno, uint32_t peer_initial_seqno, int rawnd)
249 struct rudp_pcb *pcb;
251 pcb = (struct rudp_pcb*)calloc(sizeof(struct rudp_pcb), 1);
252 for(i=0; i<MAX_PHY_CHANNELS; i++)
254 struct sndbuf *psbuf;
255 struct rcvbuf *prbuf;
257 psbuf = &pcb->channel[i].sbuf;
258 psbuf->seqno = initial_seqno;
259 psbuf->max_pkts = DEFAULT_SNDBUF_SIZE;
260 psbuf->rwnd = psbuf->rawnd = rawnd;
262 prbuf = &pcb->channel[i].rbuf;
263 prbuf->expected_seqno = prbuf->first_seq = peer_initial_seqno;
264 prbuf->q_size = rcvbuf_size;;
265 prbuf->pkt_q = (struct rudp_pkt**)calloc(sizeof(void*), rcvbuf_size);
266 prbuf->win = prbuf->q_size - 1;
269 pcb->rtw_size = rawnd/2;//8;
270 pcb->rwin_size = rawnd;
271 pcb->ssthresh = rawnd;
275 pcb->rto = 6; //As [Jacobson 1988] rto = srtt + 2*sdev, but it seems too large for us
280 static void _terminateSocketInternally(struct rudp_socket *s, int err)
282 if(s->state == RS_DEAD) return;
284 PA_MutexLock(s->mutex_r);
285 PA_MutexLock(s->mutex_w);
293 for(i=0; i<MAX_PHY_CHANNELS; i++)
298 c = s->pcb->channel[i].sbuf.first;
301 struct rudp_pkt *p = c;
306 prb = &s->pcb->channel[i].rbuf;
307 for(; prb->head != prb->tail; prb->head = (prb->head+1)%prb->q_size)
308 if(prb->pkt_q[prb->head])
309 _MBufPutPacket(prb->pkt_q[prb->head]);
316 SETEVENT(s->event_r);
317 SETEVENT(s->event_w);
319 PA_MutexUnlock(s->mutex_w);
320 PA_MutexUnlock(s->mutex_r);
323 //Still in listening queue
324 if(list_empty(&s->inst_list) && !list_empty(&s->listen_queue))
326 list_del(&s->listen_queue);
330 //dbg_msg("################## _terminateSocketInternally ##########\n");
333 /// \brief Should be called with "mutex_sock_list" hold
334 // \param err error code for the reason to cleanup this socket
335 static void _CleanupSocket(struct rudp_socket *s, int err)
337 if(s->state == RS_DEAD) return;
339 _terminateSocketInternally(s, err);
341 PA_MutexUninit(s->mutex_r);
342 PA_MutexUninit(s->mutex_w);
343 #if defined(WIN32) || defined(ARM_UCOS_LWIP)
344 PA_EventUninit(s->event_r);
345 PA_EventUninit(s->event_w);
347 pthread_cond_destroy(&s->event_r);
348 pthread_cond_destroy(&s->event_w);
351 if(!list_empty(&s->listen_queue))
353 struct list_head *pp, *qq;
354 list_for_each_safe(pp, qq, &s->listen_queue)
356 struct rudp_socket *sl = list_entry(pp, struct rudp_socket, listen_queue);
357 _CleanupSocket(sl, 0);
367 static void _CleanAndFreeSocket(struct rudp_socket *s)
369 if(list_empty(&s->inst_list)) //accepted
371 list_del(&s->accepted_list);
375 list_del(&s->inst_list);
376 if(list_empty(&s->accepted_list))
377 PA_SocketClose(s->udp_sock);
380 struct rudp_socket *aa = list_entry(s->accepted_list.next, struct rudp_socket, accepted_list);
381 INIT_LIST_HEAD(&aa->inst_list);
382 list_add_tail(&aa->inst_list, &sock_list);
386 if(s->state != RS_DEAD) _CleanupSocket(s, 0);
391 void _timerProc(TimerHandler handler)
393 struct list_head *p, *q, *pp, *qq;
394 //PA_MutexLock(mutex_sock_list);
395 list_for_each_safe(p, q, &sock_list)
397 struct rudp_socket *s, *ss;
398 s = list_entry(p, struct rudp_socket, inst_list);
400 if(s->state == RS_DEAD) continue;
402 list_for_each_safe(pp, qq, &s->accepted_list)
404 ss = list_entry(pp, struct rudp_socket, accepted_list);
407 list_for_each_safe(pp, qq, &s->listen_queue) //a socket in listen_queue may be removed due to timeout
409 ss = list_entry(pp, struct rudp_socket, listen_queue);
415 //PA_MutexUnlock(mutex_sock_list);
418 static void _congestionAvoidance(struct rudp_socket *s)
420 if(s->pcb->cwnd < s->pcb->ssthresh)
425 if(s->pcb->ca_cnt >= s->pcb->rwin_size)
427 if(s->pcb->rtw_size < s->pcb->rwin_size)
431 {//congestion avoidance, increase cwnd slowly
433 if(s->pcb->ca_cnt >= s->pcb->cwnd)
436 if(s->pcb->cwnd < s->pcb->rwin_size)
439 //s->pcb->ssthresh++; ???
441 if(s->pcb->rtw_size < s->pcb->rwin_size)
447 static void _congestionDetected(struct rudp_socket *s, int chno, int what)
450 struct rudp_pcb *pcb;
452 //for(i=rawnd=0; i<MAX_CHANNELS; i++) rawnd += pcb->channel[chno].sbuf.rawnd;
455 pcb->ssthresh = min(pcb->channel[chno].sbuf.rawnd, pcb->cwnd)/2;
456 if(pcb->ssthresh < 2) pcb->ssthresh = 2;
458 if(what == CONGESTED) pcb->cwnd = 1;
459 else pcb->cwnd = pcb->ssthresh + 3;
466 if(pcb->rtw_size < 8) pcb->rtw_size = 8;
470 pcb->rtw_size -= pcb->rtw_size >> 2;
471 if(pcb->rtw_size < 8) pcb->rtw_size = 8;
476 static INLINE unsigned int _calcCurWnd(struct rudp_socket *s, struct sndbuf *psbuf)
478 //if receiver's rawnd is 0, still send one more packet then transmission can be
479 //started again by re-transfer timer, even when the receiver's OPENWND ACK(s) are lost
480 return min(s->pcb->cwnd, psbuf->rwnd);
482 //return min(min(s->pcb->cwnd, psbuf->rwnd),psbuf->rawnd);
483 //return psbuf->rwnd;
488 * mutex_w shall be hold before call _RudpOutput
490 * return: 1 - if a packet is sent; otherwise, no packet is sent
492 static int _RudpOutput(struct rudp_socket *s, int chno, int opt)
494 struct sndbuf *psbuf;
495 struct rcvbuf *prbuf;
496 struct rudp_channel *pch;
498 if(s->tag != RUDP_SOCKET_TAG) return -1;
499 if(s->state <= RS_CLOSED) return -1;
501 pch = &s->pcb->channel[chno];
505 //if(pch->congested && opt == RO_NORMAL) return;
506 if(opt == RO_REXMT || opt == RO_REXMT_FAST) //packets are queued before
508 struct rudp_pkt *pkt = opt == RO_REXMT_FAST ? psbuf->rexmt : psbuf->first;
510 if(prbuf->should_ack == ACKT_DELAYED)
512 prbuf->should_ack = 0;
513 pkt->hdr.flags.ack = 1;
514 pkt->hdr.ackno = ntohl(prbuf->expected_seqno);
515 pkt->hdr.flags.n_loss = prbuf->n_lost;
516 //if(psbuf->first->trans) psbuf->first->hdr.crc32 = calc_crc32(0, (char*)&psbuf->first->hdr, offsetof(struct rudp_hdr, crc32));
518 _sendPacket(s, pch, pkt, opt);
519 pkt->hdr.flags.ack = 0;
523 if(((prbuf->should_ack == ACKT_DELAYED) && !psbuf->first) || prbuf->should_ack == ACKT_OPENWND)
526 prbuf->should_ack = 0;
531 if(opt == 0 && pch->sbuf.rawnd == 0)
533 if(pch->timer[RCT_REXMT] == 0)
534 pch->timer[RCT_PERSIST] = s->pcb->rto * rudp_backoff[pch->sbuf.first?pch->sbuf.first->trans:0];
535 //signalOutput(s, chno);
542 pch->timer[RCT_PERSIST] = 0;
543 if(p && (opt == RO_FORCE /*|| prbuf->should_ack == ACKT_DELAYED */
544 || _calcCurWnd(s, psbuf) > psbuf->n_unacked
545 /* && psbuf->n_unacked < s->pcb->rtw_size*/))
546 //psbuf->n_unacked < psbuf->rawnd) )
548 if(prbuf->should_ack == ACKT_DELAYED)
550 prbuf->should_ack = 0;
551 p->hdr.flags.ack = 1;
552 p->hdr.ackno = ntohl(prbuf->expected_seqno);
553 p->hdr.flags.n_loss = prbuf->n_lost;
554 //if(p->trans) p->hdr.crc32 = calc_crc32(0, (char*)&p->hdr, offsetof(struct rudp_hdr, crc32));
556 //psbuf->n_unacked++;
557 _sendPacket(s, pch, p, opt);
558 p->hdr.flags.ack = 0;
560 //psbuf->not_sent = p = p->next;
561 if(p && p->seqno < psbuf->not_sent->seqno)
563 dbg_msg("#####################################################\n");
567 //if(psbuf->rwnd > 0)
570 if(opt == RO_ONLYONE) return 1;
572 if(_calcCurWnd(s, psbuf) && p)
574 //signalOutput(s, chno);
575 //struct output_notify notify = { s, chno };
576 //PA_Send(sockw_s, ¬ify, sizeof(notify), 0);
584 void _handleTimer500ms(struct rudp_socket *s)
587 struct rudp_pcb *pcb;
589 if(s->state == RS_LISTEN || s->state == RS_DEAD) return;
593 int sbuf_is_empty = 1;
596 PA_MutexLock(s->mutex_w);
597 for(i=0; i<MAX_PHY_CHANNELS; i++)
599 struct rudp_channel *pch = &pcb->channel[i];
600 if(pch->sbuf.first) sbuf_is_empty = 0;
601 for(j=0; j<RCT_CNT; j++)
603 if(pch->timer[j] == 0) continue;
605 if(pch->timer[j] == 0)
610 dbg_msg("Persist timeout.\n");
611 _RudpOutput(s, i, RO_FORCE);
616 if(pch->sbuf.first->trans >= MAX_REXMT_ATTEMPT)
618 PA_MutexUnlock(s->mutex_w);
619 _CleanupSocket(s, ERUDP_TIMEOUTED);
621 s->err = ERUDP_TIMEOUTED;
624 else if(pch->sbuf.first->trans)
626 _congestionDetected(s, i, CONGESTED);
627 _printTime(); dbg_msg("congested: cwnd=%d, ssthresh=%d\n", pcb->cwnd, pcb->ssthresh);
628 _congestionAvoidance(s);
629 _RudpOutput(s, i, RO_REXMT);
631 pch->sbuf.pkt_rttm_start = pch->sbuf.not_sent;
634 _RudpOutput(s, i, 0);
641 if(sbuf_is_empty && s->state == RS_FIN_QUEUED)
644 s->state = RS_FIN_WAIT_1;
645 s->timer[RT_KEEP] = RTT_MIN * RTV_KEEP_CLOSE;
647 PA_MutexUnlock(s->mutex_w);
650 for(i=0; i<RT_CNT; i++)
652 if(s->timer[i] == 0) continue;
659 case RT_KEEP: //for connecting timeout
660 if(s->state == RS_SYN_RCVD || s->state == RS_SYN_SENT)
662 if(s->pcb->retr_cnt >= MAX_RECONN_ATTEMPT)
664 if(list_empty(&s->inst_list))
666 list_del(&s->listen_queue);
667 INIT_LIST_HEAD(&s->listen_queue);
668 _CleanupSocket(s, ERUDP_TIMEOUTED);
673 //_CleanupSocket(s, ERUDP_TIMEOUTED);
674 s->err = ERUDP_TIMEOUTED;
675 s->state = RS_CLOSED;
676 SETEVENT(s->event_w);
682 s->timer[RT_KEEP] = conn_backoff[++s->pcb->retr_cnt];
683 if(s->state == RS_SYN_SENT)
689 else if(s->state >= RS_FIN_QUEUED)
691 dbg_msg("clean and free %p\n", s);
692 _CleanAndFreeSocket(s);
702 void _handleTimer200ms(struct rudp_socket *s)
704 if(s->pcb)// && (s->pcb->r_flags & RUDPF_DELAYACK))
707 for(i=0; i<MAX_PHY_CHANNELS; i++)
709 struct rcvbuf *prb = &s->pcb->channel[i].rbuf;
712 //_printTime(); dbg_msg("delayed ack.\n");
713 PA_MutexLock(s->mutex_w);
714 _RudpOutput(s, i, 0);
715 //signalOutput(s, i);
716 PA_MutexUnlock(s->mutex_w);
721 void _sendHeader(struct rudp_socket *s, struct rudp_hdr *phdr)
723 _printHdr(s->pcb, phdr, &s->pcb->peer);
724 phdr->crc32 = calc_crc32(0, (char*)phdr, offsetof(struct rudp_hdr, crc32));
725 PA_SendTo(s->udp_sock, phdr, sizeof(struct rudp_hdr), 0,
726 s->connected?NULL:(struct sockaddr*)&s->pcb->peer,
727 sizeof(struct sockaddr));
730 void _sendPacket(struct rudp_socket *s, struct rudp_channel *pch, struct rudp_pkt *pkt, int opt)
732 if(pch->timer[RCT_REXMT] == 0)
733 pch->timer[RCT_REXMT] = s->pcb->rto * rudp_backoff[pkt->trans];
735 //dbg_msg("............. seq %u, ts %d...........", pkt->seqno, rudp_now);
736 pkt->hdr.flags.window = WINDOW_HTON(pch->rbuf.win);
737 if(opt != RO_REXMT_FAST)
739 if(!pkt->trans) pch->sbuf.n_unacked++;
741 if(pkt->trans >= MAX_REXMT_ATTEMPT) pkt->trans = MAX_REXMT_ATTEMPT;
743 pkt->hdr.crc32 = calc_crc32(0, (char*)&pkt->hdr, offsetof(struct rudp_hdr, crc32));
745 _printPkt(s->pcb, pkt, PHF_DATA|pch->sbuf.rwnd, &s->pcb->peer);
746 PA_SendTo(s->udp_sock, &pkt->hdr, sizeof(struct rudp_hdr) + pkt->len, 0,
747 s->connected?NULL:(struct sockaddr*)&s->pcb->peer,
748 sizeof(struct sockaddr));
750 void _sendSyn(struct rudp_socket *s)
754 hdr.flags.rudp = RUDP_HEADER_TAG;
756 hdr.seqno = htonl(s->pcb->channel[0].sbuf.seqno);
758 hdr.flags.window = WINDOW_HTON(s->pcb->channel[0].rbuf.win);
759 _sendHeader(s, &hdr);
761 void _sendSynAck(struct rudp_socket *s)
765 hdr.flags.rudp = RUDP_HEADER_TAG;
767 hdr.seqno = htonl(s->pcb->channel[0].sbuf.seqno);
769 hdr.ackno = htonl(s->pcb->channel[0].rbuf.expected_seqno);
770 hdr.flags.window = WINDOW_HTON(s->pcb->channel[0].rbuf.win);
771 _sendHeader(s, &hdr);
774 void _sendEmptyAck(struct rudp_socket *s, int chno)
778 hdr.flags.rudp = RUDP_HEADER_TAG;
780 hdr.flags.chno = chno;
783 hdr.flags.window = 0;
784 _sendHeader(s, &hdr);
788 void _sendAck(struct rudp_socket *s, int chno)
791 struct rcvbuf *pr = &s->pcb->channel[chno].rbuf;
794 hdr.flags.rudp = RUDP_HEADER_TAG;
796 hdr.flags.chno = chno;
797 hdr.seqno = htonl(s->pcb->channel[chno].sbuf.seqno);
798 hdr.ackno = htonl(pr->expected_seqno);
799 hdr.flags.n_loss = pr->n_lost;
800 hdr.flags.window = WINDOW_HTON(pr->win);
801 _sendHeader(s, &hdr);
803 pr->acked_seqno = pr->expected_seqno;
806 void _sendFin(struct rudp_socket *s)
810 hdr.flags.rudp = RUDP_HEADER_TAG;
814 hdr.flags.window = 0;
815 _sendHeader(s, &hdr);
819 void _updateRTO(struct rudp_socket *s, int rtt)
821 int rto0 = s->pcb->rto;
823 /* [Jacobson 1988], refresh rto */
824 int drtt = rtt - s->pcb->srtt;
827 s->pcb->srtt += drtt >> 3;
828 if(drtt > s->pcb->sdev)
829 s->pcb->sdev += (drtt - s->pcb->sdev) >> 2;
831 s->pcb->sdev -= (s->pcb->sdev - drtt) >> 2;
836 s->pcb->srtt -= drtt >> 3;
837 if(drtt > s->pcb->sdev)
838 s->pcb->sdev += (drtt - s->pcb->sdev) >> 2;
840 s->pcb->sdev -= (s->pcb->sdev - drtt) >> 2;
842 s->pcb->rto = s->pcb->srtt + (s->pcb->sdev > 0) ?(s->pcb->sdev << 2):(-s->pcb->sdev << 2);
843 if(s->pcb->rto < 2) s->pcb->rto = 2;
844 dbg_msg("rtt = %d, rto = %d\n", rtt, s->pcb->rto);
846 if(rtt < RTT_MIN) rtt = RTT_MIN;
847 //if(rtt < 2) rtt = 2;
850 if(s->pcb->rto - rto0 > 0)
852 s->pcb->cwnd -= s->pcb->cwnd >> 2;
853 //s->pcb->rtw_size -= s->pcb->rtw_size >> 2;
857 INLINE BOOL _isPacketValid(struct rudp_pkt *pkt)
860 return (calc_crc32(0, (char*)&pkt->hdr, offsetof(struct rudp_hdr, crc32)) == pkt->hdr.crc32)?TRUE:(printf("Invalid packet!\n"),FALSE);
862 return calc_crc32(0, (char*)&pkt->hdr, offsetof(struct rudp_hdr, crc32)) == pkt->hdr.crc32;
866 int _DispatchPacket(struct rudp_socket *s, struct rudp_pkt *pkt, const struct sockaddr *from, int from_len)
868 struct list_head *pp, *qq;
869 struct rudp_socket *sa;
870 struct sockaddr_in *sp, *sf;
872 sf = (struct sockaddr_in*)from;
874 /* We must search each queue to make sure there is no duplicated connection for a listening socket */
876 if(s->state == RS_LISTEN)
878 list_for_each_safe(pp, qq, &s->listen_queue)
880 sa = list_entry(pp, struct rudp_socket, listen_queue);
881 if(!sa->pcb) continue;
882 sp = (struct sockaddr_in*)&sa->pcb->peer;
883 if(sp->sin_addr.s_addr == sf->sin_addr.s_addr && sp->sin_port == sf->sin_port)
886 if(pkt->hdr.flags.rst)
888 list_del(&sa->listen_queue);
889 INIT_LIST_HEAD(&sa->listen_queue);
890 _CleanupSocket(sa, ERUDP_RESETED);
896 if(pkt->hdr.flags.ack && sa->state == RS_SYN_RCVD)
898 if(_ProcessPacket(sa, pkt, from, from_len) < 0)
900 if(check_again && sa->state == RS_ESTABLISHED)
901 SETEVENT(s->event_r);
907 list_for_each(pp, &s->accepted_list)
909 sa = list_entry(pp, struct rudp_socket, accepted_list);
910 if(sa->state >= RS_ESTABLISHED)
912 sp = (struct sockaddr_in*)&sa->pcb->peer;
913 if(sp->sin_addr.s_addr == sf->sin_addr.s_addr && sp->sin_port == sf->sin_port)
915 if(_ProcessPacket(sa, pkt, from, from_len) < 0)
922 /* It's time to Me */
923 if(s->state == RS_LISTEN/*listening socket*/ ||
924 ( s->pcb && (s->connected/*client*/
925 || (((sp = (struct sockaddr_in*)&s->pcb->peer), sp->sin_addr.s_addr == sf->sin_addr.s_addr)
926 && (sp->sin_port == sf->sin_port)/*accepted(listening socket is closed)*/) )
930 if(_ProcessPacket(s, pkt, from, from_len) < 0)
935 /* Nobody want this packet */
936 if(!(pkt->hdr.flags.rst && s->state <= 0))
942 static int _PPState_Established(struct rudp_socket *s, struct rudp_pkt *pkt, const struct sockaddr *from, int from_len)
944 struct rudp_channel *pch;
945 struct sndbuf *psbuf;
946 struct rcvbuf *prbuf;
950 if(pkt->hdr.flags.syn)
952 if(pkt->hdr.flags.ack)
958 if(pkt->hdr.flags.fin)
961 PA_MutexLock(s->mutex_w);
962 s->state = RS_CLOSE_WAIT;
964 s->timer[RT_KEEP] = RTT_MIN * RTV_KEEP_CLOSE;
969 for(i=0; i<MAX_PHY_CHANNELS; i++)
971 struct sndbuf *psb = &s->pcb->channel[i].sbuf;
972 struct rudp_pkt *c = psb->first;
975 struct rudp_pkt *p = c;
979 memset(psb, 0, sizeof(struct sndbuf));
980 for(ii=0; ii<RCT_CNT; ii++) s->pcb->channel[i].timer[ii] = 0;
982 for(i=0; i<RT_CNT; i++) s->timer[i] = 0;
984 PA_MutexUnlock(s->mutex_w);
988 chno = pkt->hdr.flags.chno;
989 pch = &s->pcb->channel[chno];
991 old_rawnd = psbuf->rawnd;
992 psbuf->rawnd = WINDOW_NTOH(pkt->hdr.flags.window);
993 //psbuf->rwnd += old_rawnd - psbuf->rawnd;
994 psbuf->rwnd = psbuf->rawnd - psbuf->n_unacked;
995 if(psbuf->rwnd < 0) psbuf->rwnd = 0;
998 if(pkt->hdr.flags.ack && psbuf->first)
1000 uint32_t ackno = ntohl(pkt->hdr.ackno);
1002 PA_MutexLock(s->mutex_w);
1003 if(SEQ_LE(ackno, psbuf->first->seqno)/* && !pch->congested*/) //duplicated ACK
1004 //if(ackno == psbuf->first->seqno) //duplicated ACK
1006 if(old_rawnd == 0 && psbuf->rawnd > 0) //Recevier's window opened
1008 _RudpOutput(s, pkt->hdr.flags.chno, RO_FORCE);
1009 PA_MutexUnlock(s->mutex_w);
1014 if(psbuf->dup_ack >= 3)
1016 if(psbuf->dup_ack == 3)
1018 psbuf->rlost = pkt->hdr.flags.n_loss;
1019 if(psbuf->rlost == 0) psbuf->rlost = 1;
1020 psbuf->rexmt = psbuf->first;
1021 _congestionDetected(s, chno, psbuf->rlost>1?FASTRETRANS2:FASTRETRANS);
1022 _printTime(); dbg_msg("duplicated ACKs(%d): rlost=%d, cwnd=%d, ssthresh=%d\n",
1023 ackno, psbuf->rlost, s->pcb->cwnd, s->pcb->ssthresh);
1025 pch->timer[RCT_REXMT] = s->pcb->rto * rudp_backoff[0] + 1;
1027 psbuf->pkt_rttm_start = psbuf->not_sent;
1028 psbuf->fastretr_end_seq = psbuf->first->seqno + psbuf->n_unacked - 1;
1031 if(psbuf->rlost && psbuf->rexmt)
1033 _RudpOutput(s, pkt->hdr.flags.chno, RO_REXMT_FAST);
1034 psbuf->rexmt = psbuf->rexmt->next;
1035 psbuf->rlost --; //if(psbuf->rlsot == 0) psbuf->rexmt = NULL;
1037 else if(psbuf->dup_ack > 3)
1040 // Send one new packet every two duplicated acks.
1041 // Because each ack means a packet(very possible being valid) is received by peer,
1042 // so we can inject new packet into network
1043 if(psbuf->dup_ack & 1) _RudpOutput(s, pkt->hdr.flags.chno, RO_FORCE);
1045 if(s->pcb->cwnd > s->pcb->ssthresh)
1046 s->pcb->cwnd = s->pcb->ssthresh;
1047 //psbuf->dup_ack = 0;
1051 _RudpOutput(s, pkt->hdr.flags.chno, RO_ONLYONE);
1053 else// if(SEQ_GT(ackno, psbuf->first->seqno))
1055 struct rudp_pkt *p, *p2;
1056 int fast_rxmt_end = 0; //fast retransmission
1059 //while(p && p->seqno != ackno && psbuf->n_unacked)
1060 while(p && p->seqno < ackno)
1061 {//remove acked packets
1062 if(psbuf->pkt_rttm_start == p)
1063 psbuf->pkt_rttm_start = NULL;
1064 if(p->trans == 1 && psbuf->pkt_rttm_start == NULL)
1065 _updateRTO(s, rudp_now - p->ts);
1067 if(p->seqno == psbuf->fastretr_end_seq && psbuf->dup_ack >= 3)
1072 psbuf->n_unacked --;
1073 assert(psbuf->n_unacked>=0);
1077 // Slow start && congestion avoidance
1078 _congestionAvoidance(s);
1080 // If recovery from congestion, or, after a fast retransmission,
1081 // there's another hole in the unacked queue, continue performing
1082 // fast retransmission.
1083 // Otherwise, stop fast recovery
1084 if(psbuf->rlost && psbuf->rexmt)
1086 _RudpOutput(s, pkt->hdr.flags.chno, RO_REXMT_FAST);
1087 psbuf->rexmt = psbuf->rexmt->next;
1088 psbuf->rlost --; //if(psbuf->rlsot == 0) psbuf->rexmt = NULL;
1090 else if(pch->congested)
1093 if(psbuf->dup_ack & 1) _RudpOutput(s, pkt->hdr.flags.chno, RO_FORCE);
1095 if(s->pcb->cwnd > s->pcb->ssthresh)
1097 s->pcb->cwnd = s->pcb->ssthresh;
1104 if(old_rawnd == 0 && psbuf->rawnd > 0) //Recevier's window opened
1105 _RudpOutput(s, pkt->hdr.flags.chno, RO_FORCE);
1107 while(_RudpOutput(s, pkt->hdr.flags.chno, 0));
1114 psbuf->pkt_rttm_start = psbuf->not_sent = psbuf->last = NULL; //enable rtt measurement
1115 pch->timer[RCT_REXMT] = 0;
1117 assert(psbuf->n_pkt==0);
1120 pch->timer[RCT_REXMT] = s->pcb->rto * rudp_backoff[pkt->trans];
1122 //signalOutput(s, pkt->hdr.flags.chno);
1123 SETEVENT(s->event_w);
1125 PA_MutexUnlock(s->mutex_w);
1129 if(pkt->len) //length of data > 0
1131 uint32_t pos, seqno;
1134 PA_MutexLock(s->mutex_r);
1135 seqno = ntohl(pkt->hdr.seqno);
1138 delta = (int)(seqno - prbuf->first_seq);
1139 if(delta < 0) //old packet, just ignore it
1141 //PA_MutexUnlock(s->mutex_r);
1142 dbg_msg("packet %d is ignored, expected_seqno=%d\n", pkt->seqno, prbuf->expected_seqno);
1143 goto _sendack_and_discard_pkt;
1145 if(delta < prbuf->q_size-1) //in the buffer
1147 pos = (prbuf->head + delta)%prbuf->q_size;
1148 if(prbuf->pkt_q[pos]) //duplicated
1150 dbg_msg("packet %d is duplicated.\n", pkt->seqno);
1151 goto _sendack_and_discard_pkt;
1154 prbuf->pkt_q[pos] = pkt;
1156 //update pointers & window
1157 if((prbuf->tail + prbuf->q_size - prbuf->head)%prbuf->q_size <=
1158 (pos + prbuf->q_size - prbuf->head)%prbuf->q_size)
1159 prbuf->tail = (pos+1)%prbuf->q_size;
1160 prbuf->win = (prbuf->q_size + prbuf->head - prbuf->tail - 1)%prbuf->q_size;
1162 if(prbuf->loss == pos) //just the one we expected
1164 //move "loss" to next empty slot, if any
1165 while(prbuf->loss != prbuf->tail && prbuf->pkt_q[prbuf->loss])
1167 prbuf->loss = (prbuf->loss + 1) % prbuf->q_size;
1168 prbuf->expected_seqno++;
1170 prbuf->should_ack = ACKT_DELAYED;
1171 SETEVENT(s->event_r);
1174 //(re-)calculate the size of (next, or current) hole.
1176 int loss = prbuf->loss, n_lost=0;
1177 while(loss != prbuf->tail && !prbuf->pkt_q[loss] && n_lost < MAX_LOSS_REPORT)
1179 loss = (loss + 1) % prbuf->q_size;
1182 prbuf->n_lost = n_lost;
1183 if(n_lost) prbuf->should_ack = 0;
1186 if(!prbuf->should_ack ||
1187 ((prbuf->should_ack == ACKT_DELAYED) && ((prbuf->expected_seqno - prbuf->acked_seqno) >= 3))
1190 _sendAck(s, pkt->hdr.flags.chno);
1191 prbuf->should_ack = 0;
1194 PA_MutexUnlock(s->mutex_r);
1198 dbg_msg("exceeds receiver's buffer: %d\n", pkt->seqno);
1199 _sendack_and_discard_pkt:
1200 PA_MutexUnlock(s->mutex_r);
1201 _sendAck(s, pkt->hdr.flags.chno);
1204 else if(!pkt->hdr.flags.ack) //What's this? fin?
1206 if(s->state == RS_CLOSE_WAIT)
1208 PA_MutexLock(s->mutex_r);
1209 s->err = ERUDP_PEER_CLOSED;
1210 SETEVENT(s->event_r);
1211 PA_MutexUnlock(s->mutex_r);
1215 PA_MutexLock(s->mutex_w);
1216 _RudpOutput(s, pkt->hdr.flags.chno, 0);
1217 PA_MutexUnlock(s->mutex_w);
1226 /// \brief Process packet
1227 // @return -1 -- if the packet will be released
1228 int _ProcessPacket(struct rudp_socket *s, struct rudp_pkt *pkt, const struct sockaddr *from, int from_len)
1230 struct rudp_channel *pch;
1231 struct sndbuf *psbuf;
1233 if((pkt->hdr.flags.rst) && s->state != RS_LISTEN)
1235 if(s->state == RS_SYN_SENT)
1236 {//To support simultanous connection, ignore it
1237 //s->state = RS_CLOSED;
1238 //s->err = ERUDP_RESETED;
1242 _terminateSocketInternally(s, ERUDP_RESETED);
1250 if(s->err) return -1;
1251 if((s->flags & RF_ADHOC) && !s->pcb && pkt->hdr.flags.fin)
1258 if(pkt->hdr.flags.syn && !pkt->hdr.flags.ack)
1260 struct rudp_socket *ss;
1263 ss = _AllocRudpSocket();
1264 ss->rcvbuf_sz = s->rcvbuf_sz;
1265 ss->udp_sock = s->udp_sock;
1266 ss->pcb = _AllocRudpPcb(ss->rcvbuf_sz, INITIAL_SEQ_NO, ntohl(pkt->hdr.seqno), WINDOW_NTOH(pkt->hdr.flags.window));
1267 memcpy(&ss->pcb->peer, from, from_len);
1268 sa_len = sizeof(struct sockaddr_in);
1269 PA_GetSockName(s->udp_sock, (struct sockaddr*)&ss->pcb->local, &sa_len);
1271 ss->state = RS_SYN_RCVD;
1272 ss->timer[RT_KEEP] = conn_backoff[0];
1273 list_add_tail(&ss->listen_queue, &s->listen_queue);
1282 if(pkt->hdr.flags.syn)
1285 struct rudp_hdr hdr;
1287 pch = &s->pcb->channel[0];
1290 for(i=0; i<MAX_PHY_CHANNELS; i++)
1292 pch[i].rbuf.expected_seqno = pch[i].rbuf.first_seq = ntohl(pkt->hdr.seqno);
1293 s->pcb->rwin_size = pch[i].sbuf.rwnd = pch[i].sbuf.rawnd = WINDOW_NTOH(pkt->hdr.flags.window);
1294 s->pcb->rtw_size = s->pcb->rwin_size/2;//8;
1296 s->pcb->ssthresh = s->pcb->rwin_size; //s->pcb->rtw_size;
1298 /* Send ACK for normal connection, finish handshake.
1300 * Send SYN & ACK for simultaneous open.
1302 memset(&hdr, 0, sizeof(hdr));
1304 hdr.flags.rudp = RUDP_HEADER_TAG;
1306 hdr.ackno = pkt->hdr.seqno;
1307 hdr.flags.window = WINDOW_HTON(s->pcb->channel[0].rbuf.win);
1309 PA_MutexLock(s->mutex_w);
1311 if(pkt->hdr.flags.ack) //Normal open
1313 _sendHeader(s, &hdr);
1314 s->state = RS_ESTABLISHED;
1315 SETEVENT(s->event_w);
1317 else // Simultaneous open
1321 hdr.seqno = htonl(psbuf->seqno);
1322 _sendHeader(s, &hdr);
1324 s->state = RS_SYN_RCVD;
1326 PA_MutexUnlock(s->mutex_w);
1332 if(pkt->hdr.flags.ack)// && pkt->hdr.ackno == s->pcb->channel[0].sbuf.seqno)
1334 pch = &s->pcb->channel[0];
1337 /* Simultaneous open, wakeup thread wait on RUDPConnect(...) */
1338 if(list_empty(&s->listen_queue))
1340 PA_MutexLock(s->mutex_w);
1341 s->state = RS_ESTABLISHED;
1342 SETEVENT(s->event_w);
1343 PA_MutexUnlock(s->mutex_w);
1345 /* Accepted, wakeup thread wait on RUDPAccept(...) */
1348 //waiting thread is wakedup in _DispatchPacket...
1349 //since s is not returned by Accept yet, it's safe without lock
1350 s->state = RS_ESTABLISHED;
1354 else if(pkt->hdr.flags.syn)
1361 case RS_ESTABLISHED:
1362 return _PPState_Established(s, pkt, from, from_len);
1366 if(pkt->hdr.flags.ack)
1368 if(pkt->hdr.flags.fin)
1370 _sendEmptyAck(s, 0);
1371 s->state = RS_TIME_WAIT;
1374 s->state = RS_FIN_WAIT_2;
1376 else if(pkt->hdr.flags.fin)
1378 _sendEmptyAck(s, 0);
1379 s->state = RS_CLOSING;
1381 s->timer[RT_KEEP] = RTT_MIN * RTV_KEEP_CLOSE;
1385 if(pkt->hdr.flags.fin)
1388 s->state = RS_TIME_WAIT;
1397 _sendReset(s, from);
1401 /* pcb might not be allocated, cann't replaced by _sendHeader */
1402 void _sendReset(struct rudp_socket *s, const struct sockaddr *to)
1404 struct rudp_hdr hdr;
1406 memset(&hdr, 0, sizeof(hdr));
1408 hdr.flags.rudp = RUDP_HEADER_TAG;
1410 hdr.crc32 = calc_crc32(0, (char*)&hdr, offsetof(struct rudp_hdr, crc32));
1411 _printHdr(s->pcb, &hdr, (struct sockaddr_in*)to);
1412 PA_SendTo(s->udp_sock, &hdr, sizeof(struct rudp_hdr), 0,
1413 s->connected?NULL:to,
1414 sizeof(struct sockaddr));
1417 PA_THREAD_RETTYPE __STDCALL _RUDPServiceThread(void *pdata)
1421 struct list_head *p;
1423 unsigned long t200, t500, tnow;
1425 t200 = t500 = PA_GetTickCount();
1429 tv.tv_sec = 0; tv.tv_usec = 25*1000;
1430 //FD_SET(sockw_r, &rfds);
1434 PA_MutexLock(mutex_sock_list);
1435 list_for_each(p, &sock_list)
1437 struct rudp_socket *s = list_entry(p, struct rudp_socket, inst_list);
1438 //accepted socket's peer reseted ???
1439 if(1)//s->state != RS_DEAD || !list_empty(&s->accepted_list))
1441 FD_SET(s->udp_sock, &rfds);
1442 max_fd = max(max_fd, s->udp_sock);
1445 PA_MutexUnlock(mutex_sock_list);
1447 if(select(max_fd+1, &rfds, NULL, NULL, &tv) > 0)
1449 PA_MutexLock(mutex_sock_list);
1450 list_for_each(p, &sock_list)
1452 struct rudp_socket *s = list_entry(p, struct rudp_socket, inst_list);
1453 if(FD_ISSET(s->udp_sock, &rfds))
1455 struct sockaddr from;
1457 struct rudp_pkt *pkt;
1461 pkt = _MBufGetPacket();
1462 from_len = sizeof(from);
1465 len = PA_RecvFrom(s->udp_sock, &pkt->hdr, MAX_PACKET_SIZE, 0, &from, &from_len);
1466 if(len < 0 || len < sizeof(struct rudp_hdr) || pkt->hdr.flags.rudp != RUDP_HEADER_TAG ||
1467 !_isPacketValid(pkt))
1473 int err = WSAGetLastError();
1474 if(err != WSAEWOULDBLOCK)
1475 dbg_msg("recvfrom error: %d\n", err);
1476 #elif defined(ARM_UCOS_LWIP)
1477 int err = lwip_get_error(s->udp_sock);
1478 if(err != EWOULDBLOCK)
1479 dbg_msg("recvfrom: %s\n", lwip_strerr(err));
1481 if(errno != EWOULDBLOCK)
1482 dbg_msg("recvfrom: %s\n", strerror(errno));
1485 //_terminateSocketInternally(s, ERUDP_RESETED);
1487 else if(s->non_rudp_pkt_cb)
1488 s->non_rudp_pkt_cb((const uint8_t*)&pkt->hdr, len, s->p_user);
1489 _MBufPutPacket(pkt);
1493 pkt->len = len - sizeof(struct rudp_hdr);
1494 pkt->seqno = ntohl(pkt->hdr.seqno);
1495 _printPkt(s->pcb, pkt, PHF_FROM, (struct sockaddr_in*)&from);
1496 _DispatchPacket(s, pkt, &from, from_len);
1503 PA_MutexUnlock(mutex_sock_list);
1506 tnow = PA_GetTickCount();
1507 PA_MutexLock(mutex_sock_list);
1508 if(tnow - t200 >= DELAY_ACK_MS-4)
1510 //t200 += DELAY_ACK_MS;
1512 _timerProc(_handleTimer200ms);
1514 if(tnow - t500 >= RTT_UINT-4)
1519 _timerProc(_handleTimer500ms);
1521 PA_MutexUnlock(mutex_sock_list);
1525 return (PA_THREAD_RETTYPE)(0);
1529 ////////////////////////////////////////////////////////////////////////////////////////////
1530 extern void initRudpTimer();
1531 extern void uninitRudpTimer();
1536 struct sockaddr_in sai;
1539 sockw_r = socket(AF_INET, SOCK_DGRAM, 0);
1540 sockw_s = socket(AF_INET, SOCK_DGRAM, 0);
1542 memset(&sai, 0, sizeof(sai));
1543 sai.sin_family = AF_INET;
1544 sai.sin_addr.s_addr = inet_addr("127.0.0.1");
1545 bind(sockw_r, (struct sockaddr*)&sai, sizeof(sai));
1546 salen = sizeof(sai);
1547 PA_GetSockName(sockw_r, (struct sockaddr*)&sai, &salen);
1549 if(connect(sockw_s, (struct sockaddr*)&sai, sizeof(sai)) < 0)
1551 perror("connect to output notification slot");
1555 PA_MutexInit(mutex_sock_list);
1556 PA_MutexInit(mutex_pkt_pool);
1559 hthd = PA_ThreadCreate(_RUDPServiceThread, NULL);
1567 struct list_head *p, *q;
1569 //uninitRudpTimer();
1572 PA_ThreadWaitUntilTerminate(hthd);
1573 PA_ThreadCloseHandle(hthd);
1574 //PA_SocketClose(sockw_r);
1575 //PA_SocketClose(sockw_s);
1577 PA_MutexLock(mutex_sock_list);
1578 list_for_each_safe(p, q, &sock_list)
1580 struct rudp_socket *s = list_entry(p, struct rudp_socket, inst_list);
1581 if(!list_empty(&s->listen_queue))
1583 struct list_head *pp, *qq;
1584 list_for_each_safe(pp, qq, &s->listen_queue)
1586 struct rudp_socket *ss = list_entry(pp, struct rudp_socket, accepted_list);
1587 _sendReset(ss, (struct sockaddr*)&ss->pcb->peer);
1588 _CleanupSocket(ss, 0);
1593 if(s->state == RS_ESTABLISHED)
1594 _sendReset(s, (struct sockaddr*)&s->pcb->peer);
1595 _CleanupSocket(s, 0);
1597 PA_SocketClose(s->udp_sock);
1600 PA_MutexUnlock(mutex_sock_list);
1602 PA_MutexUninit(mutex_pkt_pool);
1605 struct rudp_pkt *p = free_pkt;
1606 free_pkt = free_pkt->next;
1610 PA_MutexUninit(mutex_sock_list);
1615 RUDPSOCKET RUDPSocket()
1617 struct rudp_socket *sock = _AllocRudpSocket();
1618 sock->udp_sock = socket(AF_INET, SOCK_DGRAM, 0);
1619 PA_SocketSetNBlk(sock->udp_sock, 1);
1620 PA_MutexLock(mutex_sock_list);
1621 list_add_tail(&sock->inst_list, &sock_list);
1622 PA_MutexUnlock(mutex_sock_list);
1624 return (RUDPSOCKET)sock;
1627 RUDPSOCKET RUDPSocketFromUdp(int udpsock)
1629 struct rudp_socket *s = _AllocRudpSocket();
1630 s->udp_sock = udpsock;
1631 PA_SocketSetNBlk(udpsock, 1);
1632 PA_MutexLock(mutex_sock_list);
1633 list_add_tail(&s->inst_list, &sock_list);
1634 PA_MutexUnlock(mutex_sock_list);
1636 return (RUDPSOCKET)s;
1639 int RUDPSetInvalidPacketCB(RUDPSOCKET sock, NONRUDPPACKETCB pkt_cb, void *p_user)
1641 struct rudp_socket *s = (struct rudp_socket*)sock;
1642 if(s->tag != RUDP_SOCKET_TAG) return ERUDP_NOT_SOCKET;
1643 s->non_rudp_pkt_cb = (_NONRUDPPACKETCB)pkt_cb;
1648 int RUDPClose(RUDPSOCKET sock)
1650 struct rudp_socket *s = (struct rudp_socket*)sock;
1651 if(s->tag != RUDP_SOCKET_TAG) return ERUDP_NOT_SOCKET;
1659 struct list_head *p, *q;
1660 PA_MutexLock(mutex_sock_list);
1661 list_for_each_safe(p, q, &s->listen_queue)
1663 struct rudp_socket *aa = list_entry(p, struct rudp_socket, accepted_list);
1664 _sendReset(aa, (struct sockaddr*)&aa->pcb->peer);
1665 _CleanupSocket(aa, 0);
1669 PA_MutexUnlock(mutex_sock_list);
1674 _sendReset(s, (struct sockaddr*)&s->pcb->peer);
1676 case RS_ESTABLISHED:
1678 int i, sb_is_empty = 1;
1679 PA_MutexLock(s->mutex_w);
1680 for(i=0; i<MAX_PHY_CHANNELS; i++)
1682 struct sndbuf *psb = &s->pcb->channel[i].sbuf;
1683 struct rcvbuf *prb = &s->pcb->channel[i].rbuf;
1684 if(psb->first != NULL) { sb_is_empty = 0; }
1686 for(; prb->head != prb->tail; prb->head = (prb->head+1)%prb->q_size)
1687 if(prb->pkt_q[prb->head])
1689 _MBufPutPacket(prb->pkt_q[prb->head]);
1690 prb->pkt_q[prb->head] = NULL;
1696 s->state = RS_FIN_WAIT_1;
1697 s->timer[RT_KEEP] = RTT_MIN * RTV_KEEP_CLOSE;
1701 s->state = RS_FIN_QUEUED;
1703 PA_MutexUnlock(s->mutex_w);
1713 return ERUDP_NOT_ALLOWED;
1717 * Remove from inst_list first, then cleanup the resources.
1718 * If we are managed to maintain the closing states in future, cleanup
1719 * should be executed in timerProc()
1722 if(1)//s->state < RS_ESTABLISHED)
1724 PA_MutexLock(mutex_sock_list);
1725 _CleanAndFreeSocket(s);
1726 PA_MutexUnlock(mutex_sock_list);
1732 int RUDPListen(RUDPSOCKET sock, int n)
1734 struct rudp_socket *s = (struct rudp_socket*)sock;
1735 if(s->tag != RUDP_SOCKET_TAG) return ERUDP_NOT_SOCKET;
1737 if(s->state != RS_CLOSED) return ERUDP_NOT_ALLOWED;
1738 INIT_LIST_HEAD(&s->listen_queue);
1739 s->state = RS_LISTEN;
1741 #if defined(WIN32) || defined(ARM_UCOS_LWIP)
1744 opt = 100*1024;//1500*s->pcb->channel[0].sbuf.rwin_size/2;
1745 setsockopt(s->udp_sock, SOL_SOCKET, SO_RCVBUF, (const char*)&opt, sizeof(int));
1746 //setsockopt(s->udp_sock, SOL_SOCKET, SO_SNDBUF, &opt, sizeof(int));
1753 int RUDPAccept(RUDPSOCKET sock, RUDPSOCKET *accepted, struct sockaddr *addr, int *addrlen)
1755 struct rudp_socket *s, *a;
1756 struct list_head *p, *q;
1758 s = (struct rudp_socket*)sock;
1759 if(s->tag != RUDP_SOCKET_TAG) return ERUDP_NOT_SOCKET;
1763 PA_MutexLock(s->mutex_r);
1764 #if defined(WIN32) || defined(ARM_UCOS_LWIP)
1765 if(list_empty(&s->listen_queue))
1767 PA_MutexUnlock(s->mutex_r);
1768 PA_EventWait(s->event_r);
1769 PA_MutexLock(s->mutex_r);
1772 while(list_empty(&s->listen_queue))
1773 pthread_cond_wait(&s->event_r, &s->mutex_r);
1775 PA_MutexUnlock(s->mutex_r);
1778 PA_MutexLock(mutex_sock_list);
1780 list_for_each_safe(p, q, &s->listen_queue)
1782 a = list_entry(p, struct rudp_socket, listen_queue);
1783 if(a->state == RS_ESTABLISHED)
1787 INIT_LIST_HEAD(&a->accepted_list);
1788 list_add_tail(&a->accepted_list, &s->accepted_list);
1790 INIT_LIST_HEAD(&a->listen_queue);
1791 *accepted = (RUDPSOCKET)a;
1792 memcpy(addr, &a->pcb->peer, sizeof(struct sockaddr));
1793 *addrlen = sizeof(struct sockaddr);
1799 PA_MutexUnlock(mutex_sock_list);
1803 if(s->flags & RF_NBLK)
1811 int RUDPBind(RUDPSOCKET sock, const struct sockaddr *addr, int addrlen)
1813 struct rudp_socket *s = (struct rudp_socket*)sock;
1814 if(s->tag != RUDP_SOCKET_TAG) return ERUDP_NOT_SOCKET;
1816 if(bind(s->udp_sock, addr, addrlen) == 0) return 0;
1821 /* For simultaneous open:
1822 * 1. Call RUDPBind(...) to bind to a local port, call RUDPAccept(...) on NEITHER sides
1823 * 2. Call RUDPConnect(...) on both sides
1825 int RUDPConnect(RUDPSOCKET sock, const struct sockaddr* addr, int addr_len)
1827 struct rudp_socket *s;
1830 s = (struct rudp_socket*)sock;
1831 if(s->tag != RUDP_SOCKET_TAG) return ERUDP_NOT_SOCKET;
1832 if(s->state == RS_ESTABLISHED) return ERUDP_CONNECTED;
1833 if(s->state == RS_SYN_SENT) return ERUDP_IN_PROGRESS;
1834 if(s->state != RS_CLOSED/* || s->pcb*/) return ERUDP_NOT_ALLOWED;
1838 if(!s->pcb) s->pcb = _AllocRudpPcb(s->rcvbuf_sz, INITIAL_SEQ_NO, 0, 1);
1839 memcpy(&s->pcb->peer, addr, sizeof(struct sockaddr));
1841 if(connect(s->udp_sock, addr, addr_len) == 0)
1842 s->connected = TRUE;
1844 dbg_msg("call connect() failed.\n");
1846 sa_len = sizeof(struct sockaddr_in);
1847 PA_GetSockName(s->udp_sock, (struct sockaddr*)&s->pcb->local, &sa_len);
1848 s->state = RS_SYN_SENT;
1849 s->timer[RT_KEEP] = conn_backoff[0];
1853 s->pcb->retr_cnt = 0;
1855 if(s->flags & RF_NBLK)
1858 #if defined(WIN32) || defined(ARM_UCOS_LWIP)
1859 PA_EventWait(s->event_w);
1861 PA_MutexLock(s->mutex_w);
1862 while(s->state == RS_SYN_SENT)
1863 pthread_cond_wait(&s->event_w, &s->mutex_w);
1864 PA_MutexUnlock(s->mutex_w);
1867 if(s->state == RS_ESTABLISHED)
1872 setsockopt(s->udp_sock, SOL_SOCKET, SO_RCVBUF, (const char*)&opt, sizeof(int));
1878 return ERUDP_CONN_FAILED;
1881 /** Set rudp socket to connected state(RS_ESTABLSHED) with default setting(receiver's buffer, ...)
1883 int RUDPConnected(RUDPSOCKET sock, const struct sockaddr* addr, int peer_rbuf_sz)
1885 struct rudp_socket *s;
1888 s = (struct rudp_socket*)sock;
1889 if(s->tag != RUDP_SOCKET_TAG) return ERUDP_NOT_SOCKET;
1890 if(s->state == RS_ESTABLISHED) return ERUDP_CONNECTED;
1891 if(s->state == RS_SYN_SENT) return ERUDP_IN_PROGRESS;
1892 if(s->state != RS_CLOSED/* || s->pcb*/) return ERUDP_NOT_ALLOWED;
1896 if(peer_rbuf_sz) s->rcvbuf_sz = peer_rbuf_sz;
1897 if(!s->pcb) s->pcb = _AllocRudpPcb(s->rcvbuf_sz, INITIAL_SEQ_NO, 0, 1);
1898 memcpy(&s->pcb->peer, addr, sizeof(struct sockaddr));
1900 if(connect(s->udp_sock, addr, sizeof(struct sockaddr)) == 0)
1901 s->connected = TRUE;
1903 dbg_msg("call connect() failed.\n");
1905 sa_len = sizeof(struct sockaddr_in);
1906 PA_GetSockName(s->udp_sock, (struct sockaddr*)&s->pcb->local, &sa_len);
1907 //PA_MutexLock(s->mutex_w);
1908 s->state = RS_ESTABLISHED;
1909 SETEVENT(s->event_w);
1910 //PA_MutexUnlock(s->mutex_w);
1913 setsockopt(s->udp_sock, SOL_SOCKET, SO_RCVBUF, (const char*)&opt, sizeof(int));
1918 //! \retval >=0 bytes sent
1919 //! \retval <0 error code
1920 int RUDPSendV(RUDPSOCKET sock, int chno, const PA_IOVEC *v, unsigned int size, int flags)
1922 struct rudp_socket *s;
1924 unsigned int i, len, byt_sent;
1927 for(i=0; i<size; i++) len += PA_IoVecGetLen(&v[i]);
1928 if(len == 0) return 0;
1930 s = (struct rudp_socket*)sock;
1931 if(s->tag != RUDP_SOCKET_TAG) return ERUDP_NOT_SOCKET;
1932 if(s->err) return s->err;
1933 if(s->state != RS_ESTABLISHED) return ERUDP_NO_CONN;
1936 ps = &s->pcb->channel[PHY_CHN(chno)].sbuf;
1937 PA_MutexLock(s->mutex_w);
1938 if(s->state == RS_DEAD)
1940 PA_MutexLock(s->mutex_w);
1945 #if defined(WIN32) || defined(ARM_UCOS_LWIP)
1946 if(ps->n_pkt >= ps->max_pkts && s->state != RS_DEAD)
1948 if((flags & RUDPMSG_DONTWAIT) || (s->flags & RF_NBLK))
1950 PA_MutexUnlock(s->mutex_w);
1953 PA_MutexUnlock(s->mutex_w);
1954 PA_EventWait(s->event_w);
1955 PA_MutexLock(s->mutex_w);
1958 while(ps->n_pkt >= ps->max_pkts && s->state != RS_DEAD)
1960 if((flags & RUDPMSG_DONTWAIT) || (s->flags & RF_NBLK))
1962 PA_MutexUnlock(s->mutex_w);
1965 pthread_cond_wait(&s->event_w, &s->mutex_w);
1970 unsigned int t, copied = 0/*byte copied in an IOVEC*/;
1971 struct rudp_pkt *last = ps->last;
1974 //if last packet has the same chno, fill it
1975 // <<<-- always has the same chno since we have sending-queue for each chno(20150203)
1976 if(last && last->trans == 0 && last->hdr.flags.chno == chno)
1978 for(; i<size && last->len < MAX_DATA_SIZE; i++)
1980 t = min(PA_IoVecGetLen(&v[i]), MAX_DATA_SIZE - last->len);
1981 memcpy(last->data + last->len, PA_IoVecGetPtr(&v[i]), t);
1984 if(PA_IoVecGetLen(&v[i]) != t) //last packet is full, but there still are data in v[i]
1994 struct rudp_pkt *pkt;
1996 pkt = _MBufGetPacket();
1999 pkt->hdr.flags.chno = chno;
2001 for(; i<size && pkt->len < MAX_DATA_SIZE; i++, copied = 0)
2003 t = min(PA_IoVecGetLen(&v[i]) - copied, MAX_DATA_SIZE - pkt->len);
2004 memcpy(pkt->data + pkt->len, (char*)PA_IoVecGetPtr(&v[i]) + copied, t);
2007 if(PA_IoVecGetLen(&v[i]) - copied != t) //pkt is full
2015 _MBufPutPacket(pkt);
2019 pkt->seqno = s->pcb->channel[PHY_CHN(chno)].sbuf.seqno++;
2020 pkt->hdr.seqno = htonl(pkt->seqno);
2023 ps->last->next = pkt;
2025 //ps->last = ps->last->next = pkt;
2026 if(!ps->not_sent) ps->not_sent = ps->last;
2029 ps->first = ps->last = ps->not_sent = pkt;
2034 //signalOutput(s, PHY_CHN(chno));
2035 //_RudpOutput(s, PHY_CHN(chno), 0);
2036 while(_RudpOutput(s, PHY_CHN(chno), 0) > 0);
2041 PA_MutexUnlock(s->mutex_w);
2046 //! \param priority 0~15. Low value has a higher priority
2047 //! \retval >=0 bytes sent
2048 //! \retval <0 error code
2049 int RUDPSendVEx(RUDPSOCKET sock, int chno, int priority, const PA_IOVEC *v, unsigned int size, int flags)
2051 struct rudp_socket *s;
2053 unsigned int i, len, byt_sent;
2056 for(i=0; i<size; i++) len += PA_IoVecGetLen(&v[i]);
2057 if(len == 0) return 0;
2059 s = (struct rudp_socket*)sock;
2060 if(s->tag != RUDP_SOCKET_TAG) return ERUDP_NOT_SOCKET;
2061 if(s->err) return s->err;
2062 if(s->state != RS_ESTABLISHED) return ERUDP_NO_CONN;
2065 ps = &s->pcb->channel[PHY_CHN(chno)].sbuf;
2066 PA_MutexLock(s->mutex_w);
2067 if(s->state == RS_DEAD)
2069 PA_MutexLock(s->mutex_w);
2074 #if defined(WIN32) || defined(ARM_UCOS_LWIP)
2075 if(ps->n_pkt >= ps->max_pkts && s->state != RS_DEAD)
2077 if((flags & RUDPMSG_DONTWAIT) || (s->flags & RF_NBLK))
2079 PA_MutexUnlock(s->mutex_w);
2082 PA_MutexUnlock(s->mutex_w);
2083 PA_EventWait(s->event_w);
2084 PA_MutexLock(s->mutex_w);
2087 while(ps->n_pkt >= ps->max_pkts && s->state != RS_DEAD)
2089 if((flags & RUDPMSG_DONTWAIT) || (s->flags & RF_NBLK))
2091 PA_MutexUnlock(s->mutex_w);
2094 pthread_cond_wait(&s->event_w, &s->mutex_w);
2099 unsigned int t, copied = 0/*byte copied in an IOVEC*/;
2100 struct rudp_pkt *next_sav, *pos = ps->not_sent;
2102 if(pos == NULL) pos = ps->last;
2104 while(pos != ps->last && pos->priority <= priority)
2108 next_sav = pos->next;
2110 //if the packet has the same priority, fill it.
2111 if(pos && pos->trans == 0 && pos->hdr.flags.chno == chno && pos->priority == priority)
2113 for(; i<size && pos->len < MAX_DATA_SIZE; i++)
2115 t = min(PA_IoVecGetLen(&v[i]), MAX_DATA_SIZE - pos->len);
2116 memcpy(pos->data + pos->len, PA_IoVecGetPtr(&v[i]), t);
2119 if(PA_IoVecGetLen(&v[i]) != t) //packet is full, but there still are data in v[i]
2129 struct rudp_pkt *pkt;
2131 pkt = _MBufGetPacket();
2134 pkt->hdr.flags.chno = chno;
2136 for(; i<size && pkt->len < MAX_DATA_SIZE; i++, copied = 0)
2138 t = min(PA_IoVecGetLen(&v[i]) - copied, MAX_DATA_SIZE - pkt->len);
2139 memcpy(pkt->data + pkt->len, (char*)PA_IoVecGetPtr(&v[i]) + copied, t);
2142 if(PA_IoVecGetLen(&v[i]) - copied != t) //pkt is full
2150 _MBufPutPacket(pkt);
2154 pkt->seqno = s->pcb->channel[PHY_CHN(chno)].sbuf.seqno++;
2155 pkt->hdr.seqno = htonl(pkt->seqno);
2160 if(!ps->not_sent) ps->not_sent = pos;
2163 ps->first = ps->last = ps->not_sent = pkt;
2168 pos->next = next_sav;
2170 //signalOutput(s, PHY_CHN(chno));
2171 //_RudpOutput(s, PHY_CHN(chno), 0);
2172 while(_RudpOutput(s, PHY_CHN(chno), 0) > 0);
2177 PA_MutexUnlock(s->mutex_w);
2182 int RUDPSend(RUDPSOCKET sock, int chno, const void *ptr, int len, int flags)
2184 struct rudp_socket *s;
2188 if(len == 0) return 0;
2189 s = (struct rudp_socket*)sock;
2190 if(s->tag != RUDP_SOCKET_TAG) return ERUDP_NOT_SOCKET;
2191 if(s->err) return s->err;
2192 if(s->state != RS_ESTABLISHED) return ERUDP_NO_CONN;
2195 ps = &s->pcb->channel[PHY_CHN(chno)].sbuf;
2196 PA_MutexLock(s->mutex_w);
2197 if(s->state == RS_DEAD)
2199 PA_MutexLock(s->mutex_w);
2204 /* Put a packet on the end of sending-queue then return.
2205 * If the sending-queue is full, wait on this queue.
2206 * The actual sending is done in the service thread.
2208 #if defined(WIN32) || defined(ARM_UCOS_LWIP)
2209 while(ps->n_pkt >= ps->max_pkts && s->state != RS_DEAD)
2211 if((flags & RUDPMSG_DONTWAIT) || (s->flags & RF_NBLK))
2213 PA_MutexUnlock(s->mutex_w);
2216 PA_MutexUnlock(s->mutex_w);
2217 PA_EventWait(s->event_w);
2218 PA_MutexLock(s->mutex_w);
2221 while(ps->n_pkt >= ps->max_pkts && s->state != RS_DEAD)
2223 if((flags & RUDPMSG_DONTWAIT) || (s->flags & RF_NBLK))
2225 PA_MutexUnlock(s->mutex_w);
2228 pthread_cond_wait(&s->event_w, &s->mutex_w);
2236 struct rudp_pkt *last, *pkt;
2241 //Merge small packets with the same chno
2242 if(last && last->trans == 0 && last->hdr.flags.chno == chno && last->len < MAX_DATA_SIZE)
2244 t = min(len, MAX_DATA_SIZE - last->len);
2245 memcpy(last->data + last->len, data, t);
2254 pkt = _MBufGetPacket();
2257 pkt->hdr.flags.chno = chno;
2259 t = min(len, MAX_DATA_SIZE);
2260 memcpy(pkt->data, data, t);
2263 pkt->seqno = ps->seqno++;
2264 pkt->hdr.seqno = htonl(pkt->seqno);
2267 ps->last->next = pkt;
2269 //ps->last = ps->last->next = pkt;
2270 if(!ps->not_sent) ps->not_sent = ps->last;
2273 ps->first = ps->last = ps->not_sent = pkt;
2281 //signalOutput(s, PHY_CHN(chno));
2282 //_RudpOutput(s, PHY_CHN(chno), 0);
2283 while(_RudpOutput(s, PHY_CHN(chno), 0) > 0);
2288 PA_MutexUnlock(s->mutex_w);
2290 //_sendReset(s, &s->pcb->peer);
2294 int RUDPSendEx(RUDPSOCKET sock, int chno, int priority, const void *ptr, int len, int flags)
2296 struct rudp_socket *s;
2300 if(len == 0) return 0;
2301 s = (struct rudp_socket*)sock;
2302 if(s->tag != RUDP_SOCKET_TAG) return ERUDP_NOT_SOCKET;
2303 if(s->err) return s->err;
2304 if(s->state != RS_ESTABLISHED) return ERUDP_NO_CONN;
2307 ps = &s->pcb->channel[PHY_CHN(chno)].sbuf;
2308 PA_MutexLock(s->mutex_w);
2309 if(s->state == RS_DEAD)
2311 PA_MutexLock(s->mutex_w);
2316 /* Put a packet on the end of sending-queue then return.
2317 * If the sending-queue is full, wait on this queue.
2318 * The actual sending is done in the service thread.
2320 #if defined(WIN32) || defined(ARM_UCOS_LWIP)
2321 while(ps->n_pkt >= ps->max_pkts && s->state != RS_DEAD)
2323 if((flags & RUDPMSG_DONTWAIT) || (s->flags & RF_NBLK))
2325 PA_MutexUnlock(s->mutex_w);
2328 PA_MutexUnlock(s->mutex_w);
2329 PA_EventWait(s->event_w);
2330 PA_MutexLock(s->mutex_w);
2333 while(ps->n_pkt >= ps->max_pkts && s->state != RS_DEAD)
2335 if((flags & RUDPMSG_DONTWAIT) || (s->flags & RF_NBLK))
2337 PA_MutexUnlock(s->mutex_w);
2340 pthread_cond_wait(&s->event_w, &s->mutex_w);
2348 struct rudp_pkt *pos, *pkt, *next_sav;
2352 while(pos && pos != ps->last && pos->priority <= priority)
2355 next_sav = pos->next;
2357 //Merge small packets with the same chno
2358 if(pos && pos->trans == 0 && pos->hdr.flags.chno == chno && pos->len < MAX_DATA_SIZE)
2360 t = min(len, MAX_DATA_SIZE - pos->len);
2361 memcpy(pos->data + pos->len, data, t);
2367 while(byt_sent < len)
2369 pkt = _MBufGetPacket();
2372 pkt->hdr.flags.chno = chno;
2374 t = min(len, MAX_DATA_SIZE);
2375 memcpy(pkt->data, data, t);
2378 pkt->seqno = ps->seqno++;
2379 pkt->hdr.seqno = htonl(pkt->seqno);
2384 if(!ps->not_sent) ps->not_sent = pos;
2387 ps->first = ps->last = ps->not_sent = pkt;
2394 //signalOutput(s, PHY_CHN(chno));
2395 //_RudpOutput(s, PHY_CHN(chno), 0);
2396 while(_RudpOutput(s, PHY_CHN(chno), 0) > 0);
2401 PA_MutexUnlock(s->mutex_w);
2403 //_sendReset(s, &s->pcb->peer);
2406 /** Receive a packet
2407 \param chno channel of packet[out]
2409 \return length of data received
2411 int RUDPRecv(RUDPSOCKET sock, int *chno, void *ptr, int len, int flags)
2413 struct rudp_socket *s;
2414 struct rudp_pcb *pcb;
2419 s = (struct rudp_socket*)sock;
2420 if(s->tag != RUDP_SOCKET_TAG) return ERUDP_NOT_SOCKET;
2421 if(s->err && s->err != ERUDP_PEER_CLOSED) return s->err;
2422 if(s->state != RS_ESTABLISHED && s->state != RS_CLOSE_WAIT) return ERUDP_NO_CONN;
2425 PA_MutexLock(s->mutex_r);
2426 if(s->state == RS_DEAD)
2428 PA_MutexUnlock(s->mutex_r);
2433 #if defined(WIN32) || defined(ARM_UCOS_LWIP)
2434 for(i=0; i<MAX_PHY_CHANNELS; i++)
2436 prb = &pcb->channel[i].rbuf;
2437 if(prb->pkt_q[prb->head]) { no_data = 0; break; }
2441 PA_MutexUnlock(s->mutex_r);
2442 if(s->state == RS_CLOSE_WAIT) return 0;
2443 if((flags & RUDPMSG_DONTWAIT) || (s->flags & RF_NBLK))
2445 PA_EventWait(s->event_r);
2446 PA_MutexLock(s->mutex_r);
2449 while(s->state == RS_ESTABLISHED)
2451 for(i=0; i<MAX_PHY_CHANNELS; i++)
2453 prb = &pcb->channel[i].rbuf;
2454 if(prb->pkt_q[prb->head]) { no_data = 0; break; }
2457 if(s->state == RS_CLOSE_WAIT)
2459 PA_MutexUnlock(s->mutex_r);
2462 if((flags & RUDPMSG_DONTWAIT) || (s->flags & RF_NBLK))
2464 PA_MutexUnlock(s->mutex_r);
2467 pthread_cond_wait(&s->event_r, &s->mutex_r);
2473 for(i=0; i<MAX_PHY_CHANNELS; i++)
2475 struct rudp_pkt *pkt;
2477 prb = &pcb->channel[i].rbuf;
2478 pkt = prb->pkt_q[prb->head];
2484 memcpy(ptr, pkt->pdata, len);
2491 memcpy(ptr, pkt->pdata, rlen);
2492 *chno = pkt->hdr.flags.chno;
2494 _MBufPutPacket(pkt);
2496 prb->pkt_q[prb->head] = NULL;
2497 /* "first_seq" always updates with "head",
2498 * and is used to calculate the inserted posistion
2499 * when a packet is received
2501 prb->head = (prb->head+1)%prb->q_size;
2506 if(prb->win == 1 && !prb->should_ack)
2508 prb->should_ack = ACKT_OPENWND;
2509 _RudpOutput(s, PHY_CHN(*chno), 0);
2510 //signalOutput(s, *chno);
2518 if(s->flags & RF_NBLK)
2519 rlen = -ERUDP_AGAIN;
2524 else if(s->err == ERUDP_PEER_CLOSED)
2528 PA_MutexUnlock(s->mutex_r);
2533 /** Receive a packet
2534 \param chno channel of packet[out]
2536 \return length of data received
2538 int RUDPRecvChn(RUDPSOCKET sock, int *chno, void *ptr, int len, int flags)
2540 struct rudp_socket *s;
2541 struct rudp_pcb *pcb;
2545 s = (struct rudp_socket*)sock;
2546 if(s->tag != RUDP_SOCKET_TAG) return ERUDP_NOT_SOCKET;
2547 if(s->err && s->err != ERUDP_PEER_CLOSED) return s->err;
2548 if(s->state != RS_ESTABLISHED && s->state != RS_CLOSE_WAIT) return ERUDP_NO_CONN;
2551 PA_MutexLock(s->mutex_r);
2552 if(s->state == RS_DEAD)
2554 PA_MutexUnlock(s->mutex_r);
2557 prb = &pcb->channel[PHY_CHN(*chno)].rbuf;
2560 #if defined(WIN32) || defined(ARM_UCOS_LWIP)
2561 if(!prb->pkt_q[prb->head])
2563 PA_MutexUnlock(s->mutex_r);
2564 if(s->state == RS_CLOSE_WAIT) return 0;
2565 if(s->flags & RF_NBLK) return ERUDP_AGAIN;
2566 PA_EventWait(s->event_r);
2567 PA_MutexLock(s->mutex_r);
2570 while(s->state == RS_ESTABLISHED)
2572 if(prb->pkt_q[prb->head]) break;
2574 if(s->state == RS_CLOSE_WAIT)
2576 PA_MutexUnlock(s->mutex_r);
2579 if(s->flags & RF_NBLK)
2581 PA_MutexUnlock(s->mutex_r);
2584 pthread_cond_wait(&s->event_r, &s->mutex_r);
2589 struct rudp_pkt *pkt;
2591 pkt = prb->pkt_q[prb->head];
2597 memcpy(ptr, pkt->pdata, len);
2598 *chno = pkt->hdr.flags.chno;
2606 memcpy(ptr, pkt->pdata, rlen);
2607 *chno = pkt->hdr.flags.chno;
2609 _MBufPutPacket(pkt);
2611 prb->pkt_q[prb->head] = NULL;
2612 /* "first_seq" always updates with "head",
2613 * and is used to calculate the inserted posistion
2614 * when a packet is received
2616 prb->head = (prb->head+1)%prb->q_size;
2621 if(prb->win == 1 && !prb->should_ack)
2623 prb->should_ack = ACKT_OPENWND;
2624 _RudpOutput(s, PHY_CHN(*chno), 0);
2625 //signalOutput(s, *chno);
2631 if(s->flags & RF_NBLK)
2632 rlen = -ERUDP_AGAIN;
2637 else if(s->err == ERUDP_PEER_CLOSED)
2641 PA_MutexUnlock(s->mutex_r);
2646 //return: <0 - failed
2649 int RUDPSelectSock(RUDPSOCKET sock, int chno, int flag, const struct timeval *timeout)
2651 struct rudp_socket *s;
2652 #if defined(__LINUX__) || defined(__ANDROID__)
2655 s = (struct rudp_socket*)sock;
2656 if(s->tag != RUDP_SOCKET_TAG) return ERUDP_NOT_SOCKET;
2659 #if defined(__LINUX__) || defined(__ANDROID__)
2662 clock_gettime(CLOCK_REALTIME, &ts);
2663 ts.tv_sec += timeout->tv_sec;
2664 ts.tv_nsec += 1000 * timeout->tv_usec;
2665 if(ts.tv_nsec > 1000000000) {
2666 ts.tv_nsec -= 1000000000;
2672 if(flag == RUDPSELECT_READABLE)
2676 PA_MutexLock(s->mutex_r);
2677 if(s->err == ERUDP_RESETED || s->err == ERUDP_PEER_CLOSED)
2679 PA_MutexUnlock(s->mutex_r);
2684 if(no_data/* && s->state == RS_ESTABLISHED*/)
2687 if(s->state == RS_LISTEN)
2689 struct list_head *p;
2690 list_for_each(p, &s->listen_queue)
2692 struct rudp_socket *ss = list_entry(p, struct rudp_socket, listen_queue);
2693 if(ss->state == RS_ESTABLISHED) //established socket, can be returned by RUDPAccept
2700 else if(s->state == RS_ESTABLISHED)
2702 for(i=0; i<MAX_PHY_CHANNELS; i++)
2704 if(chno < 0 || chno == i)
2706 prb = &s->pcb->channel[i].rbuf;
2707 if(prb->pkt_q[prb->head]) { no_data = 0; break; }
2714 #if defined(WIN32) || defined(ARM_UCOS_LWIP)
2715 PA_MutexUnlock(s->mutex_r);
2717 no_data = !PA_EventWaitTimed(s->event_r, timeout->tv_sec*1000+timeout->tv_usec/1000);
2719 PA_EventWait(s->event_r);
2722 PA_MutexLock(s->mutex_r);
2725 no_data = pthread_cond_timedwait(&s->event_r, &s->mutex_r, &ts) == ETIMEDOUT ? 1 : 0;
2727 no_data = pthread_cond_wait(&s->event_r, &s->mutex_r) != 0;
2732 PA_MutexUnlock(s->mutex_r);
2733 if(s->err) return s->err;
2734 else return !no_data;
2738 if(flag == RUDPSELECT_WRITABLE) do
2743 if(chno < 0) return ERUDP_INVALID;
2744 if(s->state != RS_ESTABLISHED) break;
2746 PA_MutexLock(s->mutex_w);
2747 if(s->state == RS_DEAD)
2749 PA_MutexLock(s->mutex_w);
2753 ps = &s->pcb->channel[chno].sbuf;
2754 if(ps->n_pkt >= ps->max_pkts && s->state != RS_DEAD)
2756 #if defined(WIN32) || defined(ARM_UCOS_LWIP)
2757 PA_MutexUnlock(s->mutex_w);
2759 writable = PA_EventWaitTimed(s->event_w, timeout->tv_sec*1000+timeout->tv_usec/1000);
2761 PA_EventWait(s->event_w);
2764 PA_MutexLock(s->mutex_w);
2767 writable = pthread_cond_timedwait(&s->event_w, &s->mutex_w, &ts) == ETIMEDOUT ? 0 : 1;
2769 writable = pthread_cond_wait(&s->event_w, &s->mutex_w) == 0;
2773 PA_MutexUnlock(s->mutex_w);
2774 if(s->err) return s->err;
2775 else return writable;
2781 int RUDPSelect(RUDPSOCKCHNO *r_rscs, int *n_rrscs, RUDPSOCKCHNO *w_rscs, int *n_wrscs,
2782 RUDPSOCKCHNO *e_rscs, int *n_erscs, const struct timeval *timeout)
2786 struct list_head *p;
2787 struct rudp_socket *s, *ss;
2788 unsigned int wait_ticks, t0;
2791 int _rfds[64], _wfds[64], _efds[64];
2793 fd_set _rfds_, _wfds_, _efds_;
2797 wait_ticks = timeout->tv_sec * 1000 + timeout->tv_usec / 1000;
2801 /* Extract udp sockets */
2802 _nr = _nw = _ne = 0;
2805 for(i=0; i<*n_rrscs; i++)
2807 if(r_rscs[i].sock == NULL)
2809 _rfds[_nr++] = r_rscs[i].chno;
2810 max_fd = max(max_fd, r_rscs[i].chno);
2816 for(i=0; i<*n_wrscs; i++)
2818 if(w_rscs[i].sock == NULL)
2820 _wfds[_nw++] = w_rscs[i].chno;
2821 max_fd = max(max_fd, w_rscs[i].chno);
2827 for(i=0; i<*n_erscs; i++)
2829 if(e_rscs[i].sock == NULL)
2831 _efds[_ne++] = e_rscs[i].chno;
2832 max_fd = max(max_fd, e_rscs[i].chno);
2839 t0 = PA_GetTickCount();
2841 PA_MutexLock(mutex_sock_list);
2845 for(i = 0; i < n; i++)
2847 if(r_rscs[i].sock == NULL) continue;
2848 s = (struct rudp_socket*)r_rscs[i].sock;
2849 if(s->err == ERUDP_RESETED || s->err == ERUDP_PEER_CLOSED)
2851 RUDP_SET(s, -1, r_rscs, nr);
2855 if(s->state <= RS_CLOSED || s->state >= RS_FIN_QUEUED)
2858 //PA_MutexUnlock(mutex_sock_list);
2859 //return ERUDP_NOT_SOCKET;
2861 PA_MutexLock(s->mutex_r);
2862 if(s->state == RS_LISTEN)
2864 list_for_each(p, &s->listen_queue)
2866 ss = list_entry(p, struct rudp_socket, listen_queue);
2867 if(ss->state == RS_ESTABLISHED) //established socket, can be returned by RUDPAccept
2869 r_rscs[nr++].sock = s;
2875 else if(s->state == RS_CLOSE_WAIT)
2877 RUDP_SET(s, -1, r_rscs, nr);
2882 for(ii=0; ii<MAX_PHY_CHANNELS; ii++)
2884 if(r_rscs[i].chno < 0 || ii == r_rscs[i].chno)
2886 prb = &s->pcb->channel[ii/*r_rscs[i].chno*/].rbuf;
2887 if(prb->pkt_q[prb->head])
2889 RUDP_SET(r_rscs[i].sock, ii, r_rscs, nr);
2895 PA_MutexUnlock(s->mutex_r);
2898 if(nr) { *n_rrscs = nr; r_rscs[nr].sock = INVALID_RUDPSOCKET; }
2904 for(i = 0; i < n; i++)
2906 if(w_rscs[i].sock == NULL) continue;
2907 s = (struct rudp_socket*)w_rscs[i].sock;
2908 if(s->state <= RS_CLOSED || s->state >= RS_FIN_QUEUED)
2910 //PA_MutexUnlock(mutex_sock_list);
2911 //return ERUDP_NOT_SOCKET;
2914 PA_MutexLock(s->mutex_w);
2915 if(s->state == RS_ESTABLISHED)
2917 for(ii=0; ii<MAX_PHY_CHANNELS; ii++)
2919 if(w_rscs[i].chno < 0 || w_rscs[i].chno == ii)
2921 struct sndbuf *psb = &s->pcb->channel[ii].sbuf;
2922 if(psb->n_pkt < psb->max_pkts)
2924 RUDP_SET(w_rscs[i].sock, ii, w_rscs, nw);
2930 else if(s->state == RS_CLOSE_WAIT)
2932 RUDP_SET(w_rscs[i].sock, -1, w_rscs, nw);
2934 PA_MutexUnlock(s->mutex_w);
2936 if(nw) { *n_wrscs = nw; w_rscs[nw].sock = INVALID_RUDPSOCKET; }
2940 for(i=0; i<*n_erscs; i++)
2942 if(e_rscs[i].sock == NULL) continue;
2943 s = (struct rudp_socket*)e_rscs[i].sock;
2944 if(s->err) RUDP_SET(s, -1, e_rscs, ne);
2946 if(ne) { *n_erscs = ne; e_rscs[ne].sock = INVALID_RUDPSOCKET; }
2948 PA_MutexUnlock(mutex_sock_list);
2950 //if(nr || nw || ne) break; //normal socket will be starved
2951 if(_nr || _nw || _ne)
2953 tv.tv_sec = 0; tv.tv_usec = 0;
2954 if(_nr) { FD_ZERO(&_rfds_); for(i=0; i<_nr; i++) FD_SET(_rfds[i], &_rfds_); }
2955 if(_nw) { FD_ZERO(&_wfds_); for(i=0; i<_nw; i++) FD_SET(_wfds[i], &_wfds_); }
2956 if(_ne) { FD_ZERO(&_efds_); for(i=0; i<_ne; i++) FD_SET(_efds[i], &_efds_); }
2957 if(select(max_fd+1, _nr?&_rfds_:NULL, _nw?&_wfds_:NULL, _ne?&_efds_:NULL, &tv) > 0)
2959 if(_nr) for(i=0; i<_nr; i++)
2960 if(FD_ISSET(_rfds[i], &_rfds_))
2962 r_rscs[nr].sock = NULL;
2963 r_rscs[nr++].chno = _rfds[i];
2965 if(_nw) for(i=0; i<_nw; i++)
2966 if(FD_ISSET(_wfds[i], &_wfds_))
2968 w_rscs[nw].sock = NULL;
2969 w_rscs[nw++].chno = _wfds[i];
2971 if(_ne) for(i=0; i<_ne; i++)
2972 if(FD_ISSET(_efds[i], &_efds_))
2974 e_rscs[ne].sock = NULL;
2975 e_rscs[ne++].chno = _efds[i];
2980 if(nr || nw || ne) break;
2982 } while(PA_GetTickCount() - t0 < wait_ticks);
2984 if(n_erscs) *n_erscs = ne;
2985 if(n_rrscs) *n_rrscs = nr;
2986 if(n_wrscs) *n_wrscs = nw;
2988 return nr + nw + ne;
2991 int RUDP_FD_ISSET(int fd, const RUDPSOCKCHNO *prc, int size)
2994 if(fd < 0) return 0;
2995 for(i=0; i<size; i++)
2997 if(prc[i].sock == NULL && prc[i].chno == fd) return 1;
3002 int RUDP_ISSET(RUDPSOCKET s, const RUDPSOCKCHNO *prc, int size)
3005 if(s == INVALID_RUDPSOCKET) return 0;
3006 for(i=0; i<size; i++)
3008 if(prc[i].sock == s) return 1;
3013 int RUDPGetSockOpt(RUDPSOCKET sock, int opt, void *optval, int *optlen)
3015 struct rudp_socket *s = (struct rudp_socket*)sock;
3016 if(s->tag != RUDP_SOCKET_TAG) return ERUDP_NOT_SOCKET;
3020 case OPT_UDP_SNDBUF:
3021 if(*optlen != sizeof(int)) return ERUDP_INVALID;
3022 PA_GetSockOpt(s->udp_sock, SOL_SOCKET, SO_SNDBUF, (char*)&opt, optlen);
3024 case OPT_UDP_RCVBUF:
3025 if(*optlen != sizeof(int)) return ERUDP_INVALID;
3026 PA_GetSockOpt(s->udp_sock, SOL_SOCKET, SO_RCVBUF, (char*)&opt, optlen);
3028 case OPT_RUDP_SNDBUF:
3029 if(*optlen != sizeof(int)) return ERUDP_INVALID;
3030 *((int*)optval) = s->pcb->channel[0].sbuf.max_pkts;
3032 case OPT_RUDP_RCVBUF:
3033 if(*optlen != sizeof(int)) return ERUDP_INVALID;
3034 *((int*)optval) = s->pcb->channel[0].rbuf.q_size;
3042 if(*optlen != sizeof(int)) return ERUDP_INVALID;
3043 *((int*)optval) = (s->flags & RF_ADHOC)?1:0;
3047 if(*optlen != sizeof(int)) return ERUDP_INVALID;
3048 *((int*)optval) = (s->flags & RF_NBLK)?1:0;
3052 if(*optlen != sizeof(int)) return ERUDP_INVALID;
3053 PA_GetSockOpt(s->udp_sock, SOL_SOCKET, SO_REUSEADDR, (char*)optval, optlen);
3056 if(*optlen != sizeof(int)) return ERUDP_INVALID;
3057 *((int*)optval) = s->err;
3060 return ERUDP_INVALID;
3065 int RUDPSetSockOpt(RUDPSOCKET sock, int opt, const void *optval, int optlen)
3068 struct rudp_socket *s = (struct rudp_socket*)sock;
3069 if(s->tag != RUDP_SOCKET_TAG) return ERUDP_NOT_SOCKET;
3073 case OPT_UDP_SNDBUF:
3074 if(optlen != sizeof(int)) return ERUDP_INVALID;
3075 setsockopt(s->udp_sock, SOL_SOCKET, SO_SNDBUF, (char*)&opt, optlen);
3077 case OPT_UDP_RCVBUF:
3078 if(optlen != sizeof(int)) return ERUDP_INVALID;
3079 setsockopt(s->udp_sock, SOL_SOCKET, SO_RCVBUF, (char*)&opt, optlen);
3081 case OPT_RUDP_SNDBUF:
3082 if(optlen != sizeof(int)) return ERUDP_INVALID;
3085 val = *((int*)optval);
3086 if(val > MAX_WINDOW) val = MAX_WINDOW;
3087 if(val < 64) val = 64;
3088 for(i=0; i<MAX_PHY_CHANNELS; i++)
3089 s->pcb->channel[i].sbuf.max_pkts = val;
3092 case OPT_RUDP_RCVBUF:
3093 if(optlen != sizeof(int)) return ERUDP_INVALID;
3094 if(!s->pcb && s->state == RS_CLOSED)
3096 val = *((int*)optval);
3097 if(val > MAX_WINDOW) val = MAX_WINDOW;
3098 if(val < 64) val = 64;
3108 if(optlen != sizeof(int)) return ERUDP_INVALID;
3109 if(s->state != RS_CLOSED || s->pcb) return ERUDP_NOT_ALLOWED;
3110 if(*((int*)optval)) s->flags |= RF_ADHOC;
3111 else s->flags &= ~RF_ADHOC;
3114 if(optlen != sizeof(int)) return ERUDP_INVALID;
3115 setsockopt(s->udp_sock, SOL_SOCKET, SO_REUSEADDR, (char*)optval, optlen);
3119 if(optlen != sizeof(int)) return ERUDP_INVALID;
3120 if(s->state < RS_CLOSED) return ERUDP_NOT_ALLOWED;
3121 if(*((int*)optval)) s->flags |= RF_NBLK;
3122 else s->flags &= ~RF_NBLK;
3126 return ERUDP_INVALID;
3131 int RUDPGetSockName(RUDPSOCKET sock, struct sockaddr *name)
3133 struct rudp_socket *s = (struct rudp_socket*)sock;
3134 socklen_t len = sizeof(struct sockaddr);
3135 return getsockname(s->udp_sock, name, &len);
3138 int RUDPGetPeerName(RUDPSOCKET sock, struct sockaddr *name)
3140 struct rudp_socket *s = (struct rudp_socket*)sock;
3143 memcpy(name, &s->pcb->peer, sizeof(struct sockaddr));
3146 memset(name, 0, sizeof(struct sockaddr));