From 9aa05a29582370a5aa91674297339ad16652559b Mon Sep 17 00:00:00 2001 From: Sachandhan Ganesh Date: Sat, 13 Feb 2021 00:06:08 -0800 Subject: [PATCH] move tls into a feature and remove error dep on async-channel --- Cargo.toml | 22 +++++++++++++++++----- README.md | 21 ++++----------------- src/lib.rs | 16 +++++++++++----- src/tls/mod.rs | 6 ++++++ src/writer.rs | 43 +++++++++++++++++++++++++++++++++---------- 5 files changed, 71 insertions(+), 37 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2cf5a0f..2e7b1c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "connect" -version = "0.1.3" +version = "0.2.0" edition = "2018" authors = ["Sachandhan Ganesh "] description = "message queue abstraction over async network streams" @@ -11,18 +11,30 @@ documentation = "https://docs.rs/connect/" readme = "README.md" license = "Apache-2.0" +[package.metadata.docs.rs] +features = ["async"] +rustdoc-args = ["--cfg", "docsrs"] + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + [workspace] members = [ "examples/*", ] +[features] +tls = ["async-tls", "rustls"] + [dependencies] anyhow = "1.0.31" -async-channel = "1.4.0" -async-std = { version = "1.9.0", features = ["attributes", "unstable"] } -async-tls = { version = "0.9.0", default-features = false, features = ["client", "server"]} +async-std = { version = "1.9.0", features = ["unstable"] } bytes = "0.5.5" futures = "0.3.8" log = "0.4" -rustls = "0.18.0" + +async-tls = { version = "0.11.0", default-features = false, features = ["client", "server"], optional = true } +rustls = { version = "0.19.0", optional = true } + +[dev-dependencies] +async-std = { version = "1.9.0", features = ["attributes"] } \ No newline at end of file diff --git a/README.md b/README.md index 21e441a..bdf89a0 100644 --- a/README.md +++ b/README.md @@ -11,29 +11,16 @@ This Rust crate provides a simple brokerless message-queue abstraction over asynchronous network streams. +## Examples +Please use the [example programs](https://github.com/sachanganesh/connect-rs/tree/main/examples) +provided to help understand crate usage. + ## Why? When building networked applications, developers shouldn't have to focus on repeatedly solving the problem of reliable, fault-tolerant message delivery over byte-streams. By using a message queue abstraction, crate users can focus on core application logic and leave the low-level networking and message-queue guarantees to the abstraction. -## Examples -Please use the [examples](https://github.com/sachanganesh/connect-rs/tree/main/examples) -provided to help understand crate usage. - -## Protobuf -This crate relies on the use of [Protocol Buffers](https://developers.google.com/protocol-buffers) -due to it being widely adopted and industry-proven. All messages are Protobuf messages that -are packed into a Protobuf `Any` type and then sent over the wire. Message recipients must -decide what Protobuf message type it is, and correspondingly unpack the `Any` into a particular -message type. - -Protobuf was chosen when the library hit a roadblock with Rust's `TypeId` potentially not being -consistent between Rust compiler versions. The crate requires a consistent way to determine what -type of message is received, so it can appropriately deserialize the message from network bytes. -Until the Rust ecosystem around reflection improves, the crate will use Protobuf to fill the -void. - ## Feature Status | Feature | Status | diff --git a/src/lib.rs b/src/lib.rs index 1a0cb2b..7ead050 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,26 +1,32 @@ //! This crate provides a simple brokerless message-queue abstraction over asynchronous network //! streams. //! +//! # Examples +//! Please use the [example programs](https://github.com/sachanganesh/connect-rs/tree/main/examples) +//! provided to help understand crate usage. +//! //! # Why? //! When building networked applications, developers shouldn't have to focus on repeatedly solving //! the problem of reliable, fault-tolerant message delivery over byte-streams. By using a message //! queue abstraction, crate users can focus on core application logic and leave the low-level //! networking and message-queue guarantees to the abstraction. //! -//! # Examples -//! Please use the [examples](https://github.com/sachanganesh/connect-rs/tree/main/examples) -//! provided to help understand crate usage. -//! + +#![feature(doc_cfg)] mod protocol; mod reader; pub mod tcp; + +#[cfg(feature = "tls")] +#[doc(cfg(feature = "tls"))] pub mod tls; + mod writer; pub use crate::protocol::{ConnectDatagram, DatagramEmptyError}; pub use crate::reader::ConnectionReader; -pub use crate::writer::ConnectionWriter; +pub use crate::writer::{ConnectionWriter, ConnectionWriteError}; use async_std::{net::SocketAddr, pin::Pin}; use futures::{AsyncRead, AsyncWrite}; pub use futures::{SinkExt, StreamExt}; diff --git a/src/tls/mod.rs b/src/tls/mod.rs index dfc535f..eeefd47 100644 --- a/src/tls/mod.rs +++ b/src/tls/mod.rs @@ -12,9 +12,15 @@ pub(crate) use crate::Connection; pub(crate) mod client; pub(crate) mod listener; +#[cfg(feature = "tls")] +#[doc(cfg(feature = "tls"))] pub use async_tls; + pub use client::*; pub use listener::*; + +#[cfg(feature = "tls")] +#[doc(cfg(feature = "tls"))] pub use rustls; use async_std::net::TcpStream; diff --git a/src/writer.rs b/src/writer.rs index b8c8a95..c65a444 100644 --- a/src/writer.rs +++ b/src/writer.rs @@ -1,14 +1,37 @@ -use async_channel::RecvError; +use crate::protocol::ConnectDatagram; use async_std::net::SocketAddr; use async_std::pin::Pin; use futures::io::IoSlice; use futures::task::{Context, Poll}; use futures::{AsyncWrite, Sink}; use log::*; +use std::error::Error; -use crate::protocol::ConnectDatagram; pub use futures::SinkExt; pub use futures::StreamExt; +use std::fmt::Debug; + +/// Encountered when there is an issue with writing messages on the network stream. +/// +#[derive(Debug)] +pub enum ConnectionWriteError { + /// Encountered when trying to send a message while the connection is closed. + ConnectionClosed, + + /// Encountered when there is an IO-level error with the connection. + IoError(std::io::Error), +} + +impl Error for ConnectionWriteError {} + +impl std::fmt::Display for ConnectionWriteError { + fn fmt(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + ConnectionWriteError::ConnectionClosed => formatter.write_str("cannot send message when connection is closed"), + ConnectionWriteError::IoError(err) => std::fmt::Display::fmt(&err, formatter), + } + } +} /// An interface to write messages to the network connection. /// @@ -68,7 +91,7 @@ impl ConnectionWriter { pub(crate) fn write_pending_bytes( &mut self, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { if self.pending_writes.len() > 0 { let stream = self.write_stream.as_mut(); @@ -91,16 +114,16 @@ impl ConnectionWriter { Poll::Ready(Ok(())) } - Poll::Ready(Err(_e)) => { + Poll::Ready(Err(err)) => { error!("Encountered error when writing to network stream"); - Poll::Ready(Err(RecvError)) + Poll::Ready(Err(ConnectionWriteError::IoError(err))) } } } - Poll::Ready(Err(_e)) => { + Poll::Ready(Err(err)) => { error!("Encountered error when flushing network stream"); - Poll::Ready(Err(RecvError)) + Poll::Ready(Err(ConnectionWriteError::IoError(err))) } } } else { @@ -110,12 +133,12 @@ impl ConnectionWriter { } impl Sink for ConnectionWriter { - type Error = RecvError; + type Error = ConnectionWriteError; fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { if self.is_closed() { trace!("Connection is closed, cannot send message"); - Poll::Ready(Err(RecvError)) + Poll::Ready(Err(ConnectionWriteError::ConnectionClosed)) } else { trace!("Connection ready to send message"); Poll::Ready(Ok(())) @@ -152,7 +175,7 @@ impl Sink for ConnectionWriter { Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), - Poll::Ready(Err(_e)) => Poll::Ready(Err(RecvError)), + Poll::Ready(Err(err)) => Poll::Ready(Err(ConnectionWriteError::IoError(err))), } } -- 2.44.0