]> git.lizzy.rs Git - mt_rudp.git/blob - src/main.rs
69c87976ac81ede6d6218d454e859b43559ec239
[mt_rudp.git] / src / main.rs
1 #![feature(cursor_remaining)]
2 #![feature(hash_drain_filter)]
3 #![feature(once_cell)]
4 mod client;
5 pub mod error;
6 mod new;
7 mod recv;
8 mod send;
9
10 use async_trait::async_trait;
11 use byteorder::{BigEndian, WriteBytesExt};
12 pub use client::{connect, Sender as Client};
13 pub use new::new;
14 use num_enum::TryFromPrimitive;
15 use pretty_hex::PrettyHex;
16 use std::{
17     collections::HashMap,
18     io::{self, Write},
19     ops,
20     sync::Arc,
21     time::Duration,
22 };
23 use tokio::{
24     sync::{mpsc, watch, Mutex, RwLock},
25     task::JoinSet,
26 };
27
28 pub const PROTO_ID: u32 = 0x4f457403;
29 pub const UDP_PKT_SIZE: usize = 512;
30 pub const NUM_CHANS: usize = 3;
31 pub const REL_BUFFER: usize = 0x8000;
32 pub const INIT_SEQNUM: u16 = 65500;
33 pub const TIMEOUT: u64 = 30;
34 pub const PING_TIMEOUT: u64 = 5;
35
36 mod ticker_mod {
37     #[macro_export]
38     macro_rules! ticker {
39                 ($duration:expr, $close:expr, $body:block) => {
40                         let mut interval = tokio::time::interval($duration);
41
42                         while tokio::select!{
43                                 _ = interval.tick() => true,
44                                 _ = $close.changed() => false,
45                         } $body
46                 };
47         }
48
49     //pub(crate) use ticker;
50 }
51
52 #[async_trait]
53 pub trait UdpSender: Send + Sync + 'static {
54     async fn send(&self, data: &[u8]) -> io::Result<()>;
55 }
56
57 #[async_trait]
58 pub trait UdpReceiver: Send + Sync + 'static {
59     async fn recv(&self) -> io::Result<Vec<u8>>;
60 }
61
62 #[derive(Debug, Copy, Clone, PartialEq)]
63 #[repr(u16)]
64 pub enum PeerID {
65     Nil = 0,
66     Srv,
67     CltMin,
68 }
69
70 #[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
71 #[repr(u8)]
72 pub enum PktType {
73     Ctl = 0,
74     Orig,
75     Split,
76     Rel,
77 }
78
79 #[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
80 #[repr(u8)]
81 pub enum CtlType {
82     Ack = 0,
83     SetPeerID,
84     Ping,
85     Disco,
86 }
87
88 #[derive(Debug)]
89 pub struct Pkt<T> {
90     unrel: bool,
91     chan: u8,
92     data: T,
93 }
94
95 pub type Error = error::Error;
96 pub type InPkt = Result<Pkt<Vec<u8>>, Error>;
97
98 #[derive(Debug)]
99 struct Ack {
100     tx: watch::Sender<bool>,
101     rx: watch::Receiver<bool>,
102     data: Vec<u8>,
103 }
104
105 #[derive(Debug)]
106 struct Chan {
107     acks: HashMap<u16, Ack>,
108     seqnum: u16,
109 }
110
111 #[derive(Debug)]
112 pub struct RudpShare<S: UdpSender> {
113     id: u16,
114     remote_id: RwLock<u16>,
115     chans: Vec<Mutex<Chan>>,
116     udp_tx: S,
117     close_tx: watch::Sender<bool>,
118     tasks: Mutex<JoinSet<()>>,
119 }
120
121 #[derive(Debug)]
122 pub struct RudpReceiver<S: UdpSender> {
123     share: Arc<RudpShare<S>>,
124     pkt_rx: mpsc::UnboundedReceiver<InPkt>,
125 }
126
127 #[derive(Debug)]
128 pub struct RudpSender<S: UdpSender> {
129     share: Arc<RudpShare<S>>,
130 }
131
132 macro_rules! impl_share {
133     ($T:ident) => {
134         impl<S: UdpSender> $T<S> {
135             pub async fn peer_id(&self) -> u16 {
136                 self.share.id
137             }
138
139             pub async fn is_server(&self) -> bool {
140                 self.share.id == PeerID::Srv as u16
141             }
142
143             pub async fn close(self) {
144                 self.share.close_tx.send(true).ok();
145
146                 let mut tasks = self.share.tasks.lock().await;
147                 while let Some(res) = tasks.join_next().await {
148                     res.ok(); // TODO: handle error (?)
149                 }
150             }
151         }
152     };
153 }
154
155 impl_share!(RudpReceiver);
156 impl_share!(RudpSender);
157
158 impl<S: UdpSender> ops::Deref for RudpReceiver<S> {
159     type Target = mpsc::UnboundedReceiver<InPkt>;
160
161     fn deref(&self) -> &Self::Target {
162         &self.pkt_rx
163     }
164 }
165
166 impl<S: UdpSender> ops::DerefMut for RudpReceiver<S> {
167     fn deref_mut(&mut self) -> &mut Self::Target {
168         &mut self.pkt_rx
169     }
170 }
171
172 async fn example(tx: &RudpSender<Client>, rx: &mut RudpReceiver<Client>) -> io::Result<()> {
173     // send hello packet
174     let mut mtpkt = vec![];
175     mtpkt.write_u16::<BigEndian>(2)?; // high level type
176     mtpkt.write_u8(29)?; // serialize ver
177     mtpkt.write_u16::<BigEndian>(0)?; // compression modes
178     mtpkt.write_u16::<BigEndian>(40)?; // MinProtoVer
179     mtpkt.write_u16::<BigEndian>(40)?; // MaxProtoVer
180     mtpkt.write_u16::<BigEndian>(6)?; // player name length
181     mtpkt.write(b"foobar")?; // player name
182
183     tx.send(Pkt {
184         unrel: true,
185         chan: 1,
186         data: &mtpkt,
187     })
188     .await?;
189
190     // handle incoming packets
191     while let Some(result) = rx.recv().await {
192         match result {
193             Ok(pkt) => {
194                 println!("{}", pkt.data.hex_dump());
195             }
196             Err(err) => eprintln!("Error: {}", err),
197         }
198     }
199
200     Ok(())
201 }
202
203 #[tokio::main]
204 async fn main() -> io::Result<()> {
205     let (tx, mut rx) = connect("127.0.0.1:30000").await?;
206
207     tokio::select! {
208         _ = tokio::signal::ctrl_c() => println!("canceled"),
209         res = example(&tx, &mut rx) => {
210             res?;
211             println!("disconnected");
212         }
213     }
214
215     // close either the receiver or the sender
216     // this shuts down associated tasks
217     rx.close().await;
218
219     Ok(())
220 }