From 0791790bac8fd431cbdfdbbf52f31371b6db884c Mon Sep 17 00:00:00 2001 From: Sachandhan Ganesh Date: Sat, 13 Feb 2021 00:28:35 -0800 Subject: [PATCH] remove `block_on` in tcp-listener --- Cargo.toml | 6 ++-- src/reader.rs | 4 +-- src/tcp/listener.rs | 76 +++++++++++++++++++++++++++------------------ 3 files changed, 52 insertions(+), 34 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2e7b1c9..3ce2e53 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,10 +27,12 @@ members = [ tls = ["async-tls", "rustls"] [dependencies] -anyhow = "1.0.31" +anyhow = "1.0" async-std = { version = "1.9.0", features = ["unstable"] } +async-stream = "0.3.0" bytes = "0.5.5" -futures = "0.3.8" +futures = "0.3" +futures-lite = "1.11" log = "0.4" async-tls = { version = "0.11.0", default-features = false, features = ["client", "server"], optional = true } diff --git a/src/reader.rs b/src/reader.rs index b046c84..b1b7248 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -1,14 +1,14 @@ +use crate::protocol::ConnectDatagram; use async_std::net::SocketAddr; use async_std::pin::Pin; use bytes::{Buf, BytesMut}; use futures::task::{Context, Poll}; use futures::{AsyncRead, Stream}; use log::*; +use std::io::Cursor; -use crate::protocol::ConnectDatagram; pub use futures::SinkExt; pub use futures::StreamExt; -use std::io::Cursor; /// A default buffer size to read in bytes and then deserialize as messages. const BUFFER_SIZE: usize = 8192; diff --git a/src/tcp/listener.rs b/src/tcp/listener.rs index 997975a..71a876e 100644 --- a/src/tcp/listener.rs +++ b/src/tcp/listener.rs @@ -1,8 +1,10 @@ use crate::Connection; -use async_std::net::{SocketAddr, TcpListener as AsyncListener, ToSocketAddrs}; +use async_std::net::{SocketAddr, TcpListener as AsyncListener, ToSocketAddrs, TcpStream}; use async_std::pin::Pin; use async_std::task::{Context, Poll}; -use futures::{Stream, StreamExt}; +use async_stream::stream; +use futures::Stream; +use futures_lite::StreamExt; use log::*; /// Listens on a bound socket for incoming TCP connections to be handled as independent @@ -25,7 +27,8 @@ use log::*; #[allow(dead_code)] pub struct TcpListener { local_addrs: SocketAddr, - listener: AsyncListener, + // listener: AsyncListener, + conn_stream: Pin>> + Send + Sync>>, } impl TcpListener { @@ -43,38 +46,47 @@ impl TcpListener { let listener = AsyncListener::bind(&ip_addrs).await?; info!("Started TCP server at {}", &ip_addrs); + let local_addrs = listener.local_addr()?; + + let stream = Box::pin(stream! { + loop { + yield listener.incoming().next().await; + } + }); + Ok(Self { - local_addrs: listener.local_addr()?, - listener, + local_addrs, + // listener, + conn_stream: stream, }) } - /// Creates a [`Connection`] for the next `accept`ed TCP connection at the bound socket. - /// - /// # Example - /// - /// Basic usage: - /// - /// ```ignore - /// let mut server = TcpListener::bind("127.0.0.1:3456").await?; - /// while let Ok(mut conn) = server.accept().await? { - /// // handle the connection - /// } - /// ``` - pub async fn accept(&self) -> anyhow::Result { - let (stream, ip_addr) = self.listener.accept().await?; - debug!("Received connection attempt from {}", ip_addr); - - Ok(Connection::from(stream)) - } + // /// Creates a [`Connection`] for the next `accept`ed TCP connection at the bound socket. + // /// + // /// # Example + // /// + // /// Basic usage: + // /// + // /// ```ignore + // /// let mut server = TcpListener::bind("127.0.0.1:3456").await?; + // /// while let Ok(mut conn) = server.accept().await? { + // /// // handle the connection + // /// } + // /// ``` + // pub async fn accept(&self) -> anyhow::Result { + // let (stream, ip_addr) = self.listener.accept().await?; + // debug!("Received connection attempt from {}", ip_addr); + // + // Ok(Connection::from(stream)) + // } } impl Stream for TcpListener { type Item = Connection; - fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - match futures::executor::block_on(self.listener.incoming().next()) { - Some(Ok(tcp_stream)) => { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.conn_stream.poll_next(cx) { + Poll::Ready(Some(Some(Ok(tcp_stream)))) => { let peer_addr = tcp_stream .peer_addr() .expect("Could not retrieve peer IP address"); @@ -83,15 +95,19 @@ impl Stream for TcpListener { Poll::Ready(Some(Connection::from(tcp_stream))) } - Some(Err(e)) => { + Poll::Ready(Some(Some(Err(err)))) => { error!( "Encountered error when trying to accept new connection {}", - e + err ); - Poll::Pending + Poll::Ready(None) } - None => Poll::Ready(None), + Poll::Ready(Some(None)) => Poll::Ready(None), + + Poll::Ready(None) => Poll::Ready(None), + + Poll::Pending => Poll::Pending, } } } -- 2.44.0