]> git.lizzy.rs Git - connect-rs.git/blob - src/lib.rs
renamed server to listener and add thorough documentation
[connect-rs.git] / src / lib.rs
1 //! This crate provides a reliable, fault-tolerant, and brokerless message-queue abstraction over
2 //! asynchronous network streams.
3 //!
4 //! # Why?
5 //! When building networked applications, developers shouldn't have to focus on repeatedly solving
6 //! the problem of reliable, fault-tolerant message delivery over byte-streams. By using a message
7 //! queue abstraction, crate users can focus on core application logic and leave the low-level
8 //! networking and message-queue guarantees to the abstraction.
9 //!
10 //! # Protobuf
11 //! This crate relies on the use of [Protocol Buffers](https://developers.google.com/protocol-buffers)
12 //! due to it being widely adopted and industry-proven. All messages are Protobuf messages that
13 //! are packed into a Protobuf `Any` type and then sent over the wire. Message recipients must
14 //! decide what Protobuf message type it is, and correspondingly unpack the `Any` into a particular
15 //! message type.
16 //!
17 //! # Examples
18 //! Please use the [examples](https://github.com/sachanganesh/connect-rs/tree/main/examples)
19 //! provided to help understand crate usage.
20
21 mod reader;
22 pub(crate) mod schema;
23 pub mod tcp;
24 pub mod tls;
25 mod writer;
26
27 pub use crate::reader::ConnectionReader;
28 pub use crate::writer::ConnectionWriter;
29 use async_std::{net::SocketAddr, pin::Pin};
30 use futures::{AsyncRead, AsyncWrite};
31 pub use futures::{SinkExt, StreamExt};
32
33 /// Wrapper around a [`ConnectionReader`] and [`ConnectionWriter`] to read and write on a network
34 /// connection
35 pub struct Connection {
36     local_addr: SocketAddr,
37     peer_addr: SocketAddr,
38     reader: ConnectionReader,
39     writer: ConnectionWriter,
40 }
41
42 #[allow(dead_code)]
43 impl Connection {
44     pub(crate) fn new(
45         local_addr: SocketAddr,
46         peer_addr: SocketAddr,
47         read_stream: Pin<Box<dyn AsyncRead + Send + Sync>>,
48         write_stream: Pin<Box<dyn AsyncWrite + Send + Sync>>,
49     ) -> Self {
50         Self {
51             local_addr,
52             peer_addr,
53             reader: ConnectionReader::new(local_addr, peer_addr, read_stream),
54             writer: ConnectionWriter::new(local_addr, peer_addr, write_stream),
55         }
56     }
57
58     /// Get the local IP address and port
59     pub fn local_addr(&self) -> SocketAddr {
60         self.local_addr.clone()
61     }
62
63     /// Get the peer IP address and port
64     pub fn peer_addr(&self) -> SocketAddr {
65         self.peer_addr.clone()
66     }
67
68     /// Consume the [`Connection`] to split into separate [`ConnectionReader`] and
69     /// [`ConnectionWriter`] halves
70     ///
71     /// [`Connection`]s are split when reading and writing must be concurrent operations.
72     pub fn split(self) -> (ConnectionReader, ConnectionWriter) {
73         (self.reader, self.writer)
74     }
75
76     /// Re-wrap the [`ConnectionReader`] and [`ConnectionWriter`] halves into a [`Connection`]
77     pub fn join(reader: ConnectionReader, writer: ConnectionWriter) -> Self {
78         Self {
79             local_addr: reader.local_addr(),
80             peer_addr: reader.peer_addr(),
81             reader,
82             writer,
83         }
84     }
85
86     /// Get mutable access to the underlying [`ConnectionReader`]
87     pub fn reader(&mut self) -> &mut ConnectionReader {
88         &mut self.reader
89     }
90
91     /// Get mutable access to the underlying [`ConnectionWriter`]
92     pub fn writer(&mut self) -> &mut ConnectionWriter {
93         &mut self.writer
94     }
95
96     /// Close the connection by closing both the reading and writing halves
97     pub async fn close(self) -> SocketAddr {
98         let peer_addr = self.peer_addr();
99         let (reader, writer) = self.split();
100
101         drop(reader);
102
103         // writer.close().await;
104         drop(writer);
105
106         return peer_addr;
107     }
108 }