1 use crate::{error::Error, CtlType, InPkt, Pkt, PktType, RudpShare, UdpReceiver, UdpSender};
2 use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
3 use num_enum::{TryFromPrimitive, TryFromPrimitiveError};
10 fn to_seqnum(seqnum: u16) -> usize {
11 (seqnum as usize) & (crate::REL_BUFFER - 1)
15 packets: Vec<Cell<Option<Vec<u8>>>>, // in the good old days this used to be called char **
20 type PktTx = mpsc::Sender<InPkt>;
21 type Result = result::Result<(), Error>;
23 pub struct RecvWorker<R: UdpReceiver, S: UdpSender> {
24 share: Arc<RudpShare<S>>,
29 impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
30 pub fn new(udp_rx: R, share: Arc<RudpShare<S>>, pkt_tx: PktTx) -> Self {
39 let mut recv_chans = (0..crate::NUM_CHANS as u8)
42 packets: (0..crate::REL_BUFFER).map(|_| Cell::new(None)).collect(),
43 seqnum: crate::INIT_SEQNUM,
48 if let Err(e) = self.handle(self.recv_pkt(&mut recv_chans)) {
49 if let Error::LocalDisco = e {
56 data: &[CtlType::Disco as u8],
66 fn recv_pkt(&self, chans: &mut Vec<RelChan>) -> Result {
69 // todo: reset timeout
70 let mut cursor = io::Cursor::new(self.udp_rx.recv()?);
72 let proto_id = cursor.read_u32::<BigEndian>()?;
73 if proto_id != crate::PROTO_ID {
74 do yeet InvalidProtoId(proto_id);
77 let peer_id = cursor.read_u16::<BigEndian>()?;
79 let n_chan = cursor.read_u8()?;
81 .get_mut(n_chan as usize)
82 .ok_or(InvalidChannel(n_chan))?;
84 self.process_pkt(cursor, chan)
87 fn process_pkt(&self, mut cursor: io::Cursor<Vec<u8>>, chan: &mut RelChan) -> Result {
92 match cursor.read_u8()?.try_into()? {
93 Ctl => match cursor.read_u8()?.try_into()? {
94 Disco => return Err(RemoteDisco),
100 self.pkt_tx.send(Ok(Pkt {
103 data: cursor.remaining_slice().into(),
108 dbg!(cursor.remaining_slice());
113 let seqnum = cursor.read_u16::<BigEndian>()?;
114 chan.packets[to_seqnum(seqnum)].set(Some(cursor.remaining_slice().into()));
116 while let Some(pkt) = chan.packets[to_seqnum(chan.seqnum)].take() {
117 self.handle(self.process_pkt(io::Cursor::new(pkt), chan))?;
118 chan.seqnum = chan.seqnum.overflowing_add(1).0;
126 fn handle(&self, res: Result) -> Result {
131 Err(RemoteDisco) => Err(RemoteDisco),
132 Err(LocalDisco) => Err(LocalDisco),
133 Err(e) => Ok(self.pkt_tx.send(Err(e))?),