]> git.lizzy.rs Git - mt_rudp.git/commitdiff
works
authorLizzy Fleckenstein <eliasfleckenstein@web.de>
Thu, 22 Dec 2022 18:01:35 +0000 (19:01 +0100)
committerLizzy Fleckenstein <eliasfleckenstein@web.de>
Thu, 22 Dec 2022 18:01:35 +0000 (19:01 +0100)
src/main.rs

index b097046c58f220fa756ce7ff647552618104185a..61ca9830234e009dcc171c3e4cae41b37e560442 100644 (file)
@@ -63,7 +63,7 @@ impl From<TryFromPrimitiveError<PktType>> for Error {
 }
 
 impl From<mpsc::SendError<PktResult>> for Error {
-    fn from(err: mpsc::SendError<PktResult>) -> Self {
+    fn from(_err: mpsc::SendError<PktResult>) -> Self {
         Self::LocalHangup
     }
 }
@@ -85,11 +85,41 @@ impl fmt::Display for Error {
 }
 
 #[derive(Debug)]
-struct Channel {}
+struct Channel {
+    num: u8,
+}
+
+type RelPkt = Cell<Option<Vec<u8>>>;
+
+struct RecvChannel<'a> {
+    packets: Vec<RelPkt>, // used to be called char **
+    seqnum: u16,
+    main: &'a Channel,
+}
 
 pub type PktResult = Result<Pkt<Vec<u8>>, Error>;
 type PktSender = mpsc::Sender<PktResult>;
 
