]> git.lizzy.rs Git - connect-rs.git/commitdiff
remove `block_on` in tls-listener
authorSachandhan Ganesh <sachan.ganesh@gmail.com>
Sat, 13 Feb 2021 09:00:12 +0000 (01:00 -0800)
committerSachandhan Ganesh <sachan.ganesh@gmail.com>
Sat, 13 Feb 2021 09:05:58 +0000 (01:05 -0800)
examples/tls-client/Cargo.toml
examples/tls-echo-server/Cargo.toml
src/lib.rs
src/tcp/client.rs
src/tcp/listener.rs
src/tls/client.rs
src/tls/listener.rs
src/writer.rs

index e6cf99bfc9d0a3dcd7ea2d18208b0653b94162d8..33013c451fec598acbb8586b580a205efe47c914 100644 (file)
@@ -12,4 +12,4 @@ async-std = { version = "1.6.2", features = ["attributes"] }
 env_logger = "0.7"
 log = "0.4"
 
-connect = { path = "../../" }
+connect = { path = "../../", features = ["tls"] }
index 820c0c6295ba33dbd84ad0788ce6f4a0de7fbad0..5f8478cbb5d27d79fc8ed47dcce9960437c0a761 100644 (file)
@@ -12,4 +12,4 @@ async-std = { version = "1.6.2", features = ["attributes"] }
 env_logger = "0.7"
 log = "0.4"
 
-connect = { path = "../../" }
+connect = { path = "../../", features = ["tls"] }
index 7ead0504a60aff7cc54088025aaab0624ef89ff1..033d389ad66c578eaf8973f478cbf931c73fb32f 100644 (file)
@@ -26,7 +26,7 @@ mod writer;
 
 pub use crate::protocol::{ConnectDatagram, DatagramEmptyError};
 pub use crate::reader::ConnectionReader;
-pub use crate::writer::{ConnectionWriter, ConnectionWriteError};
+pub use crate::writer::{ConnectionWriteError, ConnectionWriter};
 use async_std::{net::SocketAddr, pin::Pin};
 use futures::{AsyncRead, AsyncWrite};
 pub use futures::{SinkExt, StreamExt};
