ops,
sync::Arc,
};
-use tokio::sync::{mpsc, Mutex, RwLock};
+use tokio::sync::{mpsc, watch, Mutex, RwLock};
pub const PROTO_ID: u32 = 0x4f457403;
pub const UDP_PKT_SIZE: usize = 512;
pub type Error = error::Error;
pub type InPkt = Result<Pkt<Vec<u8>>, Error>;
+type AckChan = (watch::Sender<bool>, watch::Receiver<bool>);
#[derive(Debug)]
pub struct RudpShare<S: UdpSender> {
pub id: u16,
pub remote_id: RwLock<u16>,
- pub ack_chans: Mutex<HashMap<u16, mpsc::Sender<()>>>,
+ pub ack_chans: Mutex<HashMap<u16, AckChan>>,
udp_tx: S,
}
}
println!("disco");
+ // close()ing rx is not needed because it has been consumed to the end
+
Ok(())
}
PktType::Ctl => match cursor.read_u8()?.try_into()? {
CtlType::Ack => {
let seqnum = cursor.read_u16::<BigEndian>()?;
- self.share.ack_chans.lock().await.remove(&seqnum);
+ if let Some((tx, _)) = self.share.ack_chans.lock().await.remove(&seqnum) {
+ tx.send(true).ok();
+ }
}
CtlType::SetPeerID => {
let mut id = self.share.remote_id.write().await;
let seqnum = cursor.read_u16::<BigEndian>()?;
chan.packets[to_seqnum(seqnum)].set(Some(cursor.remaining_slice().into()));
+ let mut ack_data = Vec::with_capacity(3);
+ ack_data.write_u8(CtlType::Ack as u8)?;
+ ack_data.write_u16::<BigEndian>(seqnum)?;
+
+ self.share
+ .send(
+ PktType::Ctl,
+ Pkt {
+ unrel: true,
+ chan: chan.num,
+ data: &ack_data,
+ },
+ )
+ .await?;
+
fn next_pkt(chan: &mut Chan) -> Option<Vec<u8>> {
chan.packets[to_seqnum(chan.seqnum)].take()
}