]> git.lizzy.rs Git - mt_rudp.git/blob - src/recv_worker.rs
files
[mt_rudp.git] / src / recv_worker.rs
1 use crate::{error::Error, CtlType, InPkt, Pkt, PktType, RudpShare, UdpReceiver, UdpSender};
2 use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
3 use num_enum::{TryFromPrimitive, TryFromPrimitiveError};
4 use std::{
5     cell::Cell,
6     io, result,
7     sync::{mpsc, Arc},
8 };
9
10 fn to_seqnum(seqnum: u16) -> usize {
11     (seqnum as usize) & (crate::REL_BUFFER - 1)
12 }
13
14 struct RelChan {
15     packets: Vec<Cell<Option<Vec<u8>>>>, // in the good old days this used to be called char **
16     seqnum: u16,
17     num: u8,
18 }
19
20 type PktTx = mpsc::Sender<InPkt>;
21 type Result = result::Result<(), Error>;
22
23 pub struct RecvWorker<R: UdpReceiver, S: UdpSender> {
24     share: Arc<RudpShare<S>>,
25     pkt_tx: PktTx,
26     udp_rx: R,
27 }
28
29 impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
30     pub fn new(udp_rx: R, share: Arc<RudpShare<S>>, pkt_tx: PktTx) -> Self {
31         Self {
32             udp_rx,
33             share,
34             pkt_tx,
35         }
36     }
37
38     pub fn run(&self) {
39         let mut recv_chans = (0..crate::NUM_CHANS as u8)
40             .map(|num| RelChan {
41                 num,
42                 packets: (0..crate::REL_BUFFER).map(|_| Cell::new(None)).collect(),
43                 seqnum: crate::INIT_SEQNUM,
44             })
45             .collect();
46
47         loop {
48             if let Err(e) = self.handle(self.recv_pkt(&mut recv_chans)) {
49                 if let Error::LocalDisco = e {
50                     self.share
51                         .send(
52                             PktType::Ctl,
53                             Pkt {
54                                 unrel: true,
55                                 chan: 0,
56                                 data: &[CtlType::Disco as u8],
57                             },
58                         )
59                         .ok();
60                 }
61                 break;
62             }
63         }
64     }
65
66     fn recv_pkt(&self, chans: &mut Vec<RelChan>) -> Result {
67         use Error::*;
68
69         // todo: reset timeout
70         let mut cursor = io::Cursor::new(self.udp_rx.recv()?);
71
72         let proto_id = cursor.read_u32::<BigEndian>()?;
73         if proto_id != crate::PROTO_ID {
74             do yeet InvalidProtoId(proto_id);
75         }
76
77         let peer_id = cursor.read_u16::<BigEndian>()?;
78
79         let n_chan = cursor.read_u8()?;
80         let chan = chans
81             .get_mut(n_chan as usize)
82             .ok_or(InvalidChannel(n_chan))?;
83
84         self.process_pkt(cursor, chan)
85     }
86
87     fn process_pkt(&self, mut cursor: io::Cursor<Vec<u8>>, chan: &mut RelChan) -> Result {
88         use CtlType::*;
89         use Error::*;
90         use PktType::*;
91
92         match cursor.read_u8()?.try_into()? {
93             Ctl => match cursor.read_u8()?.try_into()? {
94                 Disco => return Err(RemoteDisco),
95                 _ => {}
96             },
97             Orig => {
98                 println!("Orig");
99
100                 self.pkt_tx.send(Ok(Pkt {
101                     chan: chan.num,
102                     unrel: true,
103                     data: cursor.remaining_slice().into(),
104                 }))?;
105             }
106             Split => {
107                 println!("Split");
108                 dbg!(cursor.remaining_slice());
109             }
110             Rel => {
111                 println!("Rel");
112
113                 let seqnum = cursor.read_u16::<BigEndian>()?;
114                 chan.packets[to_seqnum(seqnum)].set(Some(cursor.remaining_slice().into()));
115
116                 while let Some(pkt) = chan.packets[to_seqnum(chan.seqnum)].take() {
117                     self.handle(self.process_pkt(io::Cursor::new(pkt), chan))?;
118                     chan.seqnum = chan.seqnum.overflowing_add(1).0;
119                 }
120             }
121         }
122
123         Ok(())
124     }
125
126     fn handle(&self, res: Result) -> Result {
127         use Error::*;
128
129         match res {
130             Ok(v) => Ok(v),
131             Err(RemoteDisco) => Err(RemoteDisco),
132             Err(LocalDisco) => Err(LocalDisco),
133             Err(e) => Ok(self.pkt_tx.send(Err(e))?),
134         }
135     }
136 }