From: Lizzy Fleckenstein Date: Wed, 15 Feb 2023 20:54:25 +0000 (+0100) Subject: Rework structure X-Git-Url: https://git.lizzy.rs/?a=commitdiff_plain;h=1d4ebed25ff3e05d2fac70a040901fd3ea3029eb;p=mt_rudp.git Rework structure --- diff --git a/Cargo.toml b/Cargo.toml index 0b5f690..8e5f7ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ async-recursion = "1.0.0" async-trait = "0.1.60" byteorder = "1.4.3" delegate = "0.9.0" +drop_bomb = "0.1.5" num_enum = "0.5.7" thiserror = "1.0.38" tokio = { version = "1.23.0", features = ["sync", "time", "net", "signal", "macros", "rt"] } diff --git a/examples/example.rs b/examples/example.rs index fc5f1fb..8625243 100644 --- a/examples/example.rs +++ b/examples/example.rs @@ -1,9 +1,9 @@ use byteorder::{BigEndian, WriteBytesExt}; -use mt_rudp::{Client, RudpReceiver, RudpSender}; +use mt_rudp::{RudpReceiver, RudpSender, ToSrv}; use pretty_hex::PrettyHex; use std::io::{self, Write}; -async fn example(tx: &RudpSender, rx: &mut RudpReceiver) -> io::Result<()> { +async fn example(tx: &RudpSender, rx: &mut RudpReceiver) -> io::Result<()> { // send hello packet let mut pkt = vec![]; pkt.write_u16::(2)?; // high level type diff --git a/src/client.rs b/src/client.rs index 6785a1f..c4922ec 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,48 +1,44 @@ -use crate::prelude::*; +use super::*; use async_trait::async_trait; use std::{io, sync::Arc}; use tokio::net; -pub struct Sender { - sock: Arc, -} +#[derive(Debug)] +pub struct ToSrv(Arc); + +#[derive(Debug)] +pub struct FromSrv(Arc); #[async_trait] -impl UdpSender for Sender { +impl UdpSender for ToSrv { async fn send(&self, data: &[u8]) -> io::Result<()> { - self.sock.send(data).await?; + self.0.send(data).await?; Ok(()) } } -pub struct Receiver { - sock: Arc, -} - #[async_trait] -impl UdpReceiver for Receiver { +impl UdpReceiver for FromSrv { async fn recv(&self) -> io::Result> { let mut buffer = Vec::new(); buffer.resize(UDP_PKT_SIZE, 0); - let len = self.sock.recv(&mut buffer).await?; + let len = self.0.recv(&mut buffer).await?; buffer.truncate(len); Ok(buffer) } } -pub async fn connect(addr: &str) -> io::Result<(RudpSender, RudpReceiver)> { +pub async fn connect(addr: &str) -> io::Result<(RudpSender, RudpReceiver)> { let sock = Arc::new(net::UdpSocket::bind("0.0.0.0:0").await?); sock.connect(addr).await?; new( PeerID::Srv as u16, PeerID::Nil as u16, - Sender { - sock: Arc::clone(&sock), - }, - Receiver { sock }, + ToSrv(Arc::clone(&sock)), + FromSrv(sock), ) .await } diff --git a/src/common.rs b/src/common.rs new file mode 100644 index 0000000..797ccd1 --- /dev/null +++ b/src/common.rs @@ -0,0 +1,104 @@ +use super::*; +use async_trait::async_trait; +use delegate::delegate; +use num_enum::TryFromPrimitive; +use std::{io, sync::Arc}; +use tokio::sync::mpsc; + +pub const PROTO_ID: u32 = 0x4f457403; +pub const UDP_PKT_SIZE: usize = 512; +pub const NUM_CHANS: usize = 3; +pub const REL_BUFFER: usize = 0x8000; +pub const INIT_SEQNUM: u16 = 65500; +pub const TIMEOUT: u64 = 30; +pub const PING_TIMEOUT: u64 = 5; + +#[async_trait] +pub trait UdpSender: Send + Sync + 'static { + async fn send(&self, data: &[u8]) -> io::Result<()>; +} + +#[async_trait] +pub trait UdpReceiver: Send + Sync + 'static { + async fn recv(&self) -> io::Result>; +} + +#[derive(Debug, Copy, Clone, PartialEq)] +#[repr(u16)] +pub enum PeerID { + Nil = 0, + Srv, + CltMin, +} + +#[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)] +#[repr(u8)] +pub enum PktType { + Ctl = 0, + Orig, + Split, + Rel, +} + +#[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)] +#[repr(u8)] +pub enum CtlType { + Ack = 0, + SetPeerID, + Ping, + Disco, +} + +#[derive(Debug)] +pub struct Pkt { + pub unrel: bool, + pub chan: u8, + pub data: T, +} + +pub type InPkt = Result>, error::Error>; + +#[derive(Debug)] +pub struct RudpReceiver { + pub(crate) share: Arc>, + pub(crate) pkt_rx: mpsc::UnboundedReceiver, +} + +#[derive(Debug)] +pub struct RudpSender { + pub(crate) share: Arc>, +} + +macro_rules! impl_share { + ($T:ident) => { + impl $T { + pub async fn peer_id(&self) -> u16 { + self.share.id + } + + pub async fn is_server(&self) -> bool { + self.share.id == PeerID::Srv as u16 + } + + pub async fn close(self) { + self.share.close_tx.send(true).ok(); + + let mut tasks = self.share.tasks.lock().await; + while let Some(res) = tasks.join_next().await { + res.ok(); // TODO: handle error (?) + } + } + } + }; +} + +impl_share!(RudpReceiver); +impl_share!(RudpSender); + +impl RudpReceiver { + delegate! { + to self.pkt_rx { + pub async fn recv(&mut self) -> Option; + } + } +} diff --git a/src/error.rs b/src/error.rs index bac843a..7cfc057 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,4 +1,4 @@ -use crate::prelude::*; +use super::*; use num_enum::TryFromPrimitiveError; use thiserror::Error; use tokio::sync::mpsc::error::SendError; diff --git a/src/lib.rs b/src/lib.rs index 76f0311..a02eb20 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,173 +2,21 @@ #![feature(hash_drain_filter)] #![feature(once_cell)] mod client; +mod common; mod error; -mod new; mod recv; mod send; +mod share; -pub use prelude::*; - -use async_trait::async_trait; -use delegate::delegate; -use num_enum::TryFromPrimitive; -use std::{cell::OnceCell, collections::HashMap, io, sync::Arc, time::Instant}; -use tokio::{ - sync::{mpsc, watch, Mutex, RwLock}, - task::JoinSet, -}; - -#[async_trait] -pub trait UdpSender: Send + Sync + 'static { - async fn send(&self, data: &[u8]) -> io::Result<()>; -} - -#[async_trait] -pub trait UdpReceiver: Send + Sync + 'static { - async fn recv(&self) -> io::Result>; -} - -#[derive(Debug, Copy, Clone, PartialEq)] -#[repr(u16)] -pub enum PeerID { - Nil = 0, - Srv, - CltMin, -} - -#[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)] -#[repr(u8)] -pub enum PktType { - Ctl = 0, - Orig, - Split, - Rel, -} - -#[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)] -#[repr(u8)] -pub enum CtlType { - Ack = 0, - SetPeerID, - Ping, - Disco, -} - -#[derive(Debug)] -pub struct Pkt { - pub unrel: bool, - pub chan: u8, - pub data: T, -} - -pub type InPkt = Result>, error::Error>; - -#[derive(Debug)] -struct Ack { - tx: watch::Sender, - rx: watch::Receiver, - data: Vec, -} - -#[derive(Debug)] -struct Chan { - acks: HashMap, - seqnum: u16, -} - -#[derive(Debug)] -struct RudpShare { - id: u16, - remote_id: RwLock, - chans: Vec>, - udp_tx: S, - close_tx: watch::Sender, - tasks: Mutex>, -} - -#[derive(Debug)] -pub struct RudpReceiver { - share: Arc>, - pkt_rx: mpsc::UnboundedReceiver, -} - -#[derive(Debug)] -pub struct RudpSender { - share: Arc>, -} - -macro_rules! impl_share { - ($T:ident) => { - impl $T { - pub async fn peer_id(&self) -> u16 { - self.share.id - } - - pub async fn is_server(&self) -> bool { - self.share.id == PeerID::Srv as u16 - } - - pub async fn close(self) { - self.share.close_tx.send(true).ok(); - - let mut tasks = self.share.tasks.lock().await; - while let Some(res) = tasks.join_next().await { - res.ok(); // TODO: handle error (?) - } - } - } - }; -} - -impl_share!(RudpReceiver); -impl_share!(RudpSender); - -impl RudpReceiver { - delegate! { - to self.pkt_rx { - pub async fn recv(&mut self) -> Option; - } - } -} - -#[derive(Debug)] -struct Split { - timestamp: Option, - chunks: Vec>>, - got: usize, -} - -struct RecvChan { - packets: Vec>>, // char ** 😛 - splits: HashMap, - seqnum: u16, - num: u8, -} - -struct RecvWorker { - share: Arc>, - close: watch::Receiver, - chans: Arc>>, - pkt_tx: mpsc::UnboundedSender, - udp_rx: R, -} - -mod prelude { - pub const PROTO_ID: u32 = 0x4f457403; - pub const UDP_PKT_SIZE: usize = 512; - pub const NUM_CHANS: usize = 3; - pub const REL_BUFFER: usize = 0x8000; - pub const INIT_SEQNUM: u16 = 65500; - pub const TIMEOUT: u64 = 30; - pub const PING_TIMEOUT: u64 = 5; - - pub use super::{ - client::{connect, Sender as Client}, - error::Error, - new::new, - CtlType, InPkt, PeerID, Pkt, PktType, RudpReceiver, RudpSender, UdpReceiver, UdpSender, - }; +pub use client::*; +pub use common::*; +pub use error::*; +use recv::*; +pub use send::*; +pub use share::*; +pub use ticker_mod::*; +mod ticker_mod { #[macro_export] macro_rules! ticker { ($duration:expr, $close:expr, $body:block) => { diff --git a/src/new.rs b/src/new.rs deleted file mode 100644 index b17f518..0000000 --- a/src/new.rs +++ /dev/null @@ -1,89 +0,0 @@ -use crate::{prelude::*, ticker, Chan, RecvWorker, RudpShare}; -use std::{collections::HashMap, io, sync::Arc, time::Duration}; -use tokio::{ - sync::{mpsc, watch, Mutex, RwLock}, - task::JoinSet, -}; - -pub async fn new( - id: u16, - remote_id: u16, - udp_tx: S, - udp_rx: R, -) -> io::Result<(RudpSender, RudpReceiver)> { - let (pkt_tx, pkt_rx) = mpsc::unbounded_channel(); - let (close_tx, close_rx) = watch::channel(false); - - let share = Arc::new(RudpShare { - id, - remote_id: RwLock::new(remote_id), - udp_tx, - close_tx, - chans: (0..NUM_CHANS) - .map(|_| { - Mutex::new(Chan { - acks: HashMap::new(), - seqnum: INIT_SEQNUM, - }) - }) - .collect(), - tasks: Mutex::new(JoinSet::new()), - }); - - let mut tasks = share.tasks.lock().await; - - let recv_share = Arc::clone(&share); - let recv_close = close_rx.clone(); - tasks - /*.build_task() - .name("recv")*/ - .spawn(async move { - let worker = RecvWorker::new(udp_rx, recv_share, recv_close, pkt_tx); - worker.run().await; - }); - - let resend_share = Arc::clone(&share); - let mut resend_close = close_rx.clone(); - tasks - /*.build_task() - .name("resend")*/ - .spawn(async move { - ticker!(Duration::from_millis(500), resend_close, { - for chan in resend_share.chans.iter() { - for (_, ack) in chan.lock().await.acks.iter() { - resend_share.send_raw(&ack.data).await.ok(); // TODO: handle error (?) - } - } - }); - }); - - let ping_share = Arc::clone(&share); - let mut ping_close = close_rx.clone(); - tasks - /*.build_task() - .name("ping")*/ - .spawn(async move { - ticker!(Duration::from_secs(PING_TIMEOUT), ping_close, { - ping_share - .send( - PktType::Ctl, - Pkt { - chan: 0, - unrel: false, - data: &[CtlType::Ping as u8], - }, - ) - .await - .ok(); - }); - }); - - drop(tasks); - - Ok(( - RudpSender { - share: Arc::clone(&share), - }, - RudpReceiver { share, pkt_rx }, - )) -} diff --git a/src/recv.rs b/src/recv.rs index a88426f..572b17e 100644 --- a/src/recv.rs +++ b/src/recv.rs @@ -1,4 +1,4 @@ -use crate::{prelude::*, ticker, RecvChan, RecvWorker, RudpShare, Split}; +use super::*; use async_recursion::async_recursion; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use std::{ @@ -17,6 +17,28 @@ fn to_seqnum(seqnum: u16) -> usize { type Result = std::result::Result; +#[derive(Debug)] +struct Split { + timestamp: Option, + chunks: Vec>>, + got: usize, +} + +struct RecvChan { + packets: Vec>>, // char ** 😛 + splits: HashMap, + seqnum: u16, + num: u8, +} + +pub(crate) struct RecvWorker { + share: Arc>, + close: watch::Receiver, + chans: Arc>>, + pkt_tx: mpsc::UnboundedSender, + udp_rx: R, +} + impl RecvWorker { pub fn new( udp_rx: R, diff --git a/src/send.rs b/src/send.rs index 0bbce47..e0c2fa3 100644 --- a/src/send.rs +++ b/src/send.rs @@ -1,9 +1,9 @@ -use crate::{prelude::*, Ack, RudpShare}; +use super::*; use byteorder::{BigEndian, WriteBytesExt}; use std::io::{self, Write}; use tokio::sync::watch; -type AckResult = io::Result>>; +pub type AckResult = io::Result>>; impl RudpSender { pub async fn send(&self, pkt: Pkt<&[u8]>) -> AckResult { diff --git a/src/share.rs b/src/share.rs new file mode 100644 index 0000000..e0d2d2b --- /dev/null +++ b/src/share.rs @@ -0,0 +1,112 @@ +use super::*; +use std::{collections::HashMap, io, sync::Arc, time::Duration}; +use tokio::{ + sync::{mpsc, watch, Mutex, RwLock}, + task::JoinSet, +}; + +#[derive(Debug)] +pub(crate) struct Ack { + pub(crate) tx: watch::Sender, + pub(crate) rx: watch::Receiver, + pub(crate) data: Vec, +} + +#[derive(Debug)] +pub(crate) struct Chan { + pub(crate) acks: HashMap, + pub(crate) seqnum: u16, +} + +#[derive(Debug)] +pub(crate) struct RudpShare { + pub(crate) id: u16, + pub(crate) remote_id: RwLock, + pub(crate) chans: Vec>, + pub(crate) udp_tx: S, + pub(crate) close_tx: watch::Sender, + pub(crate) tasks: Mutex>, +} + +pub async fn new( + id: u16, + remote_id: u16, + udp_tx: S, + udp_rx: R, +) -> io::Result<(RudpSender, RudpReceiver)> { + let (pkt_tx, pkt_rx) = mpsc::unbounded_channel(); + let (close_tx, close_rx) = watch::channel(false); + + let share = Arc::new(RudpShare { + id, + remote_id: RwLock::new(remote_id), + udp_tx, + close_tx, + chans: (0..NUM_CHANS) + .map(|_| { + Mutex::new(Chan { + acks: HashMap::new(), + seqnum: INIT_SEQNUM, + }) + }) + .collect(), + tasks: Mutex::new(JoinSet::new()), + }); + + let mut tasks = share.tasks.lock().await; + + let recv_share = Arc::clone(&share); + let recv_close = close_rx.clone(); + tasks + /*.build_task() + .name("recv")*/ + .spawn(async move { + let worker = RecvWorker::new(udp_rx, recv_share, recv_close, pkt_tx); + worker.run().await; + }); + + let resend_share = Arc::clone(&share); + let mut resend_close = close_rx.clone(); + tasks + /*.build_task() + .name("resend")*/ + .spawn(async move { + ticker!(Duration::from_millis(500), resend_close, { + for chan in resend_share.chans.iter() { + for (_, ack) in chan.lock().await.acks.iter() { + resend_share.send_raw(&ack.data).await.ok(); // TODO: handle error (?) + } + } + }); + }); + + let ping_share = Arc::clone(&share); + let mut ping_close = close_rx.clone(); + tasks + /*.build_task() + .name("ping")*/ + .spawn(async move { + ticker!(Duration::from_secs(PING_TIMEOUT), ping_close, { + ping_share + .send( + PktType::Ctl, + Pkt { + chan: 0, + unrel: false, + data: &[CtlType::Ping as u8], + }, + ) + .await + .ok(); + }); + }); + + drop(tasks); + + Ok(( + RudpSender { + share: Arc::clone(&share), + }, + RudpReceiver { share, pkt_rx }, + )) +}