1 use crate::schema::ConnectionMessage;
2 use async_std::net::SocketAddr;
3 use async_std::pin::Pin;
4 use bytes::{Buf, BytesMut};
5 use futures::task::{Context, Poll};
6 use futures::{AsyncRead, Stream};
9 use std::convert::TryInto;
11 pub use futures::SinkExt;
12 pub use futures::StreamExt;
13 use protobuf::well_known_types::Any;
15 const BUFFER_SIZE: usize = 8192;
17 pub struct ConnectionReader {
18 local_addr: SocketAddr,
19 peer_addr: SocketAddr,
20 read_stream: Pin<Box<dyn AsyncRead + Send + Sync>>,
21 pending_read: Option<BytesMut>,
25 impl ConnectionReader {
27 local_addr: SocketAddr,
28 peer_addr: SocketAddr,
29 read_stream: Pin<Box<dyn AsyncRead + Send + Sync>>,
40 pub fn local_addr(&self) -> SocketAddr {
41 self.local_addr.clone()
44 pub fn peer_addr(&self) -> SocketAddr {
45 self.peer_addr.clone()
48 pub fn is_closed(&self) -> bool {
52 pub(crate) fn close_stream(&mut self) {
53 trace!("Closing the stream for connection with {}", self.peer_addr);
54 self.pending_read.take();
59 impl Stream for ConnectionReader {
62 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
63 let mut buffer = BytesMut::new();
64 buffer.resize(BUFFER_SIZE, 0);
66 trace!("Starting new read loop for {}", self.local_addr);
68 trace!("Reading from the stream");
69 let stream = self.read_stream.as_mut();
71 match stream.poll_read(cx, &mut buffer) {
72 Poll::Pending => return Poll::Pending,
74 Poll::Ready(Ok(mut bytes_read)) => {
76 trace!("Read {} bytes from the network stream", bytes_read)
77 } else if self.pending_read.is_none() {
79 return Poll::Ready(None)
82 if let Some(mut pending_buf) = self.pending_read.take() {
83 trace!("Prepending broken data ({} bytes) encountered from earlier read of network stream", pending_buf.len());
84 bytes_read += pending_buf.len();
86 pending_buf.unsplit(buffer);
90 let mut bytes_read_u64: u64 = bytes_read.try_into().expect(
91 format!("Conversion from usize ({}) to u64 failed", bytes_read).as_str(),
93 while bytes_read_u64 > 0 {
95 "{} bytes from network stream still unprocessed",
99 buffer.resize(bytes_read, 0);
101 match ConnectionMessage::parse_from_bytes(buffer.as_ref()) {
103 let serialized_size = data.compute_size();
104 trace!("Deserialized message of size {} bytes", serialized_size);
106 buffer.advance(serialized_size as usize);
108 let serialized_size_u64: u64 = serialized_size.try_into().expect(
110 "Conversion from usize ({}) to u64 failed",
115 bytes_read_u64 -= serialized_size_u64;
116 trace!("{} bytes still unprocessed", bytes_read_u64);
118 trace!("Sending deserialized message downstream");
119 return Poll::Ready(Some(data.take_payload()));
124 "Could not deserialize data from the received bytes: {:#?}",
128 self.pending_read = Some(buffer);
129 buffer = BytesMut::new();
135 buffer.resize(BUFFER_SIZE, 0);
139 Poll::Ready(Err(_e)) => {
141 return Poll::Ready(None)