]> git.lizzy.rs Git - connect-rs.git/blobdiff - src/reader.rs
better handling of stream/sink closing
[connect-rs.git] / src / reader.rs
index 8dcb089c194f1883bb98708f9bd57500787e59fe..0452f6c59daf65e182f57c0445b24b10e43900e7 100644 (file)
@@ -15,10 +15,11 @@ use protobuf::well_known_types::Any;
 const BUFFER_SIZE: usize = 8192;
 
 pub struct ConnectionReader {
-    local_addr: SocketAddr,
-    peer_addr: SocketAddr,
-    read_stream: Box<dyn AsyncRead + Send + Sync + Unpin>,
+    local_addr:   SocketAddr,
+    peer_addr:    SocketAddr,
+    read_stream:  Box<dyn AsyncRead + Send + Sync + Unpin>,
     pending_read: Option<BytesMut>,
+    closed:       bool,
 }
 
 impl ConnectionReader {
@@ -32,6 +33,7 @@ impl ConnectionReader {
             peer_addr,
             read_stream,
             pending_read: None,
+            closed: false,
         }
     }
 
@@ -42,6 +44,10 @@ impl ConnectionReader {
     pub fn peer_addr(&self) -> SocketAddr {
         self.peer_addr.clone()
     }
+
+    pub fn is_closed(&self) -> bool {
+        self.closed
+    }
 }
 
 impl Stream for ConnectionReader {
@@ -58,6 +64,8 @@ impl Stream for ConnectionReader {
                 Ok(mut bytes_read) => {
                     if bytes_read > 0 {
                         trace!("Read {} bytes from the network stream", bytes_read)
+                    } else if bytes_read == 0 && self.pending_read.is_none() {
+                        return Poll::Pending;
                     }
 
                     if let Some(mut pending_buf) = self.pending_read.take() {
@@ -116,7 +124,13 @@ impl Stream for ConnectionReader {
                     buffer.resize(BUFFER_SIZE, 0);
                 }
 
-                Err(_err) => return Poll::Pending,
+                // Close the stream
+                Err(_err) => {
+                    trace!("Closing the stream");
+                    self.pending_read.take();
+                    self.closed = true;
+                    return Poll::Ready(None);
+                }
             }
         }
     }