1 use crate::{error::Error, *};
2 use byteorder::{BigEndian, ReadBytesExt};
9 fn to_seqnum(seqnum: u16) -> usize {
10 (seqnum as usize) & (REL_BUFFER - 1)
14 packets: Vec<Cell<Option<Vec<u8>>>>, // in the good old days this used to be called char **
19 type PktTx = mpsc::Sender<InPkt>;
20 type Result = result::Result<(), Error>;
22 pub struct RecvWorker<R: UdpReceiver, S: UdpSender> {
23 share: Arc<RudpShare<S>>,
28 impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
29 pub fn new(udp_rx: R, share: Arc<RudpShare<S>>, pkt_tx: PktTx) -> Self {
38 let mut recv_chans = (0..NUM_CHANS as u8)
41 packets: (0..REL_BUFFER).map(|_| Cell::new(None)).collect(),
47 if let Err(e) = self.handle(self.recv_pkt(&mut recv_chans)) {
48 if let Error::LocalDisco = e {
55 data: &[CtlType::Disco as u8],
65 fn recv_pkt(&self, chans: &mut Vec<RelChan>) -> Result {
68 // todo: reset timeout
69 let mut cursor = io::Cursor::new(self.udp_rx.recv()?);
71 let proto_id = cursor.read_u32::<BigEndian>()?;
72 if proto_id != PROTO_ID {
73 do yeet InvalidProtoId(proto_id);
76 let peer_id = cursor.read_u16::<BigEndian>()?;
78 let n_chan = cursor.read_u8()?;
80 .get_mut(n_chan as usize)
81 .ok_or(InvalidChannel(n_chan))?;
83 self.process_pkt(cursor, chan)
86 fn process_pkt(&self, mut cursor: io::Cursor<Vec<u8>>, chan: &mut RelChan) -> Result {
91 match cursor.read_u8()?.try_into()? {
92 Ctl => match cursor.read_u8()?.try_into()? {
93 Disco => return Err(RemoteDisco),
99 self.pkt_tx.send(Ok(Pkt {
102 data: cursor.remaining_slice().into(),
107 dbg!(cursor.remaining_slice());
112 let seqnum = cursor.read_u16::<BigEndian>()?;
113 chan.packets[to_seqnum(seqnum)].set(Some(cursor.remaining_slice().into()));
115 while let Some(pkt) = chan.packets[to_seqnum(chan.seqnum)].take() {
116 self.handle(self.process_pkt(io::Cursor::new(pkt), chan))?;
117 chan.seqnum = chan.seqnum.overflowing_add(1).0;
125 fn handle(&self, res: Result) -> Result {
130 Err(RemoteDisco) => Err(RemoteDisco),
131 Err(LocalDisco) => Err(LocalDisco),
132 Err(e) => Ok(self.pkt_tx.send(Err(e))?),