2 use async_recursion::async_recursion;
3 use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
10 time::{Duration, Instant},
14 time::{interval, sleep, Interval, Sleep},
17 fn to_seqnum(seqnum: u16) -> usize {
18 (seqnum as usize) & (REL_BUFFER - 1)
21 type Result<T> = std::result::Result<T, Error>;
25 timestamp: Option<Instant>,
26 chunks: Vec<Option<Vec<u8>>>,
32 packets: Vec<Option<Vec<u8>>>, // char ** 😛
33 splits: HashMap<u16, Split>,
38 pub struct Worker<S: UdpSender, R: UdpReceiver> {
39 sender: Arc<Sender<S>>,
40 chans: [RecvChan; NUM_CHANS],
42 close: watch::Receiver<bool>,
46 timeout: Pin<Box<Sleep>>,
47 output: mpsc::UnboundedSender<Result<Pkt<'static>>>,
50 impl<S: UdpSender, R: UdpReceiver> Worker<S, R> {
53 close: watch::Receiver<bool>,
54 sender: Arc<Sender<S>>,
55 output: mpsc::UnboundedSender<Result<Pkt<'static>>>,
62 resend: interval(Duration::from_millis(500)),
63 ping: interval(Duration::from_secs(PING_TIMEOUT)),
64 cleanup: interval(Duration::from_secs(TIMEOUT)),
65 timeout: Box::pin(sleep(Duration::from_secs(TIMEOUT))),
66 chans: std::array::from_fn(|_| RecvChan {
67 packets: (0..REL_BUFFER).map(|_| None).collect(),
69 splits: HashMap::new(),
74 pub async fn run(mut self) {
79 _ = self.close.changed() => {
86 data: Cow::Borrowed(&[CtlType::Disco as u8]),
92 self.output.send(Err(LocalDisco)).ok();
95 _ = &mut self.timeout => {
96 self.output.send(Err(RemoteDisco(true))).ok();
99 _ = self.cleanup.tick() => {
100 let timeout = Duration::from_secs(TIMEOUT);
102 for chan in self.chans.iter_mut() {
106 |_k, v| !matches!(v.timestamp, Some(t) if t.elapsed() < timeout),
111 _ = self.resend.tick() => {
112 for chan in self.sender.chans.iter() {
113 for (_, ack) in chan.lock().await.acks.iter() {
114 self.sender.send_udp(&ack.data).await.ok();
118 _ = self.ping.tick() => {
125 data: Cow::Borrowed(&[CtlType::Ping as u8]),
131 pkt = self.input.recv() => {
132 if let Err(e) = self.handle_pkt(pkt).await {
133 self.output.send(Err(e)).ok();
140 async fn handle_pkt(&mut self, pkt: io::Result<Vec<u8>>) -> Result<()> {
143 let mut cursor = io::Cursor::new(pkt?);
147 .reset(tokio::time::Instant::now() + Duration::from_secs(TIMEOUT));
149 let proto_id = cursor.read_u32::<BigEndian>()?;
150 if proto_id != PROTO_ID {
151 return Err(InvalidProtoId(proto_id));
154 let _peer_id = cursor.read_u16::<BigEndian>()?;
156 let chan = cursor.read_u8()?;
157 if chan >= NUM_CHANS as u8 {
158 return Err(InvalidChannel(chan));
161 self.process_pkt(cursor, true, chan).await
165 async fn process_pkt(
167 mut cursor: io::Cursor<Vec<u8>>,
173 let ch = chan as usize;
174 match cursor.read_u8()?.try_into()? {
175 PktType::Ctl => match cursor.read_u8()?.try_into()? {
177 let seqnum = cursor.read_u16::<BigEndian>()?;
178 if let Some(ack) = self.sender.chans[ch].lock().await.acks.remove(&seqnum) {
179 ack.tx.send(true).ok();
182 CtlType::SetPeerID => {
183 let mut id = self.sender.remote_id.write().await;
185 if *id != PeerID::Nil as u16 {
186 return Err(PeerIDAlreadySet);
189 *id = cursor.read_u16::<BigEndian>()?;
193 return Err(RemoteDisco(false));
201 data: Cow::Owned(cursor.remaining_slice().into()),
206 let seqnum = cursor.read_u16::<BigEndian>()?;
207 let chunk_count = cursor.read_u16::<BigEndian>()? as usize;
208 let chunk_index = cursor.read_u16::<BigEndian>()? as usize;
210 let mut split = self.chans[ch]
213 .or_insert_with(|| Split {
215 chunks: (0..chunk_count).map(|_| None).collect(),
219 if split.chunks.len() != chunk_count {
220 return Err(InvalidChunkCount(split.chunks.len(), chunk_count));
225 .get_mut(chunk_index)
226 .ok_or(InvalidChunkIndex(chunk_index, chunk_count))?
227 .replace(cursor.remaining_slice().into())
233 split.timestamp = if unrel { Some(Instant::now()) } else { None };
235 if split.got == chunk_count {
236 let split = self.chans[ch].splits.remove(&seqnum).unwrap();
246 .reduce(|mut a, mut b| {
257 let seqnum = cursor.read_u16::<BigEndian>()?;
258 self.chans[ch].packets[to_seqnum(seqnum)].replace(cursor.remaining_slice().into());
260 let mut ack_data = Vec::with_capacity(3);
261 ack_data.write_u8(CtlType::Ack as u8)?;
262 ack_data.write_u16::<BigEndian>(seqnum)?;
270 data: ack_data.into(),
275 let next_pkt = |chan: &mut RecvChan| chan.packets[to_seqnum(chan.seqnum)].take();
276 while let Some(pkt) = next_pkt(&mut self.chans[ch]) {
277 if let Err(e) = self.process_pkt(io::Cursor::new(pkt), false, chan).await {
278 self.output.send(Err(e)).ok();
281 self.chans[ch].seqnum = self.chans[ch].seqnum.overflowing_add(1).0;