const BUFFER_SIZE: usize = 8192;
pub struct ConnectionReader {
- local_addr: SocketAddr,
- peer_addr: SocketAddr,
- read_stream: Box<dyn AsyncRead + Send + Sync + Unpin>,
+ local_addr: SocketAddr,
+ peer_addr: SocketAddr,
+ read_stream: Box<dyn AsyncRead + Send + Sync + Unpin>,
pending_read: Option<BytesMut>,
+ closed: bool,
}
impl ConnectionReader {
peer_addr,
read_stream,
pending_read: None,
+ closed: false,
}
}
pub fn peer_addr(&self) -> SocketAddr {
self.peer_addr.clone()
}
+
+ pub fn is_closed(&self) -> bool {
+ self.closed
+ }
}
impl Stream for ConnectionReader {
Ok(mut bytes_read) => {
if bytes_read > 0 {
trace!("Read {} bytes from the network stream", bytes_read)
+ } else if bytes_read == 0 && self.pending_read.is_none() {
+ return Poll::Pending;
}
if let Some(mut pending_buf) = self.pending_read.take() {
buffer.resize(BUFFER_SIZE, 0);
}
- Err(_err) => return Poll::Pending,
+ // Close the stream
+ Err(_err) => {
+ trace!("Closing the stream");
+ self.pending_read.take();
+ self.closed = true;
+ return Poll::Ready(None);
+ }
}
}
}