]> git.lizzy.rs Git - mt_rudp.git/commitdiff
Rework structure
authorLizzy Fleckenstein <eliasfleckenstein@web.de>
Wed, 15 Feb 2023 20:54:25 +0000 (21:54 +0100)
committerLizzy Fleckenstein <eliasfleckenstein@web.de>
Wed, 15 Feb 2023 20:55:47 +0000 (21:55 +0100)
Cargo.toml
examples/example.rs
src/client.rs
src/common.rs [new file with mode: 0644]
src/error.rs
src/lib.rs
src/new.rs [deleted file]
src/recv.rs
src/send.rs
src/share.rs [new file with mode: 0644]

index 0b5f690e4eb242b76947ef59df8f99387bae9589..8e5f7ef186d2b67403b04f12347ec6e7bca9cff0 100644 (file)
@@ -8,6 +8,7 @@ async-recursion = "1.0.0"
 async-trait = "0.1.60"
 byteorder = "1.4.3"
 delegate = "0.9.0"
+drop_bomb = "0.1.5"
 num_enum = "0.5.7"
 thiserror = "1.0.38"
 tokio = { version = "1.23.0", features = ["sync", "time", "net", "signal", "macros", "rt"] }
index fc5f1fbc6130e4605cd9cb0e4923d0f942713dd0..8625243ebb7cd5290bdc812056fbf613b9e372a1 100644 (file)
@@ -1,9 +1,9 @@
 use byteorder::{BigEndian, WriteBytesExt};
-use mt_rudp::{Client, RudpReceiver, RudpSender};
+use mt_rudp::{RudpReceiver, RudpSender, ToSrv};
 use pretty_hex::PrettyHex;
 use std::io::{self, Write};
 
