]> git.lizzy.rs Git - mt_rudp.git/blob - src/main.rs
implement splits
[mt_rudp.git] / src / main.rs
1 #![feature(cursor_remaining)]
2 #![feature(hash_drain_filter)]
3 #![feature(once_cell)]
4 mod client;
5 pub mod error;
6 mod recv_worker;
7
8 use async_trait::async_trait;
9 use byteorder::{BigEndian, WriteBytesExt};
10 pub use client::{connect, Sender as Client};
11 use num_enum::TryFromPrimitive;
12 use std::{
13     io::{self, Write},
14     ops,
15     sync::Arc,
16 };
17 use tokio::sync::{mpsc, RwLock};
18
19 pub const PROTO_ID: u32 = 0x4f457403;
20 pub const UDP_PKT_SIZE: usize = 512;
21 pub const NUM_CHANS: usize = 3;
22 pub const REL_BUFFER: usize = 0x8000;
23 pub const INIT_SEQNUM: u16 = 65500;
24 pub const TIMEOUT: u64 = 30;
25
26 #[async_trait]
27 pub trait UdpSender: Send + Sync + 'static {
28     async fn send(&self, data: Vec<u8>) -> io::Result<()>;
29 }
30
31 #[async_trait]
32 pub trait UdpReceiver: Send + Sync + 'static {
33     async fn recv(&self) -> io::Result<Vec<u8>>;
34 }
35
36 #[derive(Debug, Copy, Clone, PartialEq)]
37 #[repr(u16)]
38 pub enum PeerID {
39     Nil = 0,
40     Srv,
41     CltMin,
42 }
43
44 #[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
45 #[repr(u8)]
46 pub enum PktType {
47     Ctl = 0,
48     Orig,
49     Split,
50     Rel,
51 }
52
53 #[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
54 #[repr(u8)]
55 pub enum CtlType {
56     Ack = 0,
57     SetPeerID,
58     Ping,
59     Disco,
60 }
61
62 #[derive(Debug)]
63 pub struct Pkt<T> {
64     unrel: bool,
65     chan: u8,
66     data: T,
67 }
68
69 pub type Error = error::Error;
70 pub type InPkt = Result<Pkt<Vec<u8>>, Error>;
71
72 #[derive(Debug)]
73 pub struct AckChan;
74
75 #[derive(Debug)]
76 pub struct RudpShare<S: UdpSender> {
77     pub id: u16,
78     pub remote_id: RwLock<u16>,
79     pub chans: Vec<AckChan>,
80     udp_tx: S,
81 }
82
83 #[derive(Debug)]
84 pub struct RudpReceiver<S: UdpSender> {
85     share: Arc<RudpShare<S>>,
86     pkt_rx: mpsc::UnboundedReceiver<InPkt>,
87 }
88
89 #[derive(Debug)]
90 pub struct RudpSender<S: UdpSender> {
91     share: Arc<RudpShare<S>>,
92 }
93
94 impl<S: UdpSender> RudpShare<S> {
95     pub async fn send(&self, tp: PktType, pkt: Pkt<&[u8]>) -> io::Result<()> {
96         let mut buf = Vec::with_capacity(4 + 2 + 1 + 1 + pkt.data.len());
97         buf.write_u32::<BigEndian>(PROTO_ID)?;
98         buf.write_u16::<BigEndian>(*self.remote_id.read().await)?;
99         buf.write_u8(pkt.chan as u8)?;
100         buf.write_u8(tp as u8)?;
101         buf.write(pkt.data)?;
102
103         self.udp_tx.send(buf).await?;
104
105         Ok(())
106     }
107 }
108
109 impl<S: UdpSender> RudpSender<S> {
110     pub async fn send(&self, pkt: Pkt<&[u8]>) -> io::Result<()> {
111         self.share.send(PktType::Orig, pkt).await // TODO
112     }
113
114     pub async fn peer_id(&self) -> u16 {
115         self.share.id
116     }
117
118     pub async fn is_server(&self) -> bool {
119         self.share.id == PeerID::Srv as u16
120     }
121 }
122
123 impl<S: UdpSender> RudpReceiver<S> {
124     pub async fn peer_id(&self) -> u16 {
125         self.share.id
126     }
127
128     pub async fn is_server(&self) -> bool {
129         self.share.id == PeerID::Srv as u16
130     }
131 }
132
133 impl<S: UdpSender> ops::Deref for RudpReceiver<S> {
134     type Target = mpsc::UnboundedReceiver<InPkt>;
135
136     fn deref(&self) -> &Self::Target {
137         &self.pkt_rx
138     }
139 }
140
141 impl<S: UdpSender> ops::DerefMut for RudpReceiver<S> {
142     fn deref_mut(&mut self) -> &mut Self::Target {
143         &mut self.pkt_rx
144     }
145 }
146
147 pub fn new<S: UdpSender, R: UdpReceiver>(
148     id: u16,
149     remote_id: u16,
150     udp_tx: S,
151     udp_rx: R,
152 ) -> (RudpSender<S>, RudpReceiver<S>) {
153     let (pkt_tx, pkt_rx) = mpsc::unbounded_channel();
154
155     let share = Arc::new(RudpShare {
156         id,
157         remote_id: RwLock::new(remote_id),
158         udp_tx,
159         chans: (0..NUM_CHANS).map(|_| AckChan).collect(),
160     });
161     let recv_share = Arc::clone(&share);
162
163     tokio::spawn(async {
164         let worker = recv_worker::RecvWorker::new(udp_rx, recv_share, pkt_tx);
165         worker.run().await;
166     });
167
168     (
169         RudpSender {
170             share: Arc::clone(&share),
171         },
172         RudpReceiver { share, pkt_rx },
173     )
174 }
175
176 // connect
177
178 #[tokio::main]
179 async fn main() -> io::Result<()> {
180     let (tx, mut rx) = connect("127.0.0.1:30000").await?;
181
182     let mut mtpkt = vec![];
183     mtpkt.write_u16::<BigEndian>(2)?; // high level type
184     mtpkt.write_u8(29)?; // serialize ver
185     mtpkt.write_u16::<BigEndian>(0)?; // compression modes
186     mtpkt.write_u16::<BigEndian>(40)?; // MinProtoVer
187     mtpkt.write_u16::<BigEndian>(40)?; // MaxProtoVer
188     mtpkt.write_u16::<BigEndian>(3)?; // player name length
189     mtpkt.write(b"foo")?; // player name
190
191     tx.send(Pkt {
192         unrel: true,
193         chan: 1,
194         data: &mtpkt,
195     })
196     .await?;
197
198     while let Some(result) = rx.recv().await {
199         match result {
200             Ok(pkt) => {
201                 io::stdout().write(pkt.data.as_slice())?;
202             }
203             Err(err) => eprintln!("Error: {}", err),
204         }
205     }
206     println!("disco");
207
208     Ok(())
209 }