#![feature(yeet_expr)]
#![feature(cursor_remaining)]
#![feature(hash_drain_filter)]
+#![feature(async_fn_in_trait)]
mod client;
pub mod error;
mod recv_worker;
use byteorder::{BigEndian, WriteBytesExt};
pub use client::{connect, Sender as Client};
use num_enum::TryFromPrimitive;
+use std::future::Future;
use std::{
io::{self, Write},
ops,
- sync::{mpsc, Arc},
- thread,
+ sync::Arc,
};
+use tokio::sync::mpsc;
pub const PROTO_ID: u32 = 0x4f457403;
pub const UDP_PKT_SIZE: usize = 512;
pub const TIMEOUT: u64 = 30;
pub trait UdpSender: Send + Sync + 'static {
- fn send(&self, data: Vec<u8>) -> io::Result<()>;
+ async fn send(&self, data: Vec<u8>) -> io::Result<()>;
}
pub trait UdpReceiver: Send + Sync + 'static {
- fn recv(&self) -> io::Result<Vec<u8>>;
+ async fn recv(&self) -> io::Result<Vec<u8>>;
}
#[derive(Debug, Copy, Clone)]
#[derive(Debug)]
pub struct RudpReceiver<S: UdpSender> {
share: Arc<RudpShare<S>>,
- pkt_rx: mpsc::Receiver<InPkt>,
+ pkt_rx: mpsc::UnboundedReceiver<InPkt>,
}
#[derive(Debug)]
}
impl<S: UdpSender> RudpShare<S> {
- pub fn send(&self, tp: PktType, pkt: Pkt<&[u8]>) -> io::Result<()> {
+ pub async fn send(&self, tp: PktType, pkt: Pkt<&[u8]>) -> io::Result<()> {
let mut buf = Vec::with_capacity(4 + 2 + 1 + 1 + pkt.data.len());
buf.write_u32::<BigEndian>(PROTO_ID)?;
buf.write_u16::<BigEndian>(self.remote_id)?;
buf.write_u8(tp as u8)?;
buf.write(pkt.data)?;
- self.udp_tx.send(buf)?;
+ self.udp_tx.send(buf).await?;
Ok(())
}
}
impl<S: UdpSender> RudpSender<S> {
- pub fn send(&self, pkt: Pkt<&[u8]>) -> io::Result<()> {
- self.share.send(PktType::Orig, pkt) // TODO
+ pub async fn send(&self, pkt: Pkt<&[u8]>) -> io::Result<()> {
+ self.share.send(PktType::Orig, pkt).await // TODO
}
}
impl<S: UdpSender> ops::Deref for RudpReceiver<S> {
- type Target = mpsc::Receiver<InPkt>;
+ type Target = mpsc::UnboundedReceiver<InPkt>;
fn deref(&self) -> &Self::Target {
&self.pkt_rx
udp_tx: S,
udp_rx: R,
) -> (RudpSender<S>, RudpReceiver<S>) {
- let (pkt_tx, pkt_rx) = mpsc::channel();
+ let (pkt_tx, pkt_rx) = mpsc::unbounded_channel();
let share = Arc::new(RudpShare {
id,
});
let recv_share = Arc::clone(&share);
- thread::spawn(|| {
- recv_worker::RecvWorker::new(udp_rx, recv_share, pkt_tx).run();
- });
+ tokio::spawn(async { recv_worker::RecvWorker::new(udp_rx, recv_share, pkt_tx).await });
(
RudpSender {
// connect
-fn main() -> io::Result<()> {
+#[tokio::main]
+async fn main() -> io::Result<()> {
//println!("{}", x.deep_size_of());
- let (tx, rx) = connect("127.0.0.1:30000")?;
+ let (tx, rx) = connect("127.0.0.1:30000").await?;
let mut mtpkt = vec![];
mtpkt.write_u16::<BigEndian>(2)?; // high level type
unrel: true,
chan: 1,
data: &mtpkt,
- })?;
+ })
+ .await?;
- while let Ok(result) = rx.recv() {
+ while let Some(result) = rx.recv().await {
match result {
Ok(pkt) => {
io::stdout().write(pkt.data.as_slice())?;