]> git.lizzy.rs Git - mt_rudp.git/blob - src/new.rs
timeouts
[mt_rudp.git] / src / new.rs
1 use crate::*;
2
3 pub async fn new<S: UdpSender, R: UdpReceiver>(
4     id: u16,
5     remote_id: u16,
6     udp_tx: S,
7     udp_rx: R,
8 ) -> io::Result<(RudpSender<S>, RudpReceiver<S>)> {
9     let (pkt_tx, pkt_rx) = mpsc::unbounded_channel();
10     let (close_tx, close_rx) = watch::channel(false);
11
12     let share = Arc::new(RudpShare {
13         id,
14         remote_id: RwLock::new(remote_id),
15         udp_tx,
16         close_tx,
17         chans: (0..NUM_CHANS)
18             .map(|_| {
19                 Mutex::new(Chan {
20                     acks: HashMap::new(),
21                     seqnum: INIT_SEQNUM,
22                 })
23             })
24             .collect(),
25         tasks: Mutex::new(JoinSet::new()),
26     });
27
28     let mut tasks = share.tasks.lock().await;
29
30     let recv_share = Arc::clone(&share);
31     let recv_close = close_rx.clone();
32     tasks
33         /*.build_task()
34         .name("recv")*/
35         .spawn(async move {
36             let worker = recv::RecvWorker::new(udp_rx, recv_share, recv_close, pkt_tx);
37             worker.run().await;
38         });
39
40     let resend_share = Arc::clone(&share);
41     let mut resend_close = close_rx.clone();
42     tasks
43         /*.build_task()
44         .name("resend")*/
45         .spawn(async move {
46             ticker!(Duration::from_millis(500), resend_close, {
47                 for chan in resend_share.chans.iter() {
48                     for (_, ack) in chan.lock().await.acks.iter() {
49                         resend_share.send_raw(&ack.data).await.ok(); // TODO: handle error (?)
50                     }
51                 }
52             });
53         });
54
55     let ping_share = Arc::clone(&share);
56     let mut ping_close = close_rx.clone();
57     tasks
58         /*.build_task()
59         .name("ping")*/
60         .spawn(async move {
61             ticker!(Duration::from_secs(PING_TIMEOUT), ping_close, {
62                 ping_share
63                     .send(
64                         PktType::Ctl,
65                         Pkt {
66                             chan: 0,
67                             unrel: false,
68                             data: &[CtlType::Ping as u8],
69                         },
70                     )
71                     .await
72                     .ok();
73             });
74         });
75
76     drop(tasks);
77
78     Ok((
79         RudpSender {
80             share: Arc::clone(&share),
81         },
82         RudpReceiver { share, pkt_rx },
83     ))
84 }