]> git.lizzy.rs Git - connect-rs.git/commitdiff
bring back Stream impl with block_on in poll_next for functionality
authorSachandhan Ganesh <sachan.ganesh@gmail.com>
Thu, 21 Jan 2021 02:21:45 +0000 (18:21 -0800)
committerSachandhan Ganesh <sachan.ganesh@gmail.com>
Thu, 21 Jan 2021 02:21:45 +0000 (18:21 -0800)
examples/tcp-echo-server/src/main.rs
examples/tls-echo-server/src/main.rs
src/tcp/server.rs
src/tls/server.rs

index 1c6f588ca710edd3f8904f653145a9f9210a8336..f480928a872910c376f2216b2ef85a8da2b2bcba 100644 (file)
@@ -23,11 +23,11 @@ async fn main() -> anyhow::Result<()> {
     };
 
     // create a server
-    let server = TcpServer::new(ip_address).await?;
+    let mut server = TcpServer::new(ip_address).await?;
 
     // handle server connections
     // wait for a connection to come in and be accepted
-    while let Ok(mut conn) = server.accept().await {
+    while let Some(mut conn) = server.next().await {
         info!("Handling connection from {}", conn.peer_addr());
 
         task::spawn(async move {
index 32ebf972f939d3c11678997125854d3c5c05487b..ef0cfef86330bf11a4ec6770f4f65ef458179d75 100644 (file)
@@ -30,13 +30,13 @@ async fn main() -> anyhow::Result<()> {
         .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
 
     // create a server
-    let server = TlsServer::new(ip_address, config.into()).await?;
+    let mut server = TlsServer::new(ip_address, config.into()).await?;
 
     // handle server connections
     // wait for a connection to come in and be accepted
     loop {
-        match server.accept().await {
-            Ok(Some(mut conn)) => {
+        match server.next().await {
+            Some(mut conn) => {
                 info!("Handling connection from {}", conn.peer_addr());
 
                 task::spawn(async move {
@@ -62,10 +62,7 @@ async fn main() -> anyhow::Result<()> {
                 });
             }
 
-            Ok(None) => (),
-
-            Err(e) => {
-                error!("Encountered error when accepting connection: {}", e);
+            None => {
                 break
             }
         }
index 4d22e7b4c37b7ca7141834f18e9b442ae35125ab..d7b0269d6ed9d3e07763aaf333d70bf336adeabd 100644 (file)
@@ -1,5 +1,8 @@
 use crate::Connection;
 use async_std::net::{SocketAddr, TcpListener, ToSocketAddrs};
+use async_std::pin::Pin;
+use async_std::task::{Context, Poll};
+use futures::{Stream, StreamExt};
 use log::*;
 
 #[allow(dead_code)]
@@ -26,3 +29,25 @@ impl TcpServer {
         Ok(Connection::from(stream))
     }
 }
+
+impl Stream for TcpServer {
+    type Item = Connection;
+
+    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        match futures::executor::block_on(self.listener.incoming().next()) {
+            Some(Ok(tcp_stream)) => {
+                let peer_addr = tcp_stream.peer_addr().expect("Could not retrieve peer IP address");
+                debug!("Received connection attempt from {}", peer_addr);
+
+                Poll::Ready(Some(Connection::from(tcp_stream)))
+            },
+
+            Some(Err(e)) => {
+                error!("Encountered error when trying to accept new connection {}", e);
+                Poll::Pending
+            }
+
+            None => Poll::Ready(None)
+        }
+    }
+}
index 2011730fa410e40001aa88c4f0d814ce6741244f..a2351f7347b447e1b02938126387f892f88113fa 100644 (file)
@@ -1,7 +1,10 @@
 use crate::tls::TlsConnectionMetadata;
 use crate::Connection;
-use async_std::net::*;
+use async_std::net::{SocketAddr, TcpListener, ToSocketAddrs};
+use async_std::pin::Pin;
+use async_std::task::{Context, Poll};
 use async_tls::TlsAcceptor;
+use futures::{Stream, StreamExt};
 use log::*;
 
 #[allow(dead_code)]
@@ -26,20 +29,60 @@ impl TlsServer {
         })
     }
 
-    pub async fn accept(&self) -> anyhow::Result<Option<Connection>> {
+    pub async fn accept(&self) -> anyhow::Result<Connection> {
         let (tcp_stream, peer_addr) = self.listener.accept().await?;
         debug!("Received connection attempt from {}", peer_addr);
 
-        if let Ok(tls_stream) = self.acceptor.accept(tcp_stream).await {
-            debug!("Completed TLS handshake with {}", peer_addr);
-            Ok(Some(Connection::from(TlsConnectionMetadata::Server {
-                local_addr: self.local_addrs.clone(),
-                peer_addr,
-                stream: tls_stream,
-            })))
-        } else {
-            warn!("Could not encrypt connection with TLS from {}", peer_addr);
-            Ok(None)
+        match self.acceptor.accept(tcp_stream).await {
+            Ok(tls_stream) => {
+                debug!("Completed TLS handshake with {}", peer_addr);
+                Ok(Connection::from(TlsConnectionMetadata::Server {
+                    local_addr: self.local_addrs.clone(),
+                    peer_addr,
+                    stream: tls_stream,
+                }))
+            }
+
+            Err(e) => {
+                warn!("Could not encrypt connection with TLS from {}", peer_addr);
+                Err(anyhow::Error::new(e))
+            }
+        }
+    }
+}
+
+impl Stream for TlsServer {
+    type Item = Connection;
+
+    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        match futures::executor::block_on(self.listener.incoming().next()) {
+            Some(Ok(tcp_stream)) => {
+                let peer_addr = tcp_stream.peer_addr().expect("Could not retrieve peer IP address");
+                debug!("Received connection attempt from {}", peer_addr);
+
+                match futures::executor::block_on(self.acceptor.accept(tcp_stream)) {
+                    Ok(tls_stream) => {
+                        debug!("Completed TLS handshake with {}", peer_addr);
+                        Poll::Ready(Some(Connection::from(TlsConnectionMetadata::Server {
+                            local_addr: self.local_addrs.clone(),
+                            peer_addr,
+                            stream: tls_stream,
+                        })))
+                    }
+
+                    Err(_e) => {
+                        warn!("Could not encrypt connection with TLS from {}", peer_addr);
+                        Poll::Pending
+                    }
+                }
+            },
+
+            Some(Err(e)) => {
+                error!("Encountered error when trying to accept new connection {}", e);
+                Poll::Pending
+            }
+
+            None => Poll::Ready(None)
         }
     }
 }