PeerIDAlreadySet,
InvalidChunkIndex(usize, usize),
InvalidChunkCount(usize, usize),
- RemoteDisco,
+ RemoteDisco(bool),
LocalDisco,
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
use Error::*;
- write!(f, "RUDP error: ")?;
+ write!(f, "rudp: ")?;
match self {
IoError(err) => write!(f, "IO error: {}", err),
PeerIDAlreadySet => write!(f, "peer ID already set"),
InvalidChunkIndex(i, n) => write!(f, "chunk index {i} bigger than chunk count {n}"),
InvalidChunkCount(o, n) => write!(f, "chunk count changed from {o} to {n}"),
- RemoteDisco => write!(f, "remote disconnected"),
+ RemoteDisco(to) => write!(
+ f,
+ "remote disconnected{}",
+ if *to { " (timeout)" } else { "" }
+ ),
LocalDisco => write!(f, "local disconnected"),
}
}
.name("ping")*/
.spawn(async move {
ticker!(Duration::from_secs(PING_TIMEOUT), ping_close, {
- let pkt: Pkt<&[u8]> = Pkt {
- chan: 0,
- unrel: false,
- data: &[CtlType::Ping as u8],
- };
-
- ping_share.send(PktType::Ctl, pkt).await.ok();
+ ping_share
+ .send(
+ PktType::Ctl,
+ Pkt {
+ chan: 0,
+ unrel: false,
+ data: &[CtlType::Ping as u8],
+ },
+ )
+ .await
+ .ok();
});
});
cell::{Cell, OnceCell},
collections::HashMap,
io,
+ pin::Pin,
sync::Arc,
time::{Duration, Instant},
};
}
pub async fn run(&self) {
+ use Error::*;
+
let cleanup_chans = Arc::clone(&self.chans);
let mut cleanup_close = self.close.clone();
self.share
});
let mut close = self.close.clone();
+ let timeout = tokio::time::sleep(Duration::from_secs(TIMEOUT));
+ tokio::pin!(timeout);
+
loop {
- if let Err(e) = self.handle(self.recv_pkt(&mut close).await) {
- if let Error::LocalDisco = e {
- self.share
- .send(
- PktType::Ctl,
- Pkt {
- unrel: true,
- chan: 0,
- data: &[CtlType::Disco as u8],
- },
- )
- .await
- .ok();
+ if let Err(e) = self.handle(self.recv_pkt(&mut close, timeout.as_mut()).await) {
+ // TODO: figure out whether this is a good idea
+ if let RemoteDisco(to) = e {
+ self.pkt_tx.send(Err(RemoteDisco(to))).ok();
}
+
+ match e {
+ // anon5's mt notifies the peer on timeout, C++ MT does not
+ LocalDisco /*| RemoteDisco(true)*/ => drop(
+ self.share
+ .send(
+ PktType::Ctl,
+ Pkt {
+ unrel: true,
+ chan: 0,
+ data: &[CtlType::Disco as u8],
+ },
+ )
+ .await
+ .ok(),
+ ),
+ _ => {}
+ }
+
break;
}
}
}
- async fn recv_pkt(&self, close: &mut watch::Receiver<bool>) -> Result<()> {
+ async fn recv_pkt(
+ &self,
+ close: &mut watch::Receiver<bool>,
+ timeout: Pin<&mut tokio::time::Sleep>,
+ ) -> Result<()> {
use Error::*;
// TODO: reset timeout
let mut cursor = io::Cursor::new(tokio::select! {
pkt = self.udp_rx.recv() => pkt?,
+ _ = tokio::time::sleep_until(timeout.deadline()) => return Err(RemoteDisco(true)),
_ = close.changed() => return Err(LocalDisco),
});
- println!("recv");
+ timeout.reset(tokio::time::Instant::now() + Duration::from_secs(TIMEOUT));
let proto_id = cursor.read_u32::<BigEndian>()?;
if proto_id != PROTO_ID {
}
CtlType::Disco => {
println!("Disco");
- return Err(RemoteDisco);
+ return Err(RemoteDisco(false));
}
},
PktType::Orig => {
match res {
Ok(v) => Ok(v),
- Err(RemoteDisco) => Err(RemoteDisco),
+ Err(RemoteDisco(to)) => Err(RemoteDisco(to)),
Err(LocalDisco) => Err(LocalDisco),
Err(e) => Ok(self.pkt_tx.send(Err(e))?),
}