-use crate::schema::ConnectionMessage;
use async_std::net::SocketAddr;
use async_std::pin::Pin;
use bytes::{Buf, BytesMut};
use futures::task::{Context, Poll};
use futures::{AsyncRead, Stream};
use log::*;
-use protobuf::Message;
-use std::convert::TryInto;
+use crate::protocol::ConnectDatagram;
pub use futures::SinkExt;
pub use futures::StreamExt;
-use protobuf::well_known_types::Any;
+use std::io::Cursor;
/// A default buffer size to read in bytes and then deserialize as messages
const BUFFER_SIZE: usize = 8192;
}
impl Stream for ConnectionReader {
- type Item = Any;
+ type Item = ConnectDatagram;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut buffer = BytesMut::new();
buffer = pending_buf;
}
- let mut bytes_read_u64: u64 = bytes_read.try_into().expect(
- format!("Conversion from usize ({}) to u64 failed", bytes_read).as_str(),
- );
- while bytes_read_u64 > 0 {
- trace!(
- "{} bytes from network stream still unprocessed",
- bytes_read_u64
- );
+ while bytes_read > 0 {
+ trace!("{} bytes from network stream still unprocessed", bytes_read);
buffer.resize(bytes_read, 0);
- match ConnectionMessage::parse_from_bytes(buffer.as_ref()) {
- Ok(mut data) => {
- let serialized_size = data.compute_size();
- trace!("Deserialized message of size {} bytes", serialized_size);
-
- buffer.advance(serialized_size as usize);
-
- let serialized_size_u64: u64 = serialized_size.try_into().expect(
- format!(
- "Conversion from usize ({}) to u64 failed",
- serialized_size
- )
- .as_str(),
- );
- bytes_read_u64 -= serialized_size_u64;
- trace!("{} bytes still unprocessed", bytes_read_u64);
-
- trace!("Sending deserialized message downstream");
- return Poll::Ready(Some(data.take_payload()));
+ let mut cursor = Cursor::new(buffer.as_mut());
+ match ConnectDatagram::decode(&mut cursor) {
+ Ok(data) => {
+ return match data.version() {
+ _ => {
+ let serialized_size = data.size();
+ trace!(
+ "Deserialized message of size {} bytes",
+ serialized_size
+ );
+
+ buffer.advance(serialized_size);
+ bytes_read -= serialized_size;
+ trace!("{} bytes still unprocessed", bytes_read);
+
+ trace!("Sending deserialized message downstream");
+ Poll::Ready(Some(data))
+ }
+ }
}
Err(err) => {