1 //! This crate provides a reliable, fault-tolerant, and brokerless message-queue abstraction over
2 //! asynchronous network streams.
5 //! When building networked applications, developers shouldn't have to focus on repeatedly solving
6 //! the problem of reliable, fault-tolerant message delivery over byte-streams. By using a message
7 //! queue abstraction, crate users can focus on core application logic and leave the low-level
8 //! networking and message-queue guarantees to the abstraction.
11 //! This crate relies on the use of [Protocol Buffers](https://developers.google.com/protocol-buffers)
12 //! due to it being widely adopted and industry-proven. All messages are Protobuf messages that
13 //! are packed into a Protobuf `Any` type and then sent over the wire. Message recipients must
14 //! decide what Protobuf message type it is, and correspondingly unpack the `Any` into a particular
18 //! Please use the [examples](https://github.com/sachanganesh/connect-rs/tree/main/examples)
19 //! provided to help understand crate usage.
22 pub(crate) mod schema;
27 pub use crate::reader::ConnectionReader;
28 pub use crate::writer::ConnectionWriter;
29 use async_std::{net::SocketAddr, pin::Pin};
30 use futures::{AsyncRead, AsyncWrite};
31 pub use futures::{SinkExt, StreamExt};
33 /// Wrapper around a [`ConnectionReader`] and [`ConnectionWriter`] to read and write on a network
35 pub struct Connection {
36 local_addr: SocketAddr,
37 peer_addr: SocketAddr,
38 reader: ConnectionReader,
39 writer: ConnectionWriter,
45 local_addr: SocketAddr,
46 peer_addr: SocketAddr,
47 read_stream: Pin<Box<dyn AsyncRead + Send + Sync>>,
48 write_stream: Pin<Box<dyn AsyncWrite + Send + Sync>>,
53 reader: ConnectionReader::new(local_addr, peer_addr, read_stream),
54 writer: ConnectionWriter::new(local_addr, peer_addr, write_stream),
58 /// Get the local IP address and port
59 pub fn local_addr(&self) -> SocketAddr {
60 self.local_addr.clone()
63 /// Get the peer IP address and port
64 pub fn peer_addr(&self) -> SocketAddr {
65 self.peer_addr.clone()
68 /// Consume the [`Connection`] to split into separate [`ConnectionReader`] and
69 /// [`ConnectionWriter`] halves
71 /// [`Connection`]s are split when reading and writing must be concurrent operations.
72 pub fn split(self) -> (ConnectionReader, ConnectionWriter) {
73 (self.reader, self.writer)
76 /// Re-wrap the [`ConnectionReader`] and [`ConnectionWriter`] halves into a [`Connection`]
77 pub fn join(reader: ConnectionReader, writer: ConnectionWriter) -> Self {
79 local_addr: reader.local_addr(),
80 peer_addr: reader.peer_addr(),
86 /// Get mutable access to the underlying [`ConnectionReader`]
87 pub fn reader(&mut self) -> &mut ConnectionReader {
91 /// Get mutable access to the underlying [`ConnectionWriter`]
92 pub fn writer(&mut self) -> &mut ConnectionWriter {
96 /// Close the connection by closing both the reading and writing halves
97 pub async fn close(self) -> SocketAddr {
98 let peer_addr = self.peer_addr();
99 let (reader, writer) = self.split();
103 // writer.close().await;