2 use byteorder::{BigEndian, WriteBytesExt};
8 use tokio::sync::{watch, Mutex, RwLock};
10 pub type Ack = Option<watch::Receiver<bool>>;
13 pub(crate) struct AckWait {
14 pub(crate) tx: watch::Sender<bool>,
15 pub(crate) rx: watch::Receiver<bool>,
16 pub(crate) data: Vec<u8>,
20 pub(crate) struct Chan {
21 pub(crate) acks: HashMap<u16, AckWait>,
22 pub(crate) seqnum: u16,
26 pub struct Sender<S: UdpSender> {
28 pub(crate) remote_id: RwLock<u16>,
29 pub(crate) chans: [Mutex<Chan>; NUM_CHANS],
31 close: watch::Sender<bool>,
34 impl<S: UdpSender> Sender<S> {
35 pub fn new(udp: S, close: watch::Sender<bool>, id: u16, remote_id: u16) -> Arc<Self> {
38 remote_id: RwLock::new(remote_id),
41 chans: std::array::from_fn(|_| {
50 pub async fn send_rudp(&self, pkt: Pkt<'_>) -> io::Result<Ack> {
51 self.send_rudp_type(PktType::Orig, pkt).await // TODO: splits
54 pub async fn send_rudp_type(&self, tp: PktType, pkt: Pkt<'_>) -> io::Result<Ack> {
55 let mut buf = Vec::with_capacity(4 + 2 + 1 + 1 + 2 + 1 + pkt.data.len());
56 buf.write_u32::<BigEndian>(PROTO_ID)?;
57 buf.write_u16::<BigEndian>(*self.remote_id.read().await)?;
58 buf.write_u8(pkt.chan)?;
60 let mut chan = self.chans[pkt.chan as usize].lock().await;
61 let seqnum = chan.seqnum;
64 buf.write_u8(PktType::Rel as u8)?;
65 buf.write_u16::<BigEndian>(seqnum)?;
68 buf.write_u8(tp as u8)?;
69 buf.write_all(pkt.data.as_ref())?;
71 self.send_udp(&buf).await?;
76 // TODO: reliable window
77 let (tx, rx) = watch::channel(false);
86 chan.seqnum = chan.seqnum.overflowing_add(1).0;
92 pub async fn send_udp(&self, data: &[u8]) -> io::Result<()> {
93 if data.len() > UDP_PKT_SIZE {
94 panic!("splitting packets is not implemented yet");
97 self.udp.send(data).await
100 pub async fn peer_id(&self) -> u16 {
104 pub async fn is_server(&self) -> bool {
105 self.id == PeerID::Srv as u16
108 pub fn close(&self) {
109 self.close.send(true).ok();