From: Lizzy Fleckenstein Date: Thu, 5 Jan 2023 13:55:57 +0000 (+0100) Subject: send acks X-Git-Url: https://git.lizzy.rs/?a=commitdiff_plain;h=d3b8019227137853406891e2aa84e0c8a9e3c31c;p=mt_rudp.git send acks --- diff --git a/src/main.rs b/src/main.rs index b6a6af6..1f0aca0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,7 +15,7 @@ use std::{ ops, sync::Arc, }; -use tokio::sync::{mpsc, Mutex, RwLock}; +use tokio::sync::{mpsc, watch, Mutex, RwLock}; pub const PROTO_ID: u32 = 0x4f457403; pub const UDP_PKT_SIZE: usize = 512; @@ -69,12 +69,13 @@ pub struct Pkt { pub type Error = error::Error; pub type InPkt = Result>, Error>; +type AckChan = (watch::Sender, watch::Receiver); #[derive(Debug)] pub struct RudpShare { pub id: u16, pub remote_id: RwLock, - pub ack_chans: Mutex>>, + pub ack_chans: Mutex>, udp_tx: S, } @@ -203,5 +204,7 @@ async fn main() -> io::Result<()> { } println!("disco"); + // close()ing rx is not needed because it has been consumed to the end + Ok(()) } diff --git a/src/recv_worker.rs b/src/recv_worker.rs index 316bb48..83b3273 100644 --- a/src/recv_worker.rs +++ b/src/recv_worker.rs @@ -135,7 +135,9 @@ impl RecvWorker { PktType::Ctl => match cursor.read_u8()?.try_into()? { CtlType::Ack => { let seqnum = cursor.read_u16::()?; - self.share.ack_chans.lock().await.remove(&seqnum); + if let Some((tx, _)) = self.share.ack_chans.lock().await.remove(&seqnum) { + tx.send(true).ok(); + } } CtlType::SetPeerID => { let mut id = self.share.remote_id.write().await; @@ -212,6 +214,21 @@ impl RecvWorker { let seqnum = cursor.read_u16::()?; chan.packets[to_seqnum(seqnum)].set(Some(cursor.remaining_slice().into())); + let mut ack_data = Vec::with_capacity(3); + ack_data.write_u8(CtlType::Ack as u8)?; + ack_data.write_u16::(seqnum)?; + + self.share + .send( + PktType::Ctl, + Pkt { + unrel: true, + chan: chan.num, + data: &ack_data, + }, + ) + .await?; + fn next_pkt(chan: &mut Chan) -> Option> { chan.packets[to_seqnum(chan.seqnum)].take() }