]> git.lizzy.rs Git - mt_rudp.git/blob - src/send.rs
clippy
[mt_rudp.git] / src / send.rs
1 use crate::{prelude::*, Ack, RudpShare};
2 use byteorder::{BigEndian, WriteBytesExt};
3 use std::io::{self, Write};
4 use tokio::sync::watch;
5
6 type AckResult = io::Result<Option<watch::Receiver<bool>>>;
7
8 impl<S: UdpSender> RudpSender<S> {
9     pub async fn send(&self, pkt: Pkt<&[u8]>) -> AckResult {
10         self.share.send(PktType::Orig, pkt).await // TODO: splits
11     }
12 }
13
14 impl<S: UdpSender> RudpShare<S> {
15     #[allow(clippy::unused_io_amount)]
16     pub async fn send(&self, tp: PktType, pkt: Pkt<&[u8]>) -> AckResult {
17         let mut buf = Vec::with_capacity(4 + 2 + 1 + 1 + 2 + 1 + pkt.data.len());
18         buf.write_u32::<BigEndian>(PROTO_ID)?;
19         buf.write_u16::<BigEndian>(*self.remote_id.read().await)?;
20         buf.write_u8(pkt.chan)?;
21
22         let mut chan = self.chans[pkt.chan as usize].lock().await;
23         let seqnum = chan.seqnum;
24
25         if !pkt.unrel {
26             buf.write_u8(PktType::Rel as u8)?;
27             buf.write_u16::<BigEndian>(seqnum)?;
28         }
29
30         buf.write_u8(tp as u8)?;
31         buf.write(pkt.data)?;
32
33         self.send_raw(&buf).await?;
34
35         if pkt.unrel {
36             Ok(None)
37         } else {
38             // TODO: reliable window
39             let (tx, rx) = watch::channel(false);
40             chan.acks.insert(
41                 seqnum,
42                 Ack {
43                     tx,
44                     rx: rx.clone(),
45                     data: buf,
46                 },
47             );
48             chan.seqnum = chan.seqnum.overflowing_add(1).0;
49
50             Ok(Some(rx))
51         }
52     }
53
54     pub async fn send_raw(&self, data: &[u8]) -> io::Result<()> {
55         self.udp_tx.send(data).await
56     }
57 }