]> git.lizzy.rs Git - connect-rs.git/blobdiff - src/reader.rs
remove dependency on protobuf and introduce basic custom wire format
[connect-rs.git] / src / reader.rs
index 428f6d19902f4c7afc614e8b1f66704d39a117e0..39b1048f896fca459f491d4b566cf74c4acd11ad 100644 (file)
@@ -1,16 +1,14 @@
-use crate::schema::ConnectionMessage;
 use async_std::net::SocketAddr;
 use async_std::pin::Pin;
 use bytes::{Buf, BytesMut};
 use futures::task::{Context, Poll};
 use futures::{AsyncRead, Stream};
 use log::*;
-use protobuf::Message;
-use std::convert::TryInto;
 
+use crate::protocol::ConnectDatagram;
 pub use futures::SinkExt;
 pub use futures::StreamExt;
-use protobuf::well_known_types::Any;
+use std::io::Cursor;
 
 /// A default buffer size to read in bytes and then deserialize as messages
 const BUFFER_SIZE: usize = 8192;
@@ -81,7 +79,7 @@ impl ConnectionReader {
 }
 
 impl Stream for ConnectionReader {
-    type Item = Any;
+    type Item = ConnectDatagram;
 
     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
         let mut buffer = BytesMut::new();
@@ -111,36 +109,30 @@ impl Stream for ConnectionReader {
                         buffer = pending_buf;
                     }
 
-                    let mut bytes_read_u64: u64 = bytes_read.try_into().expect(
-                        format!("Conversion from usize ({}) to u64 failed", bytes_read).as_str(),
-                    );
-                    while bytes_read_u64 > 0 {
-                        trace!(
-                            "{} bytes from network stream still unprocessed",
-                            bytes_read_u64
-                        );
+                    while bytes_read > 0 {
+                        trace!("{} bytes from network stream still unprocessed", bytes_read);
 
                         buffer.resize(bytes_read, 0);
 
-                        match ConnectionMessage::parse_from_bytes(buffer.as_ref()) {
-                            Ok(mut data) => {
-                                let serialized_size = data.compute_size();
-                                trace!("Deserialized message of size {} bytes", serialized_size);
-
-                                buffer.advance(serialized_size as usize);
-
-                                let serialized_size_u64: u64 = serialized_size.try_into().expect(
-                                    format!(
-                                        "Conversion from usize ({}) to u64 failed",
-                                        serialized_size
-                                    )
-                                    .as_str(),
-                                );
-                                bytes_read_u64 -= serialized_size_u64;
-                                trace!("{} bytes still unprocessed", bytes_read_u64);
-
-                                trace!("Sending deserialized message downstream");
-                                return Poll::Ready(Some(data.take_payload()));
+                        let mut cursor = Cursor::new(buffer.as_mut());
+                        match ConnectDatagram::decode(&mut cursor) {
+                            Ok(data) => {
+                                return match data.version() {
+                                    _ => {
+                                        let serialized_size = data.size();
+                                        trace!(
+                                            "Deserialized message of size {} bytes",
+                                            serialized_size
+                                        );
+
+                                        buffer.advance(serialized_size);
+                                        bytes_read -= serialized_size;
+                                        trace!("{} bytes still unprocessed", bytes_read);
+
+                                        trace!("Sending deserialized message downstream");
+                                        Poll::Ready(Some(data))
+                                    }
+                                }
                             }
 
                             Err(err) => {