index a02f3796cf9c8f8a9ae9e539f2bb2d98e71a75db..d7f8ff8f6d9432f07158b6622ac0102fc694a5a5 100644 (file)
@@ -8,6 +8,9 @@ impl Connection {
     ///
     /// # Example
     ///
+    /// Please see the [tcp-client](https://github.com/sachanganesh/connect-rs/blob/main/examples/tcp-client/src/main.rs)
+    /// example program for a more thorough showcase.
+    ///
     /// Basic usage:
     ///
     /// ```ignore
index 71a876e6a98d04c5272e92bd5fcfb7b2617bd11c..de0537a8e864b5905973de9ebd3ad5717a8f607d 100644 (file)
@@ -1,5 +1,5 @@
 use crate::Connection;
-use async_std::net::{SocketAddr, TcpListener as AsyncListener, ToSocketAddrs, TcpStream};
+use async_std::net::{SocketAddr, TcpListener as AsyncListener, TcpStream, ToSocketAddrs};
 use async_std::pin::Pin;
 use async_std::task::{Context, Poll};
 use async_stream::stream;
@@ -14,6 +14,9 @@ use log::*;
 ///
 /// # Example
 ///
+/// Please see the [tcp-echo-server](https://github.com/sachanganesh/connect-rs/blob/main/examples/tcp-echo-server/src/main.rs)
+/// example program for a more thorough showcase.
+///
 /// Basic usage:
 ///
 /// ```ignore
@@ -28,7 +31,8 @@ use log::*;
 pub struct TcpListener {
     local_addrs: SocketAddr,
     // listener: AsyncListener,
-    conn_stream: Pin<Box<dyn Stream<Item = Option<Result<TcpStream, std::io::Error>>> + Send + Sync>>,
+    conn_stream:
+        Pin<Box<dyn Stream<Item = Option<Result<TcpStream, std::io::Error>>> + Send + Sync>>,
 }
 
 impl TcpListener {
@@ -100,7 +104,7 @@ impl Stream for TcpListener {
                     "Encountered error when trying to accept new connection {}",
                     err
                 );
-                Poll::Ready(None)
+                Poll::Pending
             }
 
             Poll::Ready(Some(None)) => Poll::Ready(None),
index 33a62977d842b96f81ad711d677413f1ffde058b..6285ed1b8d32f58bc9312d2c883f5f72e29db15a 100644 (file)
@@ -12,14 +12,14 @@ impl Connection {
     ///
     /// # Example
     ///
+    /// Please see the [tls-client](https://github.com/sachanganesh/connect-rs/blob/main/examples/tls-client/src/main.rs)
+    /// example program for a more thorough showcase.
+    ///
     /// Basic usage:
     ///
     /// ```ignore
     /// let mut conn = Connection::tls_client("127.0.0.1:3456", "localhost", client_config.into()).await?;
     /// ```
-    ///
-    /// Please see the [tls-client](https://github.com/sachanganesh/connect-rs/blob/main/examples/tls-client/src/main.rs)
-    /// example program for a more thorough showcase.
     pub async fn tls_client<A: ToSocketAddrs + std::fmt::Display>(
         ip_addrs: A,
         domain: &str,
index 047dcae88dbbe301aea273d921c18d3a7b2b432f..4378baa368db9f50e61696533ff4fb2d45a14e51 100644 (file)
@@ -1,10 +1,12 @@
 use crate::tls::TlsConnectionMetadata;
 use crate::Connection;
-use async_std::net::{SocketAddr, TcpListener, ToSocketAddrs};
+use async_std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
 use async_std::pin::Pin;
 use async_std::task::{Context, Poll};
-use async_tls::TlsAcceptor;
-use futures::{Stream, StreamExt};
+use async_stream::stream;
+use async_tls::{server::TlsStream, TlsAcceptor};
+use futures::Stream;
+use futures_lite::StreamExt;
 use log::*;
 
 /// Listens on a bound socket for incoming TLS connections to be handled as independent
@@ -14,6 +16,9 @@ use log::*;
 ///
 /// # Example
 ///
+/// Please see the [tls-echo-server](https://github.com/sachanganesh/connect-rs/blob/main/examples/tls-echo-server/src/main.rs)
+/// example program for a more thorough showcase.
+///
 /// Basic usage:
 ///
 /// ```ignore
@@ -27,8 +32,18 @@ use log::*;
 #[allow(dead_code)]
 pub struct TlsListener {
     local_addrs: SocketAddr,
-    listener: TcpListener,
-    acceptor: TlsAcceptor,
+    conn_stream: Pin<
+        Box<
+            dyn Stream<
+                    Item = Option<
+                        Option<(SocketAddr, Result<TlsStream<TcpStream>, std::io::Error>)>,
+                    >,
+                > + Send
+                + Sync,
+        >,
+    >,
+    // listener: TcpListener,
+    // acceptor: TlsAcceptor,
 }
 
 impl TlsListener {
@@ -46,87 +61,103 @@ impl TlsListener {
         ip_addrs: A,
         acceptor: TlsAcceptor,
     ) -> anyhow::Result<Self> {
-        let listener = TcpListener::bind(ip_addrs).await?;
-        info!("Started TLS server at {}", listener.local_addr()?);
+        let listener = TcpListener::bind(&ip_addrs).await?;
+        info!("Started TLS server at {}", &ip_addrs);
 
-        Ok(Self {
-            local_addrs: listener.local_addr()?,
-            listener,
-            acceptor,
-        })
-    }
+        let local_addrs = listener.local_addr()?;
 
-    /// Creates a [`Connection`] for the next `accept`ed TCP connection at the bound socket.
-    ///
-    /// # Example
-    ///
-    /// Basic usage:
-    ///
-    /// ```ignore
-    /// let mut server = TlsListener::bind("127.0.0.1:3456", config.into()).await?;
-    /// while let Some(mut conn) = server.next().await {
-    ///     // do something with connection
-    /// }
-    /// ```
-    pub async fn accept(&self) -> anyhow::Result<Connection> {
-        let (tcp_stream, peer_addr) = self.listener.accept().await?;
-        debug!("Received connection attempt from {}", peer_addr);
+        let stream = Box::pin(stream! {
+            loop {
+                yield match listener.incoming().next().await {
+                    Some(Ok(tcp_stream)) => {
+                        let peer_addr = tcp_stream
+                            .peer_addr()
+                            .expect("Could not retrieve peer IP address");
+                        debug!("Received connection attempt from {}", peer_addr);
 
-        match self.acceptor.accept(tcp_stream).await {
-            Ok(tls_stream) => {
-                debug!("Completed TLS handshake with {}", peer_addr);
-                Ok(Connection::from(TlsConnectionMetadata::Listener {
-                    local_addr: self.local_addrs.clone(),
-                    peer_addr,
-                    stream: tls_stream,
-                }))
-            }
+                        Some(Some((peer_addr, acceptor.accept(tcp_stream).await)))
+                    }
 
-            Err(e) => {
-                warn!("Could not encrypt connection with TLS from {}", peer_addr);
-                Err(anyhow::Error::new(e))
+                    Some(Err(err)) => {
+                        error!(
+                            "Encountered error when trying to accept new connection {}",
+                            err
+                        );
+                        Some(None)
+                    }
+
+                    None => None,
+                }
             }
-        }
+        });
+
+        Ok(Self {
+            local_addrs,
+            conn_stream: stream,
+            // listener,
+            // acceptor,
+        })
     }
+
+    // /// Creates a [`Connection`] for the next `accept`ed TCP connection at the bound socket.
+    // ///
+    // /// # Example
+    // ///
+    // /// Basic usage:
+    // ///
+    // /// ```ignore
+    // /// let mut server = TlsListener::bind("127.0.0.1:3456", config.into()).await?;
+    // /// while let Some(mut conn) = server.next().await {
+    // ///     // do something with connection
+    // /// }
+    // /// ```
+    // pub async fn accept(&self) -> anyhow::Result<Connection> {
+    //     let (tcp_stream, peer_addr) = self.listener.accept().await?;
+    //     debug!("Received connection attempt from {}", peer_addr);
+    //
+    //     match self.acceptor.accept(tcp_stream).await {
+    //         Ok(tls_stream) => {
+    //             debug!("Completed TLS handshake with {}", peer_addr);
+    //             Ok(Connection::from(TlsConnectionMetadata::Listener {
+    //                 local_addr: self.local_addrs.clone(),
+    //                 peer_addr,
+    //                 stream: tls_stream,
+    //             }))
+    //         }
+    //
+    //         Err(e) => {
+    //             warn!("Could not encrypt connection with TLS from {}", peer_addr);
+    //             Err(anyhow::Error::new(e))
+    //         }
+    //     }
+    // }
 }
 
 impl Stream for TlsListener {
     type Item = Connection;
 
-    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
-        match futures::executor::block_on(self.listener.incoming().next()) {
-            Some(Ok(tcp_stream)) => {
-                let peer_addr = tcp_stream
-                    .peer_addr()
-                    .expect("Could not retrieve peer IP address");
-                debug!("Received connection attempt from {}", peer_addr);
-
-                match futures::executor::block_on(self.acceptor.accept(tcp_stream)) {
-                    Ok(tls_stream) => {
-                        debug!("Completed TLS handshake with {}", peer_addr);
-                        Poll::Ready(Some(Connection::from(TlsConnectionMetadata::Listener {
-                            local_addr: self.local_addrs.clone(),
-                            peer_addr,
-                            stream: tls_stream,
-                        })))
-                    }
-
-                    Err(_e) => {
-                        warn!("Could not encrypt connection with TLS from {}", peer_addr);
-                        Poll::Pending
-                    }
-                }
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        match self.conn_stream.poll_next(cx) {
+            Poll::Ready(Some(Some(Some((peer_addr, Ok(tls_stream)))))) => {
+                debug!("Completed TLS handshake with {}", peer_addr);
+                Poll::Ready(Some(Connection::from(TlsConnectionMetadata::Listener {
+                    local_addr: self.local_addrs.clone(),
+                    peer_addr,
+                    stream: tls_stream,
+                })))
             }
 
-            Some(Err(e)) => {
-                error!(
-                    "Encountered error when trying to accept new connection {}",
-                    e
+            Poll::Ready(Some(Some(Some((peer_addr, Err(err)))))) => {
+                warn!(
+                    "Could not encrypt connection with TLS from {}: {}",
+                    peer_addr, err
                 );
                 Poll::Pending
             }
 
-            None => Poll::Ready(None),
+            Poll::Pending => Poll::Pending,
+
+            _ => Poll::Ready(None),
         }
     }
 }
index c65a4441cfc0b922ae7977de879493bcb52a2593..9b48c02899f7b9d73d1db13ffb693a9dfa9147b8 100644 (file)
@@ -27,7 +27,9 @@ impl Error for ConnectionWriteError {}
 impl std::fmt::Display for ConnectionWriteError {
     fn fmt(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
         match self {
-            ConnectionWriteError::ConnectionClosed => formatter.write_str("cannot send message when connection is closed"),
+            ConnectionWriteError::ConnectionClosed => {
+                formatter.write_str("cannot send message when connection is closed")
+            }
             ConnectionWriteError::IoError(err) => std::fmt::Display::fmt(&err, formatter),
         }
     }