1 #![feature(cursor_remaining)]
2 #![feature(hash_drain_filter)]
8 use async_trait::async_trait;
9 use byteorder::{BigEndian, WriteBytesExt};
10 pub use client::{connect, Sender as Client};
11 use num_enum::TryFromPrimitive;
18 use tokio::sync::{mpsc, watch, Mutex, RwLock};
20 pub const PROTO_ID: u32 = 0x4f457403;
21 pub const UDP_PKT_SIZE: usize = 512;
22 pub const NUM_CHANS: usize = 3;
23 pub const REL_BUFFER: usize = 0x8000;
24 pub const INIT_SEQNUM: u16 = 65500;
25 pub const TIMEOUT: u64 = 30;
28 pub trait UdpSender: Send + Sync + 'static {
29 async fn send(&self, data: Vec<u8>) -> io::Result<()>;
33 pub trait UdpReceiver: Send + Sync + 'static {
34 async fn recv(&self) -> io::Result<Vec<u8>>;
37 #[derive(Debug, Copy, Clone, PartialEq)]
45 #[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
54 #[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
70 pub type Error = error::Error;
71 pub type InPkt = Result<Pkt<Vec<u8>>, Error>;
72 type AckChan = (watch::Sender<bool>, watch::Receiver<bool>);
75 pub struct RudpShare<S: UdpSender> {
77 pub remote_id: RwLock<u16>,
78 pub ack_chans: Mutex<HashMap<u16, AckChan>>,
83 pub struct RudpReceiver<S: UdpSender> {
84 share: Arc<RudpShare<S>>,
85 pkt_rx: mpsc::UnboundedReceiver<InPkt>,
89 pub struct RudpSender<S: UdpSender> {
90 share: Arc<RudpShare<S>>,
93 impl<S: UdpSender> RudpShare<S> {
94 pub async fn send(&self, tp: PktType, pkt: Pkt<&[u8]>) -> io::Result<()> {
95 let mut buf = Vec::with_capacity(4 + 2 + 1 + 1 + pkt.data.len());
96 buf.write_u32::<BigEndian>(PROTO_ID)?;
97 buf.write_u16::<BigEndian>(*self.remote_id.read().await)?;
98 buf.write_u8(pkt.chan as u8)?;
99 buf.write_u8(tp as u8)?;
100 buf.write(pkt.data)?;
102 self.udp_tx.send(buf).await?;
108 impl<S: UdpSender> RudpSender<S> {
109 pub async fn send(&self, pkt: Pkt<&[u8]>) -> io::Result<()> {
110 self.share.send(PktType::Orig, pkt).await // TODO
113 pub async fn peer_id(&self) -> u16 {
117 pub async fn is_server(&self) -> bool {
118 self.share.id == PeerID::Srv as u16
122 impl<S: UdpSender> RudpReceiver<S> {
123 pub async fn peer_id(&self) -> u16 {
127 pub async fn is_server(&self) -> bool {
128 self.share.id == PeerID::Srv as u16
132 impl<S: UdpSender> ops::Deref for RudpReceiver<S> {
133 type Target = mpsc::UnboundedReceiver<InPkt>;
135 fn deref(&self) -> &Self::Target {
140 impl<S: UdpSender> ops::DerefMut for RudpReceiver<S> {
141 fn deref_mut(&mut self) -> &mut Self::Target {
146 pub fn new<S: UdpSender, R: UdpReceiver>(
151 ) -> (RudpSender<S>, RudpReceiver<S>) {
152 let (pkt_tx, pkt_rx) = mpsc::unbounded_channel();
154 let share = Arc::new(RudpShare {
156 remote_id: RwLock::new(remote_id),
158 ack_chans: Mutex::new(HashMap::new()),
160 let recv_share = Arc::clone(&share);
163 let worker = recv_worker::RecvWorker::new(udp_rx, recv_share, pkt_tx);
169 share: Arc::clone(&share),
171 RudpReceiver { share, pkt_rx },
178 async fn main() -> io::Result<()> {
179 let (tx, mut rx) = connect("127.0.0.1:30000").await?;
181 let mut mtpkt = vec![];
182 mtpkt.write_u16::<BigEndian>(2)?; // high level type
183 mtpkt.write_u8(29)?; // serialize ver
184 mtpkt.write_u16::<BigEndian>(0)?; // compression modes
185 mtpkt.write_u16::<BigEndian>(40)?; // MinProtoVer
186 mtpkt.write_u16::<BigEndian>(40)?; // MaxProtoVer
187 mtpkt.write_u16::<BigEndian>(3)?; // player name length
188 mtpkt.write(b"foo")?; // player name
197 while let Some(result) = rx.recv().await {
200 io::stdout().write(pkt.data.as_slice())?;
202 Err(err) => eprintln!("Error: {}", err),
207 // close()ing rx is not needed because it has been consumed to the end