1 #![feature(cursor_remaining)]
2 #![feature(hash_drain_filter)]
10 use async_trait::async_trait;
11 use byteorder::{BigEndian, WriteBytesExt};
12 pub use client::{connect, Sender as Client};
14 use num_enum::TryFromPrimitive;
15 use pretty_hex::PrettyHex;
24 sync::{mpsc, watch, Mutex, RwLock},
28 pub const PROTO_ID: u32 = 0x4f457403;
29 pub const UDP_PKT_SIZE: usize = 512;
30 pub const NUM_CHANS: usize = 3;
31 pub const REL_BUFFER: usize = 0x8000;
32 pub const INIT_SEQNUM: u16 = 65500;
33 pub const TIMEOUT: u64 = 30;
38 ($duration:expr, $close:expr, $body:block) => {
39 let mut interval = tokio::time::interval($duration);
42 _ = interval.tick() => true,
43 _ = $close.changed() => false,
48 //pub(crate) use ticker;
52 pub trait UdpSender: Send + Sync + 'static {
53 async fn send(&self, data: &[u8]) -> io::Result<()>;
57 pub trait UdpReceiver: Send + Sync + 'static {
58 async fn recv(&self) -> io::Result<Vec<u8>>;
61 #[derive(Debug, Copy, Clone, PartialEq)]
69 #[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
78 #[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
94 pub type Error = error::Error;
95 pub type InPkt = Result<Pkt<Vec<u8>>, Error>;
99 tx: watch::Sender<bool>,
100 rx: watch::Receiver<bool>,
106 acks: HashMap<u16, Ack>,
111 pub struct RudpShare<S: UdpSender> {
113 remote_id: RwLock<u16>,
114 chans: Vec<Mutex<Chan>>,
116 close_tx: watch::Sender<bool>,
117 tasks: Mutex<JoinSet<()>>,
121 pub struct RudpReceiver<S: UdpSender> {
122 share: Arc<RudpShare<S>>,
123 pkt_rx: mpsc::UnboundedReceiver<InPkt>,
127 pub struct RudpSender<S: UdpSender> {
128 share: Arc<RudpShare<S>>,
131 macro_rules! impl_share {
133 impl<S: UdpSender> $T<S> {
134 pub async fn peer_id(&self) -> u16 {
138 pub async fn is_server(&self) -> bool {
139 self.share.id == PeerID::Srv as u16
142 pub async fn close(self) {
143 self.share.close_tx.send(true).ok();
145 let mut tasks = self.share.tasks.lock().await;
146 while let Some(res) = tasks.join_next().await {
147 res.ok(); // TODO: handle error (?)
154 impl_share!(RudpReceiver);
155 impl_share!(RudpSender);
157 impl<S: UdpSender> ops::Deref for RudpReceiver<S> {
158 type Target = mpsc::UnboundedReceiver<InPkt>;
160 fn deref(&self) -> &Self::Target {
165 impl<S: UdpSender> ops::DerefMut for RudpReceiver<S> {
166 fn deref_mut(&mut self) -> &mut Self::Target {
171 async fn example(tx: &RudpSender<Client>, rx: &mut RudpReceiver<Client>) -> io::Result<()> {
173 let mut mtpkt = vec![];
174 mtpkt.write_u16::<BigEndian>(2)?; // high level type
175 mtpkt.write_u8(29)?; // serialize ver
176 mtpkt.write_u16::<BigEndian>(0)?; // compression modes
177 mtpkt.write_u16::<BigEndian>(40)?; // MinProtoVer
178 mtpkt.write_u16::<BigEndian>(40)?; // MaxProtoVer
179 mtpkt.write_u16::<BigEndian>(6)?; // player name length
180 mtpkt.write(b"foobar")?; // player name
189 // handle incoming packets
190 while let Some(result) = rx.recv().await {
193 println!("{}", pkt.data.hex_dump());
195 Err(err) => eprintln!("Error: {}", err),
203 async fn main() -> io::Result<()> {
204 let (tx, mut rx) = connect("127.0.0.1:30000").await?;
207 _ = tokio::signal::ctrl_c() => println!("canceled"),
208 res = example(&tx, &mut rx) => {
210 println!("disconnected");
214 // close either the receiver or the sender
215 // this shuts down associated tasks