1 #![feature(cursor_remaining)]
2 #![feature(hash_drain_filter)]
10 use async_trait::async_trait;
11 use byteorder::{BigEndian, WriteBytesExt};
12 pub use client::{connect, Sender as Client};
14 use num_enum::TryFromPrimitive;
15 use pretty_hex::PrettyHex;
24 sync::{mpsc, watch, Mutex, RwLock},
28 pub const PROTO_ID: u32 = 0x4f457403;
29 pub const UDP_PKT_SIZE: usize = 512;
30 pub const NUM_CHANS: usize = 3;
31 pub const REL_BUFFER: usize = 0x8000;
32 pub const INIT_SEQNUM: u16 = 65500;
33 pub const TIMEOUT: u64 = 30;
34 pub const PING_TIMEOUT: u64 = 5;
39 ($duration:expr, $close:expr, $body:block) => {
40 let mut interval = tokio::time::interval($duration);
43 _ = interval.tick() => true,
44 _ = $close.changed() => false,
49 //pub(crate) use ticker;
53 pub trait UdpSender: Send + Sync + 'static {
54 async fn send(&self, data: &[u8]) -> io::Result<()>;
58 pub trait UdpReceiver: Send + Sync + 'static {
59 async fn recv(&self) -> io::Result<Vec<u8>>;
62 #[derive(Debug, Copy, Clone, PartialEq)]
70 #[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
79 #[derive(Debug, Copy, Clone, PartialEq, TryFromPrimitive)]
95 pub type Error = error::Error;
96 pub type InPkt = Result<Pkt<Vec<u8>>, Error>;
100 tx: watch::Sender<bool>,
101 rx: watch::Receiver<bool>,
107 acks: HashMap<u16, Ack>,
112 pub struct RudpShare<S: UdpSender> {
114 remote_id: RwLock<u16>,
115 chans: Vec<Mutex<Chan>>,
117 close_tx: watch::Sender<bool>,
118 tasks: Mutex<JoinSet<()>>,
122 pub struct RudpReceiver<S: UdpSender> {
123 share: Arc<RudpShare<S>>,
124 pkt_rx: mpsc::UnboundedReceiver<InPkt>,
128 pub struct RudpSender<S: UdpSender> {
129 share: Arc<RudpShare<S>>,
132 macro_rules! impl_share {
134 impl<S: UdpSender> $T<S> {
135 pub async fn peer_id(&self) -> u16 {
139 pub async fn is_server(&self) -> bool {
140 self.share.id == PeerID::Srv as u16
143 pub async fn close(self) {
144 self.share.close_tx.send(true).ok();
146 let mut tasks = self.share.tasks.lock().await;
147 while let Some(res) = tasks.join_next().await {
148 res.ok(); // TODO: handle error (?)
155 impl_share!(RudpReceiver);
156 impl_share!(RudpSender);
158 impl<S: UdpSender> ops::Deref for RudpReceiver<S> {
159 type Target = mpsc::UnboundedReceiver<InPkt>;
161 fn deref(&self) -> &Self::Target {
166 impl<S: UdpSender> ops::DerefMut for RudpReceiver<S> {
167 fn deref_mut(&mut self) -> &mut Self::Target {
172 async fn example(tx: &RudpSender<Client>, rx: &mut RudpReceiver<Client>) -> io::Result<()> {
174 let mut mtpkt = vec![];
175 mtpkt.write_u16::<BigEndian>(2)?; // high level type
176 mtpkt.write_u8(29)?; // serialize ver
177 mtpkt.write_u16::<BigEndian>(0)?; // compression modes
178 mtpkt.write_u16::<BigEndian>(40)?; // MinProtoVer
179 mtpkt.write_u16::<BigEndian>(40)?; // MaxProtoVer
180 mtpkt.write_u16::<BigEndian>(6)?; // player name length
181 mtpkt.write(b"foobar")?; // player name
190 // handle incoming packets
191 while let Some(result) = rx.recv().await {
194 println!("{}", pkt.data.hex_dump());
196 Err(err) => eprintln!("Error: {}", err),
204 async fn main() -> io::Result<()> {
205 let (tx, mut rx) = connect("127.0.0.1:30000").await?;
208 _ = tokio::signal::ctrl_c() => println!("canceled"),
209 res = example(&tx, &mut rx) => {
211 println!("disconnected");
215 // close either the receiver or the sender
216 // this shuts down associated tasks