2 use async_recursion::async_recursion;
3 use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
6 collections::{HashMap, VecDeque},
10 time::{Duration, Instant},
14 time::{interval, sleep, Interval, Sleep},
17 fn to_seqnum(seqnum: u16) -> usize {
18 (seqnum as usize) & (REL_BUFFER - 1)
21 type Result<T> = std::result::Result<T, Error>;
25 timestamp: Option<Instant>,
26 chunks: Vec<Option<Vec<u8>>>,
31 packets: Vec<Option<Vec<u8>>>, // char ** 😛
32 splits: HashMap<u16, Split>,
36 pub struct RudpReceiver<P: UdpPeer> {
37 pub(crate) share: Arc<RudpShare<P>>,
38 chans: [RecvChan; NUM_CHANS],
40 close: watch::Receiver<bool>,
45 timeout: Pin<Box<Sleep>>,
46 queue: VecDeque<Result<Pkt<'static>>>,
49 impl<P: UdpPeer> RudpReceiver<P> {
52 share: Arc<RudpShare<P>>,
53 close: watch::Receiver<bool>,
60 resend: interval(Duration::from_millis(500)),
61 ping: interval(Duration::from_secs(PING_TIMEOUT)),
62 cleanup: interval(Duration::from_secs(TIMEOUT)),
63 timeout: Box::pin(sleep(Duration::from_secs(TIMEOUT))),
64 chans: std::array::from_fn(|_| RecvChan {
65 packets: (0..REL_BUFFER).map(|_| None).collect(),
67 splits: HashMap::new(),
69 queue: VecDeque::new(),
73 fn handle_err(&mut self, res: Result<()>) -> Result<()> {
77 Err(RemoteDisco(_)) | Err(LocalDisco) => {
83 self.queue.push_back(Err(e));
89 pub async fn recv(&mut self) -> Option<Result<Pkt<'static>>> {
97 if let Some(x) = self.queue.pop_front() {
102 _ = self.close.changed() => {
104 return Some(Err(LocalDisco));
106 _ = self.cleanup.tick() => {
107 let timeout = Duration::from_secs(TIMEOUT);
109 for chan in self.chans.iter_mut() {
113 |_k, v| !matches!(v.timestamp, Some(t) if t.elapsed() < timeout),
118 _ = self.resend.tick() => {
119 for chan in self.share.chans.iter() {
120 for (_, ack) in chan.lock().await.acks.iter() {
121 self.share.send_raw(&ack.data).await.ok(); // TODO: handle error (?)
125 _ = self.ping.tick() => {
132 data: Cow::Borrowed(&[CtlType::Ping as u8]),
138 _ = &mut self.timeout => {
140 return Some(Err(RemoteDisco(true)));
142 pkt = self.udp.recv() => {
143 if let Err(e) = self.handle_pkt(pkt).await {
151 async fn handle_pkt(&mut self, pkt: io::Result<Vec<u8>>) -> Result<()> {
154 let mut cursor = io::Cursor::new(pkt?);
158 .reset(tokio::time::Instant::now() + Duration::from_secs(TIMEOUT));
160 let proto_id = cursor.read_u32::<BigEndian>()?;
161 if proto_id != PROTO_ID {
162 return Err(InvalidProtoId(proto_id));
165 let _peer_id = cursor.read_u16::<BigEndian>()?;
167 let chan = cursor.read_u8()?;
168 if chan >= NUM_CHANS as u8 {
169 return Err(InvalidChannel(chan));
172 let res = self.process_pkt(cursor, true, chan).await;
173 self.handle_err(res)?;
179 async fn process_pkt(
181 mut cursor: io::Cursor<Vec<u8>>,
187 let ch = chan as usize;
188 match cursor.read_u8()?.try_into()? {
189 PktType::Ctl => match cursor.read_u8()?.try_into()? {
193 let seqnum = cursor.read_u16::<BigEndian>()?;
194 if let Some(ack) = self.share.chans[ch].lock().await.acks.remove(&seqnum) {
195 ack.tx.send(true).ok();
198 CtlType::SetPeerID => {
199 // println!("SetPeerID");
201 let mut id = self.share.remote_id.write().await;
203 if *id != PeerID::Nil as u16 {
204 return Err(PeerIDAlreadySet);
207 *id = cursor.read_u16::<BigEndian>()?;
213 // println!("Disco");
214 return Err(RemoteDisco(false));
220 self.queue.push_back(Ok(Pkt {
223 data: Cow::Owned(cursor.remaining_slice().into()),
227 // println!("Split");
229 let seqnum = cursor.read_u16::<BigEndian>()?;
230 let chunk_index = cursor.read_u16::<BigEndian>()? as usize;
231 let chunk_count = cursor.read_u16::<BigEndian>()? as usize;
233 let mut split = self.chans[ch]
236 .or_insert_with(|| Split {
238 chunks: (0..chunk_count).map(|_| None).collect(),
242 if split.chunks.len() != chunk_count {
243 return Err(InvalidChunkCount(split.chunks.len(), chunk_count));
248 .get_mut(chunk_index)
249 .ok_or(InvalidChunkIndex(chunk_index, chunk_count))?
250 .replace(cursor.remaining_slice().into())
256 split.timestamp = if unrel { Some(Instant::now()) } else { None };
258 if split.got == chunk_count {
259 let split = self.chans[ch].splits.remove(&seqnum).unwrap();
261 self.queue.push_back(Ok(Pkt {
268 .reduce(|mut a, mut b| {
280 let seqnum = cursor.read_u16::<BigEndian>()?;
281 self.chans[ch].packets[to_seqnum(seqnum)].replace(cursor.remaining_slice().into());
283 let mut ack_data = Vec::with_capacity(3);
284 ack_data.write_u8(CtlType::Ack as u8)?;
285 ack_data.write_u16::<BigEndian>(seqnum)?;
293 data: ack_data.into(),
298 let next_pkt = |chan: &mut RecvChan| chan.packets[to_seqnum(chan.seqnum)].take();
299 while let Some(pkt) = next_pkt(&mut self.chans[ch]) {
300 let res = self.process_pkt(io::Cursor::new(pkt), false, chan).await;
301 self.handle_err(res)?;
302 self.chans[ch].seqnum = self.chans[ch].seqnum.overflowing_add(1).0;