]> git.lizzy.rs Git - mt_rudp.git/blob - src/share.rs
DropBomb to ensure close() is called
[mt_rudp.git] / src / share.rs
1 use super::*;
2 use drop_bomb::DropBomb;
3 use std::{borrow::Cow, collections::HashMap, io, sync::Arc, time::Duration};
4 use tokio::{
5     sync::{mpsc, watch, Mutex, RwLock},
6     task::JoinSet,
7 };
8
9 #[derive(Debug)]
10 pub(crate) struct Ack {
11     pub(crate) tx: watch::Sender<bool>,
12     pub(crate) rx: watch::Receiver<bool>,
13     pub(crate) data: Vec<u8>,
14 }
15
16 #[derive(Debug)]
17 pub(crate) struct Chan {
18     pub(crate) acks: HashMap<u16, Ack>,
19     pub(crate) seqnum: u16,
20 }
21
22 #[derive(Debug)]
23 pub(crate) struct RudpShare<S: UdpSender> {
24     pub(crate) id: u16,
25     pub(crate) remote_id: RwLock<u16>,
26     pub(crate) chans: Vec<Mutex<Chan>>,
27     pub(crate) udp_tx: S,
28     pub(crate) close_tx: watch::Sender<bool>,
29     pub(crate) tasks: Mutex<JoinSet<()>>,
30     pub(crate) bomb: Mutex<DropBomb>,
31 }
32
33 pub async fn new<S: UdpSender, R: UdpReceiver>(
34     id: u16,
35     remote_id: u16,
36     udp_tx: S,
37     udp_rx: R,
38 ) -> io::Result<(RudpSender<S>, RudpReceiver<S>)> {
39     let (pkt_tx, pkt_rx) = mpsc::unbounded_channel();
40     let (close_tx, close_rx) = watch::channel(false);
41
42     let share = Arc::new(RudpShare {
43         id,
44         remote_id: RwLock::new(remote_id),
45         udp_tx,
46         close_tx,
47         chans: (0..NUM_CHANS)
48             .map(|_| {
49                 Mutex::new(Chan {
50                     acks: HashMap::new(),
51                     seqnum: INIT_SEQNUM,
52                 })
53             })
54             .collect(),
55         tasks: Mutex::new(JoinSet::new()),
56         bomb: Mutex::new(DropBomb::new("rudp connection must be explicitly closed")),
57     });
58
59     let mut tasks = share.tasks.lock().await;
60
61     let recv_share = Arc::clone(&share);
62     let recv_close = close_rx.clone();
63     tasks
64         /*.build_task()
65         .name("recv")*/
66         .spawn(async move {
67             let worker = RecvWorker::new(udp_rx, recv_share, recv_close, pkt_tx);
68             worker.run().await;
69         });
70
71     let resend_share = Arc::clone(&share);
72     let mut resend_close = close_rx.clone();
73     tasks
74         /*.build_task()
75         .name("resend")*/
76         .spawn(async move {
77             ticker!(Duration::from_millis(500), resend_close, {
78                 for chan in resend_share.chans.iter() {
79                     for (_, ack) in chan.lock().await.acks.iter() {
80                         resend_share.send_raw(&ack.data).await.ok(); // TODO: handle error (?)
81                     }
82                 }
83             });
84         });
85
86     let ping_share = Arc::clone(&share);
87     let mut ping_close = close_rx.clone();
88     tasks
89         /*.build_task()
90         .name("ping")*/
91         .spawn(async move {
92             ticker!(Duration::from_secs(PING_TIMEOUT), ping_close, {
93                 ping_share
94                     .send(
95                         PktType::Ctl,
96                         Pkt {
97                             chan: 0,
98                             unrel: false,
99                             data: Cow::Borrowed(&[CtlType::Ping as u8]),
100                         },
101                     )
102                     .await
103                     .ok();
104             });
105         });
106
107     drop(tasks);
108
109     Ok((
110         RudpSender {
111             share: Arc::clone(&share),
112         },
113         RudpReceiver { share, pkt_rx },
114     ))
115 }