| SCTP Server | |
| DTLS-SCTP Client | |
| DTLS-SCTP Server | |
+
+
+## Why Protobuf?
\ No newline at end of file
-# seam-channel tcp-client example
+# connect tcp-client example
This example program will:
-# seam-channel tcp-echo-server example
+# connect tcp-echo-server example
This example program will:
-1. Bind to an IP address
+1. Bind to an IP address and port
2. Accept any number of TCP connections
3. Handle each connection by:
1. Waiting for `String` messages to be received
use crate::schema::hello_world::HelloWorld;
use async_std::task;
-use connect::tcp::TcpServer;
+use connect::tcp::TcpListener;
use connect::{SinkExt, StreamExt};
use log::*;
use std::env;
};
// create a server
- let mut server = TcpServer::new(ip_address).await?;
+ let mut server = TcpListener::bind(ip_address).await?;
// handle server connections
// wait for a connection to come in and be accepted
-# seam-channel tls-client example
+# connect tls-client example
This example program will:
-# seam-channel tls-echo-server example
+# connect tls-echo-server example
This example program will:
-1. Bind to an IP address
+1. Bind to an IP address and port
2. Accept any number of secure TLS connections
3. Handle each connection by:
1. Waiting for `String` messages to be received
use async_std::{io, task};
use connect::tls::rustls::internal::pemfile::{certs, rsa_private_keys};
use connect::tls::rustls::{NoClientAuth, ServerConfig};
-use connect::tls::TlsServer;
+use connect::tls::TlsListener;
use connect::{SinkExt, StreamExt};
use log::*;
use std::env;
.map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
// create a server
- let mut server = TlsServer::new(ip_address, config.into()).await?;
+ let mut server = TlsListener::bind(ip_address, config.into()).await?;
// handle server connections
// wait for a connection to come in and be accepted
- loop {
- match server.next().await {
- Some(mut conn) => {
- info!("Handling connection from {}", conn.peer_addr());
-
- task::spawn(async move {
- while let Some(msg) = conn.reader().next().await {
- if msg.is::<HelloWorld>() {
- if let Ok(Some(contents)) = msg.unpack::<HelloWorld>() {
- info!(
- "Received a message \"{}\" from {}",
- contents.get_message(),
- conn.peer_addr()
- );
-
- conn.writer()
- .send(contents)
- .await
- .expect("Could not send message back to source connection");
- info!("Sent message back to original sender");
- }
- } else {
- error!("Received a message of unknown type")
- }
+ while let Some(mut conn) = server.next().await {
+ info!("Handling connection from {}", conn.peer_addr());
+
+ task::spawn(async move {
+ while let Some(msg) = conn.reader().next().await {
+ if msg.is::<HelloWorld>() {
+ if let Ok(Some(contents)) = msg.unpack::<HelloWorld>() {
+ info!(
+ "Received a message \"{}\" from {}",
+ contents.get_message(),
+ conn.peer_addr()
+ );
+
+ conn.writer()
+ .send(contents)
+ .await
+ .expect("Could not send message back to source connection");
+ info!("Sent message back to original sender");
}
- });
+ } else {
+ error!("Received a message of unknown type")
+ }
}
-
- None => break,
- }
+ });
}
Ok(())
+//! This crate provides a reliable, fault-tolerant, and brokerless message-queue abstraction over
+//! asynchronous network streams.
+//!
+//! # Why?
+//! When building networked applications, developers shouldn't have to focus on repeatedly solving
+//! the problem of reliable, fault-tolerant message delivery over byte-streams. By using a message
+//! queue abstraction, crate users can focus on core application logic and leave the low-level
+//! networking and message-queue guarantees to the abstraction.
+//!
+//! # Protobuf
+//! This crate relies on the use of [Protocol Buffers](https://developers.google.com/protocol-buffers)
+//! due to it being widely adopted and industry-proven. All messages are Protobuf messages that
+//! are packed into a Protobuf `Any` type and then sent over the wire. Message recipients must
+//! decide what Protobuf message type it is, and correspondingly unpack the `Any` into a particular
+//! message type.
+//!
+//! # Examples
+//! Please use the [examples](https://github.com/sachanganesh/connect-rs/tree/main/examples)
+//! provided to help understand crate usage.
+
mod reader;
pub(crate) mod schema;
pub mod tcp;
use futures::{AsyncRead, AsyncWrite};
pub use futures::{SinkExt, StreamExt};
+/// Wrapper around a [`ConnectionReader`] and [`ConnectionWriter`] to read and write on a network
+/// connection
pub struct Connection {
local_addr: SocketAddr,
peer_addr: SocketAddr,
}
}
+ /// Get the local IP address and port
pub fn local_addr(&self) -> SocketAddr {
self.local_addr.clone()
}
+ /// Get the peer IP address and port
pub fn peer_addr(&self) -> SocketAddr {
self.peer_addr.clone()
}
+ /// Consume the [`Connection`] to split into separate [`ConnectionReader`] and
+ /// [`ConnectionWriter`] halves
+ ///
+ /// [`Connection`]s are split when reading and writing must be concurrent operations.
pub fn split(self) -> (ConnectionReader, ConnectionWriter) {
(self.reader, self.writer)
}
+ /// Re-wrap the [`ConnectionReader`] and [`ConnectionWriter`] halves into a [`Connection`]
pub fn join(reader: ConnectionReader, writer: ConnectionWriter) -> Self {
Self {
local_addr: reader.local_addr(),
}
}
+ /// Get mutable access to the underlying [`ConnectionReader`]
pub fn reader(&mut self) -> &mut ConnectionReader {
&mut self.reader
}
+ /// Get mutable access to the underlying [`ConnectionWriter`]
pub fn writer(&mut self) -> &mut ConnectionWriter {
&mut self.writer
}
+ /// Close the connection by closing both the reading and writing halves
pub async fn close(self) -> SocketAddr {
let peer_addr = self.peer_addr();
let (reader, writer) = self.split();
pub use futures::StreamExt;
use protobuf::well_known_types::Any;
+/// A default buffer size to read in bytes and then deserialize as messages
const BUFFER_SIZE: usize = 8192;
+/// An interface to read messages from the network connection
+///
+/// Implements the [`Stream`] trait to asynchronously read messages from the network connection.
+///
+/// # Example
+///
+/// Basic usage:
+///
+/// ```ignore
+/// while let Some(msg) = reader.next().await {
+/// // handle the received message
+/// }
+/// ```
+///
+/// Please see the [tcp-client](https://github.com/sachanganesh/connect-rs/blob/main/examples/tcp-client/)
+/// example program or other client example programs for a more thorough showcase.
+///
+
pub struct ConnectionReader {
local_addr: SocketAddr,
peer_addr: SocketAddr,
}
impl ConnectionReader {
+ /// Creates a new [`ConnectionReader`] from an [`AsyncRead`] trait object and the local and peer
+ /// socket metadata
pub fn new(
local_addr: SocketAddr,
peer_addr: SocketAddr,
}
}
+ /// Get the local IP address and port
pub fn local_addr(&self) -> SocketAddr {
self.local_addr.clone()
}
+ /// Get the peer IP address and port
pub fn peer_addr(&self) -> SocketAddr {
self.peer_addr.clone()
}
+ /// Check if the [`Stream`] of messages from the network is closed
pub fn is_closed(&self) -> bool {
self.closed
}
use async_std::net::{TcpStream, ToSocketAddrs};
impl Connection {
+ /// Creates a [`Connection`] that uses a TCP transport
+ ///
+ /// # Example
+ ///
+ /// Basic usage:
+ ///
+ /// ```ignore
+ /// let mut conn = Connection::tcp_client("127.0.0.1:3456").await?;
+ /// ```
pub async fn tcp_client<A: ToSocketAddrs + std::fmt::Display>(
ip_addrs: A,
) -> anyhow::Result<Self> {
}
impl From<TcpStream> for Connection {
+ /// Creates a [`Connection`] using a TCP transport from an async [`TcpStream`].
fn from(stream: TcpStream) -> Self {
let write_stream = stream.clone();
--- /dev/null
+use crate::Connection;
+use async_std::net::{SocketAddr, TcpListener as AsyncListener, ToSocketAddrs};
+use async_std::pin::Pin;
+use async_std::task::{Context, Poll};
+use futures::{Stream, StreamExt};
+use log::*;
+
+/// Listens on a bound socket for incoming TCP connections to be handled as independent
+/// [`Connection`]s.
+///
+/// Implements the [`Stream`] trait to asynchronously accept incoming TCP connections.
+///
+/// # Example
+///
+/// Basic usage:
+///
+/// ```ignore
+/// let mut server = TcpListener::bind(ip_address).await?;
+///
+/// // wait for a connection to come in and be accepted
+/// while let Some(mut conn) = server.next().await {
+/// // do something with connection
+/// }
+/// ```
+#[allow(dead_code)]
+pub struct TcpListener {
+ local_addrs: SocketAddr,
+ listener: AsyncListener,
+}
+
+impl TcpListener {
+ /// Creates a [`TcpListener`] by binding to an IP address and port and listens for incoming TCP
+ /// connections.
+ ///
+ /// # Example
+ ///
+ /// Basic usage:
+ ///
+ /// ```ignore
+ /// let mut server = TcpListener::bind("127.0.0.1:3456").await?;
+ /// ```
+ pub async fn bind<A: ToSocketAddrs + std::fmt::Display>(ip_addrs: A) -> anyhow::Result<Self> {
+ let listener = AsyncListener::bind(&ip_addrs).await?;
+ info!("Started TCP server at {}", &ip_addrs);
+
+ Ok(Self {
+ local_addrs: listener.local_addr()?,
+ listener,
+ })
+ }
+
+ /// Creates a [`Connection`] for the next `accept`ed TCP connection at the bound socket.
+ ///
+ /// # Example
+ ///
+ /// Basic usage:
+ ///
+ /// ```ignore
+ /// let mut server = TcpListener::bind("127.0.0.1:3456").await?;
+ /// while let Ok(mut conn) = server.accept().await? {
+ /// // handle the connection
+ /// }
+ /// ```
+ pub async fn accept(&self) -> anyhow::Result<Connection> {
+ let (stream, ip_addr) = self.listener.accept().await?;
+ debug!("Received connection attempt from {}", ip_addr);
+
+ Ok(Connection::from(stream))
+ }
+}
+
+impl Stream for TcpListener {
+ type Item = Connection;
+
+ fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ match futures::executor::block_on(self.listener.incoming().next()) {
+ Some(Ok(tcp_stream)) => {
+ let peer_addr = tcp_stream
+ .peer_addr()
+ .expect("Could not retrieve peer IP address");
+ debug!("Received connection attempt from {}", peer_addr);
+
+ Poll::Ready(Some(Connection::from(tcp_stream)))
+ }
+
+ Some(Err(e)) => {
+ error!(
+ "Encountered error when trying to accept new connection {}",
+ e
+ );
+ Poll::Pending
+ }
+
+ None => Poll::Ready(None),
+ }
+ }
+}
+//! TCP transport client and listener implementations.
+//!
+//! <br/>
+//!
+//! This module primarily exposes the TCP client implementation over a [`Connection`] type and the
+//! TCP listener implementation as [`TcpListener`].
+
+#[allow(unused_imports)]
+pub(crate) use crate::Connection;
+
pub(crate) mod client;
-pub(crate) mod server;
+pub(crate) mod listener;
pub use client::*;
-pub use server::*;
+pub use listener::*;
+++ /dev/null
-use crate::Connection;
-use async_std::net::{SocketAddr, TcpListener, ToSocketAddrs};
-use async_std::pin::Pin;
-use async_std::task::{Context, Poll};
-use futures::{Stream, StreamExt};
-use log::*;
-
-#[allow(dead_code)]
-pub struct TcpServer {
- local_addrs: SocketAddr,
- listener: TcpListener,
-}
-
-impl TcpServer {
- pub async fn new<A: ToSocketAddrs + std::fmt::Display>(ip_addrs: A) -> anyhow::Result<Self> {
- let listener = TcpListener::bind(&ip_addrs).await?;
- info!("Started TCP server at {}", &ip_addrs);
-
- Ok(Self {
- local_addrs: listener.local_addr()?,
- listener,
- })
- }
-
- pub async fn accept(&self) -> anyhow::Result<Connection> {
- let (stream, ip_addr) = self.listener.accept().await?;
- debug!("Received connection attempt from {}", ip_addr);
-
- Ok(Connection::from(stream))
- }
-}
-
-impl Stream for TcpServer {
- type Item = Connection;
-
- fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- match futures::executor::block_on(self.listener.incoming().next()) {
- Some(Ok(tcp_stream)) => {
- let peer_addr = tcp_stream
- .peer_addr()
- .expect("Could not retrieve peer IP address");
- debug!("Received connection attempt from {}", peer_addr);
-
- Poll::Ready(Some(Connection::from(tcp_stream)))
- }
-
- Some(Err(e)) => {
- error!(
- "Encountered error when trying to accept new connection {}",
- e
- );
- Poll::Pending
- }
-
- None => Poll::Ready(None),
- }
- }
-}
+use async_std::net::{TcpStream, ToSocketAddrs};
+use async_tls::client;
use async_tls::TlsConnector;
+use futures::AsyncReadExt;
use log::*;
+use crate::tls::TlsConnectionMetadata;
use crate::Connection;
-use async_std::net::{SocketAddr, TcpStream, ToSocketAddrs};
-use async_tls::client;
-use async_tls::server;
-use futures::AsyncReadExt;
-
-pub enum TlsConnectionMetadata {
- Client {
- local_addr: SocketAddr,
- peer_addr: SocketAddr,
- stream: client::TlsStream<TcpStream>,
- },
- Server {
- local_addr: SocketAddr,
- peer_addr: SocketAddr,
- stream: server::TlsStream<TcpStream>,
- },
-}
impl Connection {
+ /// Creates a [`Connection`] that uses a TLS transport
+ ///
+ /// # Example
+ ///
+ /// Basic usage:
+ ///
+ /// ```ignore
+ /// let mut conn = Connection::tls_client("127.0.0.1:3456", "localhost", client_config.into()).await?;
+ /// ```
+ ///
+ /// Please see the [tls-client](https://github.com/sachanganesh/connect-rs/blob/main/examples/tls-client/src/main.rs)
+ /// example program for a more thorough showcase.
pub async fn tls_client<A: ToSocketAddrs + std::fmt::Display>(
ip_addrs: A,
domain: &str,
}
impl From<TlsConnectionMetadata> for Connection {
+ /// Creates a [`Connection`] using a TLS transport from [`TlsConnectionMetadata`].
fn from(metadata: TlsConnectionMetadata) -> Self {
match metadata {
TlsConnectionMetadata::Client {
)
}
- TlsConnectionMetadata::Server {
+ TlsConnectionMetadata::Listener {
local_addr,
peer_addr,
stream,
--- /dev/null
+use crate::tls::TlsConnectionMetadata;
+use crate::Connection;
+use async_std::net::{SocketAddr, TcpListener, ToSocketAddrs};
+use async_std::pin::Pin;
+use async_std::task::{Context, Poll};
+use async_tls::TlsAcceptor;
+use futures::{Stream, StreamExt};
+use log::*;
+
+/// Listens on a bound socket for incoming TLS connections to be handled as independent
+/// [`Connection`]s.
+///
+/// Implements the [`Stream`] trait to asynchronously accept incoming TLS connections.
+///
+/// # Example
+///
+/// Basic usage:
+///
+/// ```ignore
+/// let mut server = TlsListener::bind("127.0.0.1:3456", config.into()).await?;
+///
+/// // wait for a connection to come in and be accepted
+/// while let Some(mut conn) = server.next().await {
+/// // do something with connection
+/// }
+/// ```
+#[allow(dead_code)]
+pub struct TlsListener {
+ local_addrs: SocketAddr,
+ listener: TcpListener,
+ acceptor: TlsAcceptor,
+}
+
+impl TlsListener {
+ /// Creates a [`TlsListener`] by binding to an IP address and port and listens for incoming TLS
+ /// connections that have successfully been accepted.
+ ///
+ /// # Example
+ ///
+ /// Basic usage:
+ ///
+ /// ```ignore
+ /// let mut server = TlsListener::bind("127.0.0.1:3456", config.into()).await?;
+ /// ```
+ pub async fn bind<A: ToSocketAddrs + std::fmt::Display>(
+ ip_addrs: A,
+ acceptor: TlsAcceptor,
+ ) -> anyhow::Result<Self> {
+ let listener = TcpListener::bind(ip_addrs).await?;
+ info!("Started TLS server at {}", listener.local_addr()?);
+
+ Ok(Self {
+ local_addrs: listener.local_addr()?,
+ listener,
+ acceptor,
+ })
+ }
+
+ /// Creates a [`Connection`] for the next `accept`ed TCP connection at the bound socket.
+ ///
+ /// # Example
+ ///
+ /// Basic usage:
+ ///
+ /// ```ignore
+ /// let mut server = TlsListener::bind("127.0.0.1:3456", config.into()).await?;
+ /// while let Some(mut conn) = server.next().await {
+ /// // do something with connection
+ /// }
+ /// ```
+ pub async fn accept(&self) -> anyhow::Result<Connection> {
+ let (tcp_stream, peer_addr) = self.listener.accept().await?;
+ debug!("Received connection attempt from {}", peer_addr);
+
+ match self.acceptor.accept(tcp_stream).await {
+ Ok(tls_stream) => {
+ debug!("Completed TLS handshake with {}", peer_addr);
+ Ok(Connection::from(TlsConnectionMetadata::Listener {
+ local_addr: self.local_addrs.clone(),
+ peer_addr,
+ stream: tls_stream,
+ }))
+ }
+
+ Err(e) => {
+ warn!("Could not encrypt connection with TLS from {}", peer_addr);
+ Err(anyhow::Error::new(e))
+ }
+ }
+ }
+}
+
+impl Stream for TlsListener {
+ type Item = Connection;
+
+ fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ match futures::executor::block_on(self.listener.incoming().next()) {
+ Some(Ok(tcp_stream)) => {
+ let peer_addr = tcp_stream
+ .peer_addr()
+ .expect("Could not retrieve peer IP address");
+ debug!("Received connection attempt from {}", peer_addr);
+
+ match futures::executor::block_on(self.acceptor.accept(tcp_stream)) {
+ Ok(tls_stream) => {
+ debug!("Completed TLS handshake with {}", peer_addr);
+ Poll::Ready(Some(Connection::from(TlsConnectionMetadata::Listener {
+ local_addr: self.local_addrs.clone(),
+ peer_addr,
+ stream: tls_stream,
+ })))
+ }
+
+ Err(_e) => {
+ warn!("Could not encrypt connection with TLS from {}", peer_addr);
+ Poll::Pending
+ }
+ }
+ }
+
+ Some(Err(e)) => {
+ error!(
+ "Encountered error when trying to accept new connection {}",
+ e
+ );
+ Poll::Pending
+ }
+
+ None => Poll::Ready(None),
+ }
+ }
+}
-pub(crate) mod client;
-pub(crate) mod server;
+//! TLS transport client and listener implementations.
+//!
+//! <br/>
+//!
+//! This module primarily exposes the TLS client implementation over a [`Connection`] type and the
+//! TLS listener implementation as [`TlsListener`].
+//!
-pub use client::*;
-pub use server::*;
+#[allow(unused_imports)]
+pub(crate) use crate::Connection;
+
+pub(crate) mod client;
+pub(crate) mod listener;
pub use async_tls;
+pub use client::*;
+pub use listener::*;
pub use rustls;
+
+use async_std::net::TcpStream;
+use async_tls::server;
+use std::net::SocketAddr;
+
+/// Used to differentiate between an outgoing connection ([`TlsConnectionMetadata::Client`]) or
+/// incoming connection listener ([`TlsConnectionMetadata::Listener`]).
+///
+/// The async TLS library used by this crate has two differing stream types based on whether the
+/// connection being established is either a client or server. This is to aid with handling that
+/// distinction during connection instantiation.
+pub enum TlsConnectionMetadata {
+ Client {
+ local_addr: SocketAddr,
+ peer_addr: SocketAddr,
+ stream: async_tls::client::TlsStream<TcpStream>,
+ },
+ Listener {
+ local_addr: SocketAddr,
+ peer_addr: SocketAddr,
+ stream: server::TlsStream<TcpStream>,
+ },
+}
+++ /dev/null
-use crate::tls::TlsConnectionMetadata;
-use crate::Connection;
-use async_std::net::{SocketAddr, TcpListener, ToSocketAddrs};
-use async_std::pin::Pin;
-use async_std::task::{Context, Poll};
-use async_tls::TlsAcceptor;
-use futures::{Stream, StreamExt};
-use log::*;
-
-#[allow(dead_code)]
-pub struct TlsServer {
- local_addrs: SocketAddr,
- listener: TcpListener,
- acceptor: TlsAcceptor,
-}
-
-impl TlsServer {
- pub async fn new<A: ToSocketAddrs + std::fmt::Display>(
- ip_addrs: A,
- acceptor: TlsAcceptor,
- ) -> anyhow::Result<Self> {
- let listener = TcpListener::bind(ip_addrs).await?;
- info!("Started TLS server at {}", listener.local_addr()?);
-
- Ok(Self {
- local_addrs: listener.local_addr()?,
- listener,
- acceptor,
- })
- }
-
- pub async fn accept(&self) -> anyhow::Result<Connection> {
- let (tcp_stream, peer_addr) = self.listener.accept().await?;
- debug!("Received connection attempt from {}", peer_addr);
-
- match self.acceptor.accept(tcp_stream).await {
- Ok(tls_stream) => {
- debug!("Completed TLS handshake with {}", peer_addr);
- Ok(Connection::from(TlsConnectionMetadata::Server {
- local_addr: self.local_addrs.clone(),
- peer_addr,
- stream: tls_stream,
- }))
- }
-
- Err(e) => {
- warn!("Could not encrypt connection with TLS from {}", peer_addr);
- Err(anyhow::Error::new(e))
- }
- }
- }
-}
-
-impl Stream for TlsServer {
- type Item = Connection;
-
- fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- match futures::executor::block_on(self.listener.incoming().next()) {
- Some(Ok(tcp_stream)) => {
- let peer_addr = tcp_stream
- .peer_addr()
- .expect("Could not retrieve peer IP address");
- debug!("Received connection attempt from {}", peer_addr);
-
- match futures::executor::block_on(self.acceptor.accept(tcp_stream)) {
- Ok(tls_stream) => {
- debug!("Completed TLS handshake with {}", peer_addr);
- Poll::Ready(Some(Connection::from(TlsConnectionMetadata::Server {
- local_addr: self.local_addrs.clone(),
- peer_addr,
- stream: tls_stream,
- })))
- }
-
- Err(_e) => {
- warn!("Could not encrypt connection with TLS from {}", peer_addr);
- Poll::Pending
- }
- }
- }
-
- Some(Err(e)) => {
- error!(
- "Encountered error when trying to accept new connection {}",
- e
- );
- Poll::Pending
- }
-
- None => Poll::Ready(None),
- }
- }
-}
pub use futures::SinkExt;
pub use futures::StreamExt;
+/// An interface to write messages to the network connection
+///
+/// Implements the [`Sink`] trait to asynchronously write messages to the network connection.
+///
+/// # Example
+///
+/// Basic usage:
+///
+/// ```ignore
+/// writer.send(msg).await?;
+/// ```
+///
+/// Please see the [tcp-client](https://github.com/sachanganesh/connect-rs/blob/main/examples/tcp-client/)
+/// example program or other client example programs for a more thorough showcase.
+///
pub struct ConnectionWriter {
local_addr: SocketAddr,
peer_addr: SocketAddr,
}
impl ConnectionWriter {
+ /// Creates a new [`ConnectionWriter`] from an [`AsyncWrite`] trait object and the local and peer
+ /// socket metadata
pub fn new(
local_addr: SocketAddr,
peer_addr: SocketAddr,
}
}
+ /// Get the local IP address and port
pub fn local_addr(&self) -> SocketAddr {
self.local_addr.clone()
}
+ /// Get the peer IP address and port
pub fn peer_addr(&self) -> SocketAddr {
self.peer_addr.clone()
}
+ /// Check if the [`Sink`] of messages to the network is closed
pub fn is_closed(&self) -> bool {
self.closed
}