]> git.lizzy.rs Git - connect-rs.git/blobdiff - src/tcp/listener.rs
remove `block_on` in tcp-listener
[connect-rs.git] / src / tcp / listener.rs
index 997975a4c93c30d4436d028e11af168e9e0a263a..71a876e6a98d04c5272e92bd5fcfb7b2617bd11c 100644 (file)
@@ -1,8 +1,10 @@
 use crate::Connection;
-use async_std::net::{SocketAddr, TcpListener as AsyncListener, ToSocketAddrs};
+use async_std::net::{SocketAddr, TcpListener as AsyncListener, ToSocketAddrs, TcpStream};
 use async_std::pin::Pin;
 use async_std::task::{Context, Poll};
-use futures::{Stream, StreamExt};
+use async_stream::stream;
+use futures::Stream;
+use futures_lite::StreamExt;
 use log::*;
 
 /// Listens on a bound socket for incoming TCP connections to be handled as independent
@@ -25,7 +27,8 @@ use log::*;
 #[allow(dead_code)]
 pub struct TcpListener {
     local_addrs: SocketAddr,
-    listener: AsyncListener,
+    // listener: AsyncListener,
+    conn_stream: Pin<Box<dyn Stream<Item = Option<Result<TcpStream, std::io::Error>>> + Send + Sync>>,
 }
 
 impl TcpListener {
@@ -43,38 +46,47 @@ impl TcpListener {
         let listener = AsyncListener::bind(&ip_addrs).await?;
         info!("Started TCP server at {}", &ip_addrs);
 
+        let local_addrs = listener.local_addr()?;
+
+        let stream = Box::pin(stream! {
+            loop {
+                yield listener.incoming().next().await;
+            }
+        });
+
         Ok(Self {
-            local_addrs: listener.local_addr()?,
-            listener,
+            local_addrs,
+            // listener,
+            conn_stream: stream,
         })
     }
 
-    /// Creates a [`Connection`] for the next `accept`ed TCP connection at the bound socket.
-    ///
-    /// # Example
-    ///
-    /// Basic usage:
-    ///
-    /// ```ignore
-    /// let mut server = TcpListener::bind("127.0.0.1:3456").await?;
-    /// while let Ok(mut conn) = server.accept().await? {
-    ///     // handle the connection
-    /// }
-    /// ```
-    pub async fn accept(&self) -> anyhow::Result<Connection> {
-        let (stream, ip_addr) = self.listener.accept().await?;
-        debug!("Received connection attempt from {}", ip_addr);
-
-        Ok(Connection::from(stream))
-    }
+    // /// Creates a [`Connection`] for the next `accept`ed TCP connection at the bound socket.
+    // ///
+    // /// # Example
+    // ///
+    // /// Basic usage:
+    // ///
+    // /// ```ignore
+    // /// let mut server = TcpListener::bind("127.0.0.1:3456").await?;
+    // /// while let Ok(mut conn) = server.accept().await? {
+    // ///     // handle the connection
+    // /// }
+    // /// ```
+    // pub async fn accept(&self) -> anyhow::Result<Connection> {
+    //     let (stream, ip_addr) = self.listener.accept().await?;
+    //     debug!("Received connection attempt from {}", ip_addr);
+    //
+    //     Ok(Connection::from(stream))
+    // }
 }
 
 impl Stream for TcpListener {
     type Item = Connection;
 
-    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
-        match futures::executor::block_on(self.listener.incoming().next()) {
-            Some(Ok(tcp_stream)) => {
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        match self.conn_stream.poll_next(cx) {
+            Poll::Ready(Some(Some(Ok(tcp_stream)))) => {
                 let peer_addr = tcp_stream
                     .peer_addr()
                     .expect("Could not retrieve peer IP address");
@@ -83,15 +95,19 @@ impl Stream for TcpListener {
                 Poll::Ready(Some(Connection::from(tcp_stream)))
             }
 
-            Some(Err(e)) => {
+            Poll::Ready(Some(Some(Err(err)))) => {
                 error!(
                     "Encountered error when trying to accept new connection {}",
-                    e
+                    err
                 );
-                Poll::Pending
+                Poll::Ready(None)
             }
 
-            None => Poll::Ready(None),
+            Poll::Ready(Some(None)) => Poll::Ready(None),
+
+            Poll::Ready(None) => Poll::Ready(None),
+
+            Poll::Pending => Poll::Pending,
         }
     }
 }