2 use async_recursion::async_recursion;
3 use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
10 time::{Duration, Instant},
14 time::{interval, sleep, Interval, Sleep},
17 fn to_seqnum(seqnum: u16) -> usize {
18 (seqnum as usize) & (REL_BUFFER - 1)
21 type Result<T> = std::result::Result<T, Error>;
25 timestamp: Option<Instant>,
26 chunks: Vec<Option<Vec<u8>>>,
32 packets: Vec<Option<Vec<u8>>>, // char ** 😛
33 splits: HashMap<u16, Split>,
38 pub struct Worker<S: UdpSender, R: UdpReceiver> {
39 sender: Arc<Sender<S>>,
40 chans: [RecvChan; NUM_CHANS],
42 close: watch::Receiver<bool>,
46 timeout: Pin<Box<Sleep>>,
47 output: mpsc::UnboundedSender<Result<Pkt<'static>>>,
51 impl<S: UdpSender, R: UdpReceiver> Worker<S, R> {
54 close: watch::Receiver<bool>,
55 sender: Arc<Sender<S>>,
56 output: mpsc::UnboundedSender<Result<Pkt<'static>>>,
63 resend: interval(Duration::from_millis(500)),
64 ping: interval(Duration::from_secs(PING_TIMEOUT)),
65 cleanup: interval(Duration::from_secs(TIMEOUT)),
66 timeout: Box::pin(sleep(Duration::from_secs(TIMEOUT))),
68 chans: std::array::from_fn(|_| RecvChan {
69 packets: (0..REL_BUFFER).map(|_| None).collect(),
71 splits: HashMap::new(),
76 pub async fn run(mut self) {
81 _ = self.close.changed() => {
89 data: Cow::Borrowed(&[CtlType::Disco as u8]),
95 self.output.send(Err(LocalDisco)).ok();
98 _ = &mut self.timeout => {
99 self.output.send(Err(RemoteDisco(true))).ok();
102 _ = self.cleanup.tick() => {
103 let timeout = Duration::from_secs(TIMEOUT);
105 for chan in self.chans.iter_mut() {
106 chan.splits.retain(|_, v| !matches!(v.timestamp, Some(t) if t.elapsed() < timeout));
109 _ = self.resend.tick() => {
110 for chan in self.sender.chans.iter() {
111 for (_, ack) in chan.lock().await.acks.iter() {
112 self.sender.send_udp(&ack.data).await.ok();
116 _ = self.ping.tick() => {
124 data: Cow::Borrowed(&[CtlType::Ping as u8]),
130 pkt = self.input.recv() => {
131 if let Err(e) = self.handle_pkt(pkt).await {
132 self.output.send(Err(e)).ok();
139 async fn handle_pkt(&mut self, pkt: io::Result<Vec<u8>>) -> Result<()> {
142 let mut cursor = io::Cursor::new(pkt?);
146 .reset(tokio::time::Instant::now() + Duration::from_secs(TIMEOUT));
148 let proto_id = cursor.read_u32::<BigEndian>()?;
149 if proto_id != PROTO_ID {
150 return Err(InvalidProtoId(proto_id));
153 let _peer_id = cursor.read_u16::<BigEndian>()?;
155 let chan = cursor.read_u8()?;
156 if chan >= NUM_CHANS as u8 {
157 return Err(InvalidChannel(chan));
160 self.process_pkt(cursor, true, chan).await
164 async fn process_pkt(
166 mut cursor: io::Cursor<Vec<u8>>,
172 let ch = chan as usize;
173 match cursor.read_u8()?.try_into()? {
174 PktType::Ctl => match cursor.read_u8()?.try_into()? {
176 let seqnum = cursor.read_u16::<BigEndian>()?;
177 if let Some(ack) = self.sender.chans[ch].lock().await.acks.remove(&seqnum) {
178 ack.tx.send(true).ok();
181 CtlType::SetPeerID => {
182 let mut id = self.sender.remote_id.write().await;
184 if *id != PeerID::Nil as u16 {
185 return Err(PeerIDAlreadySet);
188 *id = cursor.read_u16::<BigEndian>()?;
193 return Err(RemoteDisco(false));
201 data: Cow::Owned(cursor.remaining_slice().into()),
206 let seqnum = cursor.read_u16::<BigEndian>()?;
207 let chunk_count = cursor.read_u16::<BigEndian>()? as usize;
208 let chunk_index = cursor.read_u16::<BigEndian>()? as usize;
210 let mut split = self.chans[ch]
213 .or_insert_with(|| Split {
215 chunks: (0..chunk_count).map(|_| None).collect(),
219 if split.chunks.len() != chunk_count {
220 return Err(InvalidChunkCount(split.chunks.len(), chunk_count));
225 .get_mut(chunk_index)
226 .ok_or(InvalidChunkIndex(chunk_index, chunk_count))?
227 .replace(cursor.remaining_slice().into())
233 split.timestamp = if unrel { Some(Instant::now()) } else { None };
235 if split.got == chunk_count {
236 let split = self.chans[ch].splits.remove(&seqnum).unwrap();
246 .reduce(|mut a, mut b| {
257 let seqnum = cursor.read_u16::<BigEndian>()?;
258 self.chans[ch].packets[to_seqnum(seqnum)].replace(cursor.remaining_slice().into());
260 println!("{seqnum}");
262 let mut ack_data = Vec::with_capacity(3);
263 ack_data.write_u8(CtlType::Ack as u8)?;
264 ack_data.write_u16::<BigEndian>(seqnum)?;
273 data: ack_data.into(),
278 let next_pkt = |chan: &mut RecvChan| chan.packets[to_seqnum(chan.seqnum)].take();
279 while let Some(pkt) = next_pkt(&mut self.chans[ch]) {
280 if let Err(e) = self.process_pkt(io::Cursor::new(pkt), false, chan).await {
281 self.output.send(Err(e)).ok();
284 self.chans[ch].seqnum = self.chans[ch].seqnum.overflowing_add(1).0;