1 #![feature(cursor_remaining)]
2 #![feature(hash_drain_filter)]
12 use async_trait::async_trait;
13 use num_enum::TryFromPrimitive;
15 cell::{Cell, OnceCell},
22 sync::{mpsc, watch, Mutex, RwLock},
27 pub trait UdpSender: Send + Sync + 'static {
28 async fn send(&self, data: &[u8]) -> io::Result<()>;
32 pub trait UdpReceiver: Send + Sync + 'static {
33 async fn recv(&self) -> io::Result<Vec<u8>>;
36 #[derive(Debug, Copy, Clone, PartialEq)]
44 #[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
53 #[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
69 pub type InPkt = Result<Pkt<Vec<u8>>, error::Error>;
73 tx: watch::Sender<bool>,
74 rx: watch::Receiver<bool>,
80 acks: HashMap<u16, Ack>,
85 struct RudpShare<S: UdpSender> {
87 remote_id: RwLock<u16>,
88 chans: Vec<Mutex<Chan>>,
90 close_tx: watch::Sender<bool>,
91 tasks: Mutex<JoinSet<()>>,
95 pub struct RudpReceiver<S: UdpSender> {
96 share: Arc<RudpShare<S>>,
97 pkt_rx: mpsc::UnboundedReceiver<InPkt>,
101 pub struct RudpSender<S: UdpSender> {
102 share: Arc<RudpShare<S>>,
105 macro_rules! impl_share {
107 impl<S: UdpSender> $T<S> {
108 pub async fn peer_id(&self) -> u16 {
112 pub async fn is_server(&self) -> bool {
113 self.share.id == PeerID::Srv as u16
116 pub async fn close(self) {
117 self.share.close_tx.send(true).ok();
119 let mut tasks = self.share.tasks.lock().await;
120 while let Some(res) = tasks.join_next().await {
121 res.ok(); // TODO: handle error (?)
128 impl_share!(RudpReceiver);
129 impl_share!(RudpSender);
131 impl<S: UdpSender> ops::Deref for RudpReceiver<S> {
132 type Target = mpsc::UnboundedReceiver<InPkt>;
134 fn deref(&self) -> &Self::Target {
139 impl<S: UdpSender> ops::DerefMut for RudpReceiver<S> {
140 fn deref_mut(&mut self) -> &mut Self::Target {
147 timestamp: Option<Instant>,
148 chunks: Vec<OnceCell<Vec<u8>>>,
153 packets: Vec<Cell<Option<Vec<u8>>>>, // char ** 😛
154 splits: HashMap<u16, Split>,
159 struct RecvWorker<R: UdpReceiver, S: UdpSender> {
160 share: Arc<RudpShare<S>>,
161 close: watch::Receiver<bool>,
162 chans: Arc<Vec<Mutex<RecvChan>>>,
163 pkt_tx: mpsc::UnboundedSender<InPkt>,
168 pub const PROTO_ID: u32 = 0x4f457403;
169 pub const UDP_PKT_SIZE: usize = 512;
170 pub const NUM_CHANS: usize = 3;
171 pub const REL_BUFFER: usize = 0x8000;
172 pub const INIT_SEQNUM: u16 = 65500;
173 pub const TIMEOUT: u64 = 30;
174 pub const PING_TIMEOUT: u64 = 5;
177 client::{connect, Sender as Client},
180 CtlType, InPkt, PeerID, Pkt, PktType, RudpReceiver, RudpSender, UdpReceiver, UdpSender,
184 macro_rules! ticker {
185 ($duration:expr, $close:expr, $body:block) => {
186 let mut interval = tokio::time::interval($duration);
188 while tokio::select!{
189 _ = interval.tick() => true,
190 _ = $close.changed() => false,