-async fn example(tx: &RudpSender<Client>, rx: &mut RudpReceiver<Client>) -> io::Result<()> {
+async fn example(tx: &RudpSender<ToSrv>, rx: &mut RudpReceiver<ToSrv>) -> io::Result<()> {
     // send hello packet
     let mut pkt = vec![];
     pkt.write_u16::<BigEndian>(2)?; // high level type
index 6785a1f0f964f034317c2962274ca3809e6c8d09..c4922ec9c7d45c7bc45440f2f3bd733efb7906de 100644 (file)
@@ -1,48 +1,44 @@
-use crate::prelude::*;
+use super::*;
 use async_trait::async_trait;
 use std::{io, sync::Arc};
 use tokio::net;
 
-pub struct Sender {
-    sock: Arc<net::UdpSocket>,
-}
+#[derive(Debug)]
+pub struct ToSrv(Arc<net::UdpSocket>);
+
+#[derive(Debug)]
+pub struct FromSrv(Arc<net::UdpSocket>);
 
 #[async_trait]
-impl UdpSender for Sender {
+impl UdpSender for ToSrv {
     async fn send(&self, data: &[u8]) -> io::Result<()> {
-        self.sock.send(data).await?;
+        self.0.send(data).await?;
         Ok(())
     }
 }
 
-pub struct Receiver {
-    sock: Arc<net::UdpSocket>,
-}
-
 #[async_trait]
-impl UdpReceiver for Receiver {
+impl UdpReceiver for FromSrv {
     async fn recv(&self) -> io::Result<Vec<u8>> {
         let mut buffer = Vec::new();
         buffer.resize(UDP_PKT_SIZE, 0);
 
-        let len = self.sock.recv(&mut buffer).await?;
+        let len = self.0.recv(&mut buffer).await?;
         buffer.truncate(len);
 
         Ok(buffer)
     }
 }
 
-pub async fn connect(addr: &str) -> io::Result<(RudpSender<Sender>, RudpReceiver<Sender>)> {
+pub async fn connect(addr: &str) -> io::Result<(RudpSender<ToSrv>, RudpReceiver<ToSrv>)> {
     let sock = Arc::new(net::UdpSocket::bind("0.0.0.0:0").await?);
     sock.connect(addr).await?;
 
     new(
         PeerID::Srv as u16,
         PeerID::Nil as u16,
-        Sender {
-            sock: Arc::clone(&sock),
-        },
-        Receiver { sock },
+        ToSrv(Arc::clone(&sock)),
+        FromSrv(sock),
     )
     .await
 }
diff --git a/src/common.rs b/src/common.rs
new file mode 100644 (file)
index 0000000..797ccd1
--- /dev/null
@@ -0,0 +1,104 @@
+use super::*;
+use async_trait::async_trait;
+use delegate::delegate;
+use num_enum::TryFromPrimitive;
+use std::{io, sync::Arc};
+use tokio::sync::mpsc;
+
+pub const PROTO_ID: u32 = 0x4f457403;
+pub const UDP_PKT_SIZE: usize = 512;
+pub const NUM_CHANS: usize = 3;
+pub const REL_BUFFER: usize = 0x8000;
+pub const INIT_SEQNUM: u16 = 65500;
+pub const TIMEOUT: u64 = 30;
+pub const PING_TIMEOUT: u64 = 5;
+
+#[async_trait]
+pub trait UdpSender: Send + Sync + 'static {
+    async fn send(&self, data: &[u8]) -> io::Result<()>;
+}
+
+#[async_trait]
+pub trait UdpReceiver: Send + Sync + 'static {
+    async fn recv(&self) -> io::Result<Vec<u8>>;
+}
+
+#[derive(Debug, Copy, Clone, PartialEq)]
+#[repr(u16)]
+pub enum PeerID {
+    Nil = 0,
+    Srv,
+    CltMin,
+}
+
+#[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
+#[repr(u8)]
+pub enum PktType {
+    Ctl = 0,
+    Orig,
+    Split,
+    Rel,
+}
+
+#[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
+#[repr(u8)]
+pub enum CtlType {
+    Ack = 0,
+    SetPeerID,
+    Ping,
+    Disco,
+}
+
+#[derive(Debug)]
+pub struct Pkt<T> {
+    pub unrel: bool,
+    pub chan: u8,
+    pub data: T,
+}
+
+pub type InPkt = Result<Pkt<Vec<u8>>, error::Error>;
+
+#[derive(Debug)]
+pub struct RudpReceiver<S: UdpSender> {
+    pub(crate) share: Arc<RudpShare<S>>,
+    pub(crate) pkt_rx: mpsc::UnboundedReceiver<InPkt>,
+}
+
+#[derive(Debug)]
+pub struct RudpSender<S: UdpSender> {
+    pub(crate) share: Arc<RudpShare<S>>,
+}
+
+macro_rules! impl_share {
+    ($T:ident) => {
+        impl<S: UdpSender> $T<S> {
+            pub async fn peer_id(&self) -> u16 {
+                self.share.id
+            }
+
+            pub async fn is_server(&self) -> bool {
+                self.share.id == PeerID::Srv as u16
+            }
+
+            pub async fn close(self) {
+                self.share.close_tx.send(true).ok();
+
+                let mut tasks = self.share.tasks.lock().await;
+                while let Some(res) = tasks.join_next().await {
+                    res.ok(); // TODO: handle error (?)
+                }
+            }
+        }
+    };
+}
+
+impl_share!(RudpReceiver);
+impl_share!(RudpSender);
+
+impl<S: UdpSender> RudpReceiver<S> {
+    delegate! {
+        to self.pkt_rx {
+            pub async fn recv(&mut self) -> Option<InPkt>;
+        }
+    }
+}
index bac843aaf3462d048c900bc3371ffc45f0285f52..7cfc0578572766d985e5ac6869cfa2c0a1c1d8bb 100644 (file)
@@ -1,4 +1,4 @@
-use crate::prelude::*;
+use super::*;
 use num_enum::TryFromPrimitiveError;
 use thiserror::Error;
 use tokio::sync::mpsc::error::SendError;
index 76f0311dca127c322aea84bc89b5ae5ae9f42dd8..a02eb206363e9f86673d6aa8b86be0f735c510d3 100644 (file)
 #![feature(hash_drain_filter)]
 #![feature(once_cell)]
 mod client;
+mod common;
 mod error;
-mod new;
 mod recv;
 mod send;
+mod share;
 
-pub use prelude::*;
-
-use async_trait::async_trait;
-use delegate::delegate;
-use num_enum::TryFromPrimitive;
-use std::{cell::OnceCell, collections::HashMap, io, sync::Arc, time::Instant};
-use tokio::{
-    sync::{mpsc, watch, Mutex, RwLock},
-    task::JoinSet,
-};
-
-#[async_trait]
-pub trait UdpSender: Send + Sync + 'static {
-    async fn send(&self, data: &[u8]) -> io::Result<()>;
-}
-
-#[async_trait]
-pub trait UdpReceiver: Send + Sync + 'static {
-    async fn recv(&self) -> io::Result<Vec<u8>>;
-}
-
-#[derive(Debug, Copy, Clone, PartialEq)]
-#[repr(u16)]
-pub enum PeerID {
-    Nil = 0,
-    Srv,
-    CltMin,
-}
-
-#[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
-#[repr(u8)]
-pub enum PktType {
-    Ctl = 0,
-    Orig,
-    Split,
-    Rel,
-}
-
-#[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
-#[repr(u8)]
-pub enum CtlType {
-    Ack = 0,
-    SetPeerID,
-    Ping,
-    Disco,
-}
-
-#[derive(Debug)]
-pub struct Pkt<T> {
-    pub unrel: bool,
-    pub chan: u8,
-    pub data: T,
-}
-
-pub type InPkt = Result<Pkt<Vec<u8>>, error::Error>;
-
-#[derive(Debug)]
-struct Ack {
-    tx: watch::Sender<bool>,
-    rx: watch::Receiver<bool>,
-    data: Vec<u8>,
-}
-
-#[derive(Debug)]
-struct Chan {
-    acks: HashMap<u16, Ack>,
-    seqnum: u16,
-}
-
-#[derive(Debug)]
-struct RudpShare<S: UdpSender> {
-    id: u16,
-    remote_id: RwLock<u16>,
-    chans: Vec<Mutex<Chan>>,
-    udp_tx: S,
-    close_tx: watch::Sender<bool>,
-    tasks: Mutex<JoinSet<()>>,
-}
-
-#[derive(Debug)]
-pub struct RudpReceiver<S: UdpSender> {
-    share: Arc<RudpShare<S>>,
-    pkt_rx: mpsc::UnboundedReceiver<InPkt>,
-}
-
-#[derive(Debug)]
-pub struct RudpSender<S: UdpSender> {
-    share: Arc<RudpShare<S>>,
-}
-
-macro_rules! impl_share {
-    ($T:ident) => {
-        impl<S: UdpSender> $T<S> {
-            pub async fn peer_id(&self) -> u16 {
-                self.share.id
-            }
-
-            pub async fn is_server(&self) -> bool {
-                self.share.id == PeerID::Srv as u16
-            }
-
-            pub async fn close(self) {
-                self.share.close_tx.send(true).ok();
-
-                let mut tasks = self.share.tasks.lock().await;
-                while let Some(res) = tasks.join_next().await {
-                    res.ok(); // TODO: handle error (?)
-                }
-            }
-        }
-    };
-}
-
-impl_share!(RudpReceiver);
-impl_share!(RudpSender);
-
-impl<S: UdpSender> RudpReceiver<S> {
-    delegate! {
-        to self.pkt_rx {
-            pub async fn recv(&mut self) -> Option<InPkt>;
-        }
-    }
-}
-
-#[derive(Debug)]
-struct Split {
-    timestamp: Option<Instant>,
-    chunks: Vec<OnceCell<Vec<u8>>>,
-    got: usize,
-}
-
-struct RecvChan {
-    packets: Vec<Option<Vec<u8>>>, // char ** ðŸ˜›
-    splits: HashMap<u16, Split>,
-    seqnum: u16,
-    num: u8,
-}
-
-struct RecvWorker<R: UdpReceiver, S: UdpSender> {
-    share: Arc<RudpShare<S>>,
-    close: watch::Receiver<bool>,
-    chans: Arc<Vec<Mutex<RecvChan>>>,
-    pkt_tx: mpsc::UnboundedSender<InPkt>,
-    udp_rx: R,
-}
-
-mod prelude {
-    pub const PROTO_ID: u32 = 0x4f457403;
-    pub const UDP_PKT_SIZE: usize = 512;
-    pub const NUM_CHANS: usize = 3;
-    pub const REL_BUFFER: usize = 0x8000;
-    pub const INIT_SEQNUM: u16 = 65500;
-    pub const TIMEOUT: u64 = 30;
-    pub const PING_TIMEOUT: u64 = 5;
-
-    pub use super::{
-        client::{connect, Sender as Client},
-        error::Error,
-        new::new,
-        CtlType, InPkt, PeerID, Pkt, PktType, RudpReceiver, RudpSender, UdpReceiver, UdpSender,
-    };
+pub use client::*;
+pub use common::*;
+pub use error::*;
+use recv::*;
+pub use send::*;
+pub use share::*;
+pub use ticker_mod::*;
 
+mod ticker_mod {
     #[macro_export]
     macro_rules! ticker {
                ($duration:expr, $close:expr, $body:block) => {
diff --git a/src/new.rs b/src/new.rs
deleted file mode 100644 (file)
index b17f518..0000000
+++ /dev/null
@@ -1,89 +0,0 @@
-use crate::{prelude::*, ticker, Chan, RecvWorker, RudpShare};
-use std::{collections::HashMap, io, sync::Arc, time::Duration};
-use tokio::{
-    sync::{mpsc, watch, Mutex, RwLock},
-    task::JoinSet,
-};
-
-pub async fn new<S: UdpSender, R: UdpReceiver>(
-    id: u16,
-    remote_id: u16,
-    udp_tx: S,
-    udp_rx: R,
-) -> io::Result<(RudpSender<S>, RudpReceiver<S>)> {
-    let (pkt_tx, pkt_rx) = mpsc::unbounded_channel();
-    let (close_tx, close_rx) = watch::channel(false);
-
-    let share = Arc::new(RudpShare {
-        id,
-        remote_id: RwLock::new(remote_id),
-        udp_tx,
-        close_tx,
-        chans: (0..NUM_CHANS)
-            .map(|_| {
-                Mutex::new(Chan {
-                    acks: HashMap::new(),
-                    seqnum: INIT_SEQNUM,
-                })
-            })
-            .collect(),
-        tasks: Mutex::new(JoinSet::new()),
-    });
-
-    let mut tasks = share.tasks.lock().await;
-
-    let recv_share = Arc::clone(&share);
-    let recv_close = close_rx.clone();
-    tasks
-        /*.build_task()
-        .name("recv")*/
-        .spawn(async move {
-            let worker = RecvWorker::new(udp_rx, recv_share, recv_close, pkt_tx);
-            worker.run().await;
-        });
-
-    let resend_share = Arc::clone(&share);
-    let mut resend_close = close_rx.clone();
-    tasks
-        /*.build_task()
-        .name("resend")*/
-        .spawn(async move {
-            ticker!(Duration::from_millis(500), resend_close, {
-                for chan in resend_share.chans.iter() {
-                    for (_, ack) in chan.lock().await.acks.iter() {
-                        resend_share.send_raw(&ack.data).await.ok(); // TODO: handle error (?)
-                    }
-                }
-            });
-        });
-
-    let ping_share = Arc::clone(&share);
-    let mut ping_close = close_rx.clone();
-    tasks
-        /*.build_task()
-        .name("ping")*/
-        .spawn(async move {
-            ticker!(Duration::from_secs(PING_TIMEOUT), ping_close, {
-                ping_share
-                    .send(
-                        PktType::Ctl,
-                        Pkt {
-                            chan: 0,
-                            unrel: false,
-                            data: &[CtlType::Ping as u8],
-                        },
-                    )
-                    .await
-                    .ok();
-            });
-        });
-
-    drop(tasks);
-
-    Ok((
-        RudpSender {
-            share: Arc::clone(&share),
-        },
-        RudpReceiver { share, pkt_rx },
-    ))
-}
index a88426f847c4a4f650d1ad428461bb458f063dec..572b17ea3ea45b1067065858eb6be0e338efe11f 100644 (file)
@@ -1,4 +1,4 @@
-use crate::{prelude::*, ticker, RecvChan, RecvWorker, RudpShare, Split};
+use super::*;
 use async_recursion::async_recursion;
 use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
 use std::{
@@ -17,6 +17,28 @@ fn to_seqnum(seqnum: u16) -> usize {
 
 type Result<T> = std::result::Result<T, Error>;
 
+#[derive(Debug)]
+struct Split {
+    timestamp: Option<Instant>,
+    chunks: Vec<OnceCell<Vec<u8>>>,
+    got: usize,
+}
+
+struct RecvChan {
+    packets: Vec<Option<Vec<u8>>>, // char ** ðŸ˜›
+    splits: HashMap<u16, Split>,
+    seqnum: u16,
+    num: u8,
+}
+
+pub(crate) struct RecvWorker<R: UdpReceiver, S: UdpSender> {
+    share: Arc<RudpShare<S>>,
+    close: watch::Receiver<bool>,
+    chans: Arc<Vec<Mutex<RecvChan>>>,
+    pkt_tx: mpsc::UnboundedSender<InPkt>,
+    udp_rx: R,
+}
+
 impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
     pub fn new(
         udp_rx: R,
index 0bbce47b2892d0366aa3ffb574285c043a2864cb..e0c2fa3a4407f9992ea15ab5f48864181ec0ed82 100644 (file)
@@ -1,9 +1,9 @@
-use crate::{prelude::*, Ack, RudpShare};
+use super::*;
 use byteorder::{BigEndian, WriteBytesExt};
 use std::io::{self, Write};
 use tokio::sync::watch;
 
-type AckResult = io::Result<Option<watch::Receiver<bool>>>;
+pub type AckResult = io::Result<Option<watch::Receiver<bool>>>;
 
 impl<S: UdpSender> RudpSender<S> {
     pub async fn send(&self, pkt: Pkt<&[u8]>) -> AckResult {
diff --git a/src/share.rs b/src/share.rs
new file mode 100644 (file)
index 0000000..e0d2d2b
--- /dev/null
@@ -0,0 +1,112 @@
+use super::*;
+use std::{collections::HashMap, io, sync::Arc, time::Duration};
+use tokio::{
+    sync::{mpsc, watch, Mutex, RwLock},
+    task::JoinSet,
+};
+
+#[derive(Debug)]
+pub(crate) struct Ack {
+    pub(crate) tx: watch::Sender<bool>,
+    pub(crate) rx: watch::Receiver<bool>,
+    pub(crate) data: Vec<u8>,
+}
+
+#[derive(Debug)]
+pub(crate) struct Chan {
+    pub(crate) acks: HashMap<u16, Ack>,
+    pub(crate) seqnum: u16,
+}
+
+#[derive(Debug)]
+pub(crate) struct RudpShare<S: UdpSender> {
+    pub(crate) id: u16,
+    pub(crate) remote_id: RwLock<u16>,
+    pub(crate) chans: Vec<Mutex<Chan>>,
+    pub(crate) udp_tx: S,
+    pub(crate) close_tx: watch::Sender<bool>,
+    pub(crate) tasks: Mutex<JoinSet<()>>,
+}
+
+pub async fn new<S: UdpSender, R: UdpReceiver>(
+    id: u16,
+    remote_id: u16,
+    udp_tx: S,
+    udp_rx: R,
+) -> io::Result<(RudpSender<S>, RudpReceiver<S>)> {
+    let (pkt_tx, pkt_rx) = mpsc::unbounded_channel();
+    let (close_tx, close_rx) = watch::channel(false);
+
+    let share = Arc::new(RudpShare {
+        id,
+        remote_id: RwLock::new(remote_id),
+        udp_tx,
+        close_tx,
+        chans: (0..NUM_CHANS)
+            .map(|_| {
+                Mutex::new(Chan {
+                    acks: HashMap::new(),
+                    seqnum: INIT_SEQNUM,
+                })
+            })
+            .collect(),
+        tasks: Mutex::new(JoinSet::new()),
+    });
+
+    let mut tasks = share.tasks.lock().await;
+
+    let recv_share = Arc::clone(&share);
+    let recv_close = close_rx.clone();
+    tasks
+        /*.build_task()
+        .name("recv")*/
+        .spawn(async move {
+            let worker = RecvWorker::new(udp_rx, recv_share, recv_close, pkt_tx);
+            worker.run().await;
+        });
+
+    let resend_share = Arc::clone(&share);
+    let mut resend_close = close_rx.clone();
+    tasks
+        /*.build_task()
+        .name("resend")*/
+        .spawn(async move {
+            ticker!(Duration::from_millis(500), resend_close, {
+                for chan in resend_share.chans.iter() {
+                    for (_, ack) in chan.lock().await.acks.iter() {
+                        resend_share.send_raw(&ack.data).await.ok(); // TODO: handle error (?)
+                    }
+                }
+            });
+        });
+
+    let ping_share = Arc::clone(&share);
+    let mut ping_close = close_rx.clone();
+    tasks
+        /*.build_task()
+        .name("ping")*/
+        .spawn(async move {
+            ticker!(Duration::from_secs(PING_TIMEOUT), ping_close, {
+                ping_share
+                    .send(
+                        PktType::Ctl,
+                        Pkt {
+                            chan: 0,
+                            unrel: false,
+                            data: &[CtlType::Ping as u8],
+                        },
+                    )
+                    .await
+                    .ok();
+            });
+        });
+
+    drop(tasks);
+
+    Ok((
+        RudpSender {
+            share: Arc::clone(&share),
+        },
+        RudpReceiver { share, pkt_rx },
+    ))
+}