2 use async_recursion::async_recursion;
3 use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
10 time::{Duration, Instant},
14 time::{interval, sleep, Interval, Sleep},
17 fn to_seqnum(seqnum: u16) -> usize {
18 (seqnum as usize) & (REL_BUFFER - 1)
21 type Result<T> = std::result::Result<T, Error>;
25 timestamp: Option<Instant>,
26 chunks: Vec<Option<Vec<u8>>>,
32 packets: Vec<Option<Vec<u8>>>, // char ** 😛
33 splits: HashMap<u16, Split>,
38 pub struct Worker<S: UdpSender, R: UdpReceiver> {
39 sender: Arc<Sender<S>>,
40 chans: [RecvChan; NUM_CHANS],
42 close: watch::Receiver<bool>,
46 timeout: Pin<Box<Sleep>>,
47 output: mpsc::UnboundedSender<Result<Pkt<'static>>>,
51 impl<S: UdpSender, R: UdpReceiver> Worker<S, R> {
54 close: watch::Receiver<bool>,
55 sender: Arc<Sender<S>>,
56 output: mpsc::UnboundedSender<Result<Pkt<'static>>>,
63 resend: interval(Duration::from_millis(500)),
64 ping: interval(Duration::from_secs(PING_TIMEOUT)),
65 cleanup: interval(Duration::from_secs(TIMEOUT)),
66 timeout: Box::pin(sleep(Duration::from_secs(TIMEOUT))),
68 chans: std::array::from_fn(|_| RecvChan {
69 packets: (0..REL_BUFFER).map(|_| None).collect(),
71 splits: HashMap::new(),
76 pub async fn run(mut self) {
81 _ = self.close.changed() => {
88 data: Cow::Borrowed(&[CtlType::Disco as u8]),
94 self.output.send(Err(LocalDisco)).ok();
97 _ = &mut self.timeout => {
98 self.output.send(Err(RemoteDisco(true))).ok();
101 _ = self.cleanup.tick() => {
102 let timeout = Duration::from_secs(TIMEOUT);
104 for chan in self.chans.iter_mut() {
108 |_k, v| !matches!(v.timestamp, Some(t) if t.elapsed() < timeout),
113 _ = self.resend.tick() => {
114 for chan in self.sender.chans.iter() {
115 for (_, ack) in chan.lock().await.acks.iter() {
116 self.sender.send_udp(&ack.data).await.ok();
120 _ = self.ping.tick() => {
127 data: Cow::Borrowed(&[CtlType::Ping as u8]),
133 pkt = self.input.recv() => {
134 if let Err(e) = self.handle_pkt(pkt).await {
135 self.output.send(Err(e)).ok();
142 async fn handle_pkt(&mut self, pkt: io::Result<Vec<u8>>) -> Result<()> {
145 let mut cursor = io::Cursor::new(pkt?);
149 .reset(tokio::time::Instant::now() + Duration::from_secs(TIMEOUT));
151 let proto_id = cursor.read_u32::<BigEndian>()?;
152 if proto_id != PROTO_ID {
153 return Err(InvalidProtoId(proto_id));
156 let _peer_id = cursor.read_u16::<BigEndian>()?;
158 let chan = cursor.read_u8()?;
159 if chan >= NUM_CHANS as u8 {
160 return Err(InvalidChannel(chan));
163 self.process_pkt(cursor, true, chan).await
167 async fn process_pkt(
169 mut cursor: io::Cursor<Vec<u8>>,
175 let ch = chan as usize;
176 match cursor.read_u8()?.try_into()? {
177 PktType::Ctl => match cursor.read_u8()?.try_into()? {
179 let seqnum = cursor.read_u16::<BigEndian>()?;
180 if let Some(ack) = self.sender.chans[ch].lock().await.acks.remove(&seqnum) {
181 ack.tx.send(true).ok();
184 CtlType::SetPeerID => {
185 let mut id = self.sender.remote_id.write().await;
187 if *id != PeerID::Nil as u16 {
188 return Err(PeerIDAlreadySet);
191 *id = cursor.read_u16::<BigEndian>()?;
196 return Err(RemoteDisco(false));
204 data: Cow::Owned(cursor.remaining_slice().into()),
209 let seqnum = cursor.read_u16::<BigEndian>()?;
210 let chunk_count = cursor.read_u16::<BigEndian>()? as usize;
211 let chunk_index = cursor.read_u16::<BigEndian>()? as usize;
213 let mut split = self.chans[ch]
216 .or_insert_with(|| Split {
218 chunks: (0..chunk_count).map(|_| None).collect(),
222 if split.chunks.len() != chunk_count {
223 return Err(InvalidChunkCount(split.chunks.len(), chunk_count));
228 .get_mut(chunk_index)
229 .ok_or(InvalidChunkIndex(chunk_index, chunk_count))?
230 .replace(cursor.remaining_slice().into())
236 split.timestamp = if unrel { Some(Instant::now()) } else { None };
238 if split.got == chunk_count {
239 let split = self.chans[ch].splits.remove(&seqnum).unwrap();
249 .reduce(|mut a, mut b| {
260 let seqnum = cursor.read_u16::<BigEndian>()?;
261 self.chans[ch].packets[to_seqnum(seqnum)].replace(cursor.remaining_slice().into());
263 let mut ack_data = Vec::with_capacity(3);
264 ack_data.write_u8(CtlType::Ack as u8)?;
265 ack_data.write_u16::<BigEndian>(seqnum)?;
273 data: ack_data.into(),
278 let next_pkt = |chan: &mut RecvChan| chan.packets[to_seqnum(chan.seqnum)].take();
279 while let Some(pkt) = next_pkt(&mut self.chans[ch]) {
280 if let Err(e) = self.process_pkt(io::Cursor::new(pkt), false, chan).await {
281 self.output.send(Err(e)).ok();
284 self.chans[ch].seqnum = self.chans[ch].seqnum.overflowing_add(1).0;