From bc38aad0614b9e11d91e810e8473a9d0788b9867 Mon Sep 17 00:00:00 2001 From: Sachandhan Ganesh Date: Thu, 4 Feb 2021 00:42:01 -0800 Subject: [PATCH] renamed server to listener and add thorough documentation --- README.md | 3 + examples/tcp-client/README.md | 2 +- examples/tcp-echo-server/README.md | 4 +- examples/tcp-echo-server/src/main.rs | 4 +- examples/tls-client/README.md | 2 +- examples/tls-echo-server/README.md | 4 +- examples/tls-echo-server/src/main.rs | 54 +++++++--------- src/lib.rs | 32 +++++++++ src/reader.rs | 24 +++++++ src/tcp/client.rs | 10 +++ src/tcp/listener.rs | 97 ++++++++++++++++++++++++++++ src/tcp/mod.rs | 14 +++- src/tcp/server.rs | 58 ----------------- src/tls/client.rs | 36 +++++------ src/tls/{server.rs => listener.rs} | 51 +++++++++++++-- src/tls/mod.rs | 41 ++++++++++-- src/writer.rs | 20 ++++++ 17 files changed, 330 insertions(+), 126 deletions(-) create mode 100644 src/tcp/listener.rs delete mode 100644 src/tcp/server.rs rename src/tls/{server.rs => listener.rs} (66%) diff --git a/README.md b/README.md index 4f6cbe0..abed806 100644 --- a/README.md +++ b/README.md @@ -16,3 +16,6 @@ By using a message queue, crate users can focus on sending and receiving message | SCTP Server | | | DTLS-SCTP Client | | | DTLS-SCTP Server | | + + +## Why Protobuf? \ No newline at end of file diff --git a/examples/tcp-client/README.md b/examples/tcp-client/README.md index b25cbba..1afa04b 100644 --- a/examples/tcp-client/README.md +++ b/examples/tcp-client/README.md @@ -1,4 +1,4 @@ -# seam-channel tcp-client example +# connect tcp-client example This example program will: diff --git a/examples/tcp-echo-server/README.md b/examples/tcp-echo-server/README.md index 72695ae..c13289c 100644 --- a/examples/tcp-echo-server/README.md +++ b/examples/tcp-echo-server/README.md @@ -1,8 +1,8 @@ -# seam-channel tcp-echo-server example +# connect tcp-echo-server example This example program will: -1. Bind to an IP address +1. Bind to an IP address and port 2. Accept any number of TCP connections 3. Handle each connection by: 1. Waiting for `String` messages to be received diff --git a/examples/tcp-echo-server/src/main.rs b/examples/tcp-echo-server/src/main.rs index f480928..4d7bfde 100644 --- a/examples/tcp-echo-server/src/main.rs +++ b/examples/tcp-echo-server/src/main.rs @@ -2,7 +2,7 @@ mod schema; use crate::schema::hello_world::HelloWorld; use async_std::task; -use connect::tcp::TcpServer; +use connect::tcp::TcpListener; use connect::{SinkExt, StreamExt}; use log::*; use std::env; @@ -23,7 +23,7 @@ async fn main() -> anyhow::Result<()> { }; // create a server - let mut server = TcpServer::new(ip_address).await?; + let mut server = TcpListener::bind(ip_address).await?; // handle server connections // wait for a connection to come in and be accepted diff --git a/examples/tls-client/README.md b/examples/tls-client/README.md index a55c0b8..8b44f9c 100644 --- a/examples/tls-client/README.md +++ b/examples/tls-client/README.md @@ -1,4 +1,4 @@ -# seam-channel tls-client example +# connect tls-client example This example program will: diff --git a/examples/tls-echo-server/README.md b/examples/tls-echo-server/README.md index eeafd03..3d02137 100644 --- a/examples/tls-echo-server/README.md +++ b/examples/tls-echo-server/README.md @@ -1,8 +1,8 @@ -# seam-channel tls-echo-server example +# connect tls-echo-server example This example program will: -1. Bind to an IP address +1. Bind to an IP address and port 2. Accept any number of secure TLS connections 3. Handle each connection by: 1. Waiting for `String` messages to be received diff --git a/examples/tls-echo-server/src/main.rs b/examples/tls-echo-server/src/main.rs index 74326af..bf9d476 100644 --- a/examples/tls-echo-server/src/main.rs +++ b/examples/tls-echo-server/src/main.rs @@ -4,7 +4,7 @@ use crate::schema::hello_world::HelloWorld; use async_std::{io, task}; use connect::tls::rustls::internal::pemfile::{certs, rsa_private_keys}; use connect::tls::rustls::{NoClientAuth, ServerConfig}; -use connect::tls::TlsServer; +use connect::tls::TlsListener; use connect::{SinkExt, StreamExt}; use log::*; use std::env; @@ -30,40 +30,34 @@ async fn main() -> anyhow::Result<()> { .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?; // create a server - let mut server = TlsServer::new(ip_address, config.into()).await?; + let mut server = TlsListener::bind(ip_address, config.into()).await?; // handle server connections // wait for a connection to come in and be accepted - loop { - match server.next().await { - Some(mut conn) => { - info!("Handling connection from {}", conn.peer_addr()); - - task::spawn(async move { - while let Some(msg) = conn.reader().next().await { - if msg.is::() { - if let Ok(Some(contents)) = msg.unpack::() { - info!( - "Received a message \"{}\" from {}", - contents.get_message(), - conn.peer_addr() - ); - - conn.writer() - .send(contents) - .await - .expect("Could not send message back to source connection"); - info!("Sent message back to original sender"); - } - } else { - error!("Received a message of unknown type") - } + while let Some(mut conn) = server.next().await { + info!("Handling connection from {}", conn.peer_addr()); + + task::spawn(async move { + while let Some(msg) = conn.reader().next().await { + if msg.is::() { + if let Ok(Some(contents)) = msg.unpack::() { + info!( + "Received a message \"{}\" from {}", + contents.get_message(), + conn.peer_addr() + ); + + conn.writer() + .send(contents) + .await + .expect("Could not send message back to source connection"); + info!("Sent message back to original sender"); } - }); + } else { + error!("Received a message of unknown type") + } } - - None => break, - } + }); } Ok(()) diff --git a/src/lib.rs b/src/lib.rs index 8a71a37..8550027 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,23 @@ +//! This crate provides a reliable, fault-tolerant, and brokerless message-queue abstraction over +//! asynchronous network streams. +//! +//! # Why? +//! When building networked applications, developers shouldn't have to focus on repeatedly solving +//! the problem of reliable, fault-tolerant message delivery over byte-streams. By using a message +//! queue abstraction, crate users can focus on core application logic and leave the low-level +//! networking and message-queue guarantees to the abstraction. +//! +//! # Protobuf +//! This crate relies on the use of [Protocol Buffers](https://developers.google.com/protocol-buffers) +//! due to it being widely adopted and industry-proven. All messages are Protobuf messages that +//! are packed into a Protobuf `Any` type and then sent over the wire. Message recipients must +//! decide what Protobuf message type it is, and correspondingly unpack the `Any` into a particular +//! message type. +//! +//! # Examples +//! Please use the [examples](https://github.com/sachanganesh/connect-rs/tree/main/examples) +//! provided to help understand crate usage. + mod reader; pub(crate) mod schema; pub mod tcp; @@ -10,6 +30,8 @@ use async_std::{net::SocketAddr, pin::Pin}; use futures::{AsyncRead, AsyncWrite}; pub use futures::{SinkExt, StreamExt}; +/// Wrapper around a [`ConnectionReader`] and [`ConnectionWriter`] to read and write on a network +/// connection pub struct Connection { local_addr: SocketAddr, peer_addr: SocketAddr, @@ -33,18 +55,25 @@ impl Connection { } } + /// Get the local IP address and port pub fn local_addr(&self) -> SocketAddr { self.local_addr.clone() } + /// Get the peer IP address and port pub fn peer_addr(&self) -> SocketAddr { self.peer_addr.clone() } + /// Consume the [`Connection`] to split into separate [`ConnectionReader`] and + /// [`ConnectionWriter`] halves + /// + /// [`Connection`]s are split when reading and writing must be concurrent operations. pub fn split(self) -> (ConnectionReader, ConnectionWriter) { (self.reader, self.writer) } + /// Re-wrap the [`ConnectionReader`] and [`ConnectionWriter`] halves into a [`Connection`] pub fn join(reader: ConnectionReader, writer: ConnectionWriter) -> Self { Self { local_addr: reader.local_addr(), @@ -54,14 +83,17 @@ impl Connection { } } + /// Get mutable access to the underlying [`ConnectionReader`] pub fn reader(&mut self) -> &mut ConnectionReader { &mut self.reader } + /// Get mutable access to the underlying [`ConnectionWriter`] pub fn writer(&mut self) -> &mut ConnectionWriter { &mut self.writer } + /// Close the connection by closing both the reading and writing halves pub async fn close(self) -> SocketAddr { let peer_addr = self.peer_addr(); let (reader, writer) = self.split(); diff --git a/src/reader.rs b/src/reader.rs index c859dd5..428f6d1 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -12,8 +12,27 @@ pub use futures::SinkExt; pub use futures::StreamExt; use protobuf::well_known_types::Any; +/// A default buffer size to read in bytes and then deserialize as messages const BUFFER_SIZE: usize = 8192; +/// An interface to read messages from the network connection +/// +/// Implements the [`Stream`] trait to asynchronously read messages from the network connection. +/// +/// # Example +/// +/// Basic usage: +/// +/// ```ignore +/// while let Some(msg) = reader.next().await { +/// // handle the received message +/// } +/// ``` +/// +/// Please see the [tcp-client](https://github.com/sachanganesh/connect-rs/blob/main/examples/tcp-client/) +/// example program or other client example programs for a more thorough showcase. +/// + pub struct ConnectionReader { local_addr: SocketAddr, peer_addr: SocketAddr, @@ -23,6 +42,8 @@ pub struct ConnectionReader { } impl ConnectionReader { + /// Creates a new [`ConnectionReader`] from an [`AsyncRead`] trait object and the local and peer + /// socket metadata pub fn new( local_addr: SocketAddr, peer_addr: SocketAddr, @@ -37,14 +58,17 @@ impl ConnectionReader { } } + /// Get the local IP address and port pub fn local_addr(&self) -> SocketAddr { self.local_addr.clone() } + /// Get the peer IP address and port pub fn peer_addr(&self) -> SocketAddr { self.peer_addr.clone() } + /// Check if the [`Stream`] of messages from the network is closed pub fn is_closed(&self) -> bool { self.closed } diff --git a/src/tcp/client.rs b/src/tcp/client.rs index 221825d..28ae2a3 100644 --- a/src/tcp/client.rs +++ b/src/tcp/client.rs @@ -4,6 +4,15 @@ use crate::Connection; use async_std::net::{TcpStream, ToSocketAddrs}; impl Connection { + /// Creates a [`Connection`] that uses a TCP transport + /// + /// # Example + /// + /// Basic usage: + /// + /// ```ignore + /// let mut conn = Connection::tcp_client("127.0.0.1:3456").await?; + /// ``` pub async fn tcp_client( ip_addrs: A, ) -> anyhow::Result { @@ -16,6 +25,7 @@ impl Connection { } impl From for Connection { + /// Creates a [`Connection`] using a TCP transport from an async [`TcpStream`]. fn from(stream: TcpStream) -> Self { let write_stream = stream.clone(); diff --git a/src/tcp/listener.rs b/src/tcp/listener.rs new file mode 100644 index 0000000..997975a --- /dev/null +++ b/src/tcp/listener.rs @@ -0,0 +1,97 @@ +use crate::Connection; +use async_std::net::{SocketAddr, TcpListener as AsyncListener, ToSocketAddrs}; +use async_std::pin::Pin; +use async_std::task::{Context, Poll}; +use futures::{Stream, StreamExt}; +use log::*; + +/// Listens on a bound socket for incoming TCP connections to be handled as independent +/// [`Connection`]s. +/// +/// Implements the [`Stream`] trait to asynchronously accept incoming TCP connections. +/// +/// # Example +/// +/// Basic usage: +/// +/// ```ignore +/// let mut server = TcpListener::bind(ip_address).await?; +/// +/// // wait for a connection to come in and be accepted +/// while let Some(mut conn) = server.next().await { +/// // do something with connection +/// } +/// ``` +#[allow(dead_code)] +pub struct TcpListener { + local_addrs: SocketAddr, + listener: AsyncListener, +} + +impl TcpListener { + /// Creates a [`TcpListener`] by binding to an IP address and port and listens for incoming TCP + /// connections. + /// + /// # Example + /// + /// Basic usage: + /// + /// ```ignore + /// let mut server = TcpListener::bind("127.0.0.1:3456").await?; + /// ``` + pub async fn bind(ip_addrs: A) -> anyhow::Result { + let listener = AsyncListener::bind(&ip_addrs).await?; + info!("Started TCP server at {}", &ip_addrs); + + Ok(Self { + local_addrs: listener.local_addr()?, + listener, + }) + } + + /// Creates a [`Connection`] for the next `accept`ed TCP connection at the bound socket. + /// + /// # Example + /// + /// Basic usage: + /// + /// ```ignore + /// let mut server = TcpListener::bind("127.0.0.1:3456").await?; + /// while let Ok(mut conn) = server.accept().await? { + /// // handle the connection + /// } + /// ``` + pub async fn accept(&self) -> anyhow::Result { + let (stream, ip_addr) = self.listener.accept().await?; + debug!("Received connection attempt from {}", ip_addr); + + Ok(Connection::from(stream)) + } +} + +impl Stream for TcpListener { + type Item = Connection; + + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + match futures::executor::block_on(self.listener.incoming().next()) { + Some(Ok(tcp_stream)) => { + let peer_addr = tcp_stream + .peer_addr() + .expect("Could not retrieve peer IP address"); + debug!("Received connection attempt from {}", peer_addr); + + Poll::Ready(Some(Connection::from(tcp_stream))) + } + + Some(Err(e)) => { + error!( + "Encountered error when trying to accept new connection {}", + e + ); + Poll::Pending + } + + None => Poll::Ready(None), + } + } +} diff --git a/src/tcp/mod.rs b/src/tcp/mod.rs index fbc24b2..654f1ca 100644 --- a/src/tcp/mod.rs +++ b/src/tcp/mod.rs @@ -1,5 +1,15 @@ +//! TCP transport client and listener implementations. +//! +//!
+//! +//! This module primarily exposes the TCP client implementation over a [`Connection`] type and the +//! TCP listener implementation as [`TcpListener`]. + +#[allow(unused_imports)] +pub(crate) use crate::Connection; + pub(crate) mod client; -pub(crate) mod server; +pub(crate) mod listener; pub use client::*; -pub use server::*; +pub use listener::*; diff --git a/src/tcp/server.rs b/src/tcp/server.rs deleted file mode 100644 index 00c766e..0000000 --- a/src/tcp/server.rs +++ /dev/null @@ -1,58 +0,0 @@ -use crate::Connection; -use async_std::net::{SocketAddr, TcpListener, ToSocketAddrs}; -use async_std::pin::Pin; -use async_std::task::{Context, Poll}; -use futures::{Stream, StreamExt}; -use log::*; - -#[allow(dead_code)] -pub struct TcpServer { - local_addrs: SocketAddr, - listener: TcpListener, -} - -impl TcpServer { - pub async fn new(ip_addrs: A) -> anyhow::Result { - let listener = TcpListener::bind(&ip_addrs).await?; - info!("Started TCP server at {}", &ip_addrs); - - Ok(Self { - local_addrs: listener.local_addr()?, - listener, - }) - } - - pub async fn accept(&self) -> anyhow::Result { - let (stream, ip_addr) = self.listener.accept().await?; - debug!("Received connection attempt from {}", ip_addr); - - Ok(Connection::from(stream)) - } -} - -impl Stream for TcpServer { - type Item = Connection; - - fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - match futures::executor::block_on(self.listener.incoming().next()) { - Some(Ok(tcp_stream)) => { - let peer_addr = tcp_stream - .peer_addr() - .expect("Could not retrieve peer IP address"); - debug!("Received connection attempt from {}", peer_addr); - - Poll::Ready(Some(Connection::from(tcp_stream))) - } - - Some(Err(e)) => { - error!( - "Encountered error when trying to accept new connection {}", - e - ); - Poll::Pending - } - - None => Poll::Ready(None), - } - } -} diff --git a/src/tls/client.rs b/src/tls/client.rs index 60acdad..1fa8d08 100644 --- a/src/tls/client.rs +++ b/src/tls/client.rs @@ -1,26 +1,25 @@ +use async_std::net::{TcpStream, ToSocketAddrs}; +use async_tls::client; use async_tls::TlsConnector; +use futures::AsyncReadExt; use log::*; +use crate::tls::TlsConnectionMetadata; use crate::Connection; -use async_std::net::{SocketAddr, TcpStream, ToSocketAddrs}; -use async_tls::client; -use async_tls::server; -use futures::AsyncReadExt; - -pub enum TlsConnectionMetadata { - Client { - local_addr: SocketAddr, - peer_addr: SocketAddr, - stream: client::TlsStream, - }, - Server { - local_addr: SocketAddr, - peer_addr: SocketAddr, - stream: server::TlsStream, - }, -} impl Connection { + /// Creates a [`Connection`] that uses a TLS transport + /// + /// # Example + /// + /// Basic usage: + /// + /// ```ignore + /// let mut conn = Connection::tls_client("127.0.0.1:3456", "localhost", client_config.into()).await?; + /// ``` + /// + /// Please see the [tls-client](https://github.com/sachanganesh/connect-rs/blob/main/examples/tls-client/src/main.rs) + /// example program for a more thorough showcase. pub async fn tls_client( ip_addrs: A, domain: &str, @@ -46,6 +45,7 @@ impl Connection { } impl From for Connection { + /// Creates a [`Connection`] using a TLS transport from [`TlsConnectionMetadata`]. fn from(metadata: TlsConnectionMetadata) -> Self { match metadata { TlsConnectionMetadata::Client { @@ -63,7 +63,7 @@ impl From for Connection { ) } - TlsConnectionMetadata::Server { + TlsConnectionMetadata::Listener { local_addr, peer_addr, stream, diff --git a/src/tls/server.rs b/src/tls/listener.rs similarity index 66% rename from src/tls/server.rs rename to src/tls/listener.rs index 55647f0..047dcae 100644 --- a/src/tls/server.rs +++ b/src/tls/listener.rs @@ -7,15 +7,42 @@ use async_tls::TlsAcceptor; use futures::{Stream, StreamExt}; use log::*; +/// Listens on a bound socket for incoming TLS connections to be handled as independent +/// [`Connection`]s. +/// +/// Implements the [`Stream`] trait to asynchronously accept incoming TLS connections. +/// +/// # Example +/// +/// Basic usage: +/// +/// ```ignore +/// let mut server = TlsListener::bind("127.0.0.1:3456", config.into()).await?; +/// +/// // wait for a connection to come in and be accepted +/// while let Some(mut conn) = server.next().await { +/// // do something with connection +/// } +/// ``` #[allow(dead_code)] -pub struct TlsServer { +pub struct TlsListener { local_addrs: SocketAddr, listener: TcpListener, acceptor: TlsAcceptor, } -impl TlsServer { - pub async fn new( +impl TlsListener { + /// Creates a [`TlsListener`] by binding to an IP address and port and listens for incoming TLS + /// connections that have successfully been accepted. + /// + /// # Example + /// + /// Basic usage: + /// + /// ```ignore + /// let mut server = TlsListener::bind("127.0.0.1:3456", config.into()).await?; + /// ``` + pub async fn bind( ip_addrs: A, acceptor: TlsAcceptor, ) -> anyhow::Result { @@ -29,6 +56,18 @@ impl TlsServer { }) } + /// Creates a [`Connection`] for the next `accept`ed TCP connection at the bound socket. + /// + /// # Example + /// + /// Basic usage: + /// + /// ```ignore + /// let mut server = TlsListener::bind("127.0.0.1:3456", config.into()).await?; + /// while let Some(mut conn) = server.next().await { + /// // do something with connection + /// } + /// ``` pub async fn accept(&self) -> anyhow::Result { let (tcp_stream, peer_addr) = self.listener.accept().await?; debug!("Received connection attempt from {}", peer_addr); @@ -36,7 +75,7 @@ impl TlsServer { match self.acceptor.accept(tcp_stream).await { Ok(tls_stream) => { debug!("Completed TLS handshake with {}", peer_addr); - Ok(Connection::from(TlsConnectionMetadata::Server { + Ok(Connection::from(TlsConnectionMetadata::Listener { local_addr: self.local_addrs.clone(), peer_addr, stream: tls_stream, @@ -51,7 +90,7 @@ impl TlsServer { } } -impl Stream for TlsServer { +impl Stream for TlsListener { type Item = Connection; fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { @@ -65,7 +104,7 @@ impl Stream for TlsServer { match futures::executor::block_on(self.acceptor.accept(tcp_stream)) { Ok(tls_stream) => { debug!("Completed TLS handshake with {}", peer_addr); - Poll::Ready(Some(Connection::from(TlsConnectionMetadata::Server { + Poll::Ready(Some(Connection::from(TlsConnectionMetadata::Listener { local_addr: self.local_addrs.clone(), peer_addr, stream: tls_stream, diff --git a/src/tls/mod.rs b/src/tls/mod.rs index da8703d..dfc535f 100644 --- a/src/tls/mod.rs +++ b/src/tls/mod.rs @@ -1,8 +1,41 @@ -pub(crate) mod client; -pub(crate) mod server; +//! TLS transport client and listener implementations. +//! +//!
+//! +//! This module primarily exposes the TLS client implementation over a [`Connection`] type and the +//! TLS listener implementation as [`TlsListener`]. +//! -pub use client::*; -pub use server::*; +#[allow(unused_imports)] +pub(crate) use crate::Connection; + +pub(crate) mod client; +pub(crate) mod listener; pub use async_tls; +pub use client::*; +pub use listener::*; pub use rustls; + +use async_std::net::TcpStream; +use async_tls::server; +use std::net::SocketAddr; + +/// Used to differentiate between an outgoing connection ([`TlsConnectionMetadata::Client`]) or +/// incoming connection listener ([`TlsConnectionMetadata::Listener`]). +/// +/// The async TLS library used by this crate has two differing stream types based on whether the +/// connection being established is either a client or server. This is to aid with handling that +/// distinction during connection instantiation. +pub enum TlsConnectionMetadata { + Client { + local_addr: SocketAddr, + peer_addr: SocketAddr, + stream: async_tls::client::TlsStream, + }, + Listener { + local_addr: SocketAddr, + peer_addr: SocketAddr, + stream: server::TlsStream, + }, +} diff --git a/src/writer.rs b/src/writer.rs index 2403024..9a6f20a 100644 --- a/src/writer.rs +++ b/src/writer.rs @@ -11,6 +11,21 @@ use protobuf::Message; pub use futures::SinkExt; pub use futures::StreamExt; +/// An interface to write messages to the network connection +/// +/// Implements the [`Sink`] trait to asynchronously write messages to the network connection. +/// +/// # Example +/// +/// Basic usage: +/// +/// ```ignore +/// writer.send(msg).await?; +/// ``` +/// +/// Please see the [tcp-client](https://github.com/sachanganesh/connect-rs/blob/main/examples/tcp-client/) +/// example program or other client example programs for a more thorough showcase. +/// pub struct ConnectionWriter { local_addr: SocketAddr, peer_addr: SocketAddr, @@ -20,6 +35,8 @@ pub struct ConnectionWriter { } impl ConnectionWriter { + /// Creates a new [`ConnectionWriter`] from an [`AsyncWrite`] trait object and the local and peer + /// socket metadata pub fn new( local_addr: SocketAddr, peer_addr: SocketAddr, @@ -34,14 +51,17 @@ impl ConnectionWriter { } } + /// Get the local IP address and port pub fn local_addr(&self) -> SocketAddr { self.local_addr.clone() } + /// Get the peer IP address and port pub fn peer_addr(&self) -> SocketAddr { self.peer_addr.clone() } + /// Check if the [`Sink`] of messages to the network is closed pub fn is_closed(&self) -> bool { self.closed } -- 2.44.0