From 12bfebc06ed29fabbc4a4357e314b8fbde1b552d Mon Sep 17 00:00:00 2001 From: Lizzy Fleckenstein Date: Fri, 6 Jan 2023 19:48:23 +0100 Subject: [PATCH] timeouts --- src/error.rs | 10 +++++++--- src/main.rs | 1 - src/new.rs | 18 ++++++++++------- src/recv.rs | 55 ++++++++++++++++++++++++++++++++++++---------------- src/send.rs | 2 +- 5 files changed, 57 insertions(+), 29 deletions(-) diff --git a/src/error.rs b/src/error.rs index f434804..7853f59 100644 --- a/src/error.rs +++ b/src/error.rs @@ -13,7 +13,7 @@ pub enum Error { PeerIDAlreadySet, InvalidChunkIndex(usize, usize), InvalidChunkCount(usize, usize), - RemoteDisco, + RemoteDisco(bool), LocalDisco, } @@ -44,7 +44,7 @@ impl From> for Error { impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { use Error::*; - write!(f, "RUDP error: ")?; + write!(f, "rudp: ")?; match self { IoError(err) => write!(f, "IO error: {}", err), @@ -55,7 +55,11 @@ impl fmt::Display for Error { PeerIDAlreadySet => write!(f, "peer ID already set"), InvalidChunkIndex(i, n) => write!(f, "chunk index {i} bigger than chunk count {n}"), InvalidChunkCount(o, n) => write!(f, "chunk count changed from {o} to {n}"), - RemoteDisco => write!(f, "remote disconnected"), + RemoteDisco(to) => write!( + f, + "remote disconnected{}", + if *to { " (timeout)" } else { "" } + ), LocalDisco => write!(f, "local disconnected"), } } diff --git a/src/main.rs b/src/main.rs index 69c8797..9a85183 100644 --- a/src/main.rs +++ b/src/main.rs @@ -186,7 +186,6 @@ async fn example(tx: &RudpSender, rx: &mut RudpReceiver) -> io:: data: &mtpkt, }) .await?; - // handle incoming packets while let Some(result) = rx.recv().await { match result { diff --git a/src/new.rs b/src/new.rs index 6d4987e..9f820a9 100644 --- a/src/new.rs +++ b/src/new.rs @@ -59,13 +59,17 @@ pub async fn new( .name("ping")*/ .spawn(async move { ticker!(Duration::from_secs(PING_TIMEOUT), ping_close, { - let pkt: Pkt<&[u8]> = Pkt { - chan: 0, - unrel: false, - data: &[CtlType::Ping as u8], - }; - - ping_share.send(PktType::Ctl, pkt).await.ok(); + ping_share + .send( + PktType::Ctl, + Pkt { + chan: 0, + unrel: false, + data: &[CtlType::Ping as u8], + }, + ) + .await + .ok(); }); }); diff --git a/src/recv.rs b/src/recv.rs index 15811f2..2fabe3a 100644 --- a/src/recv.rs +++ b/src/recv.rs @@ -5,6 +5,7 @@ use std::{ cell::{Cell, OnceCell}, collections::HashMap, io, + pin::Pin, sync::Arc, time::{Duration, Instant}, }; @@ -65,6 +66,8 @@ impl RecvWorker { } pub async fn run(&self) { + use Error::*; + let cleanup_chans = Arc::clone(&self.chans); let mut cleanup_close = self.close.clone(); self.share @@ -90,36 +93,54 @@ impl RecvWorker { }); let mut close = self.close.clone(); + let timeout = tokio::time::sleep(Duration::from_secs(TIMEOUT)); + tokio::pin!(timeout); + loop { - if let Err(e) = self.handle(self.recv_pkt(&mut close).await) { - if let Error::LocalDisco = e { - self.share - .send( - PktType::Ctl, - Pkt { - unrel: true, - chan: 0, - data: &[CtlType::Disco as u8], - }, - ) - .await - .ok(); + if let Err(e) = self.handle(self.recv_pkt(&mut close, timeout.as_mut()).await) { + // TODO: figure out whether this is a good idea + if let RemoteDisco(to) = e { + self.pkt_tx.send(Err(RemoteDisco(to))).ok(); } + + match e { + // anon5's mt notifies the peer on timeout, C++ MT does not + LocalDisco /*| RemoteDisco(true)*/ => drop( + self.share + .send( + PktType::Ctl, + Pkt { + unrel: true, + chan: 0, + data: &[CtlType::Disco as u8], + }, + ) + .await + .ok(), + ), + _ => {} + } + break; } } } - async fn recv_pkt(&self, close: &mut watch::Receiver) -> Result<()> { + async fn recv_pkt( + &self, + close: &mut watch::Receiver, + timeout: Pin<&mut tokio::time::Sleep>, + ) -> Result<()> { use Error::*; // TODO: reset timeout let mut cursor = io::Cursor::new(tokio::select! { pkt = self.udp_rx.recv() => pkt?, + _ = tokio::time::sleep_until(timeout.deadline()) => return Err(RemoteDisco(true)), _ = close.changed() => return Err(LocalDisco), }); - println!("recv"); + timeout.reset(tokio::time::Instant::now() + Duration::from_secs(TIMEOUT)); let proto_id = cursor.read_u32::()?; if proto_id != PROTO_ID { @@ -179,7 +200,7 @@ impl RecvWorker { } CtlType::Disco => { println!("Disco"); - return Err(RemoteDisco); + return Err(RemoteDisco(false)); } }, PktType::Orig => { @@ -275,7 +296,7 @@ impl RecvWorker { match res { Ok(v) => Ok(v), - Err(RemoteDisco) => Err(RemoteDisco), + Err(RemoteDisco(to)) => Err(RemoteDisco(to)), Err(LocalDisco) => Err(LocalDisco), Err(e) => Ok(self.pkt_tx.send(Err(e))?), } diff --git a/src/send.rs b/src/send.rs index 89c15c7..20308e6 100644 --- a/src/send.rs +++ b/src/send.rs @@ -42,7 +42,7 @@ impl RudpShare { data: buf, }, ); - chan.seqnum += 1; + chan.seqnum = chan.seqnum.overflowing_add(1).0; Ok(Some(rx)) } -- 2.44.0