}
impl From<mpsc::SendError<PktResult>> for Error {
- fn from(err: mpsc::SendError<PktResult>) -> Self {
+ fn from(_err: mpsc::SendError<PktResult>) -> Self {
Self::LocalHangup
}
}
}
#[derive(Debug)]
-struct Channel {}
+struct Channel {
+ num: u8,
+}
+
+type RelPkt = Cell<Option<Vec<u8>>>;
+
+struct RecvChannel<'a> {
+ packets: Vec<RelPkt>, // used to be called char **
+ seqnum: u16,
+ main: &'a Channel,
+}
pub type PktResult = Result<Pkt<Vec<u8>>, Error>;
type PktSender = mpsc::Sender<PktResult>;
+trait HandleError {
+ fn handle(&self, res: Result<(), Error>) -> bool;
+}
+
+impl HandleError for PktSender {
+ fn handle(&self, res: Result<(), Error>) -> bool {
+ if let Err(err) = res {
+ if !self.send(Err(err)).is_ok() {
+ return false;
+ }
+ }
+
+ true
+ }
+}
+
+fn to_seqnum(seqnum: u16) -> usize {
+ (seqnum as usize) & (REL_BUFFER - 1)
+}
+
#[derive(Debug)]
struct ConnInner {
sock: net::UdpSocket,
chans: Vec<Channel>,
}
-struct RecvChannel {
- packets: Vec<Cell<Option<Vec<u8>>>>, // used to be called char **
- seqnum: u16,
- num: u8,
-}
+impl ConnInner {
+ pub fn send(&self, pkt: Pkt<&[u8]>) -> io::Result<()> {
+ let mut buf = Vec::with_capacity(4 + 2 + 1 + 1 + pkt.data.len());
+ buf.write_u32::<BigEndian>(PROTO_ID)?;
+ buf.write_u16::<BigEndian>(self.remote_id)?;
+ buf.write_u8(pkt.chan as u8)?;
+ buf.write_u8(PktType::Orig as u8)?;
+ buf.write(pkt.data)?;
-struct ConnReceiver {
- tx: PktSender,
- inner: Arc<ConnInner>,
- chans: Vec<RecvChannel>,
- inbox: [u8; UDP_PKT_SIZE],
-}
+ self.sock.send(&buf)?;
-impl ConnReceiver {
- pub fn run(inner: Arc<ConnInner>, tx: PktSender) {
- Self {
- tx,
- inner,
- chans: (0..NUM_CHANNELS as u8)
- .map(|num| RecvChannel {
- num,
- packets: (0..REL_BUFFER).map(|_| Cell::new(None)).collect(),
- seqnum: INIT_SEQNUM,
- })
- .collect(),
- inbox: [0; UDP_PKT_SIZE],
- }
- .do_loop();
+ Ok(())
}
- fn handle_err(&self, res: Result<(), Error>) -> bool {
- if let Err(err) = res {
- if !self.tx.send(Err(err)).is_ok() {
- return false;
- }
- }
-
- true
- }
+ fn recv_loop(&self, tx: PktSender) {
+ let mut inbox = [0; UDP_PKT_SIZE];
- fn do_loop(&mut self) {
- while self.handle_err(self.recv_pkt()) {}
+ let mut recv_chans = self
+ .chans
+ .iter()
+ .map(|main| RecvChannel {
+ main,
+ packets: (0..REL_BUFFER).map(|_| Cell::new(None)).collect(),
+ seqnum: INIT_SEQNUM,
+ })
+ .collect();
+
+ while tx.handle(self.recv_pkt(&mut inbox, &mut recv_chans, &tx)) {}
}
- fn recv_pkt(&mut self) -> Result<(), Error> {
+ fn recv_pkt(
+ &self,
+ buffer: &mut [u8],
+ chans: &mut Vec<RecvChannel>,
+ tx: &PktSender,
+ ) -> Result<(), Error> {
use Error::*;
// todo: reset timeout
- let len = self.inner.sock.recv(&mut self.inbox)?;
- let mut cursor = io::Cursor::new(&self.inbox[..len]);
+ let len = self.sock.recv(buffer)?;
+ let mut cursor = io::Cursor::new(&buffer[..len]);
let proto_id = cursor.read_u32::<BigEndian>()?;
if proto_id != PROTO_ID {
let peer_id = cursor.read_u16::<BigEndian>()?;
- let n_channel = cursor.read_u8()?;
- let mut channel = self
- .chans
- .get(n_channel as usize)
- .ok_or(InvalidChannel(n_channel))?;
+ let n_chan = cursor.read_u8()?;
+ let chan = chans
+ .get_mut(n_chan as usize)
+ .ok_or(InvalidChannel(n_chan))?;
- self.process_pkt(cursor, channel)
+ self.process_pkt(cursor, chan, tx)
}
- fn process_pkt(&self, mut cursor: io::Cursor<&[u8]>, chan: &RecvChannel) -> Result<(), Error> {
+ fn process_pkt(
+ &self,
+ mut cursor: io::Cursor<&[u8]>,
+ chan: &mut RecvChannel,
+ tx: &PktSender,
+ ) -> Result<(), Error> {
use PktType::*;
match cursor.read_u8()?.try_into()? {
Ctl => {
+ dbg!("Ctl");
dbg!(cursor.remaining_slice());
}
Orig => {
- self.tx.send(Ok(Pkt {
- chan: chan.num,
+ tx.send(Ok(Pkt {
+ chan: chan.main.num,
unrel: true,
data: cursor.remaining_slice().into(),
}))?;
}
Split => {
+ dbg!("Split");
dbg!(cursor.remaining_slice());
}
Rel => {
let seqnum = cursor.read_u16::<BigEndian>()?;
- chan.packets[seqnum as usize].set(Some(cursor.remaining_slice().into()));
+ chan.packets[to_seqnum(seqnum)].set(Some(cursor.remaining_slice().into()));
- while let Some(pkt) = chan.packets[chan.seqnum as usize].take() {
- self.handle_err(self.process_pkt(io::Cursor::new(&pkt), chan));
- chan.seqnum.overflowing_add(1);
+ while let Some(pkt) = chan.packets[to_seqnum(chan.seqnum)].take() {
+ tx.handle(self.process_pkt(io::Cursor::new(&pkt), chan, tx));
+ chan.seqnum = chan.seqnum.overflowing_add(1).0;
}
}
}
}
}
-impl ConnInner {
- pub fn send(&self, pkt: Pkt<&[u8]>) -> io::Result<()> {
- let mut buf = Vec::with_capacity(4 + 2 + 1 + 1 + pkt.data.len());
- buf.write_u32::<BigEndian>(PROTO_ID)?;
- buf.write_u16::<BigEndian>(self.remote_id)?;
- buf.write_u8(pkt.chan as u8)?;
- buf.write_u8(PktType::Orig as u8)?;
- buf.write(pkt.data)?;
-
- self.sock.send(&buf)?;
-
- Ok(())
- }
-}
-
#[derive(Debug)]
pub struct Conn {
inner: Arc<ConnInner>,
sock: net::UdpSocket::bind("0.0.0.0:0")?,
id: PeerID::Srv as u16,
remote_id: PeerID::Nil as u16,
- chans: (0..NUM_CHANNELS).map(|_| Channel {}).collect(),
+ chans: (0..NUM_CHANNELS as u8).map(|num| Channel { num }).collect(),
});
inner.sock.connect(addr)?;
let recv_inner = Arc::clone(&inner);
thread::spawn(move || {
- ConnReceiver::run(recv_inner, tx);
+ recv_inner.recv_loop(tx);
});
Ok(Conn { inner, rx })