]> git.lizzy.rs Git - mt_rudp.git/blob - src/lib.rs
76f0311dca127c322aea84bc89b5ae5ae9f42dd8
[mt_rudp.git] / src / lib.rs
1 #![feature(cursor_remaining)]
2 #![feature(hash_drain_filter)]
3 #![feature(once_cell)]
4 mod client;
5 mod error;
6 mod new;
7 mod recv;
8 mod send;
9
10 pub use prelude::*;
11
12 use async_trait::async_trait;
13 use delegate::delegate;
14 use num_enum::TryFromPrimitive;
15 use std::{cell::OnceCell, collections::HashMap, io, sync::Arc, time::Instant};
16 use tokio::{
17     sync::{mpsc, watch, Mutex, RwLock},
18     task::JoinSet,
19 };
20
21 #[async_trait]
22 pub trait UdpSender: Send + Sync + 'static {
23     async fn send(&self, data: &[u8]) -> io::Result<()>;
24 }
25
26 #[async_trait]
27 pub trait UdpReceiver: Send + Sync + 'static {
28     async fn recv(&self) -> io::Result<Vec<u8>>;
29 }
30
31 #[derive(Debug, Copy, Clone, PartialEq)]
32 #[repr(u16)]
33 pub enum PeerID {
34     Nil = 0,
35     Srv,
36     CltMin,
37 }
38
39 #[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
40 #[repr(u8)]
41 pub enum PktType {
42     Ctl = 0,
43     Orig,
44     Split,
45     Rel,
46 }
47
48 #[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
49 #[repr(u8)]
50 pub enum CtlType {
51     Ack = 0,
52     SetPeerID,
53     Ping,
54     Disco,
55 }
56
57 #[derive(Debug)]
58 pub struct Pkt<T> {
59     pub unrel: bool,
60     pub chan: u8,
61     pub data: T,
62 }
63
64 pub type InPkt = Result<Pkt<Vec<u8>>, error::Error>;
65
66 #[derive(Debug)]
67 struct Ack {
68     tx: watch::Sender<bool>,
69     rx: watch::Receiver<bool>,
70     data: Vec<u8>,
71 }
72
73 #[derive(Debug)]
74 struct Chan {
75     acks: HashMap<u16, Ack>,
76     seqnum: u16,
77 }
78
79 #[derive(Debug)]
80 struct RudpShare<S: UdpSender> {
81     id: u16,
82     remote_id: RwLock<u16>,
83     chans: Vec<Mutex<Chan>>,
84     udp_tx: S,
85     close_tx: watch::Sender<bool>,
86     tasks: Mutex<JoinSet<()>>,
87 }
88
89 #[derive(Debug)]
90 pub struct RudpReceiver<S: UdpSender> {
91     share: Arc<RudpShare<S>>,
92     pkt_rx: mpsc::UnboundedReceiver<InPkt>,
93 }
94
95 #[derive(Debug)]
96 pub struct RudpSender<S: UdpSender> {
97     share: Arc<RudpShare<S>>,
98 }
99
100 macro_rules! impl_share {
101     ($T:ident) => {
102         impl<S: UdpSender> $T<S> {
103             pub async fn peer_id(&self) -> u16 {
104                 self.share.id
105             }
106
107             pub async fn is_server(&self) -> bool {
108                 self.share.id == PeerID::Srv as u16
109             }
110
111             pub async fn close(self) {
112                 self.share.close_tx.send(true).ok();
113
114                 let mut tasks = self.share.tasks.lock().await;
115                 while let Some(res) = tasks.join_next().await {
116                     res.ok(); // TODO: handle error (?)
117                 }
118             }
119         }
120     };
121 }
122
123 impl_share!(RudpReceiver);
124 impl_share!(RudpSender);
125
126 impl<S: UdpSender> RudpReceiver<S> {
127     delegate! {
128         to self.pkt_rx {
129             pub async fn recv(&mut self) -> Option<InPkt>;
130         }
131     }
132 }
133
134 #[derive(Debug)]
135 struct Split {
136     timestamp: Option<Instant>,
137     chunks: Vec<OnceCell<Vec<u8>>>,
138     got: usize,
139 }
140
141 struct RecvChan {
142     packets: Vec<Option<Vec<u8>>>, // char ** ðŸ˜›
143     splits: HashMap<u16, Split>,
144     seqnum: u16,
145     num: u8,
146 }
147
148 struct RecvWorker<R: UdpReceiver, S: UdpSender> {
149     share: Arc<RudpShare<S>>,
150     close: watch::Receiver<bool>,
151     chans: Arc<Vec<Mutex<RecvChan>>>,
152     pkt_tx: mpsc::UnboundedSender<InPkt>,
153     udp_rx: R,
154 }
155
156 mod prelude {
157     pub const PROTO_ID: u32 = 0x4f457403;
158     pub const UDP_PKT_SIZE: usize = 512;
159     pub const NUM_CHANS: usize = 3;
160     pub const REL_BUFFER: usize = 0x8000;
161     pub const INIT_SEQNUM: u16 = 65500;
162     pub const TIMEOUT: u64 = 30;
163     pub const PING_TIMEOUT: u64 = 5;
164
165     pub use super::{
166         client::{connect, Sender as Client},
167         error::Error,
168         new::new,
169         CtlType, InPkt, PeerID, Pkt, PktType, RudpReceiver, RudpSender, UdpReceiver, UdpSender,
170     };
171
172     #[macro_export]
173     macro_rules! ticker {
174                 ($duration:expr, $close:expr, $body:block) => {
175                         let mut interval = tokio::time::interval($duration);
176
177                         while tokio::select!{
178                                 _ = interval.tick() => true,
179                                 _ = $close.changed() => false,
180                         } $body
181                 };
182         }
183 }