2 use async_recursion::async_recursion;
3 use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
10 time::{Duration, Instant},
14 time::{interval, sleep, Interval, Sleep},
17 fn to_seqnum(seqnum: u16) -> usize {
18 (seqnum as usize) & (REL_BUFFER - 1)
21 type Result<T> = std::result::Result<T, Error>;
25 timestamp: Option<Instant>,
26 chunks: Vec<Option<Vec<u8>>>,
32 packets: Vec<Option<Vec<u8>>>, // char ** 😛
33 splits: HashMap<u16, Split>,
38 pub struct Worker<S: UdpSender, R: UdpReceiver> {
39 sender: Arc<Sender<S>>,
40 chans: [RecvChan; NUM_CHANS],
42 close: watch::Receiver<bool>,
46 timeout: Pin<Box<Sleep>>,
47 output: mpsc::UnboundedSender<Result<Pkt<'static>>>,
51 impl<S: UdpSender, R: UdpReceiver> Worker<S, R> {
54 close: watch::Receiver<bool>,
55 sender: Arc<Sender<S>>,
56 output: mpsc::UnboundedSender<Result<Pkt<'static>>>,
63 resend: interval(Duration::from_millis(500)),
64 ping: interval(Duration::from_secs(PING_TIMEOUT)),
65 cleanup: interval(Duration::from_secs(TIMEOUT)),
66 timeout: Box::pin(sleep(Duration::from_secs(TIMEOUT))),
68 chans: std::array::from_fn(|_| RecvChan {
69 packets: (0..REL_BUFFER).map(|_| None).collect(),
71 splits: HashMap::new(),
76 pub async fn run(mut self) {
81 _ = self.close.changed() => {
89 data: Cow::Borrowed(&[CtlType::Disco as u8]),
95 self.output.send(Err(LocalDisco)).ok();
98 _ = &mut self.timeout => {
99 self.output.send(Err(RemoteDisco(true))).ok();
102 _ = self.cleanup.tick() => {
103 let timeout = Duration::from_secs(TIMEOUT);
105 for chan in self.chans.iter_mut() {
109 |_k, v| !matches!(v.timestamp, Some(t) if t.elapsed() < timeout),
114 _ = self.resend.tick() => {
115 for chan in self.sender.chans.iter() {
116 for (_, ack) in chan.lock().await.acks.iter() {
117 self.sender.send_udp(&ack.data).await.ok();
121 _ = self.ping.tick() => {
129 data: Cow::Borrowed(&[CtlType::Ping as u8]),
135 pkt = self.input.recv() => {
136 if let Err(e) = self.handle_pkt(pkt).await {
137 self.output.send(Err(e)).ok();
144 async fn handle_pkt(&mut self, pkt: io::Result<Vec<u8>>) -> Result<()> {
147 let mut cursor = io::Cursor::new(pkt?);
151 .reset(tokio::time::Instant::now() + Duration::from_secs(TIMEOUT));
153 let proto_id = cursor.read_u32::<BigEndian>()?;
154 if proto_id != PROTO_ID {
155 return Err(InvalidProtoId(proto_id));
158 let _peer_id = cursor.read_u16::<BigEndian>()?;
160 let chan = cursor.read_u8()?;
161 if chan >= NUM_CHANS as u8 {
162 return Err(InvalidChannel(chan));
165 self.process_pkt(cursor, true, chan).await
169 async fn process_pkt(
171 mut cursor: io::Cursor<Vec<u8>>,
177 let ch = chan as usize;
178 match cursor.read_u8()?.try_into()? {
179 PktType::Ctl => match cursor.read_u8()?.try_into()? {
181 let seqnum = cursor.read_u16::<BigEndian>()?;
182 if let Some(ack) = self.sender.chans[ch].lock().await.acks.remove(&seqnum) {
183 ack.tx.send(true).ok();
186 CtlType::SetPeerID => {
187 let mut id = self.sender.remote_id.write().await;
189 if *id != PeerID::Nil as u16 {
190 return Err(PeerIDAlreadySet);
193 *id = cursor.read_u16::<BigEndian>()?;
198 return Err(RemoteDisco(false));
206 data: Cow::Owned(cursor.remaining_slice().into()),
211 let seqnum = cursor.read_u16::<BigEndian>()?;
212 let chunk_count = cursor.read_u16::<BigEndian>()? as usize;
213 let chunk_index = cursor.read_u16::<BigEndian>()? as usize;
215 let mut split = self.chans[ch]
218 .or_insert_with(|| Split {
220 chunks: (0..chunk_count).map(|_| None).collect(),
224 if split.chunks.len() != chunk_count {
225 return Err(InvalidChunkCount(split.chunks.len(), chunk_count));
230 .get_mut(chunk_index)
231 .ok_or(InvalidChunkIndex(chunk_index, chunk_count))?
232 .replace(cursor.remaining_slice().into())
238 split.timestamp = if unrel { Some(Instant::now()) } else { None };
240 if split.got == chunk_count {
241 let split = self.chans[ch].splits.remove(&seqnum).unwrap();
251 .reduce(|mut a, mut b| {
262 let seqnum = cursor.read_u16::<BigEndian>()?;
263 self.chans[ch].packets[to_seqnum(seqnum)].replace(cursor.remaining_slice().into());
265 println!("{seqnum}");
267 let mut ack_data = Vec::with_capacity(3);
268 ack_data.write_u8(CtlType::Ack as u8)?;
269 ack_data.write_u16::<BigEndian>(seqnum)?;
278 data: ack_data.into(),
283 let next_pkt = |chan: &mut RecvChan| chan.packets[to_seqnum(chan.seqnum)].take();
284 while let Some(pkt) = next_pkt(&mut self.chans[ch]) {
285 if let Err(e) = self.process_pkt(io::Cursor::new(pkt), false, chan).await {
286 self.output.send(Err(e)).ok();
289 self.chans[ch].seqnum = self.chans[ch].seqnum.overflowing_add(1).0;