2 use byteorder::{BigEndian, WriteBytesExt};
9 use tokio::sync::{watch, Mutex, RwLock};
11 pub type Ack = Option<watch::Receiver<bool>>;
14 pub(crate) struct AckWait {
15 pub(crate) tx: watch::Sender<bool>,
16 pub(crate) rx: watch::Receiver<bool>,
17 pub(crate) data: Vec<u8>,
21 pub(crate) struct Chan {
22 pub(crate) acks: HashMap<u16, AckWait>,
23 pub(crate) seqnum: u16,
24 pub(crate) splits_seqnum: u16,
28 pub struct Sender<S: UdpSender> {
30 pub(crate) remote_id: RwLock<u16>,
31 pub(crate) chans: [Mutex<Chan>; NUM_CHANS],
33 close: watch::Sender<bool>,
36 impl<S: UdpSender> Sender<S> {
37 pub fn new(udp: S, close: watch::Sender<bool>, id: u16, remote_id: u16) -> Arc<Self> {
40 remote_id: RwLock::new(remote_id),
43 chans: std::array::from_fn(|_| {
47 splits_seqnum: INIT_SEQNUM,
53 pub async fn send_rudp(&self, pkt: Pkt<'_>) -> io::Result<Ack> {
54 if pkt.size() > UDP_PKT_SIZE {
57 .chunks(UDP_PKT_SIZE - (pkt.header_size() + 1 + 2 + 2 + 2));
58 let num_chunks: u16 = chunks
61 .map_err(|_| io::Error::new(io::ErrorKind::Other, "too many chunks"))?;
64 let mut chan = self.chans[pkt.chan as usize].lock().await;
65 let sn = chan.splits_seqnum;
66 chan.splits_seqnum = chan.splits_seqnum.overflowing_add(1).0;
71 for (i, ch) in chunks.enumerate() {
74 Some((seqnum, num_chunks, i as u16)),
78 data: Cow::Borrowed(ch),
86 self.send_rudp_type(PktType::Orig, None, pkt).await
90 pub async fn send_rudp_type(
93 chunk: Option<(u16, u16, u16)>,
95 ) -> io::Result<Ack> {
97 Vec::with_capacity(pkt.size() + if chunk.is_some() { 1 + 2 + 2 + 2 } else { 0 });
99 buf.write_u32::<BigEndian>(PROTO_ID)?;
100 buf.write_u16::<BigEndian>(*self.remote_id.read().await)?;
101 buf.write_u8(pkt.chan)?;
103 let mut chan = self.chans[pkt.chan as usize].lock().await;
104 let seqnum = chan.seqnum;
107 buf.write_u8(PktType::Rel as u8)?;
108 buf.write_u16::<BigEndian>(seqnum)?;
111 if let Some((seqnum, count, index)) = chunk {
112 buf.write_u8(PktType::Split as u8)?;
113 buf.write_u16::<BigEndian>(seqnum)?;
114 buf.write_u16::<BigEndian>(count)?;
115 buf.write_u16::<BigEndian>(index)?;
117 buf.write_u8(tp as u8)?;
120 buf.write_all(pkt.data.as_ref())?;
122 self.send_udp(&buf).await?;
127 // TODO: reliable window
128 let (tx, rx) = watch::channel(false);
137 chan.seqnum = chan.seqnum.overflowing_add(1).0;
143 pub async fn send_udp(&self, data: &[u8]) -> io::Result<()> {
144 if data.len() > UDP_PKT_SIZE {
146 "attempted to send a packet with len {} > {UDP_PKT_SIZE}",
151 self.udp.send(data).await
154 pub async fn peer_id(&self) -> u16 {
158 pub async fn is_server(&self) -> bool {
159 self.id == PeerID::Srv as u16
162 pub fn close(&self) {
163 self.close.send(true).ok();