use async_std::pin::Pin;
use bytes::{Buf, BytesMut};
use futures::task::{Context, Poll};
-use futures::{AsyncRead, AsyncReadExt, Stream};
+use futures::{AsyncRead, Stream};
use log::*;
use protobuf::Message;
use std::convert::TryInto;
pub struct ConnectionReader {
local_addr: SocketAddr,
peer_addr: SocketAddr,
- read_stream: Box<dyn AsyncRead + Send + Sync + Unpin>,
+ read_stream: Pin<Box<dyn AsyncRead + Send + Sync>>,
pending_read: Option<BytesMut>,
closed: bool,
}
pub fn new(
local_addr: SocketAddr,
peer_addr: SocketAddr,
- read_stream: Box<dyn AsyncRead + Send + Sync + Unpin>,
+ read_stream: Pin<Box<dyn AsyncRead + Send + Sync>>,
) -> Self {
Self {
local_addr,
pub fn is_closed(&self) -> bool {
self.closed
}
+
+ pub(crate) fn close_stream(&mut self) {
+ trace!("Closing the stream for connection with {}", self.peer_addr);
+ self.pending_read.take();
+ self.closed = true;
+ }
}
impl Stream for ConnectionReader {
type Item = Any;
- fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut buffer = BytesMut::new();
buffer.resize(BUFFER_SIZE, 0);
trace!("Starting new read loop for {}", self.local_addr);
loop {
trace!("Reading from the stream");
- match futures::executor::block_on(self.read_stream.read(&mut buffer)) {
- Ok(mut bytes_read) => {
+ let stream = self.read_stream.as_mut();
+
+ match stream.poll_read(cx, &mut buffer) {
+ Poll::Pending => return Poll::Pending,
+
+ Poll::Ready(Ok(mut bytes_read)) => {
if bytes_read > 0 {
trace!("Read {} bytes from the network stream", bytes_read)
- } else if bytes_read == 0 && self.pending_read.is_none() {
- return Poll::Pending;
+ } else if self.pending_read.is_none() {
+ self.close_stream();
+ return Poll::Ready(None)
}
if let Some(mut pending_buf) = self.pending_read.take() {
}
// Close the stream
- Err(_err) => {
- trace!("Closing the stream");
- self.pending_read.take();
- self.closed = true;
- return Poll::Ready(None);
+ Poll::Ready(Err(_e)) => {
+ self.close_stream();
+ return Poll::Ready(None)
}
}
}