]> git.lizzy.rs Git - connect-rs.git/blob - src/udp.rs
refactor to have datagram already serialized in memory
[connect-rs.git] / src / udp.rs
1 use crate::reader::BUFFER_SIZE;
2 use crate::{ConnectDatagram, Connection, ConnectionWriteError};
3 use async_std::net::UdpSocket;
4 use async_stream::stream;
5 use futures::sink::unfold;
6 use log::*;
7 use std::convert::TryFrom;
8 use std::sync::Arc;
9
10 impl TryFrom<UdpSocket> for Connection {
11     type Error = anyhow::Error;
12
13     fn try_from(socket: UdpSocket) -> Result<Self, Self::Error> {
14         let local_addr = socket.local_addr()?;
15         let peer_addr = socket.peer_addr()?;
16
17         let socket = Arc::new(socket);
18
19         let read_socket = socket.clone();
20         let reader = Box::pin(stream! {
21             let mut buffer = vec![0; BUFFER_SIZE];
22
23             while let Ok(bytes_read) = read_socket.recv(&mut buffer).await {
24                 if let Ok(datagram) = ConnectDatagram::from_bytes(&buffer[..bytes_read]) {
25                     yield datagram
26                 } else {
27                     warn!("Could not deserialize message from UDP message");
28                 }
29             }
30         });
31
32         let write_socket = socket;
33         let writer = Box::pin(unfold(0, move |_, datagram: ConnectDatagram| {
34             let socket = write_socket.clone();
35             async move {
36                 match socket.send(datagram.into_bytes().as_slice()).await {
37                     Ok(bytes_written) => Ok(bytes_written),
38
39                     Err(io_err) => Err(ConnectionWriteError::IoError(io_err)),
40                 }
41             }
42         }));
43
44         Ok(Self::join(local_addr, peer_addr, reader, writer))
45     }
46 }