-use crate::protocol::ConnectDatagram;
+use crate::SIZE_PREFIX_BYTE_SIZE;
+use crate::{protocol::ConnectDatagram, DATAGRAM_HEADER_BYTE_SIZE};
use async_std::net::SocketAddr;
use async_std::pin::Pin;
use bytes::BytesMut;
pub use futures::{SinkExt, StreamExt};
/// A default buffer size to read in bytes and then deserialize as messages.
-const BUFFER_SIZE: usize = 8192;
+pub(crate) const BUFFER_SIZE: usize = 8192;
/// An interface to read messages from the network connection.
///
let mut data_buf = pending_buf;
let pending_buf = data_buf.split_off(size);
- let datagram = ConnectDatagram::decode(data_buf.to_vec()).expect(
+ let datagram = ConnectDatagram::from_bytes_without_prefix(
+ data_buf.as_ref(),
+ )
+ .expect(
"could not construct ConnectDatagram from bytes despite explicit check",
);
- trace!("deserialized message of size {} bytes", datagram.size());
+ trace!(
+ "deserialized message of size {} bytes",
+ datagram.serialized_size()
+ );
return match datagram.version() {
// do some special work based on version number if necessary
_ => {
- if pending_buf.len() >= std::mem::size_of::<u32>() {
+ if pending_buf.len() >= DATAGRAM_HEADER_BYTE_SIZE {
trace!("can deserialize size of next datagram from remaining {} pending bytes", pending_buf.len());
let mut size_buf = pending_buf;
- let pending_buf =
- size_buf.split_off(std::mem::size_of::<u32>());
+ let pending_buf = size_buf.split_off(SIZE_PREFIX_BYTE_SIZE);
+
let size = u32::from_be_bytes(
size_buf
.to_vec()
.expect("could not parse bytes into u32"),
) as usize;
+ trace!("removed size of next datagram from pending bytes ({}), leaving {} pending bytes remaining", size, pending_buf.len());
self.pending_datagram.replace(size);
self.pending_read.replace(pending_buf);
} else {
);
pending_buf.extend_from_slice(&buffer[0..bytes_read]);
- if self.pending_datagram.is_none()
- && pending_buf.len() >= std::mem::size_of::<u32>()
+ if self.pending_datagram.is_none() && pending_buf.len() >= SIZE_PREFIX_BYTE_SIZE
{
trace!(
"can deserialize size of next datagram from remaining {} pending bytes",
pending_buf.len()
);
let mut size_buf = pending_buf;
- let pending_buf = size_buf.split_off(std::mem::size_of::<u32>());
+ let pending_buf = size_buf.split_off(SIZE_PREFIX_BYTE_SIZE);
+
let size = u32::from_be_bytes(
size_buf
.to_vec()
.expect("could not parse bytes into u32"),
) as usize;
+ trace!("removed size of next datagram from pending bytes ({}), leaving {} pending bytes remaining", size, pending_buf.len());
self.pending_datagram.replace(size);
self.pending_read.replace(pending_buf);
} else {