+trait HandleError {
+    fn handle(&self, res: Result<(), Error>) -> bool;
+}
+
+impl HandleError for PktSender {
+    fn handle(&self, res: Result<(), Error>) -> bool {
+        if let Err(err) = res {
+            if !self.send(Err(err)).is_ok() {
+                return false;
+            }
+        }
+
+        true
+    }
+}
+
+fn to_seqnum(seqnum: u16) -> usize {
+    (seqnum as usize) & (REL_BUFFER - 1)
+}
+
 #[derive(Debug)]
 struct ConnInner {
     sock: net::UdpSocket,
@@ -98,56 +128,47 @@ struct ConnInner {
     chans: Vec<Channel>,
 }
 
-struct RecvChannel {
-    packets: Vec<Cell<Option<Vec<u8>>>>, // used to be called char **
-    seqnum: u16,
-    num: u8,
-}
+impl ConnInner {
+    pub fn send(&self, pkt: Pkt<&[u8]>) -> io::Result<()> {
+        let mut buf = Vec::with_capacity(4 + 2 + 1 + 1 + pkt.data.len());
+        buf.write_u32::<BigEndian>(PROTO_ID)?;
+        buf.write_u16::<BigEndian>(self.remote_id)?;
+        buf.write_u8(pkt.chan as u8)?;
+        buf.write_u8(PktType::Orig as u8)?;
+        buf.write(pkt.data)?;
 
-struct ConnReceiver {
-    tx: PktSender,
-    inner: Arc<ConnInner>,
-    chans: Vec<RecvChannel>,
-    inbox: [u8; UDP_PKT_SIZE],
-}
+        self.sock.send(&buf)?;
 
-impl ConnReceiver {
-    pub fn run(inner: Arc<ConnInner>, tx: PktSender) {
-        Self {
-            tx,
-            inner,
-            chans: (0..NUM_CHANNELS as u8)
-                .map(|num| RecvChannel {
-                    num,
-                    packets: (0..REL_BUFFER).map(|_| Cell::new(None)).collect(),
-                    seqnum: INIT_SEQNUM,
-                })
-                .collect(),
-            inbox: [0; UDP_PKT_SIZE],
-        }
-        .do_loop();
+        Ok(())
     }
 
-    fn handle_err(&self, res: Result<(), Error>) -> bool {
-        if let Err(err) = res {
-            if !self.tx.send(Err(err)).is_ok() {
-                return false;
-            }
-        }
-
-        true
-    }
+    fn recv_loop(&self, tx: PktSender) {
+        let mut inbox = [0; UDP_PKT_SIZE];
 
-    fn do_loop(&mut self) {
-        while self.handle_err(self.recv_pkt()) {}
+        let mut recv_chans = self
+            .chans
+            .iter()
+            .map(|main| RecvChannel {
+                main,
+                packets: (0..REL_BUFFER).map(|_| Cell::new(None)).collect(),
+                seqnum: INIT_SEQNUM,
+            })
+            .collect();
+
+        while tx.handle(self.recv_pkt(&mut inbox, &mut recv_chans, &tx)) {}
     }
 
-    fn recv_pkt(&mut self) -> Result<(), Error> {
+    fn recv_pkt(
+        &self,
+        buffer: &mut [u8],
+        chans: &mut Vec<RecvChannel>,
+        tx: &PktSender,
+    ) -> Result<(), Error> {
         use Error::*;
 
         // todo: reset timeout
-        let len = self.inner.sock.recv(&mut self.inbox)?;
-        let mut cursor = io::Cursor::new(&self.inbox[..len]);
+        let len = self.sock.recv(buffer)?;
+        let mut cursor = io::Cursor::new(&buffer[..len]);
 
         let proto_id = cursor.read_u32::<BigEndian>()?;
         if proto_id != PROTO_ID {
@@ -156,39 +177,45 @@ impl ConnReceiver {
 
         let peer_id = cursor.read_u16::<BigEndian>()?;
 
-        let n_channel = cursor.read_u8()?;
-        let mut channel = self
-            .chans
-            .get(n_channel as usize)
-            .ok_or(InvalidChannel(n_channel))?;
+        let n_chan = cursor.read_u8()?;
+        let chan = chans
+            .get_mut(n_chan as usize)
+            .ok_or(InvalidChannel(n_chan))?;
 
-        self.process_pkt(cursor, channel)
+        self.process_pkt(cursor, chan, tx)
     }
 
-    fn process_pkt(&self, mut cursor: io::Cursor<&[u8]>, chan: &RecvChannel) -> Result<(), Error> {
+    fn process_pkt(
+        &self,
+        mut cursor: io::Cursor<&[u8]>,
+        chan: &mut RecvChannel,
+        tx: &PktSender,
+    ) -> Result<(), Error> {
         use PktType::*;
 
         match cursor.read_u8()?.try_into()? {
             Ctl => {
+                dbg!("Ctl");
                 dbg!(cursor.remaining_slice());
             }
             Orig => {
-                self.tx.send(Ok(Pkt {
-                    chan: chan.num,
+                tx.send(Ok(Pkt {
+                    chan: chan.main.num,
                     unrel: true,
                     data: cursor.remaining_slice().into(),
                 }))?;
             }
             Split => {
+                dbg!("Split");
                 dbg!(cursor.remaining_slice());
             }
             Rel => {
                 let seqnum = cursor.read_u16::<BigEndian>()?;
-                chan.packets[seqnum as usize].set(Some(cursor.remaining_slice().into()));
+                chan.packets[to_seqnum(seqnum)].set(Some(cursor.remaining_slice().into()));
 
-                while let Some(pkt) = chan.packets[chan.seqnum as usize].take() {
-                    self.handle_err(self.process_pkt(io::Cursor::new(&pkt), chan));
-                    chan.seqnum.overflowing_add(1);
+                while let Some(pkt) = chan.packets[to_seqnum(chan.seqnum)].take() {
+                    tx.handle(self.process_pkt(io::Cursor::new(&pkt), chan, tx));
+                    chan.seqnum = chan.seqnum.overflowing_add(1).0;
                 }
             }
         }
@@ -197,21 +224,6 @@ impl ConnReceiver {
     }
 }
 
-impl ConnInner {
-    pub fn send(&self, pkt: Pkt<&[u8]>) -> io::Result<()> {
-        let mut buf = Vec::with_capacity(4 + 2 + 1 + 1 + pkt.data.len());
-        buf.write_u32::<BigEndian>(PROTO_ID)?;
-        buf.write_u16::<BigEndian>(self.remote_id)?;
-        buf.write_u8(pkt.chan as u8)?;
-        buf.write_u8(PktType::Orig as u8)?;
-        buf.write(pkt.data)?;
-
-        self.sock.send(&buf)?;
-
-        Ok(())
-    }
-}
-
 #[derive(Debug)]
 pub struct Conn {
     inner: Arc<ConnInner>,
@@ -226,14 +238,14 @@ impl Conn {
             sock: net::UdpSocket::bind("0.0.0.0:0")?,
             id: PeerID::Srv as u16,
             remote_id: PeerID::Nil as u16,
-            chans: (0..NUM_CHANNELS).map(|_| Channel {}).collect(),
+            chans: (0..NUM_CHANNELS as u8).map(|num| Channel { num }).collect(),
         });
 
         inner.sock.connect(addr)?;
 
         let recv_inner = Arc::clone(&inner);
         thread::spawn(move || {
-            ConnReceiver::run(recv_inner, tx);
+            recv_inner.recv_loop(tx);
         });
 
         Ok(Conn { inner, rx })