]> git.lizzy.rs Git - mt_rudp.git/blob - src/new.rs
b17f518f2bf87138eada268efdebe3717af12401
[mt_rudp.git] / src / new.rs
1 use crate::{prelude::*, ticker, Chan, RecvWorker, RudpShare};
2 use std::{collections::HashMap, io, sync::Arc, time::Duration};
3 use tokio::{
4     sync::{mpsc, watch, Mutex, RwLock},
5     task::JoinSet,
6 };
7
8 pub async fn new<S: UdpSender, R: UdpReceiver>(
9     id: u16,
10     remote_id: u16,
11     udp_tx: S,
12     udp_rx: R,
13 ) -> io::Result<(RudpSender<S>, RudpReceiver<S>)> {
14     let (pkt_tx, pkt_rx) = mpsc::unbounded_channel();
15     let (close_tx, close_rx) = watch::channel(false);
16
17     let share = Arc::new(RudpShare {
18         id,
19         remote_id: RwLock::new(remote_id),
20         udp_tx,
21         close_tx,
22         chans: (0..NUM_CHANS)
23             .map(|_| {
24                 Mutex::new(Chan {
25                     acks: HashMap::new(),
26                     seqnum: INIT_SEQNUM,
27                 })
28             })
29             .collect(),
30         tasks: Mutex::new(JoinSet::new()),
31     });
32
33     let mut tasks = share.tasks.lock().await;
34
35     let recv_share = Arc::clone(&share);
36     let recv_close = close_rx.clone();
37     tasks
38         /*.build_task()
39         .name("recv")*/
40         .spawn(async move {
41             let worker = RecvWorker::new(udp_rx, recv_share, recv_close, pkt_tx);
42             worker.run().await;
43         });
44
45     let resend_share = Arc::clone(&share);
46     let mut resend_close = close_rx.clone();
47     tasks
48         /*.build_task()
49         .name("resend")*/
50         .spawn(async move {
51             ticker!(Duration::from_millis(500), resend_close, {
52                 for chan in resend_share.chans.iter() {
53                     for (_, ack) in chan.lock().await.acks.iter() {
54                         resend_share.send_raw(&ack.data).await.ok(); // TODO: handle error (?)
55                     }
56                 }
57             });
58         });
59
60     let ping_share = Arc::clone(&share);
61     let mut ping_close = close_rx.clone();
62     tasks
63         /*.build_task()
64         .name("ping")*/
65         .spawn(async move {
66             ticker!(Duration::from_secs(PING_TIMEOUT), ping_close, {
67                 ping_share
68                     .send(
69                         PktType::Ctl,
70                         Pkt {
71                             chan: 0,
72                             unrel: false,
73                             data: &[CtlType::Ping as u8],
74                         },
75                     )
76                     .await
77                     .ok();
78             });
79         });
80
81     drop(tasks);
82
83     Ok((
84         RudpSender {
85             share: Arc::clone(&share),
86         },
87         RudpReceiver { share, pkt_rx },
88     ))
89 }