]> git.lizzy.rs Git - mt_rudp.git/blob - src/recv_worker.rs
fix
[mt_rudp.git] / src / recv_worker.rs
1 use crate::{error::Error, *};
2 use byteorder::{BigEndian, ReadBytesExt};
3 use std::{
4     cell::Cell,
5     io, result,
6     sync::{mpsc, Arc},
7 };
8
9 fn to_seqnum(seqnum: u16) -> usize {
10     (seqnum as usize) & (REL_BUFFER - 1)
11 }
12
13 struct RelChan {
14     packets: Vec<Cell<Option<Vec<u8>>>>, // in the good old days this used to be called char **
15     seqnum: u16,
16     num: u8,
17 }
18
19 type PktTx = mpsc::Sender<InPkt>;
20 type Result = result::Result<(), Error>;
21
22 pub struct RecvWorker<R: UdpReceiver, S: UdpSender> {
23     share: Arc<RudpShare<S>>,
24     pkt_tx: PktTx,
25     udp_rx: R,
26 }
27
28 impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
29     pub fn new(udp_rx: R, share: Arc<RudpShare<S>>, pkt_tx: PktTx) -> Self {
30         Self {
31             udp_rx,
32             share,
33             pkt_tx,
34         }
35     }
36
37     pub fn run(&self) {
38         let mut recv_chans = (0..NUM_CHANS as u8)
39             .map(|num| RelChan {
40                 num,
41                 packets: (0..REL_BUFFER).map(|_| Cell::new(None)).collect(),
42                 seqnum: INIT_SEQNUM,
43             })
44             .collect();
45
46         loop {
47             if let Err(e) = self.handle(self.recv_pkt(&mut recv_chans)) {
48                 if let Error::LocalDisco = e {
49                     self.share
50                         .send(
51                             PktType::Ctl,
52                             Pkt {
53                                 unrel: true,
54                                 chan: 0,
55                                 data: &[CtlType::Disco as u8],
56                             },
57                         )
58                         .ok();
59                 }
60                 break;
61             }
62         }
63     }
64
65     fn recv_pkt(&self, chans: &mut Vec<RelChan>) -> Result {
66         use Error::*;
67
68         // todo: reset timeout
69         let mut cursor = io::Cursor::new(self.udp_rx.recv()?);
70
71         let proto_id = cursor.read_u32::<BigEndian>()?;
72         if proto_id != PROTO_ID {
73             do yeet InvalidProtoId(proto_id);
74         }
75
76         let peer_id = cursor.read_u16::<BigEndian>()?;
77
78         let n_chan = cursor.read_u8()?;
79         let chan = chans
80             .get_mut(n_chan as usize)
81             .ok_or(InvalidChannel(n_chan))?;
82
83         self.process_pkt(cursor, chan)
84     }
85
86     fn process_pkt(&self, mut cursor: io::Cursor<Vec<u8>>, chan: &mut RelChan) -> Result {
87         use CtlType::*;
88         use Error::*;
89         use PktType::*;
90
91         match cursor.read_u8()?.try_into()? {
92             Ctl => match cursor.read_u8()?.try_into()? {
93                 Disco => return Err(RemoteDisco),
94                 _ => {}
95             },
96             Orig => {
97                 println!("Orig");
98
99                 self.pkt_tx.send(Ok(Pkt {
100                     chan: chan.num,
101                     unrel: true,
102                     data: cursor.remaining_slice().into(),
103                 }))?;
104             }
105             Split => {
106                 println!("Split");
107                 dbg!(cursor.remaining_slice());
108             }
109             Rel => {
110                 println!("Rel");
111
112                 let seqnum = cursor.read_u16::<BigEndian>()?;
113                 chan.packets[to_seqnum(seqnum)].set(Some(cursor.remaining_slice().into()));
114
115                 while let Some(pkt) = chan.packets[to_seqnum(chan.seqnum)].take() {
116                     self.handle(self.process_pkt(io::Cursor::new(pkt), chan))?;
117                     chan.seqnum = chan.seqnum.overflowing_add(1).0;
118                 }
119             }
120         }
121
122         Ok(())
123     }
124
125     fn handle(&self, res: Result) -> Result {
126         use Error::*;
127
128         match res {
129             Ok(v) => Ok(v),
130             Err(RemoteDisco) => Err(RemoteDisco),
131             Err(LocalDisco) => Err(LocalDisco),
132             Err(e) => Ok(self.pkt_tx.send(Err(e))?),
133         }
134     }
135 }