1 #![feature(cursor_remaining)]
2 #![feature(hash_drain_filter)]
12 use async_trait::async_trait;
13 use delegate::delegate;
14 use num_enum::TryFromPrimitive;
15 use std::{cell::OnceCell, collections::HashMap, io, sync::Arc, time::Instant};
17 sync::{mpsc, watch, Mutex, RwLock},
22 pub trait UdpSender: Send + Sync + 'static {
23 async fn send(&self, data: &[u8]) -> io::Result<()>;
27 pub trait UdpReceiver: Send + Sync + 'static {
28 async fn recv(&self) -> io::Result<Vec<u8>>;
31 #[derive(Debug, Copy, Clone, PartialEq)]
39 #[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
48 #[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
64 pub type InPkt = Result<Pkt<Vec<u8>>, error::Error>;
68 tx: watch::Sender<bool>,
69 rx: watch::Receiver<bool>,
75 acks: HashMap<u16, Ack>,
80 struct RudpShare<S: UdpSender> {
82 remote_id: RwLock<u16>,
83 chans: Vec<Mutex<Chan>>,
85 close_tx: watch::Sender<bool>,
86 tasks: Mutex<JoinSet<()>>,
90 pub struct RudpReceiver<S: UdpSender> {
91 share: Arc<RudpShare<S>>,
92 pkt_rx: mpsc::UnboundedReceiver<InPkt>,
96 pub struct RudpSender<S: UdpSender> {
97 share: Arc<RudpShare<S>>,
100 macro_rules! impl_share {
102 impl<S: UdpSender> $T<S> {
103 pub async fn peer_id(&self) -> u16 {
107 pub async fn is_server(&self) -> bool {
108 self.share.id == PeerID::Srv as u16
111 pub async fn close(self) {
112 self.share.close_tx.send(true).ok();
114 let mut tasks = self.share.tasks.lock().await;
115 while let Some(res) = tasks.join_next().await {
116 res.ok(); // TODO: handle error (?)
123 impl_share!(RudpReceiver);
124 impl_share!(RudpSender);
126 impl<S: UdpSender> RudpReceiver<S> {
129 pub async fn recv(&mut self) -> Option<InPkt>;
136 timestamp: Option<Instant>,
137 chunks: Vec<OnceCell<Vec<u8>>>,
142 packets: Vec<Option<Vec<u8>>>, // char ** 😛
143 splits: HashMap<u16, Split>,
148 struct RecvWorker<R: UdpReceiver, S: UdpSender> {
149 share: Arc<RudpShare<S>>,
150 close: watch::Receiver<bool>,
151 chans: Arc<Vec<Mutex<RecvChan>>>,
152 pkt_tx: mpsc::UnboundedSender<InPkt>,
157 pub const PROTO_ID: u32 = 0x4f457403;
158 pub const UDP_PKT_SIZE: usize = 512;
159 pub const NUM_CHANS: usize = 3;
160 pub const REL_BUFFER: usize = 0x8000;
161 pub const INIT_SEQNUM: u16 = 65500;
162 pub const TIMEOUT: u64 = 30;
163 pub const PING_TIMEOUT: u64 = 5;
166 client::{connect, Sender as Client},
169 CtlType, InPkt, PeerID, Pkt, PktType, RudpReceiver, RudpSender, UdpReceiver, UdpSender,
173 macro_rules! ticker {
174 ($duration:expr, $close:expr, $body:block) => {
175 let mut interval = tokio::time::interval($duration);
177 while tokio::select!{
178 _ = interval.tick() => true,
179 _ = $close.changed() => false,