2 use drop_bomb::DropBomb;
3 use std::{borrow::Cow, collections::HashMap, io, sync::Arc, time::Duration};
5 sync::{mpsc, watch, Mutex, RwLock},
10 pub(crate) struct Ack {
11 pub(crate) tx: watch::Sender<bool>,
12 pub(crate) rx: watch::Receiver<bool>,
13 pub(crate) data: Vec<u8>,
17 pub(crate) struct Chan {
18 pub(crate) acks: HashMap<u16, Ack>,
19 pub(crate) seqnum: u16,
23 pub(crate) struct RudpShare<S: UdpSender> {
25 pub(crate) remote_id: RwLock<u16>,
26 pub(crate) chans: Vec<Mutex<Chan>>,
28 pub(crate) close_tx: watch::Sender<bool>,
29 pub(crate) tasks: Mutex<JoinSet<()>>,
30 pub(crate) bomb: Mutex<DropBomb>,
33 pub async fn new<S: UdpSender, R: UdpReceiver>(
38 ) -> io::Result<(RudpSender<S>, RudpReceiver<S>)> {
39 let (pkt_tx, pkt_rx) = mpsc::unbounded_channel();
40 let (close_tx, close_rx) = watch::channel(false);
42 let share = Arc::new(RudpShare {
44 remote_id: RwLock::new(remote_id),
55 tasks: Mutex::new(JoinSet::new()),
56 bomb: Mutex::new(DropBomb::new("rudp connection must be explicitly closed")),
59 let mut tasks = share.tasks.lock().await;
61 let recv_share = Arc::clone(&share);
62 let recv_close = close_rx.clone();
67 let worker = RecvWorker::new(udp_rx, recv_share, recv_close, pkt_tx);
71 let resend_share = Arc::clone(&share);
72 let mut resend_close = close_rx.clone();
77 ticker!(Duration::from_millis(500), resend_close, {
78 for chan in resend_share.chans.iter() {
79 for (_, ack) in chan.lock().await.acks.iter() {
80 resend_share.send_raw(&ack.data).await.ok(); // TODO: handle error (?)
86 let ping_share = Arc::clone(&share);
87 let mut ping_close = close_rx.clone();
92 ticker!(Duration::from_secs(PING_TIMEOUT), ping_close, {
99 data: Cow::Borrowed(&[CtlType::Ping as u8]),
111 share: Arc::clone(&share),
113 RudpReceiver { share, pkt_rx },