]> git.lizzy.rs Git - connect-rs.git/blobdiff - src/reader.rs
refactor to have datagram already serialized in memory
[connect-rs.git] / src / reader.rs
index 1e9c03155e6ef63e6268fd35ddc6f2aed0086048..0b716c022826b63822ff8a10ea75ca6d222b1705 100644 (file)
@@ -1,4 +1,5 @@
-use crate::protocol::ConnectDatagram;
+use crate::SIZE_PREFIX_BYTE_SIZE;
+use crate::{protocol::ConnectDatagram, DATAGRAM_HEADER_BYTE_SIZE};
 use async_std::net::SocketAddr;
 use async_std::pin::Pin;
 use bytes::BytesMut;
@@ -10,7 +11,7 @@ use std::convert::TryInto;
 pub use futures::{SinkExt, StreamExt};
 
 /// A default buffer size to read in bytes and then deserialize as messages.
-const BUFFER_SIZE: usize = 8192;
+pub(crate) const BUFFER_SIZE: usize = 8192;
 
 /// An interface to read messages from the network connection.
 ///
@@ -98,20 +99,26 @@ impl Stream for ConnectionReader {
                         let mut data_buf = pending_buf;
                         let pending_buf = data_buf.split_off(size);
 
-                        let datagram = ConnectDatagram::decode(data_buf.to_vec()).expect(
+                        let datagram = ConnectDatagram::from_bytes_without_prefix(
+                            data_buf.as_ref(),
+                        )
+                        .expect(
                             "could not construct ConnectDatagram from bytes despite explicit check",
                         );
 
-                        trace!("deserialized message of size {} bytes", datagram.size());
+                        trace!(
+                            "deserialized message of size {} bytes",
+                            datagram.serialized_size()
+                        );
                         return match datagram.version() {
                             // do some special work based on version number if necessary
                             _ => {
-                                if pending_buf.len() >= std::mem::size_of::<u32>() {
+                                if pending_buf.len() >= DATAGRAM_HEADER_BYTE_SIZE {
                                     trace!("can deserialize size of next datagram from remaining {} pending bytes", pending_buf.len());
 
                                     let mut size_buf = pending_buf;
-                                    let pending_buf =
-                                        size_buf.split_off(std::mem::size_of::<u32>());
+                                    let pending_buf = size_buf.split_off(SIZE_PREFIX_BYTE_SIZE);
+
                                     let size = u32::from_be_bytes(
                                         size_buf
                                             .to_vec()
@@ -120,6 +127,7 @@ impl Stream for ConnectionReader {
                                             .expect("could not parse bytes into u32"),
                                     ) as usize;
 
+                                    trace!("removed size of next datagram from pending bytes ({}), leaving {} pending bytes remaining", size, pending_buf.len());
                                     self.pending_datagram.replace(size);
                                     self.pending_read.replace(pending_buf);
                                 } else {
@@ -176,15 +184,15 @@ impl Stream for ConnectionReader {
                     );
                     pending_buf.extend_from_slice(&buffer[0..bytes_read]);
 
-                    if self.pending_datagram.is_none()
-                        && pending_buf.len() >= std::mem::size_of::<u32>()
+                    if self.pending_datagram.is_none() && pending_buf.len() >= SIZE_PREFIX_BYTE_SIZE
                     {
                         trace!(
                             "can deserialize size of next datagram from remaining {} pending bytes",
                             pending_buf.len()
                         );
                         let mut size_buf = pending_buf;
-                        let pending_buf = size_buf.split_off(std::mem::size_of::<u32>());
+                        let pending_buf = size_buf.split_off(SIZE_PREFIX_BYTE_SIZE);
+
                         let size = u32::from_be_bytes(
                             size_buf
                                 .to_vec()
@@ -193,6 +201,7 @@ impl Stream for ConnectionReader {
                                 .expect("could not parse bytes into u32"),
                         ) as usize;
 
+                        trace!("removed size of next datagram from pending bytes ({}), leaving {} pending bytes remaining", size, pending_buf.len());
                         self.pending_datagram.replace(size);
                         self.pending_read.replace(pending_buf);
                     } else {