]> git.lizzy.rs Git - mt_rudp.git/blob - src/main.rs
clean shutdown; send reliables
[mt_rudp.git] / src / main.rs
1 #![feature(cursor_remaining)]
2 #![feature(hash_drain_filter)]
3 #![feature(once_cell)]
4 mod client;
5 pub mod error;
6 mod new;
7 mod recv;
8 mod send;
9
10 use async_trait::async_trait;
11 use byteorder::{BigEndian, WriteBytesExt};
12 pub use client::{connect, Sender as Client};
13 pub use new::new;
14 use num_enum::TryFromPrimitive;
15 use pretty_hex::PrettyHex;
16 use std::{
17     collections::HashMap,
18     io::{self, Write},
19     ops,
20     sync::Arc,
21     time::Duration,
22 };
23 use tokio::{
24     sync::{mpsc, watch, Mutex, RwLock},
25     task::JoinSet,
26 };
27
28 pub const PROTO_ID: u32 = 0x4f457403;
29 pub const UDP_PKT_SIZE: usize = 512;
30 pub const NUM_CHANS: usize = 3;
31 pub const REL_BUFFER: usize = 0x8000;
32 pub const INIT_SEQNUM: u16 = 65500;
33 pub const TIMEOUT: u64 = 30;
34
35 mod ticker_mod {
36     #[macro_export]
37     macro_rules! ticker {
38                 ($duration:expr, $close:expr, $body:block) => {
39                         let mut interval = tokio::time::interval($duration);
40
41                         while tokio::select!{
42                                 _ = interval.tick() => true,
43                                 _ = $close.changed() => false,
44                         } $body
45                 };
46         }
47
48     //pub(crate) use ticker;
49 }
50
51 #[async_trait]
52 pub trait UdpSender: Send + Sync + 'static {
53     async fn send(&self, data: &[u8]) -> io::Result<()>;
54 }
55
56 #[async_trait]
57 pub trait UdpReceiver: Send + Sync + 'static {
58     async fn recv(&self) -> io::Result<Vec<u8>>;
59 }
60
61 #[derive(Debug, Copy, Clone, PartialEq)]
62 #[repr(u16)]
63 pub enum PeerID {
64     Nil = 0,
65     Srv,
66     CltMin,
67 }
68
69 #[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
70 #[repr(u8)]
71 pub enum PktType {
72     Ctl = 0,
73     Orig,
74     Split,
75     Rel,
76 }
77
78 #[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
79 #[repr(u8)]
80 pub enum CtlType {
81     Ack = 0,
82     SetPeerID,
83     Ping,
84     Disco,
85 }
86
87 #[derive(Debug)]
88 pub struct Pkt<T> {
89     unrel: bool,
90     chan: u8,
91     data: T,
92 }
93
94 pub type Error = error::Error;
95 pub type InPkt = Result<Pkt<Vec<u8>>, Error>;
96
97 #[derive(Debug)]
98 struct Ack {
99     tx: watch::Sender<bool>,
100     rx: watch::Receiver<bool>,
101     data: Vec<u8>,
102 }
103
104 #[derive(Debug)]
105 struct Chan {
106     acks: HashMap<u16, Ack>,
107     seqnum: u16,
108 }
109
110 #[derive(Debug)]
111 pub struct RudpShare<S: UdpSender> {
112     id: u16,
113     remote_id: RwLock<u16>,
114     chans: Vec<Mutex<Chan>>,
115     udp_tx: S,
116     close_tx: watch::Sender<bool>,
117     tasks: Mutex<JoinSet<()>>,
118 }
119
120 #[derive(Debug)]
121 pub struct RudpReceiver<S: UdpSender> {
122     share: Arc<RudpShare<S>>,
123     pkt_rx: mpsc::UnboundedReceiver<InPkt>,
124 }
125
126 #[derive(Debug)]
127 pub struct RudpSender<S: UdpSender> {
128     share: Arc<RudpShare<S>>,
129 }
130
131 macro_rules! impl_share {
132     ($T:ident) => {
133         impl<S: UdpSender> $T<S> {
134             pub async fn peer_id(&self) -> u16 {
135                 self.share.id
136             }
137
138             pub async fn is_server(&self) -> bool {
139                 self.share.id == PeerID::Srv as u16
140             }
141
142             pub async fn close(self) {
143                 self.share.close_tx.send(true).ok();
144
145                 let mut tasks = self.share.tasks.lock().await;
146                 while let Some(res) = tasks.join_next().await {
147                     res.ok(); // TODO: handle error (?)
148                 }
149             }
150         }
151     };
152 }
153
154 impl_share!(RudpReceiver);
155 impl_share!(RudpSender);
156
157 impl<S: UdpSender> ops::Deref for RudpReceiver<S> {
158     type Target = mpsc::UnboundedReceiver<InPkt>;
159
160     fn deref(&self) -> &Self::Target {
161         &self.pkt_rx
162     }
163 }
164
165 impl<S: UdpSender> ops::DerefMut for RudpReceiver<S> {
166     fn deref_mut(&mut self) -> &mut Self::Target {
167         &mut self.pkt_rx
168     }
169 }
170
171 async fn example(tx: &RudpSender<Client>, rx: &mut RudpReceiver<Client>) -> io::Result<()> {
172     // send hello packet
173     let mut mtpkt = vec![];
174     mtpkt.write_u16::<BigEndian>(2)?; // high level type
175     mtpkt.write_u8(29)?; // serialize ver
176     mtpkt.write_u16::<BigEndian>(0)?; // compression modes
177     mtpkt.write_u16::<BigEndian>(40)?; // MinProtoVer
178     mtpkt.write_u16::<BigEndian>(40)?; // MaxProtoVer
179     mtpkt.write_u16::<BigEndian>(6)?; // player name length
180     mtpkt.write(b"foobar")?; // player name
181
182     tx.send(Pkt {
183         unrel: true,
184         chan: 1,
185         data: &mtpkt,
186     })
187     .await?;
188
189     // handle incoming packets
190     while let Some(result) = rx.recv().await {
191         match result {
192             Ok(pkt) => {
193                 println!("{}", pkt.data.hex_dump());
194             }
195             Err(err) => eprintln!("Error: {}", err),
196         }
197     }
198
199     Ok(())
200 }
201
202 #[tokio::main]
203 async fn main() -> io::Result<()> {
204     let (tx, mut rx) = connect("127.0.0.1:30000").await?;
205
206     tokio::select! {
207         _ = tokio::signal::ctrl_c() => println!("canceled"),
208         res = example(&tx, &mut rx) => {
209             res?;
210             println!("disconnected");
211         }
212     }
213
214     // close either the receiver or the sender
215     // this shuts down associated tasks
216     rx.close().await;
217
218     Ok(())
219 }