]> git.lizzy.rs Git - mt_rudp.git/commitdiff
send acks
authorLizzy Fleckenstein <eliasfleckenstein@web.de>
Thu, 5 Jan 2023 13:55:57 +0000 (14:55 +0100)
committerLizzy Fleckenstein <eliasfleckenstein@web.de>
Thu, 5 Jan 2023 14:14:34 +0000 (15:14 +0100)
src/main.rs
src/recv_worker.rs

index b6a6af630bfa238b8b461b1502c7a2b3e3014949..1f0aca01f1a53a87b617be8cfc1cff2a0be890d6 100644 (file)
@@ -15,7 +15,7 @@ use std::{
     ops,
     sync::Arc,
 };
-use tokio::sync::{mpsc, Mutex, RwLock};
+use tokio::sync::{mpsc, watch, Mutex, RwLock};
 
 pub const PROTO_ID: u32 = 0x4f457403;
 pub const UDP_PKT_SIZE: usize = 512;
@@ -69,12 +69,13 @@ pub struct Pkt<T> {
 
 pub type Error = error::Error;
 pub type InPkt = Result<Pkt<Vec<u8>>, Error>;
+type AckChan = (watch::Sender<bool>, watch::Receiver<bool>);
 
 #[derive(Debug)]
 pub struct RudpShare<S: UdpSender> {
     pub id: u16,
     pub remote_id: RwLock<u16>,
-    pub ack_chans: Mutex<HashMap<u16, mpsc::Sender<()>>>,
+    pub ack_chans: Mutex<HashMap<u16, AckChan>>,
     udp_tx: S,
 }
 
@@ -203,5 +204,7 @@ async fn main() -> io::Result<()> {
     }
     println!("disco");
 
+    // close()ing rx is not needed because it has been consumed to the end
+
     Ok(())
 }
index 316bb48e59f19e229d28341b47f551b50e222d0e..83b32737e137c7fb1c985a636eb9b0dd988ddcf3 100644 (file)
@@ -135,7 +135,9 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
             PktType::Ctl => match cursor.read_u8()?.try_into()? {
                 CtlType::Ack => {
                     let seqnum = cursor.read_u16::<BigEndian>()?;
-                    self.share.ack_chans.lock().await.remove(&seqnum);
+                    if let Some((tx, _)) = self.share.ack_chans.lock().await.remove(&seqnum) {
+                        tx.send(true).ok();
+                    }
                 }
                 CtlType::SetPeerID => {
                     let mut id = self.share.remote_id.write().await;
@@ -212,6 +214,21 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
                 let seqnum = cursor.read_u16::<BigEndian>()?;
                 chan.packets[to_seqnum(seqnum)].set(Some(cursor.remaining_slice().into()));
 
+                let mut ack_data = Vec::with_capacity(3);
+                ack_data.write_u8(CtlType::Ack as u8)?;
+                ack_data.write_u16::<BigEndian>(seqnum)?;
+
+                self.share
+                    .send(
+                        PktType::Ctl,
+                        Pkt {
+                            unrel: true,
+                            chan: chan.num,
+                            data: &ack_data,
+                        },
+                    )
+                    .await?;
+
                 fn next_pkt(chan: &mut Chan) -> Option<Vec<u8>> {
                     chan.packets[to_seqnum(chan.seqnum)].take()
                 }