[dependencies]
byteorder = "1.4.3"
num_enum = "0.5.7"
+tokio = { version = "1.23.0", features = ["full"] }
use crate::*;
-use std::{io, net, sync::Arc};
+use std::{io, sync::Arc};
+use tokio::net;
pub struct Sender {
sock: Arc<net::UdpSocket>,
}
impl UdpSender for Sender {
- fn send(&self, data: Vec<u8>) -> io::Result<()> {
- self.sock.send(&data)?;
+ async fn send(&self, data: Vec<u8>) -> io::Result<()> {
+ self.sock.send(&data).await?;
Ok(())
}
}
}
impl UdpReceiver for Receiver {
- fn recv(&self) -> io::Result<Vec<u8>> {
+ async fn recv(&self) -> io::Result<Vec<u8>> {
let mut buffer = Vec::new();
buffer.resize(UDP_PKT_SIZE, 0);
- let len = self.sock.recv(&mut buffer)?;
+ let len = self.sock.recv(&mut buffer).await?;
buffer.truncate(len);
Ok(buffer)
}
}
-pub fn connect(addr: &str) -> io::Result<(RudpSender<Sender>, RudpReceiver<Sender>)> {
- let sock = Arc::new(net::UdpSocket::bind("0.0.0.0:0")?);
- sock.connect(addr)?;
+pub async fn connect(addr: &str) -> io::Result<(RudpSender<Sender>, RudpReceiver<Sender>)> {
+ let sock = Arc::new(net::UdpSocket::bind("0.0.0.0:0").await?);
+ sock.connect(addr).await?;
Ok(new(
PeerID::Srv as u16,
use crate::{CtlType, InPkt, PktType};
use num_enum::TryFromPrimitiveError;
-use std::{fmt, io, sync::mpsc};
+use std::{fmt, io};
+use tokio::sync::mpsc::error::SendError;
#[derive(Debug)]
pub enum Error {
}
}
-impl From<mpsc::SendError<InPkt>> for Error {
- fn from(_err: mpsc::SendError<InPkt>) -> Self {
+impl From<SendError<InPkt>> for Error {
+ fn from(_err: SendError<InPkt>) -> Self {
Self::LocalDisco // technically not a disconnect but a local drop
}
}
#![feature(yeet_expr)]
#![feature(cursor_remaining)]
#![feature(hash_drain_filter)]
+#![feature(async_fn_in_trait)]
mod client;
pub mod error;
mod recv_worker;
use byteorder::{BigEndian, WriteBytesExt};
pub use client::{connect, Sender as Client};
use num_enum::TryFromPrimitive;
+use std::future::Future;
use std::{
io::{self, Write},
ops,
- sync::{mpsc, Arc},
- thread,
+ sync::Arc,
};
+use tokio::sync::mpsc;
pub const PROTO_ID: u32 = 0x4f457403;
pub const UDP_PKT_SIZE: usize = 512;
pub const TIMEOUT: u64 = 30;
pub trait UdpSender: Send + Sync + 'static {
- fn send(&self, data: Vec<u8>) -> io::Result<()>;
+ async fn send(&self, data: Vec<u8>) -> io::Result<()>;
}
pub trait UdpReceiver: Send + Sync + 'static {
- fn recv(&self) -> io::Result<Vec<u8>>;
+ async fn recv(&self) -> io::Result<Vec<u8>>;
}
#[derive(Debug, Copy, Clone)]
#[derive(Debug)]
pub struct RudpReceiver<S: UdpSender> {
share: Arc<RudpShare<S>>,
- pkt_rx: mpsc::Receiver<InPkt>,
+ pkt_rx: mpsc::UnboundedReceiver<InPkt>,
}
#[derive(Debug)]
}
impl<S: UdpSender> RudpShare<S> {
- pub fn send(&self, tp: PktType, pkt: Pkt<&[u8]>) -> io::Result<()> {
+ pub async fn send(&self, tp: PktType, pkt: Pkt<&[u8]>) -> io::Result<()> {
let mut buf = Vec::with_capacity(4 + 2 + 1 + 1 + pkt.data.len());
buf.write_u32::<BigEndian>(PROTO_ID)?;
buf.write_u16::<BigEndian>(self.remote_id)?;
buf.write_u8(tp as u8)?;
buf.write(pkt.data)?;
- self.udp_tx.send(buf)?;
+ self.udp_tx.send(buf).await?;
Ok(())
}
}
impl<S: UdpSender> RudpSender<S> {
- pub fn send(&self, pkt: Pkt<&[u8]>) -> io::Result<()> {
- self.share.send(PktType::Orig, pkt) // TODO
+ pub async fn send(&self, pkt: Pkt<&[u8]>) -> io::Result<()> {
+ self.share.send(PktType::Orig, pkt).await // TODO
}
}
impl<S: UdpSender> ops::Deref for RudpReceiver<S> {
- type Target = mpsc::Receiver<InPkt>;
+ type Target = mpsc::UnboundedReceiver<InPkt>;
fn deref(&self) -> &Self::Target {
&self.pkt_rx
udp_tx: S,
udp_rx: R,
) -> (RudpSender<S>, RudpReceiver<S>) {
- let (pkt_tx, pkt_rx) = mpsc::channel();
+ let (pkt_tx, pkt_rx) = mpsc::unbounded_channel();
let share = Arc::new(RudpShare {
id,
});
let recv_share = Arc::clone(&share);
- thread::spawn(|| {
- recv_worker::RecvWorker::new(udp_rx, recv_share, pkt_tx).run();
- });
+ tokio::spawn(async { recv_worker::RecvWorker::new(udp_rx, recv_share, pkt_tx).await });
(
RudpSender {
// connect
-fn main() -> io::Result<()> {
+#[tokio::main]
+async fn main() -> io::Result<()> {
//println!("{}", x.deep_size_of());
- let (tx, rx) = connect("127.0.0.1:30000")?;
+ let (tx, rx) = connect("127.0.0.1:30000").await?;
let mut mtpkt = vec![];
mtpkt.write_u16::<BigEndian>(2)?; // high level type
unrel: true,
chan: 1,
data: &mtpkt,
- })?;
+ })
+ .await?;
- while let Ok(result) = rx.recv() {
+ while let Some(result) = rx.recv().await {
match result {
Ok(pkt) => {
io::stdout().write(pkt.data.as_slice())?;
use std::{
cell::Cell,
collections::HashMap,
- io, result,
- sync::{mpsc, Arc, Mutex, Weak},
- thread, time,
+ io,
+ sync::{Arc, Weak},
+ time,
};
+use tokio::sync::{mpsc, Mutex};
fn to_seqnum(seqnum: u16) -> usize {
(seqnum as usize) & (REL_BUFFER - 1)
}
-type PktTx = mpsc::Sender<InPkt>;
-type Result = result::Result<(), Error>;
+type Result = std::result::Result<(), Error>;
struct Split {
timestamp: time::Instant,
pub struct RecvWorker<R: UdpReceiver, S: UdpSender> {
share: Arc<RudpShare<S>>,
chans: Arc<Vec<Mutex<Chan>>>,
- pkt_tx: PktTx,
+ pkt_tx: mpsc::UnboundedSender<InPkt>,
udp_rx: R,
}
impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
- pub fn new(udp_rx: R, share: Arc<RudpShare<S>>, pkt_tx: PktTx) -> Self {
+ pub async fn new(udp_rx: R, share: Arc<RudpShare<S>>, pkt_tx: mpsc::UnboundedSender<InPkt>) {
Self {
udp_rx,
share,
.collect(),
),
}
+ .run()
+ .await
}
- pub fn run(&self) {
+ pub async fn run(&self) {
let cleanup_chans = Arc::downgrade(&self.chans);
- thread::spawn(move || {
+ tokio::spawn(async move {
let timeout = time::Duration::from_secs(TIMEOUT);
+ let mut interval = tokio::time::interval(timeout);
while let Some(chans) = Weak::upgrade(&cleanup_chans) {
for chan in chans.iter() {
- let mut ch = chan.lock().unwrap();
+ let mut ch = chan.lock().await;
ch.splits = ch
.splits
.drain_filter(|_k, v| v.timestamp.elapsed() < timeout)
.collect();
}
- thread::sleep(timeout);
+ interval.tick().await;
}
});
loop {
- if let Err(e) = self.handle(self.recv_pkt()) {
+ if let Err(e) = self.handle(self.recv_pkt().await) {
if let Error::LocalDisco = e {
self.share
.send(
data: &[CtlType::Disco as u8],
},
)
+ .await
.ok();
}
break;
}
}
- fn recv_pkt(&self) -> Result {
+ async fn recv_pkt(&self) -> Result {
use Error::*;
// todo: reset timeout
- let mut cursor = io::Cursor::new(self.udp_rx.recv()?);
+ let mut cursor = io::Cursor::new(self.udp_rx.recv().await?);
let proto_id = cursor.read_u32::<BigEndian>()?;
if proto_id != PROTO_ID {
.get(n_chan as usize)
.ok_or(InvalidChannel(n_chan))?
.lock()
- .unwrap();
+ .await;
self.process_pkt(cursor, &mut chan)
}