1 use crate::{error::Error, *};
2 use byteorder::{BigEndian, ReadBytesExt};
10 use tokio::sync::{mpsc, Mutex};
12 fn to_seqnum(seqnum: u16) -> usize {
13 (seqnum as usize) & (REL_BUFFER - 1)
16 type Result = std::result::Result<(), Error>;
19 timestamp: time::Instant,
23 packets: Vec<Cell<Option<Vec<u8>>>>, // in the good old days this used to be called char **
24 splits: HashMap<u16, Split>,
29 pub struct RecvWorker<R: UdpReceiver, S: UdpSender> {
30 share: Arc<RudpShare<S>>,
31 chans: Arc<Vec<Mutex<Chan>>>,
32 pkt_tx: mpsc::UnboundedSender<InPkt>,
36 impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
37 pub fn new(udp_rx: R, share: Arc<RudpShare<S>>, pkt_tx: mpsc::UnboundedSender<InPkt>) -> Self {
47 packets: (0..REL_BUFFER).map(|_| Cell::new(None)).collect(),
49 splits: HashMap::new(),
57 pub async fn run(&self) {
58 let cleanup_chans = Arc::downgrade(&self.chans);
59 tokio::spawn(async move {
60 let timeout = time::Duration::from_secs(TIMEOUT);
61 let mut interval = tokio::time::interval(timeout);
63 while let Some(chans) = Weak::upgrade(&cleanup_chans) {
64 for chan in chans.iter() {
65 let mut ch = chan.lock().await;
68 .drain_filter(|_k, v| v.timestamp.elapsed() < timeout)
72 interval.tick().await;
77 if let Err(e) = self.handle(self.recv_pkt().await) {
78 if let Error::LocalDisco = e {
85 data: &[CtlType::Disco as u8],
96 async fn recv_pkt(&self) -> Result {
99 // todo: reset timeout
100 let mut cursor = io::Cursor::new(self.udp_rx.recv().await?);
102 let proto_id = cursor.read_u32::<BigEndian>()?;
103 if proto_id != PROTO_ID {
104 do yeet InvalidProtoId(proto_id);
107 let peer_id = cursor.read_u16::<BigEndian>()?;
109 let n_chan = cursor.read_u8()?;
112 .get(n_chan as usize)
113 .ok_or(InvalidChannel(n_chan))?
117 self.process_pkt(cursor, &mut chan)
120 fn process_pkt(&self, mut cursor: io::Cursor<Vec<u8>>, chan: &mut Chan) -> Result {
125 match cursor.read_u8()?.try_into()? {
126 Ctl => match cursor.read_u8()?.try_into()? {
127 Disco => return Err(RemoteDisco),
133 self.pkt_tx.send(Ok(Pkt {
136 data: cursor.remaining_slice().into(),
141 dbg!(cursor.remaining_slice());
146 let seqnum = cursor.read_u16::<BigEndian>()?;
147 chan.packets[to_seqnum(seqnum)].set(Some(cursor.remaining_slice().into()));
149 while let Some(pkt) = chan.packets[to_seqnum(chan.seqnum)].take() {
150 self.handle(self.process_pkt(io::Cursor::new(pkt), chan))?;
151 chan.seqnum = chan.seqnum.overflowing_add(1).0;
159 fn handle(&self, res: Result) -> Result {
164 Err(RemoteDisco) => Err(RemoteDisco),
165 Err(LocalDisco) => Err(LocalDisco),
166 Err(e) => Ok(self.pkt_tx.send(Err(e))?),