]> git.lizzy.rs Git - mt_rudp.git/blob - src/send.rs
Use channels
[mt_rudp.git] / src / send.rs
1 use super::*;
2 use byteorder::{BigEndian, WriteBytesExt};
3 use std::{
4     collections::HashMap,
5     io::{self, Write},
6     sync::Arc,
7 };
8 use tokio::sync::{watch, Mutex, RwLock};
9
10 pub type Ack = Option<watch::Receiver<bool>>;
11
12 #[derive(Debug)]
13 pub(crate) struct AckWait {
14     pub(crate) tx: watch::Sender<bool>,
15     pub(crate) rx: watch::Receiver<bool>,
16     pub(crate) data: Vec<u8>,
17 }
18
19 #[derive(Debug)]
20 pub(crate) struct Chan {
21     pub(crate) acks: HashMap<u16, AckWait>,
22     pub(crate) seqnum: u16,
23 }
24
25 #[derive(Debug)]
26 pub struct Sender<S: UdpSender> {
27     pub(crate) id: u16,
28     pub(crate) remote_id: RwLock<u16>,
29     pub(crate) chans: [Mutex<Chan>; NUM_CHANS],
30     udp: S,
31     close: watch::Sender<bool>,
32 }
33
34 impl<S: UdpSender> Sender<S> {
35     pub fn new(udp: S, close: watch::Sender<bool>, id: u16, remote_id: u16) -> Arc<Self> {
36         Arc::new(Self {
37             id,
38             remote_id: RwLock::new(remote_id),
39             udp,
40             close,
41             chans: std::array::from_fn(|_| {
42                 Mutex::new(Chan {
43                     acks: HashMap::new(),
44                     seqnum: INIT_SEQNUM,
45                 })
46             }),
47         })
48     }
49
50     pub async fn send_rudp(&self, pkt: Pkt<'_>) -> io::Result<Ack> {
51         self.send_rudp_type(PktType::Orig, pkt).await // TODO: splits
52     }
53
54     pub async fn send_rudp_type(&self, tp: PktType, pkt: Pkt<'_>) -> io::Result<Ack> {
55         let mut buf = Vec::with_capacity(4 + 2 + 1 + 1 + 2 + 1 + pkt.data.len());
56         buf.write_u32::<BigEndian>(PROTO_ID)?;
57         buf.write_u16::<BigEndian>(*self.remote_id.read().await)?;
58         buf.write_u8(pkt.chan)?;
59
60         let mut chan = self.chans[pkt.chan as usize].lock().await;
61         let seqnum = chan.seqnum;
62
63         if !pkt.unrel {
64             buf.write_u8(PktType::Rel as u8)?;
65             buf.write_u16::<BigEndian>(seqnum)?;
66         }
67
68         buf.write_u8(tp as u8)?;
69         buf.write_all(pkt.data.as_ref())?;
70
71         self.send_udp(&buf).await?;
72
73         if pkt.unrel {
74             Ok(None)
75         } else {
76             // TODO: reliable window
77             let (tx, rx) = watch::channel(false);
78             chan.acks.insert(
79                 seqnum,
80                 AckWait {
81                     tx,
82                     rx: rx.clone(),
83                     data: buf,
84                 },
85             );
86             chan.seqnum = chan.seqnum.overflowing_add(1).0;
87
88             Ok(Some(rx))
89         }
90     }
91
92     pub async fn send_udp(&self, data: &[u8]) -> io::Result<()> {
93         if data.len() > UDP_PKT_SIZE {
94             panic!("splitting packets is not implemented yet");
95         }
96
97         self.udp.send(data).await
98     }
99
100     pub async fn peer_id(&self) -> u16 {
101         self.id
102     }
103
104     pub async fn is_server(&self) -> bool {
105         self.id == PeerID::Srv as u16
106     }
107
108     pub fn close(&self) {
109         self.close.send(true).ok();
110     }
111 }