-use crate::{error::Error, *};
+use crate::{prelude::*, ticker, RecvChan, RecvWorker, RudpShare, Split};
use async_recursion::async_recursion;
-use byteorder::{BigEndian, ReadBytesExt};
+use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use std::{
cell::{Cell, OnceCell},
collections::HashMap,
sync::Arc,
time::{Duration, Instant},
};
-use tokio::sync::{mpsc, Mutex};
+use tokio::sync::{mpsc, watch, Mutex};
fn to_seqnum(seqnum: u16) -> usize {
(seqnum as usize) & (REL_BUFFER - 1)
type Result<T> = std::result::Result<T, Error>;
-struct Split {
- timestamp: Option<Instant>,
- chunks: Vec<OnceCell<Vec<u8>>>,
- got: usize,
-}
-
-struct RecvChan {
- packets: Vec<Cell<Option<Vec<u8>>>>, // char ** 😛
- splits: HashMap<u16, Split>,
- seqnum: u16,
- num: u8,
-}
-
-pub struct RecvWorker<R: UdpReceiver, S: UdpSender> {
- share: Arc<RudpShare<S>>,
- close: watch::Receiver<bool>,
- chans: Arc<Vec<Mutex<RecvChan>>>,
- pkt_tx: mpsc::UnboundedSender<InPkt>,
- udp_rx: R,
-}
-
impl<R: UdpReceiver, S: UdpSender> RecvWorker<R, S> {
pub fn new(
udp_rx: R,
) -> Result<()> {
use Error::*;
- // TODO: reset timeout
let mut cursor = io::Cursor::new(tokio::select! {
pkt = self.udp_rx.recv() => pkt?,
_ = tokio::time::sleep_until(timeout.deadline()) => return Err(RemoteDisco(true)),