From: Lizzy Fleckenstein Date: Thu, 22 Dec 2022 17:34:51 +0000 (+0100) Subject: b X-Git-Url: https://git.lizzy.rs/?a=commitdiff_plain;h=7058b16aeb0f269be55c10840d71eee632189879;p=mt_rudp.git b --- diff --git a/src/main.rs b/src/main.rs index bda3f38..b097046 100644 --- a/src/main.rs +++ b/src/main.rs @@ -87,13 +87,6 @@ impl fmt::Display for Error { #[derive(Debug)] struct Channel {} -#[derive(Debug)] -struct RecvChannel<'a> { - packets: Vec>>, // used to be called char ** - seqnum: u16, - chan: &'a Channel, -} - pub type PktResult = Result>, Error>; type PktSender = mpsc::Sender; @@ -105,50 +98,56 @@ struct ConnInner { chans: Vec, } -impl ConnInner { - pub fn send(&self, pkt: Pkt<&[u8]>) -> io::Result<()> { - let mut buf = Vec::with_capacity(4 + 2 + 1 + 1 + pkt.data.len()); - buf.write_u32::(PROTO_ID)?; - buf.write_u16::(self.remote_id)?; - buf.write_u8(pkt.chan as u8)?; - buf.write_u8(PktType::Orig as u8)?; - buf.write(pkt.data)?; +struct RecvChannel { + packets: Vec>>>, // used to be called char ** + seqnum: u16, + num: u8, +} - self.sock.send(&buf)?; +struct ConnReceiver { + tx: PktSender, + inner: Arc, + chans: Vec, + inbox: [u8; UDP_PKT_SIZE], +} - Ok(()) +impl ConnReceiver { + pub fn run(inner: Arc, tx: PktSender) { + Self { + tx, + inner, + chans: (0..NUM_CHANNELS as u8) + .map(|num| RecvChannel { + num, + packets: (0..REL_BUFFER).map(|_| Cell::new(None)).collect(), + seqnum: INIT_SEQNUM, + }) + .collect(), + inbox: [0; UDP_PKT_SIZE], + } + .do_loop(); } - fn recv_loop(&self, tx: PktSender) { - let mut inbox = [0; UDP_PKT_SIZE]; - - let mut recv_chans = self.channels.map(|chan| RecvChannel { - chan, - packets: (0..REL_BUFFER).map(|_| Cell::new(None)), - seqnum: INIT_SEQNUM, - }); - - loop { - if let Err(err) = self.recv_pkt(&mut inbox, &mut recv_chans, &tx) { - if !tx.send(Err(err)).is_ok() { - break; - } + fn handle_err(&self, res: Result<(), Error>) -> bool { + if let Err(err) = res { + if !self.tx.send(Err(err)).is_ok() { + return false; } } + + true + } + + fn do_loop(&mut self) { + while self.handle_err(self.recv_pkt()) {} } - fn recv_pkt( - &self, - buffer: &mut [u8], - chans: &mut Vec, - tx: &PktSender, - ) -> Result<(), Error> { + fn recv_pkt(&mut self) -> Result<(), Error> { use Error::*; - use PktType::*; // todo: reset timeout - let len = self.sock.recv(buffer)?; - let mut cursor = io::Cursor::new(&buffer[..len]); + let len = self.inner.sock.recv(&mut self.inbox)?; + let mut cursor = io::Cursor::new(&self.inbox[..len]); let proto_id = cursor.read_u32::()?; if proto_id != PROTO_ID { @@ -160,24 +159,22 @@ impl ConnInner { let n_channel = cursor.read_u8()?; let mut channel = self .chans - .get_mut(n_channel as usize) + .get(n_channel as usize) .ok_or(InvalidChannel(n_channel))?; - self.process_pkt(cursor, channel); + self.process_pkt(cursor, channel) } - fn process_pkt( - &self, - mut cursor: io::Cursor<&[u8]>, - chan: &mut RecvChannel, - ) -> Result<(), Error> { + fn process_pkt(&self, mut cursor: io::Cursor<&[u8]>, chan: &RecvChannel) -> Result<(), Error> { + use PktType::*; + match cursor.read_u8()?.try_into()? { Ctl => { dbg!(cursor.remaining_slice()); } Orig => { - tx.send(Ok(Pkt { - chan: n_channel, + self.tx.send(Ok(Pkt { + chan: chan.num, unrel: true, data: cursor.remaining_slice().into(), }))?; @@ -187,10 +184,10 @@ impl ConnInner { } Rel => { let seqnum = cursor.read_u16::()?; - chan.packets[seqnum].set(cursor.remaining_slice().into()); + chan.packets[seqnum as usize].set(Some(cursor.remaining_slice().into())); - while Some(pkt) = chan.packets[chan.seqnum].take() { - self.process_pkt(io::Cursor::new(&pkt), chan)?; + while let Some(pkt) = chan.packets[chan.seqnum as usize].take() { + self.handle_err(self.process_pkt(io::Cursor::new(&pkt), chan)); chan.seqnum.overflowing_add(1); } } @@ -200,6 +197,21 @@ impl ConnInner { } } +impl ConnInner { + pub fn send(&self, pkt: Pkt<&[u8]>) -> io::Result<()> { + let mut buf = Vec::with_capacity(4 + 2 + 1 + 1 + pkt.data.len()); + buf.write_u32::(PROTO_ID)?; + buf.write_u16::(self.remote_id)?; + buf.write_u8(pkt.chan as u8)?; + buf.write_u8(PktType::Orig as u8)?; + buf.write(pkt.data)?; + + self.sock.send(&buf)?; + + Ok(()) + } +} + #[derive(Debug)] pub struct Conn { inner: Arc, @@ -221,7 +233,7 @@ impl Conn { let recv_inner = Arc::clone(&inner); thread::spawn(move || { - recv_inner.recv_loop(tx); + ConnReceiver::run(recv_inner, tx); }); Ok(Conn { inner, rx })