]> git.lizzy.rs Git - mt_rudp.git/blobdiff - src/share.rs
Don't spawn tasks
[mt_rudp.git] / src / share.rs
index a2afc4c2f9c3bd40ff7c4e91aee5224e61fcbb21..02e37b2dc76de444ffd8770dfc3665b373ab142d 100644 (file)
@@ -1,10 +1,6 @@
 use super::*;
-use drop_bomb::DropBomb;
-use std::{borrow::Cow, collections::HashMap, io, sync::Arc, time::Duration};
-use tokio::{
-    sync::{mpsc, watch, Mutex, RwLock},
-    task::JoinSet,
-};
+use std::{borrow::Cow, collections::HashMap, io, sync::Arc};
+use tokio::sync::{watch, Mutex, RwLock};
 
 #[derive(Debug)]
 pub(crate) struct Ack {
@@ -20,96 +16,72 @@ pub(crate) struct Chan {
 }
 
 #[derive(Debug)]
-pub(crate) struct RudpShare<S: UdpSender> {
+pub(crate) struct RudpShare<P: UdpPeer> {
     pub(crate) id: u16,
     pub(crate) remote_id: RwLock<u16>,
-    pub(crate) chans: Vec<Mutex<Chan>>,
-    pub(crate) udp_tx: S,
-    pub(crate) close_tx: watch::Sender<bool>,
-    pub(crate) tasks: Mutex<JoinSet<()>>,
-    pub(crate) bomb: Mutex<DropBomb>,
+    pub(crate) chans: [Mutex<Chan>; NUM_CHANS],
+    pub(crate) udp_tx: P::Sender,
+    pub(crate) close: watch::Sender<bool>,
 }
 
-pub async fn new<S: UdpSender, R: UdpReceiver>(
+pub async fn new<P: UdpPeer>(
     id: u16,
     remote_id: u16,
-    udp_tx: S,
-    udp_rx: R,
-) -> io::Result<(RudpSender<S>, RudpReceiver<S>)> {
-    let (pkt_tx, pkt_rx) = mpsc::unbounded_channel();
+    udp_tx: P::Sender,
+    udp_rx: P::Receiver,
+) -> io::Result<(RudpSender<P>, RudpReceiver<P>)> {
     let (close_tx, close_rx) = watch::channel(false);
 
     let share = Arc::new(RudpShare {
         id,
         remote_id: RwLock::new(remote_id),
         udp_tx,
-        close_tx,
-        chans: (0..NUM_CHANS)
-            .map(|_| {
-                Mutex::new(Chan {
-                    acks: HashMap::new(),
-                    seqnum: INIT_SEQNUM,
-                })
+        close: close_tx,
+        chans: std::array::from_fn(|_| {
+            Mutex::new(Chan {
+                acks: HashMap::new(),
+                seqnum: INIT_SEQNUM,
             })
-            .collect(),
-        tasks: Mutex::new(JoinSet::new()),
-        bomb: Mutex::new(DropBomb::new("rudp connection must be explicitly closed")),
+        }),
     });
 
-    let mut tasks = share.tasks.lock().await;
+    Ok((
+        RudpSender {
+            share: Arc::clone(&share),
+        },
+        RudpReceiver::new(udp_rx, share, close_rx),
+    ))
+}
 
-    let recv_share = Arc::clone(&share);
-    let recv_close = close_rx.clone();
-    tasks
-        /*.build_task()
-        .name("recv")*/
-        .spawn(async move {
-            let worker = RecvWorker::new(udp_rx, recv_share, recv_close, pkt_tx);
-            worker.run().await;
-        });
+macro_rules! impl_share {
+    ($T:ident) => {
+        impl<P: UdpPeer> $T<P> {
+            pub async fn peer_id(&self) -> u16 {
+                self.share.id
+            }
 
-    let resend_share = Arc::clone(&share);
-    let mut resend_close = close_rx.clone();
-    tasks
-        /*.build_task()
-        .name("resend")*/
-        .spawn(async move {
-            ticker!(Duration::from_millis(500), resend_close, {
-                for chan in resend_share.chans.iter() {
-                    for (_, ack) in chan.lock().await.acks.iter() {
-                        resend_share.send_raw(&ack.data).await.ok(); // TODO: handle error (?)
-                    }
-                }
-            });
-        });
+            pub async fn is_server(&self) -> bool {
+                self.share.id == PeerID::Srv as u16
+            }
 
-    let ping_share = Arc::clone(&share);
-    let mut ping_close = close_rx.clone();
-    tasks
-        /*.build_task()
-        .name("ping")*/
-        .spawn(async move {
-            ticker!(Duration::from_secs(PING_TIMEOUT), ping_close, {
-                ping_share
+            pub async fn close(self) {
+                self.share.close.send(true).ok(); // FIXME: handle err?
+
+                self.share
                     .send(
                         PktType::Ctl,
                         Pkt {
+                            unrel: true,
                             chan: 0,
-                            unrel: false,
-                            data: Cow::Borrowed(&[CtlType::Ping as u8]),
+                            data: Cow::Borrowed(&[CtlType::Disco as u8]),
                         },
                     )
                     .await
                     .ok();
-            });
-        });
-
-    drop(tasks);
-
-    Ok((
-        RudpSender {
-            share: Arc::clone(&share),
-        },
-        RudpReceiver { share, pkt_rx },
-    ))
+            }
+        }
+    };
 }
+
+impl_share!(RudpReceiver);
+impl_share!(RudpSender);