]> git.lizzy.rs Git - mt_rudp.git/blob - src/main.rs
finish receiver
[mt_rudp.git] / src / main.rs
1 #![feature(cursor_remaining)]
2 #![feature(hash_drain_filter)]
3 #![feature(once_cell)]
4 mod client;
5 pub mod error;
6 mod recv_worker;
7
8 use async_trait::async_trait;
9 use byteorder::{BigEndian, WriteBytesExt};
10 pub use client::{connect, Sender as Client};
11 use num_enum::TryFromPrimitive;
12 use std::{
13     collections::HashMap,
14     io::{self, Write},
15     ops,
16     sync::Arc,
17 };
18 use tokio::sync::{mpsc, Mutex, RwLock};
19
20 pub const PROTO_ID: u32 = 0x4f457403;
21 pub const UDP_PKT_SIZE: usize = 512;
22 pub const NUM_CHANS: usize = 3;
23 pub const REL_BUFFER: usize = 0x8000;
24 pub const INIT_SEQNUM: u16 = 65500;
25 pub const TIMEOUT: u64 = 30;
26
27 #[async_trait]
28 pub trait UdpSender: Send + Sync + 'static {
29     async fn send(&self, data: Vec<u8>) -> io::Result<()>;
30 }
31
32 #[async_trait]
33 pub trait UdpReceiver: Send + Sync + 'static {
34     async fn recv(&self) -> io::Result<Vec<u8>>;
35 }
36
37 #[derive(Debug, Copy, Clone, PartialEq)]
38 #[repr(u16)]
39 pub enum PeerID {
40     Nil = 0,
41     Srv,
42     CltMin,
43 }
44
45 #[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
46 #[repr(u8)]
47 pub enum PktType {
48     Ctl = 0,
49     Orig,
50     Split,
51     Rel,
52 }
53
54 #[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
55 #[repr(u8)]
56 pub enum CtlType {
57     Ack = 0,
58     SetPeerID,
59     Ping,
60     Disco,
61 }
62
63 #[derive(Debug)]
64 pub struct Pkt<T> {
65     unrel: bool,
66     chan: u8,
67     data: T,
68 }
69
70 pub type Error = error::Error;
71 pub type InPkt = Result<Pkt<Vec<u8>>, Error>;
72
73 #[derive(Debug)]
74 pub struct RudpShare<S: UdpSender> {
75     pub id: u16,
76     pub remote_id: RwLock<u16>,
77     pub ack_chans: Mutex<HashMap<u16, mpsc::Sender<()>>>,
78     udp_tx: S,
79 }
80
81 #[derive(Debug)]
82 pub struct RudpReceiver<S: UdpSender> {
83     share: Arc<RudpShare<S>>,
84     pkt_rx: mpsc::UnboundedReceiver<InPkt>,
85 }
86
87 #[derive(Debug)]
88 pub struct RudpSender<S: UdpSender> {
89     share: Arc<RudpShare<S>>,
90 }
91
92 impl<S: UdpSender> RudpShare<S> {
93     pub async fn send(&self, tp: PktType, pkt: Pkt<&[u8]>) -> io::Result<()> {
94         let mut buf = Vec::with_capacity(4 + 2 + 1 + 1 + pkt.data.len());
95         buf.write_u32::<BigEndian>(PROTO_ID)?;
96         buf.write_u16::<BigEndian>(*self.remote_id.read().await)?;
97         buf.write_u8(pkt.chan as u8)?;
98         buf.write_u8(tp as u8)?;
99         buf.write(pkt.data)?;
100
101         self.udp_tx.send(buf).await?;
102
103         Ok(())
104     }
105 }
106
107 impl<S: UdpSender> RudpSender<S> {
108     pub async fn send(&self, pkt: Pkt<&[u8]>) -> io::Result<()> {
109         self.share.send(PktType::Orig, pkt).await // TODO
110     }
111
112     pub async fn peer_id(&self) -> u16 {
113         self.share.id
114     }
115
116     pub async fn is_server(&self) -> bool {
117         self.share.id == PeerID::Srv as u16
118     }
119 }
120
121 impl<S: UdpSender> RudpReceiver<S> {
122     pub async fn peer_id(&self) -> u16 {
123         self.share.id
124     }
125
126     pub async fn is_server(&self) -> bool {
127         self.share.id == PeerID::Srv as u16
128     }
129 }
130
131 impl<S: UdpSender> ops::Deref for RudpReceiver<S> {
132     type Target = mpsc::UnboundedReceiver<InPkt>;
133
134     fn deref(&self) -> &Self::Target {
135         &self.pkt_rx
136     }
137 }
138
139 impl<S: UdpSender> ops::DerefMut for RudpReceiver<S> {
140     fn deref_mut(&mut self) -> &mut Self::Target {
141         &mut self.pkt_rx
142     }
143 }
144
145 pub fn new<S: UdpSender, R: UdpReceiver>(
146     id: u16,
147     remote_id: u16,
148     udp_tx: S,
149     udp_rx: R,
150 ) -> (RudpSender<S>, RudpReceiver<S>) {
151     let (pkt_tx, pkt_rx) = mpsc::unbounded_channel();
152
153     let share = Arc::new(RudpShare {
154         id,
155         remote_id: RwLock::new(remote_id),
156         udp_tx,
157         ack_chans: Mutex::new(HashMap::new()),
158     });
159     let recv_share = Arc::clone(&share);
160
161     tokio::spawn(async {
162         let worker = recv_worker::RecvWorker::new(udp_rx, recv_share, pkt_tx);
163         worker.run().await;
164     });
165
166     (
167         RudpSender {
168             share: Arc::clone(&share),
169         },
170         RudpReceiver { share, pkt_rx },
171     )
172 }
173
174 // connect
175
176 #[tokio::main]
177 async fn main() -> io::Result<()> {
178     let (tx, mut rx) = connect("127.0.0.1:30000").await?;
179
180     let mut mtpkt = vec![];
181     mtpkt.write_u16::<BigEndian>(2)?; // high level type
182     mtpkt.write_u8(29)?; // serialize ver
183     mtpkt.write_u16::<BigEndian>(0)?; // compression modes
184     mtpkt.write_u16::<BigEndian>(40)?; // MinProtoVer
185     mtpkt.write_u16::<BigEndian>(40)?; // MaxProtoVer
186     mtpkt.write_u16::<BigEndian>(3)?; // player name length
187     mtpkt.write(b"foo")?; // player name
188
189     tx.send(Pkt {
190         unrel: true,
191         chan: 1,
192         data: &mtpkt,
193     })
194     .await?;
195
196     while let Some(result) = rx.recv().await {
197         match result {
198             Ok(pkt) => {
199                 io::stdout().write(pkt.data.as_slice())?;
200             }
201             Err(err) => eprintln!("Error: {}", err),
202         }
203     }
204     println!("disco");
205
206     Ok(())
207 }