async-trait = "0.1.60"
byteorder = "1.4.3"
delegate = "0.9.0"
+drop_bomb = "0.1.5"
num_enum = "0.5.7"
thiserror = "1.0.38"
tokio = { version = "1.23.0", features = ["sync", "time", "net", "signal", "macros", "rt"] }
use byteorder::{BigEndian, WriteBytesExt};
-use mt_rudp::{Client, RudpReceiver, RudpSender};
+use mt_rudp::{RudpReceiver, RudpSender, ToSrv};
use pretty_hex::PrettyHex;
use std::io::{self, Write};
-async fn example(tx: &RudpSender<Client>, rx: &mut RudpReceiver<Client>) -> io::Result<()> {
+async fn example(tx: &RudpSender<ToSrv>, rx: &mut RudpReceiver<ToSrv>) -> io::Result<()> {
// send hello packet
let mut pkt = vec![];
pkt.write_u16::<BigEndian>(2)?; // high level type
-use crate::prelude::*;
+use super::*;
use async_trait::async_trait;
use std::{io, sync::Arc};
use tokio::net;
-pub struct Sender {
- sock: Arc<net::UdpSocket>,
-}
+#[derive(Debug)]
+pub struct ToSrv(Arc<net::UdpSocket>);
+
+#[derive(Debug)]
+pub struct FromSrv(Arc<net::UdpSocket>);
#[async_trait]
-impl UdpSender for Sender {
+impl UdpSender for ToSrv {
async fn send(&self, data: &[u8]) -> io::Result<()> {
- self.sock.send(data).await?;
+ self.0.send(data).await?;
Ok(())
}
}
-pub struct Receiver {
- sock: Arc<net::UdpSocket>,
-}
-
#[async_trait]
-impl UdpReceiver for Receiver {
+impl UdpReceiver for FromSrv {
async fn recv(&self) -> io::Result<Vec<u8>> {
let mut buffer = Vec::new();
buffer.resize(UDP_PKT_SIZE, 0);
- let len = self.sock.recv(&mut buffer).await?;
+ let len = self.0.recv(&mut buffer).await?;
buffer.truncate(len);
Ok(buffer)
}
}
-pub async fn connect(addr: &str) -> io::Result<(RudpSender<Sender>, RudpReceiver<Sender>)> {
+pub async fn connect(addr: &str) -> io::Result<(RudpSender<ToSrv>, RudpReceiver<ToSrv>)> {
let sock = Arc::new(net::UdpSocket::bind("0.0.0.0:0").await?);
sock.connect(addr).await?;
new(
PeerID::Srv as u16,
PeerID::Nil as u16,
- Sender {
- sock: Arc::clone(&sock),
- },
- Receiver { sock },
+ ToSrv(Arc::clone(&sock)),
+ FromSrv(sock),
)
.await
}
--- /dev/null
+use super::*;
+use async_trait::async_trait;
+use delegate::delegate;
+use num_enum::TryFromPrimitive;
+use std::{io, sync::Arc};
+use tokio::sync::mpsc;
+
+pub const PROTO_ID: u32 = 0x4f457403;
+pub const UDP_PKT_SIZE: usize = 512;
+pub const NUM_CHANS: usize = 3;
+pub const REL_BUFFER: usize = 0x8000;
+pub const INIT_SEQNUM: u16 = 65500;
+pub const TIMEOUT: u64 = 30;
+pub const PING_TIMEOUT: u64 = 5;
+
+#[async_trait]
+pub trait UdpSender: Send + Sync + 'static {
+ async fn send(&self, data: &[u8]) -> io::Result<()>;
+}
+
+#[async_trait]
+pub trait UdpReceiver: Send + Sync + 'static {
+ async fn recv(&self) -> io::Result<Vec<u8>>;
+}
+
+#[derive(Debug, Copy, Clone, PartialEq)]
+#[repr(u16)]
+pub enum PeerID {
+ Nil = 0,
+ Srv,
+ CltMin,
+}
+
+#[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
+#[repr(u8)]
+pub enum PktType {
+ Ctl = 0,
+ Orig,
+ Split,
+ Rel,
+}
+
+#[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
+#[repr(u8)]
+pub enum CtlType {
+ Ack = 0,
+ SetPeerID,
+ Ping,
+ Disco,
+}
+
+#[derive(Debug)]
+pub struct Pkt<T> {
+ pub unrel: bool,
+ pub chan: u8,
+ pub data: T,
+}
+
+pub type InPkt = Result<Pkt<Vec<u8>>, error::Error>;
+
+#[derive(Debug)]
+pub struct RudpReceiver<S: UdpSender> {
+ pub(crate) share: Arc<RudpShare<S>>,
+ pub(crate) pkt_rx: mpsc::UnboundedReceiver<InPkt>,
+}
+
+#[derive(Debug)]
+pub struct RudpSender<S: UdpSender> {
+ pub(crate) share: Arc<RudpShare<S>>,
+}
+
+macro_rules! impl_share {
+ ($T:ident) => {
+ impl<S: UdpSender> $T<S> {
+ pub async fn peer_id(&self) -> u16 {
+ self.share.id
+ }
+
+ pub async fn is_server(&self) -> bool {
+ self.share.id == PeerID::Srv as u16
+ }
+
+ pub async fn close(self) {
+ self.share.close_tx.send(true).ok();
+
+ let mut tasks = self.share.tasks.lock().await;
+ while let Some(res) = tasks.join_next().await {
+ res.ok(); // TODO: handle error (?)
+ }
+ }
+ }
+ };
+}
+
+impl_share!(RudpReceiver);
+impl_share!(RudpSender);
+
+impl<S: UdpSender> RudpReceiver<S> {
+ delegate! {
+ to self.pkt_rx {
+ pub async fn recv(&mut self) -> Option<InPkt>;
+ }
+ }
+}
-use crate::prelude::*;
+use super::*;
use num_enum::TryFromPrimitiveError;
use thiserror::Error;
use tokio::sync::mpsc::error::SendError;
#![feature(hash_drain_filter)]
#![feature(once_cell)]
mod client;
+mod common;
mod error;
-mod new;
mod recv;
mod send;
+mod share;
-pub use prelude::*;
-
-use async_trait::async_trait;
-use delegate::delegate;
-use num_enum::TryFromPrimitive;
-use std::{cell::OnceCell, collections::HashMap, io, sync::Arc, time::Instant};
-use tokio::{
- sync::{mpsc, watch, Mutex, RwLock},
- task::JoinSet,
-};
-
-#[async_trait]
-pub trait UdpSender: Send + Sync + 'static {
- async fn send(&self, data: &[u8]) -> io::Result<()>;
-}
-
-#[async_trait]
-pub trait UdpReceiver: Send + Sync + 'static {
- async fn recv(&self) -> io::Result<Vec<u8>>;
-}
-
-#[derive(Debug, Copy, Clone, PartialEq)]
-#[repr(u16)]
-pub enum PeerID {
- Nil = 0,
- Srv,
- CltMin,
-}
-
-#[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
-#[repr(u8)]
-pub enum PktType {
- Ctl = 0,
- Orig,
- Split,
- Rel,
-}
-
-#[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
-#[repr(u8)]
-pub enum CtlType {
- Ack = 0,
- SetPeerID,
- Ping,
- Disco,
-}
-
-#[derive(Debug)]
-pub struct Pkt<T> {
- pub unrel: bool,
- pub chan: u8,
- pub data: T,
-}
-
-pub type InPkt = Result<Pkt<Vec<u8>>, error::Error>;
-
-#[derive(Debug)]
-struct Ack {
- tx: watch::Sender<bool>,
- rx: watch::Receiver<bool>,
- data: Vec<u8>,
-}
-
-#[derive(Debug)]
-struct Chan {
- acks: HashMap<u16, Ack>,
- seqnum: u16,
-}
-
-#[derive(Debug)]
-struct RudpShare<S: UdpSender> {
- id: u16,
- remote_id: RwLock<u16>,
- chans: Vec<Mutex<Chan>>,
- udp_tx: S,
- close_tx: watch::Sender<bool>,
- tasks: Mutex<JoinSet<()>>,
-}
-
-#[derive(Debug)]
-pub struct RudpReceiver<S: UdpSender> {
- share: Arc<RudpShare<S>>,
- pkt_rx: mpsc::UnboundedReceiver<InPkt>,
-}
-
-#[derive(Debug)]
-pub struct RudpSender<S: UdpSender> {
- share: Arc<RudpShare<S>>,
-}
-
-macro_rules! impl_share {
- ($T:ident) => {
- impl<S: UdpSender> $T<S> {
- pub async fn peer_id(&self) -> u16 {
- self.share.id
- }
-
- pub async fn is_server(&self) -> bool {
- self.share.id == PeerID::Srv as u16
- }
-
- pub async fn close(self) {
- self.share.close_tx.send(true).ok();
-
- let mut tasks = self.share.tasks.lock().await;
- while let Some(res) = tasks.join_next().await {
- res.ok(); // TODO: handle error (?)
- }
- }
- }
- };
-}
-
-impl_share!(RudpReceiver);
-impl_share!(RudpSender);
-
-impl<S: UdpSender> RudpReceiver<S> {
- delegate! {
- to self.pkt_rx {
- pub async fn recv(&mut self) -> Option<InPkt>;
- }
- }
-}
-
-#[derive(Debug)]
-struct Split {
- timestamp: Option<Instant>,
- chunks: Vec<OnceCell<Vec<u8>>>,
- got: usize,
-}
-
-struct RecvChan {
- packets: Vec<Option<Vec<u8>>>, // char ** 😛
- splits: HashMap<u16, Split>,
- seqnum: u16,
- num: u8,
-}
-
-struct RecvWorker<R: UdpReceiver, S: UdpSender> {
- share: Arc<RudpShare<S>>,
- close: watch::Receiver<bool>,
- chans: Arc<Vec<Mutex<RecvChan>>>,
- pkt_tx: mpsc::UnboundedSender<InPkt>,
- udp_rx: R,
-}
-
-mod prelude {
- pub const PROTO_ID: u32 = 0x4f457403;
- pub const UDP_PKT_SIZE: usize = 512;
- pub const NUM_CHANS: usize = 3;
- pub const REL_BUFFER: usize = 0x8000;
- pub const INIT_SEQNUM: u16 = 65500;
- pub const TIMEOUT: u64 = 30;
- pub const PING_TIMEOUT: u64 = 5;
-
- pub use super::{
- client::{connect, Sender as Client},
- error::Error,
- new::new,
- CtlType, InPkt, PeerID, Pkt, PktType, RudpReceiver, RudpSender, UdpReceiver, UdpSender,
- };
+pub use client::*;
+pub use common::*;
+pub use error::*;
+use recv::*;
+pub use send::*;
+pub use share::*;
+pub use ticker_mod::*;
+mod ticker_mod {
#[macro_export]
macro_rules! ticker {
($duration:expr, $close:expr, $body:block) => {
+++ /dev/null
-use crate::{prelude::*, ticker, Chan, RecvWorker, RudpShare};
-use std::{collections::HashMap, io, sync::Arc, time::Duration};
-use tokio::{
- sync::{mpsc, watch, Mutex, RwLock},
- task::JoinSet,
-};
-
-pub async fn new<S: UdpSender, R: UdpReceiver>(
- id: u16,
- remote_id: u16,
- udp_tx: S,
- udp_rx: R,
-) -> io::Result<(RudpSender<S>, RudpReceiver<S>)> {
- let (pkt_tx, pkt_rx) = mpsc::unbounded_channel();
- let (close_tx, close_rx) = watch::channel(false);
-
- let share = Arc::new(RudpShare {
- id,
- remote_id: RwLock::new(remote_id),
- udp_tx,
- close_tx,
- chans: (0..NUM_CHANS)
- .map(|_| {
- Mutex::new(Chan {
- acks: HashMap::new(),
- seqnum: INIT_SEQNUM,
- })
- })
- .collect(),
- tasks: Mutex::new(JoinSet::new()),
- });
-
- let mut tasks = share.tasks.lock().await;
-
- let recv_share = Arc::clone(&share);
- let recv_close = close_rx.clone();
- tasks
- /*.build_task()
- .name("recv")*/
- .spawn(async move {
- let worker = RecvWorker::new(udp_rx, recv_share, recv_close, pkt_tx);
- worker.run().await;
- });
-
- let resend_share = Arc::clone(&share);
- let mut resend_close = close_rx.clone();
- tasks
- /*.build_task()
- .name("resend")*/
- .spawn(async move {
- ticker!(Duration::from_millis(500), resend_close, {
- for chan in resend_share.chans.iter() {
- for (_, ack) in chan.lock().await.acks.iter() {
- resend_share.send_raw(&ack.data).await.ok(); // TODO: handle error (?)
- }
- }
- });
- });
-
- let ping_share = Arc::clone(&share);
- let mut ping_close = close_rx.clone();
- tasks
- /*.build_task()
- .name("ping")*/
- .spawn(async move {
- ticker!(Duration::from_secs(PING_TIMEOUT), ping_close, {
- ping_share
- .send(
- PktType::Ctl,
- Pkt {
- chan: 0,
- unrel: false,
- data: &[CtlType::Ping as u8],
- },
- )
- .await
- .ok();
- });
- });
-
- drop(tasks);
-
- Ok((
- RudpSender {
- share: Arc::clone(&share),
- },
- RudpReceiver { share, pkt_rx },
- ))
-}
-use crate::{prelude::*, ticker, RecvChan, RecvWorker, RudpShare, Split};
+use super::*;
use async_recursion::async_recursion;
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use std::{
type Result<T> = std::result::Result<T, Error>;
+#[derive(Debug)]
+struct Split {
+ timestamp: Option<Instant>,
+ chunks: Vec<OnceCell<Vec<u8>>>,
+ got: usize,
+}
+
+struct RecvChan {
+ packets: Vec<Option<Vec<u8>>>, // char ** 😛
+ splits: HashMap<u16, Split>,
+ seqnum: u16,
+ num: u8,
+}
+
+pub(crate) struct RecvWorker<R: UdpReceiver, S: UdpSender> {
+ share: Arc<RudpShare<S>>,
+ close: watch::Receiver<bool>,
+ chans: Arc<Vec<Mutex<RecvChan>>>,
+ pkt_tx: mpsc::UnboundedSender<InPkt>,
+ udp_rx: R,
+}
+
impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
pub fn new(
udp_rx: R,
-use crate::{prelude::*, Ack, RudpShare};
+use super::*;
use byteorder::{BigEndian, WriteBytesExt};
use std::io::{self, Write};
use tokio::sync::watch;
-type AckResult = io::Result<Option<watch::Receiver<bool>>>;
+pub type AckResult = io::Result<Option<watch::Receiver<bool>>>;
impl<S: UdpSender> RudpSender<S> {
pub async fn send(&self, pkt: Pkt<&[u8]>) -> AckResult {
--- /dev/null
+use super::*;
+use std::{collections::HashMap, io, sync::Arc, time::Duration};
+use tokio::{
+ sync::{mpsc, watch, Mutex, RwLock},
+ task::JoinSet,
+};
+
+#[derive(Debug)]
+pub(crate) struct Ack {
+ pub(crate) tx: watch::Sender<bool>,
+ pub(crate) rx: watch::Receiver<bool>,
+ pub(crate) data: Vec<u8>,
+}
+
+#[derive(Debug)]
+pub(crate) struct Chan {
+ pub(crate) acks: HashMap<u16, Ack>,
+ pub(crate) seqnum: u16,
+}
+
+#[derive(Debug)]
+pub(crate) struct RudpShare<S: UdpSender> {
+ pub(crate) id: u16,
+ pub(crate) remote_id: RwLock<u16>,
+ pub(crate) chans: Vec<Mutex<Chan>>,
+ pub(crate) udp_tx: S,
+ pub(crate) close_tx: watch::Sender<bool>,
+ pub(crate) tasks: Mutex<JoinSet<()>>,
+}
+
+pub async fn new<S: UdpSender, R: UdpReceiver>(
+ id: u16,
+ remote_id: u16,
+ udp_tx: S,
+ udp_rx: R,
+) -> io::Result<(RudpSender<S>, RudpReceiver<S>)> {
+ let (pkt_tx, pkt_rx) = mpsc::unbounded_channel();
+ let (close_tx, close_rx) = watch::channel(false);
+
+ let share = Arc::new(RudpShare {
+ id,
+ remote_id: RwLock::new(remote_id),
+ udp_tx,
+ close_tx,
+ chans: (0..NUM_CHANS)
+ .map(|_| {
+ Mutex::new(Chan {
+ acks: HashMap::new(),
+ seqnum: INIT_SEQNUM,
+ })
+ })
+ .collect(),
+ tasks: Mutex::new(JoinSet::new()),
+ });
+
+ let mut tasks = share.tasks.lock().await;
+
+ let recv_share = Arc::clone(&share);
+ let recv_close = close_rx.clone();
+ tasks
+ /*.build_task()
+ .name("recv")*/
+ .spawn(async move {
+ let worker = RecvWorker::new(udp_rx, recv_share, recv_close, pkt_tx);
+ worker.run().await;
+ });
+
+ let resend_share = Arc::clone(&share);
+ let mut resend_close = close_rx.clone();
+ tasks
+ /*.build_task()
+ .name("resend")*/
+ .spawn(async move {
+ ticker!(Duration::from_millis(500), resend_close, {
+ for chan in resend_share.chans.iter() {
+ for (_, ack) in chan.lock().await.acks.iter() {
+ resend_share.send_raw(&ack.data).await.ok(); // TODO: handle error (?)
+ }
+ }
+ });
+ });
+
+ let ping_share = Arc::clone(&share);
+ let mut ping_close = close_rx.clone();
+ tasks
+ /*.build_task()
+ .name("ping")*/
+ .spawn(async move {
+ ticker!(Duration::from_secs(PING_TIMEOUT), ping_close, {
+ ping_share
+ .send(
+ PktType::Ctl,
+ Pkt {
+ chan: 0,
+ unrel: false,
+ data: &[CtlType::Ping as u8],
+ },
+ )
+ .await
+ .ok();
+ });
+ });
+
+ drop(tasks);
+
+ Ok((
+ RudpSender {
+ share: Arc::clone(&share),
+ },
+ RudpReceiver { share, pkt_rx },
+ ))
+}