]> git.lizzy.rs Git - mt_rudp.git/commitdiff
splits infrastructure
authorLizzy Fleckenstein <eliasfleckenstein@web.de>
Fri, 23 Dec 2022 14:31:10 +0000 (15:31 +0100)
committerLizzy Fleckenstein <eliasfleckenstein@web.de>
Fri, 23 Dec 2022 14:31:10 +0000 (15:31 +0100)
src/client.rs
src/main.rs
src/recv_worker.rs

index e4864882e8d500a79271cdcf657da6c479711b89..81c1bfb387fd5ac9ccdecbd08507545e8cbce4f8 100644 (file)
@@ -19,7 +19,7 @@ pub struct Receiver {
 impl UdpReceiver for Receiver {
     fn recv(&self) -> io::Result<Vec<u8>> {
         let mut buffer = Vec::new();
-        buffer.resize(crate::UDP_PKT_SIZE, 0);
+        buffer.resize(UDP_PKT_SIZE, 0);
 
         let len = self.sock.recv(&mut buffer)?;
         buffer.truncate(len);
@@ -28,11 +28,11 @@ impl UdpReceiver for Receiver {
     }
 }
 
-pub fn connect(addr: &str) -> io::Result<(crate::RudpSender<Sender>, crate::RudpReceiver<Sender>)> {
+pub fn connect(addr: &str) -> io::Result<(RudpSender<Sender>, RudpReceiver<Sender>)> {
     let sock = Arc::new(net::UdpSocket::bind("0.0.0.0:0")?);
     sock.connect(addr)?;
 
-    Ok(crate::new(
+    Ok(new(
         PeerID::Srv as u16,
         PeerID::Nil as u16,
         Sender {
index 2a5ff2f1f5f710c0b0c6f0b2425a4640a3b8bf84..aadb5ccd3fccea83724ff8debd81fbbddfc8a75e 100644 (file)
@@ -1,5 +1,6 @@
 #![feature(yeet_expr)]
 #![feature(cursor_remaining)]
+#![feature(hash_drain_filter)]
 mod client;
 pub mod error;
 mod recv_worker;
@@ -19,6 +20,7 @@ pub const UDP_PKT_SIZE: usize = 512;
 pub const NUM_CHANS: usize = 3;
 pub const REL_BUFFER: usize = 0x8000;
 pub const INIT_SEQNUM: u16 = 65500;
+pub const TIMEOUT: u64 = 30;
 
 pub trait UdpSender: Send + Sync + 'static {
     fn send(&self, data: Vec<u8>) -> io::Result<()>;
index 578cf2ecbe80328e2ad0c0f4ab7e0d1f00303949..2cd8197729df7ba760f3ed513d0b9ad98eae4a32 100644 (file)
@@ -2,25 +2,33 @@ use crate::{error::Error, *};
 use byteorder::{BigEndian, ReadBytesExt};
 use std::{
     cell::Cell,
+    collections::HashMap,
     io, result,
-    sync::{mpsc, Arc},
+    sync::{mpsc, Arc, Mutex, Weak},
+    thread, time,
 };
 
 fn to_seqnum(seqnum: u16) -> usize {
     (seqnum as usize) & (REL_BUFFER - 1)
 }
 
-struct RelChan {
+type PktTx = mpsc::Sender<InPkt>;
+type Result = result::Result<(), Error>;
+
+struct Split {
+    timestamp: time::Instant,
+}
+
+struct Chan {
     packets: Vec<Cell<Option<Vec<u8>>>>, // in the good old days this used to be called char **
+    splits: HashMap<u16, Split>,
     seqnum: u16,
     num: u8,
 }
 
-type PktTx = mpsc::Sender<InPkt>;
-type Result = result::Result<(), Error>;
-
 pub struct RecvWorker<R: UdpReceiver, S: UdpSender> {
     share: Arc<RudpShare<S>>,
+    chans: Arc<Vec<Mutex<Chan>>>,
     pkt_tx: PktTx,
     udp_rx: R,
 }
@@ -31,20 +39,41 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
             udp_rx,
             share,
             pkt_tx,
+            chans: Arc::new(
+                (0..NUM_CHANS as u8)
+                    .map(|num| {
+                        Mutex::new(Chan {
+                            num,
+                            packets: (0..REL_BUFFER).map(|_| Cell::new(None)).collect(),
+                            seqnum: INIT_SEQNUM,
+                            splits: HashMap::new(),
+                        })
+                    })
+                    .collect(),
+            ),
         }
     }
 
     pub fn run(&self) {
-        let mut recv_chans = (0..NUM_CHANS as u8)
-            .map(|num| RelChan {
-                num,
-                packets: (0..REL_BUFFER).map(|_| Cell::new(None)).collect(),
-                seqnum: INIT_SEQNUM,
-            })
-            .collect();
+        let cleanup_chans = Arc::downgrade(&self.chans);
+        thread::spawn(move || {
+            let timeout = time::Duration::from_secs(TIMEOUT);
+
+            while let Some(chans) = Weak::upgrade(&cleanup_chans) {
+                for chan in chans.iter() {
+                    let mut ch = chan.lock().unwrap();
+                    ch.splits = ch
+                        .splits
+                        .drain_filter(|_k, v| v.timestamp.elapsed() < timeout)
+                        .collect();
+                }
+
+                thread::sleep(timeout);
+            }
+        });
 
         loop {
-            if let Err(e) = self.handle(self.recv_pkt(&mut recv_chans)) {
+            if let Err(e) = self.handle(self.recv_pkt()) {
                 if let Error::LocalDisco = e {
                     self.share
                         .send(
@@ -62,7 +91,7 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
         }
     }
 
-    fn recv_pkt(&self, chans: &mut Vec<RelChan>) -> Result {
+    fn recv_pkt(&self) -> Result {
         use Error::*;
 
         // todo: reset timeout
@@ -76,14 +105,17 @@ impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
         let peer_id = cursor.read_u16::<BigEndian>()?;
 
         let n_chan = cursor.read_u8()?;
-        let chan = chans
-            .get_mut(n_chan as usize)
-            .ok_or(InvalidChannel(n_chan))?;
-
-        self.process_pkt(cursor, chan)
+        let mut chan = self
+            .chans
+            .get(n_chan as usize)
+            .ok_or(InvalidChannel(n_chan))?
+            .lock()
+            .unwrap();
+
+        self.process_pkt(cursor, &mut chan)
     }
 
-    fn process_pkt(&self, mut cursor: io::Cursor<Vec<u8>>, chan: &mut RelChan) -> Result {
+    fn process_pkt(&self, mut cursor: io::Cursor<Vec<u8>>, chan: &mut Chan) -> Result {
         use CtlType::*;
         use Error::*;
         use PktType::*;