]> git.lizzy.rs Git - connect-rs.git/commitdiff
remove `block_on` in tcp-listener
authorSachandhan Ganesh <sachan.ganesh@gmail.com>
Sat, 13 Feb 2021 08:28:35 +0000 (00:28 -0800)
committerSachandhan Ganesh <sachan.ganesh@gmail.com>
Sat, 13 Feb 2021 08:28:35 +0000 (00:28 -0800)
Cargo.toml
src/reader.rs
src/tcp/listener.rs

index 2e7b1c984d87062dd70d1bd20966b562f88532fc..3ce2e536bc3c04dbbbd3d38859f290bf4d154e52 100644 (file)
@@ -27,10 +27,12 @@ members = [
 tls = ["async-tls", "rustls"]
 
 [dependencies]
-anyhow = "1.0.31"
+anyhow = "1.0"
 async-std = { version = "1.9.0", features = ["unstable"] }
+async-stream = "0.3.0"
 bytes = "0.5.5"
-futures = "0.3.8"
+futures = "0.3"
+futures-lite = "1.11"
 log = "0.4"
 
 async-tls = { version = "0.11.0", default-features = false, features = ["client", "server"], optional = true }
index b046c84bdd1e5429ada023bb14fac654ebc78b00..b1b7248c0e68179826177cb0468f8893570f9d5c 100644 (file)
@@ -1,14 +1,14 @@
+use crate::protocol::ConnectDatagram;
 use async_std::net::SocketAddr;
 use async_std::pin::Pin;
 use bytes::{Buf, BytesMut};
 use futures::task::{Context, Poll};
 use futures::{AsyncRead, Stream};
 use log::*;
+use std::io::Cursor;
 
-use crate::protocol::ConnectDatagram;
 pub use futures::SinkExt;
 pub use futures::StreamExt;
-use std::io::Cursor;
 
 /// A default buffer size to read in bytes and then deserialize as messages.
 const BUFFER_SIZE: usize = 8192;
index 997975a4c93c30d4436d028e11af168e9e0a263a..71a876e6a98d04c5272e92bd5fcfb7b2617bd11c 100644 (file)
@@ -1,8 +1,10 @@
 use crate::Connection;
-use async_std::net::{SocketAddr, TcpListener as AsyncListener, ToSocketAddrs};
+use async_std::net::{SocketAddr, TcpListener as AsyncListener, ToSocketAddrs, TcpStream};
 use async_std::pin::Pin;
 use async_std::task::{Context, Poll};
-use futures::{Stream, StreamExt};
+use async_stream::stream;
+use futures::Stream;
+use futures_lite::StreamExt;
 use log::*;
 
 /// Listens on a bound socket for incoming TCP connections to be handled as independent
@@ -25,7 +27,8 @@ use log::*;
 #[allow(dead_code)]
 pub struct TcpListener {
     local_addrs: SocketAddr,
-    listener: AsyncListener,
+    // listener: AsyncListener,
+    conn_stream: Pin<Box<dyn Stream<Item = Option<Result<TcpStream, std::io::Error>>> + Send + Sync>>,
 }
 
 impl TcpListener {
@@ -43,38 +46,47 @@ impl TcpListener {
         let listener = AsyncListener::bind(&ip_addrs).await?;
         info!("Started TCP server at {}", &ip_addrs);
 
+        let local_addrs = listener.local_addr()?;
+
+        let stream = Box::pin(stream! {
+            loop {
+                yield listener.incoming().next().await;
+            }
+        });
+
         Ok(Self {
-            local_addrs: listener.local_addr()?,
-            listener,
+            local_addrs,
+            // listener,
+            conn_stream: stream,
         })
     }
 
-    /// Creates a [`Connection`] for the next `accept`ed TCP connection at the bound socket.
-    ///
-    /// # Example
-    ///
-    /// Basic usage:
-    ///
-    /// ```ignore
-    /// let mut server = TcpListener::bind("127.0.0.1:3456").await?;
-    /// while let Ok(mut conn) = server.accept().await? {
-    ///     // handle the connection
-    /// }
-    /// ```
-    pub async fn accept(&self) -> anyhow::Result<Connection> {
-        let (stream, ip_addr) = self.listener.accept().await?;
-        debug!("Received connection attempt from {}", ip_addr);
-
-        Ok(Connection::from(stream))
-    }
+    // /// Creates a [`Connection`] for the next `accept`ed TCP connection at the bound socket.
+    // ///
+    // /// # Example
+    // ///
+    // /// Basic usage:
+    // ///
+    // /// ```ignore
+    // /// let mut server = TcpListener::bind("127.0.0.1:3456").await?;
+    // /// while let Ok(mut conn) = server.accept().await? {
+    // ///     // handle the connection
+    // /// }
+    // /// ```
+    // pub async fn accept(&self) -> anyhow::Result<Connection> {
+    //     let (stream, ip_addr) = self.listener.accept().await?;
+    //     debug!("Received connection attempt from {}", ip_addr);
+    //
+    //     Ok(Connection::from(stream))
+    // }
 }
 
 impl Stream for TcpListener {
     type Item = Connection;
 
-    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
-        match futures::executor::block_on(self.listener.incoming().next()) {
-            Some(Ok(tcp_stream)) => {
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        match self.conn_stream.poll_next(cx) {
+            Poll::Ready(Some(Some(Ok(tcp_stream)))) => {
                 let peer_addr = tcp_stream
                     .peer_addr()
                     .expect("Could not retrieve peer IP address");
@@ -83,15 +95,19 @@ impl Stream for TcpListener {
                 Poll::Ready(Some(Connection::from(tcp_stream)))
             }
 
-            Some(Err(e)) => {
+            Poll::Ready(Some(Some(Err(err)))) => {
                 error!(
                     "Encountered error when trying to accept new connection {}",
-                    e
+                    err
                 );
-                Poll::Pending
+                Poll::Ready(None)
             }
 
-            None => Poll::Ready(None),
+            Poll::Ready(Some(None)) => Poll::Ready(None),
+
+            Poll::Ready(None) => Poll::Ready(None),
+
+            Poll::Pending => Poll::Pending,
         }
     }
 }