]> git.lizzy.rs Git - connect-rs.git/commitdiff
renamed server to listener and add thorough documentation
authorSachandhan Ganesh <sachan.ganesh@gmail.com>
Thu, 4 Feb 2021 08:42:01 +0000 (00:42 -0800)
committerSachandhan Ganesh <sachan.ganesh@gmail.com>
Thu, 4 Feb 2021 08:42:01 +0000 (00:42 -0800)
18 files changed:
README.md
examples/tcp-client/README.md
examples/tcp-echo-server/README.md
examples/tcp-echo-server/src/main.rs
examples/tls-client/README.md
examples/tls-echo-server/README.md
examples/tls-echo-server/src/main.rs
src/lib.rs
src/reader.rs
src/tcp/client.rs
src/tcp/listener.rs [new file with mode: 0644]
src/tcp/mod.rs
src/tcp/server.rs [deleted file]
src/tls/client.rs
src/tls/listener.rs [new file with mode: 0644]
src/tls/mod.rs
src/tls/server.rs [deleted file]
src/writer.rs

index 4f6cbe0593baa5d584a0f0f7499453192f625d61..abed80675ebca1dd4f1300a496ca3245e59144e5 100644 (file)
--- a/README.md
+++ b/README.md
@@ -16,3 +16,6 @@ By using a message queue, crate users can focus on sending and receiving message
 | SCTP Server                                          |               |
 | DTLS-SCTP Client                                     |               |
 | DTLS-SCTP Server                                     |               |
+
+
+## Why Protobuf?
\ No newline at end of file
index b25cbba03d6995166f6651474f311063cff90fef..1afa04b3bd45fe8d000748aac8bbe2d13ee6f90d 100644 (file)
@@ -1,4 +1,4 @@
-# seam-channel tcp-client example
+# connect tcp-client example
 
 This example program will:
 
index 72695aefdb419ac086ef35bba8ff056ca2d2bf13..c13289c312a180cbaf987afddad3a2c3e88bc523 100644 (file)
@@ -1,8 +1,8 @@
-# seam-channel tcp-echo-server example
+# connect tcp-echo-server example
 
 This example program will:
 
-1. Bind to an IP address
+1. Bind to an IP address and port
 2. Accept any number of TCP connections
 3. Handle each connection by:
     1. Waiting for `String` messages to be received
index f480928a872910c376f2216b2ef85a8da2b2bcba..4d7bfded05aef2e4583715afe7bc3f57b0c05812 100644 (file)
@@ -2,7 +2,7 @@ mod schema;
 
 use crate::schema::hello_world::HelloWorld;
 use async_std::task;
-use connect::tcp::TcpServer;
+use connect::tcp::TcpListener;
 use connect::{SinkExt, StreamExt};
 use log::*;
 use std::env;
@@ -23,7 +23,7 @@ async fn main() -> anyhow::Result<()> {
     };
 
     // create a server
-    let mut server = TcpServer::new(ip_address).await?;
+    let mut server = TcpListener::bind(ip_address).await?;
 
     // handle server connections
     // wait for a connection to come in and be accepted
index a55c0b83e7672c6c8e87b750ca1069336cc1f703..8b44f9caffd936edd1049533de37b63c95029512 100644 (file)
@@ -1,4 +1,4 @@
-# seam-channel tls-client example
+# connect tls-client example
 
 This example program will:
 
index eeafd03012e993d557b50ad765faa3317d3d0ef7..3d02137f5fca55f38c65f2a388c9cdae8abe19e0 100644 (file)
@@ -1,8 +1,8 @@
-# seam-channel tls-echo-server example
+# connect tls-echo-server example
 
 This example program will:
 
-1. Bind to an IP address
+1. Bind to an IP address and port
 2. Accept any number of secure TLS connections
 3. Handle each connection by:
     1. Waiting for `String` messages to be received
