]> git.lizzy.rs Git - mt_rudp.git/blob - src/share.rs
Rework structure
[mt_rudp.git] / src / share.rs
1 use super::*;
2 use std::{collections::HashMap, io, sync::Arc, time::Duration};
3 use tokio::{
4     sync::{mpsc, watch, Mutex, RwLock},
5     task::JoinSet,
6 };
7
8 #[derive(Debug)]
9 pub(crate) struct Ack {
10     pub(crate) tx: watch::Sender<bool>,
11     pub(crate) rx: watch::Receiver<bool>,
12     pub(crate) data: Vec<u8>,
13 }
14
15 #[derive(Debug)]
16 pub(crate) struct Chan {
17     pub(crate) acks: HashMap<u16, Ack>,
18     pub(crate) seqnum: u16,
19 }
20
21 #[derive(Debug)]
22 pub(crate) struct RudpShare<S: UdpSender> {
23     pub(crate) id: u16,
24     pub(crate) remote_id: RwLock<u16>,
25     pub(crate) chans: Vec<Mutex<Chan>>,
26     pub(crate) udp_tx: S,
27     pub(crate) close_tx: watch::Sender<bool>,
28     pub(crate) tasks: Mutex<JoinSet<()>>,
29 }
30
31 pub async fn new<S: UdpSender, R: UdpReceiver>(
32     id: u16,
33     remote_id: u16,
34     udp_tx: S,
35     udp_rx: R,
36 ) -> io::Result<(RudpSender<S>, RudpReceiver<S>)> {
37     let (pkt_tx, pkt_rx) = mpsc::unbounded_channel();
38     let (close_tx, close_rx) = watch::channel(false);
39
40     let share = Arc::new(RudpShare {
41         id,
42         remote_id: RwLock::new(remote_id),
43         udp_tx,
44         close_tx,
45         chans: (0..NUM_CHANS)
46             .map(|_| {
47                 Mutex::new(Chan {
48                     acks: HashMap::new(),
49                     seqnum: INIT_SEQNUM,
50                 })
51             })
52             .collect(),
53         tasks: Mutex::new(JoinSet::new()),
54     });
55
56     let mut tasks = share.tasks.lock().await;
57
58     let recv_share = Arc::clone(&share);
59     let recv_close = close_rx.clone();
60     tasks
61         /*.build_task()
62         .name("recv")*/
63         .spawn(async move {
64             let worker = RecvWorker::new(udp_rx, recv_share, recv_close, pkt_tx);
65             worker.run().await;
66         });
67
68     let resend_share = Arc::clone(&share);
69     let mut resend_close = close_rx.clone();
70     tasks
71         /*.build_task()
72         .name("resend")*/
73         .spawn(async move {
74             ticker!(Duration::from_millis(500), resend_close, {
75                 for chan in resend_share.chans.iter() {
76                     for (_, ack) in chan.lock().await.acks.iter() {
77                         resend_share.send_raw(&ack.data).await.ok(); // TODO: handle error (?)
78                     }
79                 }
80             });
81         });
82
83     let ping_share = Arc::clone(&share);
84     let mut ping_close = close_rx.clone();
85     tasks
86         /*.build_task()
87         .name("ping")*/
88         .spawn(async move {
89             ticker!(Duration::from_secs(PING_TIMEOUT), ping_close, {
90                 ping_share
91                     .send(
92                         PktType::Ctl,
93                         Pkt {
94                             chan: 0,
95                             unrel: false,
96                             data: &[CtlType::Ping as u8],
97                         },
98                     )
99                     .await
100                     .ok();
101             });
102         });
103
104     drop(tasks);
105
106     Ok((
107         RudpSender {
108             share: Arc::clone(&share),
109         },
110         RudpReceiver { share, pkt_rx },
111     ))
112 }