From 788859f5c032e9469ef30235b73c7d4630e17441 Mon Sep 17 00:00:00 2001 From: Sachandhan Ganesh Date: Sat, 13 Feb 2021 01:00:12 -0800 Subject: [PATCH] remove `block_on` in tls-listener --- examples/tls-client/Cargo.toml | 2 +- examples/tls-echo-server/Cargo.toml | 2 +- src/lib.rs | 2 +- src/tcp/client.rs | 3 + src/tcp/listener.rs | 10 +- src/tls/client.rs | 6 +- src/tls/listener.rs | 169 ++++++++++++++++------------ src/writer.rs | 4 +- 8 files changed, 119 insertions(+), 79 deletions(-) diff --git a/examples/tls-client/Cargo.toml b/examples/tls-client/Cargo.toml index e6cf99b..33013c4 100644 --- a/examples/tls-client/Cargo.toml +++ b/examples/tls-client/Cargo.toml @@ -12,4 +12,4 @@ async-std = { version = "1.6.2", features = ["attributes"] } env_logger = "0.7" log = "0.4" -connect = { path = "../../" } +connect = { path = "../../", features = ["tls"] } diff --git a/examples/tls-echo-server/Cargo.toml b/examples/tls-echo-server/Cargo.toml index 820c0c6..5f8478c 100644 --- a/examples/tls-echo-server/Cargo.toml +++ b/examples/tls-echo-server/Cargo.toml @@ -12,4 +12,4 @@ async-std = { version = "1.6.2", features = ["attributes"] } env_logger = "0.7" log = "0.4" -connect = { path = "../../" } +connect = { path = "../../", features = ["tls"] } diff --git a/src/lib.rs b/src/lib.rs index 7ead050..033d389 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,7 +26,7 @@ mod writer; pub use crate::protocol::{ConnectDatagram, DatagramEmptyError}; pub use crate::reader::ConnectionReader; -pub use crate::writer::{ConnectionWriter, ConnectionWriteError}; +pub use crate::writer::{ConnectionWriteError, ConnectionWriter}; use async_std::{net::SocketAddr, pin::Pin}; use futures::{AsyncRead, AsyncWrite}; pub use futures::{SinkExt, StreamExt}; diff --git a/src/tcp/client.rs b/src/tcp/client.rs index a02f379..d7f8ff8 100644 --- a/src/tcp/client.rs +++ b/src/tcp/client.rs @@ -8,6 +8,9 @@ impl Connection { /// /// # Example /// + /// Please see the [tcp-client](https://github.com/sachanganesh/connect-rs/blob/main/examples/tcp-client/src/main.rs) + /// example program for a more thorough showcase. + /// /// Basic usage: /// /// ```ignore diff --git a/src/tcp/listener.rs b/src/tcp/listener.rs index 71a876e..de0537a 100644 --- a/src/tcp/listener.rs +++ b/src/tcp/listener.rs @@ -1,5 +1,5 @@ use crate::Connection; -use async_std::net::{SocketAddr, TcpListener as AsyncListener, ToSocketAddrs, TcpStream}; +use async_std::net::{SocketAddr, TcpListener as AsyncListener, TcpStream, ToSocketAddrs}; use async_std::pin::Pin; use async_std::task::{Context, Poll}; use async_stream::stream; @@ -14,6 +14,9 @@ use log::*; /// /// # Example /// +/// Please see the [tcp-echo-server](https://github.com/sachanganesh/connect-rs/blob/main/examples/tcp-echo-server/src/main.rs) +/// example program for a more thorough showcase. +/// /// Basic usage: /// /// ```ignore @@ -28,7 +31,8 @@ use log::*; pub struct TcpListener { local_addrs: SocketAddr, // listener: AsyncListener, - conn_stream: Pin>> + Send + Sync>>, + conn_stream: + Pin>> + Send + Sync>>, } impl TcpListener { @@ -100,7 +104,7 @@ impl Stream for TcpListener { "Encountered error when trying to accept new connection {}", err ); - Poll::Ready(None) + Poll::Pending } Poll::Ready(Some(None)) => Poll::Ready(None), diff --git a/src/tls/client.rs b/src/tls/client.rs index 33a6297..6285ed1 100644 --- a/src/tls/client.rs +++ b/src/tls/client.rs @@ -12,14 +12,14 @@ impl Connection { /// /// # Example /// + /// Please see the [tls-client](https://github.com/sachanganesh/connect-rs/blob/main/examples/tls-client/src/main.rs) + /// example program for a more thorough showcase. + /// /// Basic usage: /// /// ```ignore /// let mut conn = Connection::tls_client("127.0.0.1:3456", "localhost", client_config.into()).await?; /// ``` - /// - /// Please see the [tls-client](https://github.com/sachanganesh/connect-rs/blob/main/examples/tls-client/src/main.rs) - /// example program for a more thorough showcase. pub async fn tls_client( ip_addrs: A, domain: &str, diff --git a/src/tls/listener.rs b/src/tls/listener.rs index 047dcae..4378baa 100644 --- a/src/tls/listener.rs +++ b/src/tls/listener.rs @@ -1,10 +1,12 @@ use crate::tls::TlsConnectionMetadata; use crate::Connection; -use async_std::net::{SocketAddr, TcpListener, ToSocketAddrs}; +use async_std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs}; use async_std::pin::Pin; use async_std::task::{Context, Poll}; -use async_tls::TlsAcceptor; -use futures::{Stream, StreamExt}; +use async_stream::stream; +use async_tls::{server::TlsStream, TlsAcceptor}; +use futures::Stream; +use futures_lite::StreamExt; use log::*; /// Listens on a bound socket for incoming TLS connections to be handled as independent @@ -14,6 +16,9 @@ use log::*; /// /// # Example /// +/// Please see the [tls-echo-server](https://github.com/sachanganesh/connect-rs/blob/main/examples/tls-echo-server/src/main.rs) +/// example program for a more thorough showcase. +/// /// Basic usage: /// /// ```ignore @@ -27,8 +32,18 @@ use log::*; #[allow(dead_code)] pub struct TlsListener { local_addrs: SocketAddr, - listener: TcpListener, - acceptor: TlsAcceptor, + conn_stream: Pin< + Box< + dyn Stream< + Item = Option< + Option<(SocketAddr, Result, std::io::Error>)>, + >, + > + Send + + Sync, + >, + >, + // listener: TcpListener, + // acceptor: TlsAcceptor, } impl TlsListener { @@ -46,87 +61,103 @@ impl TlsListener { ip_addrs: A, acceptor: TlsAcceptor, ) -> anyhow::Result { - let listener = TcpListener::bind(ip_addrs).await?; - info!("Started TLS server at {}", listener.local_addr()?); + let listener = TcpListener::bind(&ip_addrs).await?; + info!("Started TLS server at {}", &ip_addrs); - Ok(Self { - local_addrs: listener.local_addr()?, - listener, - acceptor, - }) - } + let local_addrs = listener.local_addr()?; - /// Creates a [`Connection`] for the next `accept`ed TCP connection at the bound socket. - /// - /// # Example - /// - /// Basic usage: - /// - /// ```ignore - /// let mut server = TlsListener::bind("127.0.0.1:3456", config.into()).await?; - /// while let Some(mut conn) = server.next().await { - /// // do something with connection - /// } - /// ``` - pub async fn accept(&self) -> anyhow::Result { - let (tcp_stream, peer_addr) = self.listener.accept().await?; - debug!("Received connection attempt from {}", peer_addr); + let stream = Box::pin(stream! { + loop { + yield match listener.incoming().next().await { + Some(Ok(tcp_stream)) => { + let peer_addr = tcp_stream + .peer_addr() + .expect("Could not retrieve peer IP address"); + debug!("Received connection attempt from {}", peer_addr); - match self.acceptor.accept(tcp_stream).await { - Ok(tls_stream) => { - debug!("Completed TLS handshake with {}", peer_addr); - Ok(Connection::from(TlsConnectionMetadata::Listener { - local_addr: self.local_addrs.clone(), - peer_addr, - stream: tls_stream, - })) - } + Some(Some((peer_addr, acceptor.accept(tcp_stream).await))) + } - Err(e) => { - warn!("Could not encrypt connection with TLS from {}", peer_addr); - Err(anyhow::Error::new(e)) + Some(Err(err)) => { + error!( + "Encountered error when trying to accept new connection {}", + err + ); + Some(None) + } + + None => None, + } } - } + }); + + Ok(Self { + local_addrs, + conn_stream: stream, + // listener, + // acceptor, + }) } + + // /// Creates a [`Connection`] for the next `accept`ed TCP connection at the bound socket. + // /// + // /// # Example + // /// + // /// Basic usage: + // /// + // /// ```ignore + // /// let mut server = TlsListener::bind("127.0.0.1:3456", config.into()).await?; + // /// while let Some(mut conn) = server.next().await { + // /// // do something with connection + // /// } + // /// ``` + // pub async fn accept(&self) -> anyhow::Result { + // let (tcp_stream, peer_addr) = self.listener.accept().await?; + // debug!("Received connection attempt from {}", peer_addr); + // + // match self.acceptor.accept(tcp_stream).await { + // Ok(tls_stream) => { + // debug!("Completed TLS handshake with {}", peer_addr); + // Ok(Connection::from(TlsConnectionMetadata::Listener { + // local_addr: self.local_addrs.clone(), + // peer_addr, + // stream: tls_stream, + // })) + // } + // + // Err(e) => { + // warn!("Could not encrypt connection with TLS from {}", peer_addr); + // Err(anyhow::Error::new(e)) + // } + // } + // } } impl Stream for TlsListener { type Item = Connection; - fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - match futures::executor::block_on(self.listener.incoming().next()) { - Some(Ok(tcp_stream)) => { - let peer_addr = tcp_stream - .peer_addr() - .expect("Could not retrieve peer IP address"); - debug!("Received connection attempt from {}", peer_addr); - - match futures::executor::block_on(self.acceptor.accept(tcp_stream)) { - Ok(tls_stream) => { - debug!("Completed TLS handshake with {}", peer_addr); - Poll::Ready(Some(Connection::from(TlsConnectionMetadata::Listener { - local_addr: self.local_addrs.clone(), - peer_addr, - stream: tls_stream, - }))) - } - - Err(_e) => { - warn!("Could not encrypt connection with TLS from {}", peer_addr); - Poll::Pending - } - } + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.conn_stream.poll_next(cx) { + Poll::Ready(Some(Some(Some((peer_addr, Ok(tls_stream)))))) => { + debug!("Completed TLS handshake with {}", peer_addr); + Poll::Ready(Some(Connection::from(TlsConnectionMetadata::Listener { + local_addr: self.local_addrs.clone(), + peer_addr, + stream: tls_stream, + }))) } - Some(Err(e)) => { - error!( - "Encountered error when trying to accept new connection {}", - e + Poll::Ready(Some(Some(Some((peer_addr, Err(err)))))) => { + warn!( + "Could not encrypt connection with TLS from {}: {}", + peer_addr, err ); Poll::Pending } - None => Poll::Ready(None), + Poll::Pending => Poll::Pending, + + _ => Poll::Ready(None), } } } diff --git a/src/writer.rs b/src/writer.rs index c65a444..9b48c02 100644 --- a/src/writer.rs +++ b/src/writer.rs @@ -27,7 +27,9 @@ impl Error for ConnectionWriteError {} impl std::fmt::Display for ConnectionWriteError { fn fmt(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { match self { - ConnectionWriteError::ConnectionClosed => formatter.write_str("cannot send message when connection is closed"), + ConnectionWriteError::ConnectionClosed => { + formatter.write_str("cannot send message when connection is closed") + } ConnectionWriteError::IoError(err) => std::fmt::Display::fmt(&err, formatter), } } -- 2.44.0