index 74326afc863602d5a19978d30d120a2bbe12fd56..bf9d476f2579363c4b35d231e22c0a89c0f9a94e 100644 (file)
@@ -4,7 +4,7 @@ use crate::schema::hello_world::HelloWorld;
 use async_std::{io, task};
 use connect::tls::rustls::internal::pemfile::{certs, rsa_private_keys};
 use connect::tls::rustls::{NoClientAuth, ServerConfig};
-use connect::tls::TlsServer;
+use connect::tls::TlsListener;
 use connect::{SinkExt, StreamExt};
 use log::*;
 use std::env;
@@ -30,40 +30,34 @@ async fn main() -> anyhow::Result<()> {
         .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
 
     // create a server
-    let mut server = TlsServer::new(ip_address, config.into()).await?;
+    let mut server = TlsListener::bind(ip_address, config.into()).await?;
 
     // handle server connections
     // wait for a connection to come in and be accepted
-    loop {
-        match server.next().await {
-            Some(mut conn) => {
-                info!("Handling connection from {}", conn.peer_addr());
-
-                task::spawn(async move {
-                    while let Some(msg) = conn.reader().next().await {
-                        if msg.is::<HelloWorld>() {
-                            if let Ok(Some(contents)) = msg.unpack::<HelloWorld>() {
-                                info!(
-                                    "Received a message \"{}\" from {}",
-                                    contents.get_message(),
-                                    conn.peer_addr()
-                                );
-
-                                conn.writer()
-                                    .send(contents)
-                                    .await
-                                    .expect("Could not send message back to source connection");
-                                info!("Sent message back to original sender");
-                            }
-                        } else {
-                            error!("Received a message of unknown type")
-                        }
+    while let Some(mut conn) = server.next().await {
+        info!("Handling connection from {}", conn.peer_addr());
+
+        task::spawn(async move {
+            while let Some(msg) = conn.reader().next().await {
+                if msg.is::<HelloWorld>() {
+                    if let Ok(Some(contents)) = msg.unpack::<HelloWorld>() {
+                        info!(
+                            "Received a message \"{}\" from {}",
+                            contents.get_message(),
+                            conn.peer_addr()
+                        );
+
+                        conn.writer()
+                            .send(contents)
+                            .await
+                            .expect("Could not send message back to source connection");
+                        info!("Sent message back to original sender");
                     }
-                });
+                } else {
+                    error!("Received a message of unknown type")
+                }
             }
-
-            None => break,
-        }
+        });
     }
 
     Ok(())
index 8a71a3760c734782b8d4e2244229a06d3e75fd33..85500272bcff76f5c719b6facc65ce334089f120 100644 (file)
@@ -1,3 +1,23 @@
+//! This crate provides a reliable, fault-tolerant, and brokerless message-queue abstraction over
+//! asynchronous network streams.
+//!
+//! # Why?
+//! When building networked applications, developers shouldn't have to focus on repeatedly solving
+//! the problem of reliable, fault-tolerant message delivery over byte-streams. By using a message
+//! queue abstraction, crate users can focus on core application logic and leave the low-level
+//! networking and message-queue guarantees to the abstraction.
+//!
+//! # Protobuf
+//! This crate relies on the use of [Protocol Buffers](https://developers.google.com/protocol-buffers)
+//! due to it being widely adopted and industry-proven. All messages are Protobuf messages that
+//! are packed into a Protobuf `Any` type and then sent over the wire. Message recipients must
+//! decide what Protobuf message type it is, and correspondingly unpack the `Any` into a particular
+//! message type.
+//!
+//! # Examples
+//! Please use the [examples](https://github.com/sachanganesh/connect-rs/tree/main/examples)
+//! provided to help understand crate usage.
+
 mod reader;
 pub(crate) mod schema;
 pub mod tcp;
@@ -10,6 +30,8 @@ use async_std::{net::SocketAddr, pin::Pin};
 use futures::{AsyncRead, AsyncWrite};
 pub use futures::{SinkExt, StreamExt};
 
+/// Wrapper around a [`ConnectionReader`] and [`ConnectionWriter`] to read and write on a network
+/// connection
 pub struct Connection {
     local_addr: SocketAddr,
     peer_addr: SocketAddr,
@@ -33,18 +55,25 @@ impl Connection {
         }
     }
 
+    /// Get the local IP address and port
     pub fn local_addr(&self) -> SocketAddr {
         self.local_addr.clone()
     }
 
+    /// Get the peer IP address and port
     pub fn peer_addr(&self) -> SocketAddr {
         self.peer_addr.clone()
     }
 
+    /// Consume the [`Connection`] to split into separate [`ConnectionReader`] and
+    /// [`ConnectionWriter`] halves
+    ///
+    /// [`Connection`]s are split when reading and writing must be concurrent operations.
     pub fn split(self) -> (ConnectionReader, ConnectionWriter) {
         (self.reader, self.writer)
     }
 
+    /// Re-wrap the [`ConnectionReader`] and [`ConnectionWriter`] halves into a [`Connection`]
     pub fn join(reader: ConnectionReader, writer: ConnectionWriter) -> Self {
         Self {
             local_addr: reader.local_addr(),
@@ -54,14 +83,17 @@ impl Connection {
         }
     }
 
+    /// Get mutable access to the underlying [`ConnectionReader`]
     pub fn reader(&mut self) -> &mut ConnectionReader {
         &mut self.reader
     }
 
+    /// Get mutable access to the underlying [`ConnectionWriter`]
     pub fn writer(&mut self) -> &mut ConnectionWriter {
         &mut self.writer
     }
 
+    /// Close the connection by closing both the reading and writing halves
     pub async fn close(self) -> SocketAddr {
         let peer_addr = self.peer_addr();
         let (reader, writer) = self.split();
index c859dd510d9afe9a7811a87654146b9383a4297b..428f6d19902f4c7afc614e8b1f66704d39a117e0 100644 (file)
@@ -12,8 +12,27 @@ pub use futures::SinkExt;
 pub use futures::StreamExt;
 use protobuf::well_known_types::Any;
 
+/// A default buffer size to read in bytes and then deserialize as messages
 const BUFFER_SIZE: usize = 8192;
 
+/// An interface to read messages from the network connection
+///
+/// Implements the [`Stream`] trait to asynchronously read messages from the network connection.
+///
+/// # Example
+///
+/// Basic usage:
+///
+/// ```ignore
+/// while let Some(msg) = reader.next().await {
+///   // handle the received message
+/// }
+/// ```
+///
+/// Please see the [tcp-client](https://github.com/sachanganesh/connect-rs/blob/main/examples/tcp-client/)
+/// example program or other client example programs for a more thorough showcase.
+///
+
 pub struct ConnectionReader {
     local_addr: SocketAddr,
     peer_addr: SocketAddr,
@@ -23,6 +42,8 @@ pub struct ConnectionReader {
 }
 
 impl ConnectionReader {
+    /// Creates a new [`ConnectionReader`] from an [`AsyncRead`] trait object and the local and peer
+    /// socket metadata
     pub fn new(
         local_addr: SocketAddr,
         peer_addr: SocketAddr,
@@ -37,14 +58,17 @@ impl ConnectionReader {
         }
     }
 
+    /// Get the local IP address and port
     pub fn local_addr(&self) -> SocketAddr {
         self.local_addr.clone()
     }
 
+    /// Get the peer IP address and port
     pub fn peer_addr(&self) -> SocketAddr {
         self.peer_addr.clone()
     }
 
+    /// Check if the [`Stream`] of messages from the network is closed
     pub fn is_closed(&self) -> bool {
         self.closed
     }
index 221825da27399ad7ab477a13be1a979b7179abcf..28ae2a3a54f2462ed530650d2cf9ac48ae032efb 100644 (file)
@@ -4,6 +4,15 @@ use crate::Connection;
 use async_std::net::{TcpStream, ToSocketAddrs};
 
 impl Connection {
+    /// Creates a [`Connection`] that uses a TCP transport
+    ///
+    /// # Example
+    ///
+    /// Basic usage:
+    ///
+    /// ```ignore
+    /// let mut conn = Connection::tcp_client("127.0.0.1:3456").await?;
+    /// ```
     pub async fn tcp_client<A: ToSocketAddrs + std::fmt::Display>(
         ip_addrs: A,
     ) -> anyhow::Result<Self> {
@@ -16,6 +25,7 @@ impl Connection {
 }
 
 impl From<TcpStream> for Connection {
+    /// Creates a [`Connection`] using a TCP transport from an async [`TcpStream`].
     fn from(stream: TcpStream) -> Self {
         let write_stream = stream.clone();
 
diff --git a/src/tcp/listener.rs b/src/tcp/listener.rs
new file mode 100644 (file)
index 0000000..997975a
--- /dev/null
@@ -0,0 +1,97 @@
+use crate::Connection;
+use async_std::net::{SocketAddr, TcpListener as AsyncListener, ToSocketAddrs};
+use async_std::pin::Pin;
+use async_std::task::{Context, Poll};
+use futures::{Stream, StreamExt};
+use log::*;
+
+/// Listens on a bound socket for incoming TCP connections to be handled as independent
+/// [`Connection`]s.
+///
+/// Implements the [`Stream`] trait to asynchronously accept incoming TCP connections.
+///
+/// # Example
+///
+/// Basic usage:
+///
+/// ```ignore
+/// let mut server = TcpListener::bind(ip_address).await?;
+///
+/// // wait for a connection to come in and be accepted
+/// while let Some(mut conn) = server.next().await {
+///     // do something with connection
+/// }
+/// ```
+#[allow(dead_code)]
+pub struct TcpListener {
+    local_addrs: SocketAddr,
+    listener: AsyncListener,
+}
+
+impl TcpListener {
+    /// Creates a [`TcpListener`] by binding to an IP address and port and listens for incoming TCP
+    /// connections.
+    ///
+    /// # Example
+    ///
+    /// Basic usage:
+    ///
+    /// ```ignore
+    /// let mut server = TcpListener::bind("127.0.0.1:3456").await?;
+    /// ```
+    pub async fn bind<A: ToSocketAddrs + std::fmt::Display>(ip_addrs: A) -> anyhow::Result<Self> {
+        let listener = AsyncListener::bind(&ip_addrs).await?;
+        info!("Started TCP server at {}", &ip_addrs);
+
+        Ok(Self {
+            local_addrs: listener.local_addr()?,
+            listener,
+        })
+    }
+
+    /// Creates a [`Connection`] for the next `accept`ed TCP connection at the bound socket.
+    ///
+    /// # Example
+    ///
+    /// Basic usage:
+    ///
+    /// ```ignore
+    /// let mut server = TcpListener::bind("127.0.0.1:3456").await?;
+    /// while let Ok(mut conn) = server.accept().await? {
+    ///     // handle the connection
+    /// }
+    /// ```
+    pub async fn accept(&self) -> anyhow::Result<Connection> {
+        let (stream, ip_addr) = self.listener.accept().await?;
+        debug!("Received connection attempt from {}", ip_addr);
+
+        Ok(Connection::from(stream))
+    }
+}
+
+impl Stream for TcpListener {
+    type Item = Connection;
+
+    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        match futures::executor::block_on(self.listener.incoming().next()) {
+            Some(Ok(tcp_stream)) => {
+                let peer_addr = tcp_stream
+                    .peer_addr()
+                    .expect("Could not retrieve peer IP address");
+                debug!("Received connection attempt from {}", peer_addr);
+
+                Poll::Ready(Some(Connection::from(tcp_stream)))
+            }
+
+            Some(Err(e)) => {
+                error!(
+                    "Encountered error when trying to accept new connection {}",
+                    e
+                );
+                Poll::Pending
+            }
+
+            None => Poll::Ready(None),
+        }
+    }
+}
index fbc24b29449e141e9f9522f6c285330cc93e4ac9..654f1ca3112dbc372ba7bb2f1f5adb07f92d3c1d 100644 (file)
@@ -1,5 +1,15 @@
+//! TCP transport client and listener implementations.
+//!
+//! <br/>
+//!
+//! This module primarily exposes the TCP client implementation over a [`Connection`] type and the
+//! TCP listener implementation as [`TcpListener`].
+
+#[allow(unused_imports)]
+pub(crate) use crate::Connection;
+
 pub(crate) mod client;
-pub(crate) mod server;
+pub(crate) mod listener;
 
 pub use client::*;
-pub use server::*;
+pub use listener::*;
diff --git a/src/tcp/server.rs b/src/tcp/server.rs
deleted file mode 100644 (file)
index 00c766e..0000000
+++ /dev/null
@@ -1,58 +0,0 @@
-use crate::Connection;
-use async_std::net::{SocketAddr, TcpListener, ToSocketAddrs};
-use async_std::pin::Pin;
-use async_std::task::{Context, Poll};
-use futures::{Stream, StreamExt};
-use log::*;
-
-#[allow(dead_code)]
-pub struct TcpServer {
-    local_addrs: SocketAddr,
-    listener: TcpListener,
-}
-
-impl TcpServer {
-    pub async fn new<A: ToSocketAddrs + std::fmt::Display>(ip_addrs: A) -> anyhow::Result<Self> {
-        let listener = TcpListener::bind(&ip_addrs).await?;
-        info!("Started TCP server at {}", &ip_addrs);
-
-        Ok(Self {
-            local_addrs: listener.local_addr()?,
-            listener,
-        })
-    }
-
-    pub async fn accept(&self) -> anyhow::Result<Connection> {
-        let (stream, ip_addr) = self.listener.accept().await?;
-        debug!("Received connection attempt from {}", ip_addr);
-
-        Ok(Connection::from(stream))
-    }
-}
-
-impl Stream for TcpServer {
-    type Item = Connection;
-
-    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
-        match futures::executor::block_on(self.listener.incoming().next()) {
-            Some(Ok(tcp_stream)) => {
-                let peer_addr = tcp_stream
-                    .peer_addr()
-                    .expect("Could not retrieve peer IP address");
-                debug!("Received connection attempt from {}", peer_addr);
-
-                Poll::Ready(Some(Connection::from(tcp_stream)))
-            }
-
-            Some(Err(e)) => {
-                error!(
-                    "Encountered error when trying to accept new connection {}",
-                    e
-                );
-                Poll::Pending
-            }
-
-            None => Poll::Ready(None),
-        }
-    }
-}
index 60acdad99aad7ad990b43f539ffd782542371364..1fa8d08fce43b90eb0276b5d8d9c936732c63307 100644 (file)
@@ -1,26 +1,25 @@
+use async_std::net::{TcpStream, ToSocketAddrs};
+use async_tls::client;
 use async_tls::TlsConnector;
+use futures::AsyncReadExt;
 use log::*;
 
+use crate::tls::TlsConnectionMetadata;
 use crate::Connection;
-use async_std::net::{SocketAddr, TcpStream, ToSocketAddrs};
-use async_tls::client;
-use async_tls::server;
-use futures::AsyncReadExt;
-
-pub enum TlsConnectionMetadata {
-    Client {
-        local_addr: SocketAddr,
-        peer_addr: SocketAddr,
-        stream: client::TlsStream<TcpStream>,
-    },
-    Server {
-        local_addr: SocketAddr,
-        peer_addr: SocketAddr,
-        stream: server::TlsStream<TcpStream>,
-    },
-}
 
 impl Connection {
+    /// Creates a [`Connection`] that uses a TLS transport
+    ///
+    /// # Example
+    ///
+    /// Basic usage:
+    ///
+    /// ```ignore
+    /// let mut conn = Connection::tls_client("127.0.0.1:3456", "localhost", client_config.into()).await?;
+    /// ```
+    ///
+    /// Please see the [tls-client](https://github.com/sachanganesh/connect-rs/blob/main/examples/tls-client/src/main.rs)
+    /// example program for a more thorough showcase.
     pub async fn tls_client<A: ToSocketAddrs + std::fmt::Display>(
         ip_addrs: A,
         domain: &str,
@@ -46,6 +45,7 @@ impl Connection {
 }
 
 impl From<TlsConnectionMetadata> for Connection {
+    /// Creates a [`Connection`] using a TLS transport from [`TlsConnectionMetadata`].
     fn from(metadata: TlsConnectionMetadata) -> Self {
         match metadata {
             TlsConnectionMetadata::Client {
@@ -63,7 +63,7 @@ impl From<TlsConnectionMetadata> for Connection {
                 )
             }
 
-            TlsConnectionMetadata::Server {
+            TlsConnectionMetadata::Listener {
                 local_addr,
                 peer_addr,
                 stream,
diff --git a/src/tls/listener.rs b/src/tls/listener.rs
new file mode 100644 (file)
index 0000000..047dcae
--- /dev/null
@@ -0,0 +1,132 @@
+use crate::tls::TlsConnectionMetadata;
+use crate::Connection;
+use async_std::net::{SocketAddr, TcpListener, ToSocketAddrs};
+use async_std::pin::Pin;
+use async_std::task::{Context, Poll};
+use async_tls::TlsAcceptor;
+use futures::{Stream, StreamExt};
+use log::*;
+
+/// Listens on a bound socket for incoming TLS connections to be handled as independent
+/// [`Connection`]s.
+///
+/// Implements the [`Stream`] trait to asynchronously accept incoming TLS connections.
+///
+/// # Example
+///
+/// Basic usage:
+///
+/// ```ignore
+/// let mut server = TlsListener::bind("127.0.0.1:3456", config.into()).await?;
+///
+/// // wait for a connection to come in and be accepted
+/// while let Some(mut conn) = server.next().await {
+///     // do something with connection
+/// }
+/// ```
+#[allow(dead_code)]
+pub struct TlsListener {
+    local_addrs: SocketAddr,
+    listener: TcpListener,
+    acceptor: TlsAcceptor,
+}
+
+impl TlsListener {
+    /// Creates a [`TlsListener`] by binding to an IP address and port and listens for incoming TLS
+    /// connections that have successfully been accepted.
+    ///
+    /// # Example
+    ///
+    /// Basic usage:
+    ///
+    /// ```ignore
+    /// let mut server = TlsListener::bind("127.0.0.1:3456", config.into()).await?;
+    /// ```
+    pub async fn bind<A: ToSocketAddrs + std::fmt::Display>(
+        ip_addrs: A,
+        acceptor: TlsAcceptor,
+    ) -> anyhow::Result<Self> {
+        let listener = TcpListener::bind(ip_addrs).await?;
+        info!("Started TLS server at {}", listener.local_addr()?);
+
+        Ok(Self {
+            local_addrs: listener.local_addr()?,
+            listener,
+            acceptor,
+        })
+    }
+
+    /// Creates a [`Connection`] for the next `accept`ed TCP connection at the bound socket.
+    ///
+    /// # Example
+    ///
+    /// Basic usage:
+    ///
+    /// ```ignore
+    /// let mut server = TlsListener::bind("127.0.0.1:3456", config.into()).await?;
+    /// while let Some(mut conn) = server.next().await {
+    ///     // do something with connection
+    /// }
+    /// ```
+    pub async fn accept(&self) -> anyhow::Result<Connection> {
+        let (tcp_stream, peer_addr) = self.listener.accept().await?;
+        debug!("Received connection attempt from {}", peer_addr);
+
+        match self.acceptor.accept(tcp_stream).await {
+            Ok(tls_stream) => {
+                debug!("Completed TLS handshake with {}", peer_addr);
+                Ok(Connection::from(TlsConnectionMetadata::Listener {
+                    local_addr: self.local_addrs.clone(),
+                    peer_addr,
+                    stream: tls_stream,
+                }))
+            }
+
+            Err(e) => {
+                warn!("Could not encrypt connection with TLS from {}", peer_addr);
+                Err(anyhow::Error::new(e))
+            }
+        }
+    }
+}
+
+impl Stream for TlsListener {
+    type Item = Connection;
+
+    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        match futures::executor::block_on(self.listener.incoming().next()) {
+            Some(Ok(tcp_stream)) => {
+                let peer_addr = tcp_stream
+                    .peer_addr()
+                    .expect("Could not retrieve peer IP address");
+                debug!("Received connection attempt from {}", peer_addr);
+
+                match futures::executor::block_on(self.acceptor.accept(tcp_stream)) {
+                    Ok(tls_stream) => {
+                        debug!("Completed TLS handshake with {}", peer_addr);
+                        Poll::Ready(Some(Connection::from(TlsConnectionMetadata::Listener {
+                            local_addr: self.local_addrs.clone(),
+                            peer_addr,
+                            stream: tls_stream,
+                        })))
+                    }
+
+                    Err(_e) => {
+                        warn!("Could not encrypt connection with TLS from {}", peer_addr);
+                        Poll::Pending
+                    }
+                }
+            }
+
+            Some(Err(e)) => {
+                error!(
+                    "Encountered error when trying to accept new connection {}",
+                    e
+                );
+                Poll::Pending
+            }
+
+            None => Poll::Ready(None),
+        }
+    }
+}
index da8703d5c9fbf2b1fd60884a3e959bfcd60275f9..dfc535fcd99d562f5dbb155a78e42b175b5445cc 100644 (file)
@@ -1,8 +1,41 @@
-pub(crate) mod client;
-pub(crate) mod server;
+//! TLS transport client and listener implementations.
+//!
+//! <br/>
+//!
+//! This module primarily exposes the TLS client implementation over a [`Connection`] type and the
+//! TLS listener implementation as [`TlsListener`].
+//!
 
-pub use client::*;
-pub use server::*;
+#[allow(unused_imports)]
+pub(crate) use crate::Connection;
+
+pub(crate) mod client;
+pub(crate) mod listener;
 
 pub use async_tls;
+pub use client::*;
+pub use listener::*;
 pub use rustls;
+
+use async_std::net::TcpStream;
+use async_tls::server;
+use std::net::SocketAddr;
+
+/// Used to differentiate between an outgoing connection ([`TlsConnectionMetadata::Client`]) or
+/// incoming connection listener ([`TlsConnectionMetadata::Listener`]).
+///
+/// The async TLS library used by this crate has two differing stream types based on whether the
+/// connection being established is either a client or server. This is to aid with handling that
+/// distinction during connection instantiation.
+pub enum TlsConnectionMetadata {
+    Client {
+        local_addr: SocketAddr,
+        peer_addr: SocketAddr,
+        stream: async_tls::client::TlsStream<TcpStream>,
+    },
+    Listener {
+        local_addr: SocketAddr,
+        peer_addr: SocketAddr,
+        stream: server::TlsStream<TcpStream>,
+    },
+}
diff --git a/src/tls/server.rs b/src/tls/server.rs
deleted file mode 100644 (file)
index 55647f0..0000000
+++ /dev/null
@@ -1,93 +0,0 @@
-use crate::tls::TlsConnectionMetadata;
-use crate::Connection;
-use async_std::net::{SocketAddr, TcpListener, ToSocketAddrs};
-use async_std::pin::Pin;
-use async_std::task::{Context, Poll};
-use async_tls::TlsAcceptor;
-use futures::{Stream, StreamExt};
-use log::*;
-
-#[allow(dead_code)]
-pub struct TlsServer {
-    local_addrs: SocketAddr,
-    listener: TcpListener,
-    acceptor: TlsAcceptor,
-}
-
-impl TlsServer {
-    pub async fn new<A: ToSocketAddrs + std::fmt::Display>(
-        ip_addrs: A,
-        acceptor: TlsAcceptor,
-    ) -> anyhow::Result<Self> {
-        let listener = TcpListener::bind(ip_addrs).await?;
-        info!("Started TLS server at {}", listener.local_addr()?);
-
-        Ok(Self {
-            local_addrs: listener.local_addr()?,
-            listener,
-            acceptor,
-        })
-    }
-
-    pub async fn accept(&self) -> anyhow::Result<Connection> {
-        let (tcp_stream, peer_addr) = self.listener.accept().await?;
-        debug!("Received connection attempt from {}", peer_addr);
-
-        match self.acceptor.accept(tcp_stream).await {
-            Ok(tls_stream) => {
-                debug!("Completed TLS handshake with {}", peer_addr);
-                Ok(Connection::from(TlsConnectionMetadata::Server {
-                    local_addr: self.local_addrs.clone(),
-                    peer_addr,
-                    stream: tls_stream,
-                }))
-            }
-
-            Err(e) => {
-                warn!("Could not encrypt connection with TLS from {}", peer_addr);
-                Err(anyhow::Error::new(e))
-            }
-        }
-    }
-}
-
-impl Stream for TlsServer {
-    type Item = Connection;
-
-    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
-        match futures::executor::block_on(self.listener.incoming().next()) {
-            Some(Ok(tcp_stream)) => {
-                let peer_addr = tcp_stream
-                    .peer_addr()
-                    .expect("Could not retrieve peer IP address");
-                debug!("Received connection attempt from {}", peer_addr);
-
-                match futures::executor::block_on(self.acceptor.accept(tcp_stream)) {
-                    Ok(tls_stream) => {
-                        debug!("Completed TLS handshake with {}", peer_addr);
-                        Poll::Ready(Some(Connection::from(TlsConnectionMetadata::Server {
-                            local_addr: self.local_addrs.clone(),
-                            peer_addr,
-                            stream: tls_stream,
-                        })))
-                    }
-
-                    Err(_e) => {
-                        warn!("Could not encrypt connection with TLS from {}", peer_addr);
-                        Poll::Pending
-                    }
-                }
-            }
-
-            Some(Err(e)) => {
-                error!(
-                    "Encountered error when trying to accept new connection {}",
-                    e
-                );
-                Poll::Pending
-            }
-
-            None => Poll::Ready(None),
-        }
-    }
-}
index 24030249110beb3768a0211e46654c779e488d71..9a6f20af85ab3e3bc43243b94e29a325141bc4a1 100644 (file)
@@ -11,6 +11,21 @@ use protobuf::Message;
 pub use futures::SinkExt;
 pub use futures::StreamExt;
 
+/// An interface to write messages to the network connection
+///
+/// Implements the [`Sink`] trait to asynchronously write messages to the network connection.
+///
+/// # Example
+///
+/// Basic usage:
+///
+/// ```ignore
+/// writer.send(msg).await?;
+/// ```
+///
+/// Please see the [tcp-client](https://github.com/sachanganesh/connect-rs/blob/main/examples/tcp-client/)
+/// example program or other client example programs for a more thorough showcase.
+///
 pub struct ConnectionWriter {
     local_addr: SocketAddr,
     peer_addr: SocketAddr,
@@ -20,6 +35,8 @@ pub struct ConnectionWriter {
 }
 
 impl ConnectionWriter {
+    /// Creates a new [`ConnectionWriter`] from an [`AsyncWrite`] trait object and the local and peer
+    /// socket metadata
     pub fn new(
         local_addr: SocketAddr,
         peer_addr: SocketAddr,
@@ -34,14 +51,17 @@ impl ConnectionWriter {
         }
     }
 
+    /// Get the local IP address and port
     pub fn local_addr(&self) -> SocketAddr {
         self.local_addr.clone()
     }
 
+    /// Get the peer IP address and port
     pub fn peer_addr(&self) -> SocketAddr {
         self.peer_addr.clone()
     }
 
+    /// Check if the [`Sink`] of messages to the network is closed
     pub fn is_closed(&self) -> bool {
         self.closed
     }