2 use async_recursion::async_recursion;
3 use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
11 time::{Duration, Instant},
13 use tokio::sync::{mpsc, watch, Mutex};
15 fn to_seqnum(seqnum: u16) -> usize {
16 (seqnum as usize) & (REL_BUFFER - 1)
19 type Result<T> = std::result::Result<T, Error>;
23 timestamp: Option<Instant>,
24 chunks: Vec<OnceCell<Vec<u8>>>,
29 packets: Vec<Option<Vec<u8>>>, // char ** 😛
30 splits: HashMap<u16, Split>,
35 pub(crate) struct RecvWorker<R: UdpReceiver, S: UdpSender> {
36 share: Arc<RudpShare<S>>,
37 close: watch::Receiver<bool>,
38 chans: Arc<Vec<Mutex<RecvChan>>>,
39 pkt_tx: mpsc::UnboundedSender<InPkt>,
43 impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
46 share: Arc<RudpShare<S>>,
47 close: watch::Receiver<bool>,
48 pkt_tx: mpsc::UnboundedSender<InPkt>,
60 packets: (0..REL_BUFFER).map(|_| None).collect(),
62 splits: HashMap::new(),
70 pub async fn run(&self) {
73 let cleanup_chans = Arc::clone(&self.chans);
74 let mut cleanup_close = self.close.clone();
80 .name("cleanup_splits")*/
82 let timeout = Duration::from_secs(TIMEOUT);
84 ticker!(timeout, cleanup_close, {
85 for chan_mtx in cleanup_chans.iter() {
86 let mut chan = chan_mtx.lock().await;
90 |_k, v| !matches!(v.timestamp, Some(t) if t.elapsed() < timeout),
97 let mut close = self.close.clone();
98 let timeout = tokio::time::sleep(Duration::from_secs(TIMEOUT));
102 if let Err(e) = self.handle(self.recv_pkt(&mut close, timeout.as_mut()).await) {
103 // TODO: figure out whether this is a good idea
104 if let RemoteDisco(to) = e {
105 self.pkt_tx.send(Err(RemoteDisco(to))).ok();
108 #[allow(clippy::single_match)]
110 // anon5's mt notifies the peer on timeout, C++ MT does not
111 LocalDisco /*| RemoteDisco(true)*/ => drop(
118 data: Cow::Borrowed(&[CtlType::Disco as u8]),
134 close: &mut watch::Receiver<bool>,
135 timeout: Pin<&mut tokio::time::Sleep>,
139 let mut cursor = io::Cursor::new(tokio::select! {
140 pkt = self.udp_rx.recv() => pkt?,
141 _ = tokio::time::sleep_until(timeout.deadline()) => return Err(RemoteDisco(true)),
142 _ = close.changed() => return Err(LocalDisco),
145 timeout.reset(tokio::time::Instant::now() + Duration::from_secs(TIMEOUT));
147 let proto_id = cursor.read_u32::<BigEndian>()?;
148 if proto_id != PROTO_ID {
149 return Err(InvalidProtoId(proto_id));
152 let _peer_id = cursor.read_u16::<BigEndian>()?;
154 let n_chan = cursor.read_u8()?;
157 .get(n_chan as usize)
158 .ok_or(InvalidChannel(n_chan))?
162 self.process_pkt(cursor, true, &mut chan).await
166 async fn process_pkt(
168 mut cursor: io::Cursor<Vec<u8>>,
174 match cursor.read_u8()?.try_into()? {
175 PktType::Ctl => match cursor.read_u8()?.try_into()? {
179 let seqnum = cursor.read_u16::<BigEndian>()?;
180 if let Some(ack) = self.share.chans[chan.num as usize]
186 ack.tx.send(true).ok();
189 CtlType::SetPeerID => {
190 // println!("SetPeerID");
192 let mut id = self.share.remote_id.write().await;
194 if *id != PeerID::Nil as u16 {
195 return Err(PeerIDAlreadySet);
198 *id = cursor.read_u16::<BigEndian>()?;
204 // println!("Disco");
205 return Err(RemoteDisco(false));
211 self.pkt_tx.send(Ok(Pkt {
214 data: Cow::Owned(cursor.remaining_slice().into()),
218 // println!("Split");
220 let seqnum = cursor.read_u16::<BigEndian>()?;
221 let chunk_index = cursor.read_u16::<BigEndian>()? as usize;
222 let chunk_count = cursor.read_u16::<BigEndian>()? as usize;
224 let mut split = chan.splits.entry(seqnum).or_insert_with(|| Split {
226 chunks: (0..chunk_count).map(|_| OnceCell::new()).collect(),
230 if split.chunks.len() != chunk_count {
231 return Err(InvalidChunkCount(split.chunks.len(), chunk_count));
237 .ok_or(InvalidChunkIndex(chunk_index, chunk_count))?
238 .set(cursor.remaining_slice().into())
244 split.timestamp = if unrel { Some(Instant::now()) } else { None };
246 if split.got == chunk_count {
247 self.pkt_tx.send(Ok(Pkt {
253 .flat_map(|chunk| chunk.get().unwrap().iter())
258 chan.splits.remove(&seqnum);
264 let seqnum = cursor.read_u16::<BigEndian>()?;
265 chan.packets[to_seqnum(seqnum)].replace(cursor.remaining_slice().into());
267 let mut ack_data = Vec::with_capacity(3);
268 ack_data.write_u8(CtlType::Ack as u8)?;
269 ack_data.write_u16::<BigEndian>(seqnum)?;
277 data: Cow::Borrowed(&ack_data),
282 fn next_pkt(chan: &mut RecvChan) -> Option<Vec<u8>> {
283 chan.packets[to_seqnum(chan.seqnum)].take()
286 while let Some(pkt) = next_pkt(chan) {
287 self.handle(self.process_pkt(io::Cursor::new(pkt), false, chan).await)?;
288 chan.seqnum = chan.seqnum.overflowing_add(1).0;
296 fn handle(&self, res: Result<()>) -> Result<()> {
301 Err(RemoteDisco(to)) => Err(RemoteDisco(to)),
302 Err(LocalDisco) => Err(LocalDisco),
303 Err(e) => Ok(self.pkt_tx.send(Err(e))?),