1 use crate::{prelude::*, ticker, Chan, RecvWorker, RudpShare};
2 use std::{collections::HashMap, io, sync::Arc, time::Duration};
4 sync::{mpsc, watch, Mutex, RwLock},
8 pub async fn new<S: UdpSender, R: UdpReceiver>(
13 ) -> io::Result<(RudpSender<S>, RudpReceiver<S>)> {
14 let (pkt_tx, pkt_rx) = mpsc::unbounded_channel();
15 let (close_tx, close_rx) = watch::channel(false);
17 let share = Arc::new(RudpShare {
19 remote_id: RwLock::new(remote_id),
30 tasks: Mutex::new(JoinSet::new()),
33 let mut tasks = share.tasks.lock().await;
35 let recv_share = Arc::clone(&share);
36 let recv_close = close_rx.clone();
41 let worker = RecvWorker::new(udp_rx, recv_share, recv_close, pkt_tx);
45 let resend_share = Arc::clone(&share);
46 let mut resend_close = close_rx.clone();
51 ticker!(Duration::from_millis(500), resend_close, {
52 for chan in resend_share.chans.iter() {
53 for (_, ack) in chan.lock().await.acks.iter() {
54 resend_share.send_raw(&ack.data).await.ok(); // TODO: handle error (?)
60 let ping_share = Arc::clone(&share);
61 let mut ping_close = close_rx.clone();
66 ticker!(Duration::from_secs(PING_TIMEOUT), ping_close, {
73 data: &[CtlType::Ping as u8],
85 share: Arc::clone(&share),
87 RudpReceiver { share, pkt_rx },