From: Lizzy Fleckenstein Date: Thu, 22 Dec 2022 18:01:35 +0000 (+0100) Subject: works X-Git-Url: https://git.lizzy.rs/?a=commitdiff_plain;h=209fcdbebf6354fe3246938e46dac516c6205630;p=mt_rudp.git works --- diff --git a/src/main.rs b/src/main.rs index b097046..61ca983 100644 --- a/src/main.rs +++ b/src/main.rs @@ -63,7 +63,7 @@ impl From> for Error { } impl From> for Error { - fn from(err: mpsc::SendError) -> Self { + fn from(_err: mpsc::SendError) -> Self { Self::LocalHangup } } @@ -85,11 +85,41 @@ impl fmt::Display for Error { } #[derive(Debug)] -struct Channel {} +struct Channel { + num: u8, +} + +type RelPkt = Cell>>; + +struct RecvChannel<'a> { + packets: Vec, // used to be called char ** + seqnum: u16, + main: &'a Channel, +} pub type PktResult = Result>, Error>; type PktSender = mpsc::Sender; +trait HandleError { + fn handle(&self, res: Result<(), Error>) -> bool; +} + +impl HandleError for PktSender { + fn handle(&self, res: Result<(), Error>) -> bool { + if let Err(err) = res { + if !self.send(Err(err)).is_ok() { + return false; + } + } + + true + } +} + +fn to_seqnum(seqnum: u16) -> usize { + (seqnum as usize) & (REL_BUFFER - 1) +} + #[derive(Debug)] struct ConnInner { sock: net::UdpSocket, @@ -98,56 +128,47 @@ struct ConnInner { chans: Vec, } -struct RecvChannel { - packets: Vec>>>, // used to be called char ** - seqnum: u16, - num: u8, -} +impl ConnInner { + pub fn send(&self, pkt: Pkt<&[u8]>) -> io::Result<()> { + let mut buf = Vec::with_capacity(4 + 2 + 1 + 1 + pkt.data.len()); + buf.write_u32::(PROTO_ID)?; + buf.write_u16::(self.remote_id)?; + buf.write_u8(pkt.chan as u8)?; + buf.write_u8(PktType::Orig as u8)?; + buf.write(pkt.data)?; -struct ConnReceiver { - tx: PktSender, - inner: Arc, - chans: Vec, - inbox: [u8; UDP_PKT_SIZE], -} + self.sock.send(&buf)?; -impl ConnReceiver { - pub fn run(inner: Arc, tx: PktSender) { - Self { - tx, - inner, - chans: (0..NUM_CHANNELS as u8) - .map(|num| RecvChannel { - num, - packets: (0..REL_BUFFER).map(|_| Cell::new(None)).collect(), - seqnum: INIT_SEQNUM, - }) - .collect(), - inbox: [0; UDP_PKT_SIZE], - } - .do_loop(); + Ok(()) } - fn handle_err(&self, res: Result<(), Error>) -> bool { - if let Err(err) = res { - if !self.tx.send(Err(err)).is_ok() { - return false; - } - } - - true - } + fn recv_loop(&self, tx: PktSender) { + let mut inbox = [0; UDP_PKT_SIZE]; - fn do_loop(&mut self) { - while self.handle_err(self.recv_pkt()) {} + let mut recv_chans = self + .chans + .iter() + .map(|main| RecvChannel { + main, + packets: (0..REL_BUFFER).map(|_| Cell::new(None)).collect(), + seqnum: INIT_SEQNUM, + }) + .collect(); + + while tx.handle(self.recv_pkt(&mut inbox, &mut recv_chans, &tx)) {} } - fn recv_pkt(&mut self) -> Result<(), Error> { + fn recv_pkt( + &self, + buffer: &mut [u8], + chans: &mut Vec, + tx: &PktSender, + ) -> Result<(), Error> { use Error::*; // todo: reset timeout - let len = self.inner.sock.recv(&mut self.inbox)?; - let mut cursor = io::Cursor::new(&self.inbox[..len]); + let len = self.sock.recv(buffer)?; + let mut cursor = io::Cursor::new(&buffer[..len]); let proto_id = cursor.read_u32::()?; if proto_id != PROTO_ID { @@ -156,39 +177,45 @@ impl ConnReceiver { let peer_id = cursor.read_u16::()?; - let n_channel = cursor.read_u8()?; - let mut channel = self - .chans - .get(n_channel as usize) - .ok_or(InvalidChannel(n_channel))?; + let n_chan = cursor.read_u8()?; + let chan = chans + .get_mut(n_chan as usize) + .ok_or(InvalidChannel(n_chan))?; - self.process_pkt(cursor, channel) + self.process_pkt(cursor, chan, tx) } - fn process_pkt(&self, mut cursor: io::Cursor<&[u8]>, chan: &RecvChannel) -> Result<(), Error> { + fn process_pkt( + &self, + mut cursor: io::Cursor<&[u8]>, + chan: &mut RecvChannel, + tx: &PktSender, + ) -> Result<(), Error> { use PktType::*; match cursor.read_u8()?.try_into()? { Ctl => { + dbg!("Ctl"); dbg!(cursor.remaining_slice()); } Orig => { - self.tx.send(Ok(Pkt { - chan: chan.num, + tx.send(Ok(Pkt { + chan: chan.main.num, unrel: true, data: cursor.remaining_slice().into(), }))?; } Split => { + dbg!("Split"); dbg!(cursor.remaining_slice()); } Rel => { let seqnum = cursor.read_u16::()?; - chan.packets[seqnum as usize].set(Some(cursor.remaining_slice().into())); + chan.packets[to_seqnum(seqnum)].set(Some(cursor.remaining_slice().into())); - while let Some(pkt) = chan.packets[chan.seqnum as usize].take() { - self.handle_err(self.process_pkt(io::Cursor::new(&pkt), chan)); - chan.seqnum.overflowing_add(1); + while let Some(pkt) = chan.packets[to_seqnum(chan.seqnum)].take() { + tx.handle(self.process_pkt(io::Cursor::new(&pkt), chan, tx)); + chan.seqnum = chan.seqnum.overflowing_add(1).0; } } } @@ -197,21 +224,6 @@ impl ConnReceiver { } } -impl ConnInner { - pub fn send(&self, pkt: Pkt<&[u8]>) -> io::Result<()> { - let mut buf = Vec::with_capacity(4 + 2 + 1 + 1 + pkt.data.len()); - buf.write_u32::(PROTO_ID)?; - buf.write_u16::(self.remote_id)?; - buf.write_u8(pkt.chan as u8)?; - buf.write_u8(PktType::Orig as u8)?; - buf.write(pkt.data)?; - - self.sock.send(&buf)?; - - Ok(()) - } -} - #[derive(Debug)] pub struct Conn { inner: Arc, @@ -226,14 +238,14 @@ impl Conn { sock: net::UdpSocket::bind("0.0.0.0:0")?, id: PeerID::Srv as u16, remote_id: PeerID::Nil as u16, - chans: (0..NUM_CHANNELS).map(|_| Channel {}).collect(), + chans: (0..NUM_CHANNELS as u8).map(|num| Channel { num }).collect(), }); inner.sock.connect(addr)?; let recv_inner = Arc::clone(&inner); thread::spawn(move || { - ConnReceiver::run(recv_inner, tx); + recv_inner.recv_loop(tx); }); Ok(Conn { inner, rx })