]> git.lizzy.rs Git - mt_rudp.git/commitdiff
timeouts
authorLizzy Fleckenstein <eliasfleckenstein@web.de>
Fri, 6 Jan 2023 18:48:23 +0000 (19:48 +0100)
committerLizzy Fleckenstein <eliasfleckenstein@web.de>
Fri, 6 Jan 2023 18:48:23 +0000 (19:48 +0100)
src/error.rs
src/main.rs
src/new.rs
src/recv.rs
src/send.rs

index f434804e1e6fb9315801ae2b56b13fbdbbd346a0..7853f5950946f0516a9c6b0f428596e174b39356 100644 (file)
@@ -13,7 +13,7 @@ pub enum Error {
     PeerIDAlreadySet,
     InvalidChunkIndex(usize, usize),
     InvalidChunkCount(usize, usize),
-    RemoteDisco,
+    RemoteDisco(bool),
     LocalDisco,
 }
 
@@ -44,7 +44,7 @@ impl From<SendError<InPkt>> for Error {
 impl fmt::Display for Error {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
         use Error::*;
-        write!(f, "RUDP error: ")?;
+        write!(f, "rudp: ")?;
 
         match self {
             IoError(err) => write!(f, "IO error: {}", err),
@@ -55,7 +55,11 @@ impl fmt::Display for Error {
             PeerIDAlreadySet => write!(f, "peer ID already set"),
             InvalidChunkIndex(i, n) => write!(f, "chunk index {i} bigger than chunk count {n}"),
             InvalidChunkCount(o, n) => write!(f, "chunk count changed from {o} to {n}"),
-            RemoteDisco => write!(f, "remote disconnected"),
+            RemoteDisco(to) => write!(
+                f,
+                "remote disconnected{}",
+                if *to { " (timeout)" } else { "" }
+            ),
             LocalDisco => write!(f, "local disconnected"),
         }
     }
index 69c87976ac81ede6d6218d454e859b43559ec239..9a85183bb742d7b5db871c1bd77a6be54e961c63 100644 (file)
@@ -186,7 +186,6 @@ async fn example(tx: &RudpSender<Client>, rx: &mut RudpReceiver<Client>) -> io::
         data: &mtpkt,
     })
     .await?;
-
     // handle incoming packets
     while let Some(result) = rx.recv().await {
         match result {
index 6d4987e45870dd4b8d4664e81253c3865c3af431..9f820a9f8536d61beb1315aecfeb38de9bca13bf 100644 (file)
@@ -59,13 +59,17 @@ pub async fn new<S: UdpSender, R: UdpReceiver>(
         .name("ping")*/
         .spawn(async move {
             ticker!(Duration::from_secs(PING_TIMEOUT), ping_close, {
-                let pkt: Pkt<&[u8]> = Pkt {
-                    chan: 0,
-                    unrel: false,
-                    data: &[CtlType::Ping as u8],
-                };
-
-                ping_share.send(PktType::Ctl, pkt).await.ok();
+                ping_share
+                    .send(
+                        PktType::Ctl,
+                        Pkt {
+                            chan: 0,
+                            unrel: false,
+                            data: &[CtlType::Ping as u8],
+                        },
+                    )
+                    .await
+                    .ok();
             });
         });
 
index 15811f2ab548b138b4bdf74ef24e45fe82da1e04..2fabe3ade59587d6564e2ec0ca122aa13e0f72a1 100644 (file)
@@ -5,6 +5,7 @@ use std::{
     cell::{Cell, OnceCell},
     collections::HashMap,
     io,
+    pin::Pin,
     sync::Arc,
     time::{Duration, Instant},
 };
@@ -65,6 +66,8 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
     }
 
     pub async fn run(&self) {
+        use Error::*;
+
         let cleanup_chans = Arc::clone(&self.chans);
         let mut cleanup_close = self.close.clone();
         self.share
@@ -90,36 +93,54 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
             });
 
         let mut close = self.close.clone();
+        let timeout = tokio::time::sleep(Duration::from_secs(TIMEOUT));
+        tokio::pin!(timeout);
+
         loop {
-            if let Err(e) = self.handle(self.recv_pkt(&mut close).await) {
-                if let Error::LocalDisco = e {
-                    self.share
-                        .send(
-                            PktType::Ctl,
-                            Pkt {
-                                unrel: true,
-                                chan: 0,
-                                data: &[CtlType::Disco as u8],
-                            },
-                        )
-                        .await
-                        .ok();
+            if let Err(e) = self.handle(self.recv_pkt(&mut close, timeout.as_mut()).await) {
+                // TODO: figure out whether this is a good idea
+                if let RemoteDisco(to) = e {
+                    self.pkt_tx.send(Err(RemoteDisco(to))).ok();
                 }
+
+                match e {
+                                       // anon5's mt notifies the peer on timeout, C++ MT does not
+                                       LocalDisco /*| RemoteDisco(true)*/ => drop(
+                                               self.share
+                                                       .send(
+                                                               PktType::Ctl,
+                                                               Pkt {
+                                                                       unrel: true,
+                                                                       chan: 0,
+                                                                       data: &[CtlType::Disco as u8],
+                                                               },
+                                                       )
+                                                       .await
+                                                       .ok(),
+                                       ),
+                                       _ => {}
+                               }
+
                 break;
             }
         }
     }
 
-    async fn recv_pkt(&self, close: &mut watch::Receiver<bool>) -> Result<()> {
+    async fn recv_pkt(
+        &self,
+        close: &mut watch::Receiver<bool>,
+        timeout: Pin<&mut tokio::time::Sleep>,
+    ) -> Result<()> {
         use Error::*;
 
         // TODO: reset timeout
         let mut cursor = io::Cursor::new(tokio::select! {
             pkt = self.udp_rx.recv() => pkt?,
+            _ = tokio::time::sleep_until(timeout.deadline()) => return Err(RemoteDisco(true)),
             _ = close.changed() => return Err(LocalDisco),
         });
 
-        println!("recv");
+        timeout.reset(tokio::time::Instant::now() + Duration::from_secs(TIMEOUT));
 
         let proto_id = cursor.read_u32::<BigEndian>()?;
         if proto_id != PROTO_ID {
@@ -179,7 +200,7 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
                 }
                 CtlType::Disco => {
                     println!("Disco");
-                    return Err(RemoteDisco);
+                    return Err(RemoteDisco(false));
                 }
             },
             PktType::Orig => {
@@ -275,7 +296,7 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
 
         match res {
             Ok(v) => Ok(v),
-            Err(RemoteDisco) => Err(RemoteDisco),
+            Err(RemoteDisco(to)) => Err(RemoteDisco(to)),
             Err(LocalDisco) => Err(LocalDisco),
             Err(e) => Ok(self.pkt_tx.send(Err(e))?),
         }
index 89c15c7eb11e9cc015f1490bf09f091612fdec24..20308e6758fc3dbeaceac22ccfd805cb0cbf2c1f 100644 (file)
@@ -42,7 +42,7 @@ impl<S: UdpSender> RudpShare<S> {
                     data: buf,
                 },
             );
-            chan.seqnum += 1;
+            chan.seqnum = chan.seqnum.overflowing_add(1).0;
 
             Ok(Some(rx))
         }