]> git.lizzy.rs Git - mt_rudp.git/blob - src/lib.rs
f0a91fe955a0f754a394ec075fbd4565f1f4d903
[mt_rudp.git] / src / lib.rs
1 #![feature(cursor_remaining)]
2 #![feature(hash_drain_filter)]
3 #![feature(once_cell)]
4 mod client;
5 mod error;
6 mod new;
7 mod recv;
8 mod send;
9
10 pub use prelude::*;
11
12 use async_trait::async_trait;
13 use num_enum::TryFromPrimitive;
14 use std::{
15     cell::{Cell, OnceCell},
16     collections::HashMap,
17     io, ops,
18     sync::Arc,
19     time::Instant,
20 };
21 use tokio::{
22     sync::{mpsc, watch, Mutex, RwLock},
23     task::JoinSet,
24 };
25
26 #[async_trait]
27 pub trait UdpSender: Send + Sync + 'static {
28     async fn send(&self, data: &[u8]) -> io::Result<()>;
29 }
30
31 #[async_trait]
32 pub trait UdpReceiver: Send + Sync + 'static {
33     async fn recv(&self) -> io::Result<Vec<u8>>;
34 }
35
36 #[derive(Debug, Copy, Clone, PartialEq)]
37 #[repr(u16)]
38 pub enum PeerID {
39     Nil = 0,
40     Srv,
41     CltMin,
42 }
43
44 #[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
45 #[repr(u8)]
46 pub enum PktType {
47     Ctl = 0,
48     Orig,
49     Split,
50     Rel,
51 }
52
53 #[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
54 #[repr(u8)]
55 pub enum CtlType {
56     Ack = 0,
57     SetPeerID,
58     Ping,
59     Disco,
60 }
61
62 #[derive(Debug)]
63 pub struct Pkt<T> {
64     pub unrel: bool,
65     pub chan: u8,
66     pub data: T,
67 }
68
69 pub type InPkt = Result<Pkt<Vec<u8>>, error::Error>;
70
71 #[derive(Debug)]
72 struct Ack {
73     tx: watch::Sender<bool>,
74     rx: watch::Receiver<bool>,
75     data: Vec<u8>,
76 }
77
78 #[derive(Debug)]
79 struct Chan {
80     acks: HashMap<u16, Ack>,
81     seqnum: u16,
82 }
83
84 #[derive(Debug)]
85 struct RudpShare<S: UdpSender> {
86     id: u16,
87     remote_id: RwLock<u16>,
88     chans: Vec<Mutex<Chan>>,
89     udp_tx: S,
90     close_tx: watch::Sender<bool>,
91     tasks: Mutex<JoinSet<()>>,
92 }
93
94 #[derive(Debug)]
95 pub struct RudpReceiver<S: UdpSender> {
96     share: Arc<RudpShare<S>>,
97     pkt_rx: mpsc::UnboundedReceiver<InPkt>,
98 }
99
100 #[derive(Debug)]
101 pub struct RudpSender<S: UdpSender> {
102     share: Arc<RudpShare<S>>,
103 }
104
105 macro_rules! impl_share {
106     ($T:ident) => {
107         impl<S: UdpSender> $T<S> {
108             pub async fn peer_id(&self) -> u16 {
109                 self.share.id
110             }
111
112             pub async fn is_server(&self) -> bool {
113                 self.share.id == PeerID::Srv as u16
114             }
115
116             pub async fn close(self) {
117                 self.share.close_tx.send(true).ok();
118
119                 let mut tasks = self.share.tasks.lock().await;
120                 while let Some(res) = tasks.join_next().await {
121                     res.ok(); // TODO: handle error (?)
122                 }
123             }
124         }
125     };
126 }
127
128 impl_share!(RudpReceiver);
129 impl_share!(RudpSender);
130
131 impl<S: UdpSender> ops::Deref for RudpReceiver<S> {
132     type Target = mpsc::UnboundedReceiver<InPkt>;
133
134     fn deref(&self) -> &Self::Target {
135         &self.pkt_rx
136     }
137 }
138
139 impl<S: UdpSender> ops::DerefMut for RudpReceiver<S> {
140     fn deref_mut(&mut self) -> &mut Self::Target {
141         &mut self.pkt_rx
142     }
143 }
144
145 #[derive(Debug)]
146 struct Split {
147     timestamp: Option<Instant>,
148     chunks: Vec<OnceCell<Vec<u8>>>,
149     got: usize,
150 }
151
152 struct RecvChan {
153     packets: Vec<Cell<Option<Vec<u8>>>>, // char ** ðŸ˜›
154     splits: HashMap<u16, Split>,
155     seqnum: u16,
156     num: u8,
157 }
158
159 struct RecvWorker<R: UdpReceiver, S: UdpSender> {
160     share: Arc<RudpShare<S>>,
161     close: watch::Receiver<bool>,
162     chans: Arc<Vec<Mutex<RecvChan>>>,
163     pkt_tx: mpsc::UnboundedSender<InPkt>,
164     udp_rx: R,
165 }
166
167 mod prelude {
168     pub const PROTO_ID: u32 = 0x4f457403;
169     pub const UDP_PKT_SIZE: usize = 512;
170     pub const NUM_CHANS: usize = 3;
171     pub const REL_BUFFER: usize = 0x8000;
172     pub const INIT_SEQNUM: u16 = 65500;
173     pub const TIMEOUT: u64 = 30;
174     pub const PING_TIMEOUT: u64 = 5;
175
176     pub use super::{
177         client::{connect, Sender as Client},
178         error::Error,
179         new::new,
180         CtlType, InPkt, PeerID, Pkt, PktType, RudpReceiver, RudpSender, UdpReceiver, UdpSender,
181     };
182
183     #[macro_export]
184     macro_rules! ticker {
185                 ($duration:expr, $close:expr, $body:block) => {
186                         let mut interval = tokio::time::interval($duration);
187
188                         while tokio::select!{
189                                 _ = interval.tick() => true,
190                                 _ = $close.changed() => false,
191                         } $body
192                 };
193         }
194 }