1 #![feature(cursor_remaining)]
2 #![feature(hash_drain_filter)]
12 use async_trait::async_trait;
13 use num_enum::TryFromPrimitive;
14 use std::{cell::OnceCell, collections::HashMap, io, ops, sync::Arc, time::Instant};
16 sync::{mpsc, watch, Mutex, RwLock},
21 pub trait UdpSender: Send + Sync + 'static {
22 async fn send(&self, data: &[u8]) -> io::Result<()>;
26 pub trait UdpReceiver: Send + Sync + 'static {
27 async fn recv(&self) -> io::Result<Vec<u8>>;
30 #[derive(Debug, Copy, Clone, PartialEq)]
38 #[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
47 #[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
63 pub type InPkt = Result<Pkt<Vec<u8>>, error::Error>;
67 tx: watch::Sender<bool>,
68 rx: watch::Receiver<bool>,
74 acks: HashMap<u16, Ack>,
79 struct RudpShare<S: UdpSender> {
81 remote_id: RwLock<u16>,
82 chans: Vec<Mutex<Chan>>,
84 close_tx: watch::Sender<bool>,
85 tasks: Mutex<JoinSet<()>>,
89 pub struct RudpReceiver<S: UdpSender> {
90 share: Arc<RudpShare<S>>,
91 pkt_rx: mpsc::UnboundedReceiver<InPkt>,
95 pub struct RudpSender<S: UdpSender> {
96 share: Arc<RudpShare<S>>,
99 macro_rules! impl_share {
101 impl<S: UdpSender> $T<S> {
102 pub async fn peer_id(&self) -> u16 {
106 pub async fn is_server(&self) -> bool {
107 self.share.id == PeerID::Srv as u16
110 pub async fn close(self) {
111 self.share.close_tx.send(true).ok();
113 let mut tasks = self.share.tasks.lock().await;
114 while let Some(res) = tasks.join_next().await {
115 res.ok(); // TODO: handle error (?)
122 impl_share!(RudpReceiver);
123 impl_share!(RudpSender);
125 impl<S: UdpSender> ops::Deref for RudpReceiver<S> {
126 type Target = mpsc::UnboundedReceiver<InPkt>;
128 fn deref(&self) -> &Self::Target {
133 impl<S: UdpSender> ops::DerefMut for RudpReceiver<S> {
134 fn deref_mut(&mut self) -> &mut Self::Target {
141 timestamp: Option<Instant>,
142 chunks: Vec<OnceCell<Vec<u8>>>,
147 packets: Vec<Option<Vec<u8>>>, // char ** 😛
148 splits: HashMap<u16, Split>,
153 struct RecvWorker<R: UdpReceiver, S: UdpSender> {
154 share: Arc<RudpShare<S>>,
155 close: watch::Receiver<bool>,
156 chans: Arc<Vec<Mutex<RecvChan>>>,
157 pkt_tx: mpsc::UnboundedSender<InPkt>,
162 pub const PROTO_ID: u32 = 0x4f457403;
163 pub const UDP_PKT_SIZE: usize = 512;
164 pub const NUM_CHANS: usize = 3;
165 pub const REL_BUFFER: usize = 0x8000;
166 pub const INIT_SEQNUM: u16 = 65500;
167 pub const TIMEOUT: u64 = 30;
168 pub const PING_TIMEOUT: u64 = 5;
171 client::{connect, Sender as Client},
174 CtlType, InPkt, PeerID, Pkt, PktType, RudpReceiver, RudpSender, UdpReceiver, UdpSender,
178 macro_rules! ticker {
179 ($duration:expr, $close:expr, $body:block) => {
180 let mut interval = tokio::time::interval($duration);
182 while tokio::select!{
183 _ = interval.tick() => true,
184 _ = $close.changed() => false,