use super::*;
-use drop_bomb::DropBomb;
-use std::{borrow::Cow, collections::HashMap, io, sync::Arc, time::Duration};
-use tokio::{
- sync::{mpsc, watch, Mutex, RwLock},
- task::JoinSet,
-};
+use std::{borrow::Cow, collections::HashMap, io, sync::Arc};
+use tokio::sync::{watch, Mutex, RwLock};
#[derive(Debug)]
pub(crate) struct Ack {
}
#[derive(Debug)]
-pub(crate) struct RudpShare<S: UdpSender> {
+pub(crate) struct RudpShare<P: UdpPeer> {
pub(crate) id: u16,
pub(crate) remote_id: RwLock<u16>,
- pub(crate) chans: Vec<Mutex<Chan>>,
- pub(crate) udp_tx: S,
- pub(crate) close_tx: watch::Sender<bool>,
- pub(crate) tasks: Mutex<JoinSet<()>>,
- pub(crate) bomb: Mutex<DropBomb>,
+ pub(crate) chans: [Mutex<Chan>; NUM_CHANS],
+ pub(crate) udp_tx: P::Sender,
+ pub(crate) close: watch::Sender<bool>,
}
-pub async fn new<S: UdpSender, R: UdpReceiver>(
+pub async fn new<P: UdpPeer>(
id: u16,
remote_id: u16,
- udp_tx: S,
- udp_rx: R,
-) -> io::Result<(RudpSender<S>, RudpReceiver<S>)> {
- let (pkt_tx, pkt_rx) = mpsc::unbounded_channel();
+ udp_tx: P::Sender,
+ udp_rx: P::Receiver,
+) -> io::Result<(RudpSender<P>, RudpReceiver<P>)> {
let (close_tx, close_rx) = watch::channel(false);
let share = Arc::new(RudpShare {
id,
remote_id: RwLock::new(remote_id),
udp_tx,
- close_tx,
- chans: (0..NUM_CHANS)
- .map(|_| {
- Mutex::new(Chan {
- acks: HashMap::new(),
- seqnum: INIT_SEQNUM,
- })
+ close: close_tx,
+ chans: std::array::from_fn(|_| {
+ Mutex::new(Chan {
+ acks: HashMap::new(),
+ seqnum: INIT_SEQNUM,
})
- .collect(),
- tasks: Mutex::new(JoinSet::new()),
- bomb: Mutex::new(DropBomb::new("rudp connection must be explicitly closed")),
+ }),
});
- let mut tasks = share.tasks.lock().await;
+ Ok((
+ RudpSender {
+ share: Arc::clone(&share),
+ },
+ RudpReceiver::new(udp_rx, share, close_rx),
+ ))
+}
- let recv_share = Arc::clone(&share);
- let recv_close = close_rx.clone();
- tasks
- /*.build_task()
- .name("recv")*/
- .spawn(async move {
- let worker = RecvWorker::new(udp_rx, recv_share, recv_close, pkt_tx);
- worker.run().await;
- });
+macro_rules! impl_share {
+ ($T:ident) => {
+ impl<P: UdpPeer> $T<P> {
+ pub async fn peer_id(&self) -> u16 {
+ self.share.id
+ }
- let resend_share = Arc::clone(&share);
- let mut resend_close = close_rx.clone();
- tasks
- /*.build_task()
- .name("resend")*/
- .spawn(async move {
- ticker!(Duration::from_millis(500), resend_close, {
- for chan in resend_share.chans.iter() {
- for (_, ack) in chan.lock().await.acks.iter() {
- resend_share.send_raw(&ack.data).await.ok(); // TODO: handle error (?)
- }
- }
- });
- });
+ pub async fn is_server(&self) -> bool {
+ self.share.id == PeerID::Srv as u16
+ }
- let ping_share = Arc::clone(&share);
- let mut ping_close = close_rx.clone();
- tasks
- /*.build_task()
- .name("ping")*/
- .spawn(async move {
- ticker!(Duration::from_secs(PING_TIMEOUT), ping_close, {
- ping_share
+ pub async fn close(self) {
+ self.share.close.send(true).ok(); // FIXME: handle err?
+
+ self.share
.send(
PktType::Ctl,
Pkt {
+ unrel: true,
chan: 0,
- unrel: false,
- data: Cow::Borrowed(&[CtlType::Ping as u8]),
+ data: Cow::Borrowed(&[CtlType::Disco as u8]),
},
)
.await
.ok();
- });
- });
-
- drop(tasks);
-
- Ok((
- RudpSender {
- share: Arc::clone(&share),
- },
- RudpReceiver { share, pkt_rx },
- ))
+ }
+ }
+ };
}
+
+impl_share!(RudpReceiver);
+impl_share!(RudpSender);