[package]
name = "connect"
-version = "0.1.3"
+version = "0.2.0"
edition = "2018"
authors = ["Sachandhan Ganesh <sachan.ganesh@gmail.com>"]
description = "message queue abstraction over async network streams"
readme = "README.md"
license = "Apache-2.0"
+[package.metadata.docs.rs]
+features = ["async"]
+rustdoc-args = ["--cfg", "docsrs"]
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
[workspace]
members = [
"examples/*",
]
+[features]
+tls = ["async-tls", "rustls"]
+
[dependencies]
anyhow = "1.0.31"
-async-channel = "1.4.0"
-async-std = { version = "1.9.0", features = ["attributes", "unstable"] }
-async-tls = { version = "0.9.0", default-features = false, features = ["client", "server"]}
+async-std = { version = "1.9.0", features = ["unstable"] }
bytes = "0.5.5"
futures = "0.3.8"
log = "0.4"
-rustls = "0.18.0"
+
+async-tls = { version = "0.11.0", default-features = false, features = ["client", "server"], optional = true }
+rustls = { version = "0.19.0", optional = true }
+
+[dev-dependencies]
+async-std = { version = "1.9.0", features = ["attributes"] }
\ No newline at end of file
This Rust crate provides a simple brokerless message-queue abstraction over asynchronous network
streams.
+## Examples
+Please use the [example programs](https://github.com/sachanganesh/connect-rs/tree/main/examples)
+provided to help understand crate usage.
+
## Why?
When building networked applications, developers shouldn't have to focus on repeatedly solving
the problem of reliable, fault-tolerant message delivery over byte-streams. By using a message
queue abstraction, crate users can focus on core application logic and leave the low-level
networking and message-queue guarantees to the abstraction.
-## Examples
-Please use the [examples](https://github.com/sachanganesh/connect-rs/tree/main/examples)
-provided to help understand crate usage.
-
-## Protobuf
-This crate relies on the use of [Protocol Buffers](https://developers.google.com/protocol-buffers)
-due to it being widely adopted and industry-proven. All messages are Protobuf messages that
-are packed into a Protobuf `Any` type and then sent over the wire. Message recipients must
-decide what Protobuf message type it is, and correspondingly unpack the `Any` into a particular
-message type.
-
-Protobuf was chosen when the library hit a roadblock with Rust's `TypeId` potentially not being
-consistent between Rust compiler versions. The crate requires a consistent way to determine what
-type of message is received, so it can appropriately deserialize the message from network bytes.
-Until the Rust ecosystem around reflection improves, the crate will use Protobuf to fill the
-void.
-
## Feature Status
| Feature | Status |
//! This crate provides a simple brokerless message-queue abstraction over asynchronous network
//! streams.
//!
+//! # Examples
+//! Please use the [example programs](https://github.com/sachanganesh/connect-rs/tree/main/examples)
+//! provided to help understand crate usage.
+//!
//! # Why?
//! When building networked applications, developers shouldn't have to focus on repeatedly solving
//! the problem of reliable, fault-tolerant message delivery over byte-streams. By using a message
//! queue abstraction, crate users can focus on core application logic and leave the low-level
//! networking and message-queue guarantees to the abstraction.
//!
-//! # Examples
-//! Please use the [examples](https://github.com/sachanganesh/connect-rs/tree/main/examples)
-//! provided to help understand crate usage.
-//!
+
+#![feature(doc_cfg)]
mod protocol;
mod reader;
pub mod tcp;
+
+#[cfg(feature = "tls")]
+#[doc(cfg(feature = "tls"))]
pub mod tls;
+
mod writer;
pub use crate::protocol::{ConnectDatagram, DatagramEmptyError};
pub use crate::reader::ConnectionReader;
-pub use crate::writer::ConnectionWriter;
+pub use crate::writer::{ConnectionWriter, ConnectionWriteError};
use async_std::{net::SocketAddr, pin::Pin};
use futures::{AsyncRead, AsyncWrite};
pub use futures::{SinkExt, StreamExt};
pub(crate) mod client;
pub(crate) mod listener;
+#[cfg(feature = "tls")]
+#[doc(cfg(feature = "tls"))]
pub use async_tls;
+
pub use client::*;
pub use listener::*;
+
+#[cfg(feature = "tls")]
+#[doc(cfg(feature = "tls"))]
pub use rustls;
use async_std::net::TcpStream;
-use async_channel::RecvError;
+use crate::protocol::ConnectDatagram;
use async_std::net::SocketAddr;
use async_std::pin::Pin;
use futures::io::IoSlice;
use futures::task::{Context, Poll};
use futures::{AsyncWrite, Sink};
use log::*;
+use std::error::Error;
-use crate::protocol::ConnectDatagram;
pub use futures::SinkExt;
pub use futures::StreamExt;
+use std::fmt::Debug;
+
+/// Encountered when there is an issue with writing messages on the network stream.
+///
+#[derive(Debug)]
+pub enum ConnectionWriteError {
+ /// Encountered when trying to send a message while the connection is closed.
+ ConnectionClosed,
+
+ /// Encountered when there is an IO-level error with the connection.
+ IoError(std::io::Error),
+}
+
+impl Error for ConnectionWriteError {}
+
+impl std::fmt::Display for ConnectionWriteError {
+ fn fmt(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
+ match self {
+ ConnectionWriteError::ConnectionClosed => formatter.write_str("cannot send message when connection is closed"),
+ ConnectionWriteError::IoError(err) => std::fmt::Display::fmt(&err, formatter),
+ }
+ }
+}
/// An interface to write messages to the network connection.
///
pub(crate) fn write_pending_bytes(
&mut self,
cx: &mut Context<'_>,
- ) -> Poll<Result<(), RecvError>> {
+ ) -> Poll<Result<(), ConnectionWriteError>> {
if self.pending_writes.len() > 0 {
let stream = self.write_stream.as_mut();
Poll::Ready(Ok(()))
}
- Poll::Ready(Err(_e)) => {
+ Poll::Ready(Err(err)) => {
error!("Encountered error when writing to network stream");
- Poll::Ready(Err(RecvError))
+ Poll::Ready(Err(ConnectionWriteError::IoError(err)))
}
}
}
- Poll::Ready(Err(_e)) => {
+ Poll::Ready(Err(err)) => {
error!("Encountered error when flushing network stream");
- Poll::Ready(Err(RecvError))
+ Poll::Ready(Err(ConnectionWriteError::IoError(err)))
}
}
} else {
}
impl Sink<ConnectDatagram> for ConnectionWriter {
- type Error = RecvError;
+ type Error = ConnectionWriteError;
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.is_closed() {
trace!("Connection is closed, cannot send message");
- Poll::Ready(Err(RecvError))
+ Poll::Ready(Err(ConnectionWriteError::ConnectionClosed))
} else {
trace!("Connection ready to send message");
Poll::Ready(Ok(()))
Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
- Poll::Ready(Err(_e)) => Poll::Ready(Err(RecvError)),
+ Poll::Ready(Err(err)) => Poll::Ready(Err(ConnectionWriteError::IoError(err))),
}
}