]> git.lizzy.rs Git - mt_rudp.git/blob - src/lib.rs
Get rid of Cell usage
[mt_rudp.git] / src / lib.rs
1 #![feature(cursor_remaining)]
2 #![feature(hash_drain_filter)]
3 #![feature(once_cell)]
4 mod client;
5 mod error;
6 mod new;
7 mod recv;
8 mod send;
9
10 pub use prelude::*;
11
12 use async_trait::async_trait;
13 use num_enum::TryFromPrimitive;
14 use std::{cell::OnceCell, collections::HashMap, io, ops, sync::Arc, time::Instant};
15 use tokio::{
16     sync::{mpsc, watch, Mutex, RwLock},
17     task::JoinSet,
18 };
19
20 #[async_trait]
21 pub trait UdpSender: Send + Sync + 'static {
22     async fn send(&self, data: &[u8]) -> io::Result<()>;
23 }
24
25 #[async_trait]
26 pub trait UdpReceiver: Send + Sync + 'static {
27     async fn recv(&self) -> io::Result<Vec<u8>>;
28 }
29
30 #[derive(Debug, Copy, Clone, PartialEq)]
31 #[repr(u16)]
32 pub enum PeerID {
33     Nil = 0,
34     Srv,
35     CltMin,
36 }
37
38 #[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
39 #[repr(u8)]
40 pub enum PktType {
41     Ctl = 0,
42     Orig,
43     Split,
44     Rel,
45 }
46
47 #[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
48 #[repr(u8)]
49 pub enum CtlType {
50     Ack = 0,
51     SetPeerID,
52     Ping,
53     Disco,
54 }
55
56 #[derive(Debug)]
57 pub struct Pkt<T> {
58     pub unrel: bool,
59     pub chan: u8,
60     pub data: T,
61 }
62
63 pub type InPkt = Result<Pkt<Vec<u8>>, error::Error>;
64
65 #[derive(Debug)]
66 struct Ack {
67     tx: watch::Sender<bool>,
68     rx: watch::Receiver<bool>,
69     data: Vec<u8>,
70 }
71
72 #[derive(Debug)]
73 struct Chan {
74     acks: HashMap<u16, Ack>,
75     seqnum: u16,
76 }
77
78 #[derive(Debug)]
79 struct RudpShare<S: UdpSender> {
80     id: u16,
81     remote_id: RwLock<u16>,
82     chans: Vec<Mutex<Chan>>,
83     udp_tx: S,
84     close_tx: watch::Sender<bool>,
85     tasks: Mutex<JoinSet<()>>,
86 }
87
88 #[derive(Debug)]
89 pub struct RudpReceiver<S: UdpSender> {
90     share: Arc<RudpShare<S>>,
91     pkt_rx: mpsc::UnboundedReceiver<InPkt>,
92 }
93
94 #[derive(Debug)]
95 pub struct RudpSender<S: UdpSender> {
96     share: Arc<RudpShare<S>>,
97 }
98
99 macro_rules! impl_share {
100     ($T:ident) => {
101         impl<S: UdpSender> $T<S> {
102             pub async fn peer_id(&self) -> u16 {
103                 self.share.id
104             }
105
106             pub async fn is_server(&self) -> bool {
107                 self.share.id == PeerID::Srv as u16
108             }
109
110             pub async fn close(self) {
111                 self.share.close_tx.send(true).ok();
112
113                 let mut tasks = self.share.tasks.lock().await;
114                 while let Some(res) = tasks.join_next().await {
115                     res.ok(); // TODO: handle error (?)
116                 }
117             }
118         }
119     };
120 }
121
122 impl_share!(RudpReceiver);
123 impl_share!(RudpSender);
124
125 impl<S: UdpSender> ops::Deref for RudpReceiver<S> {
126     type Target = mpsc::UnboundedReceiver<InPkt>;
127
128     fn deref(&self) -> &Self::Target {
129         &self.pkt_rx
130     }
131 }
132
133 impl<S: UdpSender> ops::DerefMut for RudpReceiver<S> {
134     fn deref_mut(&mut self) -> &mut Self::Target {
135         &mut self.pkt_rx
136     }
137 }
138
139 #[derive(Debug)]
140 struct Split {
141     timestamp: Option<Instant>,
142     chunks: Vec<OnceCell<Vec<u8>>>,
143     got: usize,
144 }
145
146 struct RecvChan {
147     packets: Vec<Option<Vec<u8>>>, // char ** ðŸ˜›
148     splits: HashMap<u16, Split>,
149     seqnum: u16,
150     num: u8,
151 }
152
153 struct RecvWorker<R: UdpReceiver, S: UdpSender> {
154     share: Arc<RudpShare<S>>,
155     close: watch::Receiver<bool>,
156     chans: Arc<Vec<Mutex<RecvChan>>>,
157     pkt_tx: mpsc::UnboundedSender<InPkt>,
158     udp_rx: R,
159 }
160
161 mod prelude {
162     pub const PROTO_ID: u32 = 0x4f457403;
163     pub const UDP_PKT_SIZE: usize = 512;
164     pub const NUM_CHANS: usize = 3;
165     pub const REL_BUFFER: usize = 0x8000;
166     pub const INIT_SEQNUM: u16 = 65500;
167     pub const TIMEOUT: u64 = 30;
168     pub const PING_TIMEOUT: u64 = 5;
169
170     pub use super::{
171         client::{connect, Sender as Client},
172         error::Error,
173         new::new,
174         CtlType, InPkt, PeerID, Pkt, PktType, RudpReceiver, RudpSender, UdpReceiver, UdpSender,
175     };
176
177     #[macro_export]
178     macro_rules! ticker {
179                 ($duration:expr, $close:expr, $body:block) => {
180                         let mut interval = tokio::time::interval($duration);
181
182                         while tokio::select!{
183                                 _ = interval.tick() => true,
184                                 _ = $close.changed() => false,
185                         } $body
186                 };
187         }
188 }