]> git.lizzy.rs Git - connect-rs.git/blob - src/lib.rs
refactor to have datagram already serialized in memory
[connect-rs.git] / src / lib.rs
1 //! This Rust crate provides a simple, brokerless message-queue abstraction over asynchronous
2 //! network streams. It guarantees ordered message delivery and reception, and both TCP and TLS
3 //! transports are supported.
4 //!
5 //! # Examples
6 //!
7 //! ````ignore
8 //! // create a client connection to the server
9 //! let mut conn = Connection::tcp_client(ip_address).await?;
10 //!
11 //! // construct a new message
12 //! let msg = String::from("Hello world!");
13 //! let envelope: ConnectDatagram = ConnectDatagram::new(65535, msg.into_bytes())?;
14 //!
15 //! // send a message to the server
16 //! conn.writer().send(envelope).await?;
17 //!
18 //! // wait for the echo-server to reply with an echo
19 //! if let Some(mut envelope) = conn.reader().next().await {
20 //!     // take the message payload from the envelope
21 //!     let data: Vec<u8> = envelope.take_data().unwrap();
22 //!
23 //!     // reconstruct the original message
24 //!     let msg = String::from_utf8(data)?;
25 //!     assert_eq!("Hello world!", msg.as_str());
26 //! }
27 //! ````
28 //!
29 //! In addition to the [crate documentation](https://docs.rs/connect/latest/connect/), please use
30 //! the provided [example programs](https://github.com/sachanganesh/connect-rs/tree/main/examples)
31 //! as a practical reference for crate usage.
32 //!
33 //! - TCP
34 //!     - [TCP Echo Server](https://github.com/sachanganesh/connect-rs/tree/main/examples/tcp-echo-server)
35 //!     - [TCP Client](https://github.com/sachanganesh/connect-rs/tree/main/examples/tcp-client)
36 //! - TLS (enable `tls` feature flag)
37 //!     - [TLS Echo Server](https://github.com/sachanganesh/connect-rs/tree/main/examples/tls-echo-server)
38 //!     - [TLS Client](https://github.com/sachanganesh/connect-rs/tree/main/examples/tls-client)
39 //!
40 //! # Why?
41 //!
42 //! When building networked applications, developers shouldn't have to focus on repeatedly solving
43 //! the problem of reliable, ordered message delivery over byte-streams. By using a message
44 //! queue abstraction, crate users can focus on core application logic and leave the low-level
45 //! networking and message-queue guarantees to the abstraction.
46 //!
47 //! Connect provides a `ConnectionWriter` and `ConnectionReader` interface to concurrently send
48 //! and receive messages over a network connection. Each user-provided message is prefixed by 8
49 //! bytes, containing a size-prefix (4 bytes), version tag (2 bytes), and recipient tag (2 bytes).
50 //! The size-prefix and version tag are used internally to deserialize messages received from the
51 //! network connection. The recipient tag is intended for crate users to identify the message
52 //! recipient, although the library leaves that up to the user's discretion.
53 //!
54 //! Library users must serialize their custom messages into bytes (`Vec<u8>`), prior to
55 //! constructing a `ConnectDatagram`, which can then be passed to a `ConnectionWriter`.
56 //! Consequently, `ConnectionReader`s will return `ConnectDatagram`s containing the message payload
57 //! (`Vec<u8>` again) to the user to deserialize.
58 //!
59 //! Requiring crate users to serialize data before constructing a datagram may appear redundant, but
60 //! gives the developer the freedom to use a serialization format of their choosing. This means that
61 //! library users can do interesting things such as:
62 //!
63 //! - Use the recipient tag to signify which serialization format was used for that message
64 //! - Use the recipient tag to signify the type of message being sent
65 //!
66 //! # Feature Flags
67 //!
68 //! - `tls`: enables usage of tls transport functionality
69 //!
70
71 // #![feature(doc_cfg)]
72
73 mod protocol;
74 mod reader;
75 pub mod tcp;
76 pub mod udp;
77 mod writer;
78
79 #[cfg(feature = "tls")]
80 // #[doc(cfg(feature = "tls"))]
81 pub mod tls;
82
83 use async_std::{net::SocketAddr, pin::Pin};
84 use futures::{AsyncRead, AsyncWrite, Sink, Stream};
85
86 pub use crate::protocol::{
87     ConnectDatagram, DatagramError, DATAGRAM_HEADER_BYTE_SIZE, SIZE_PREFIX_BYTE_SIZE,
88 };
89 pub use crate::reader::ConnectionReader;
90 pub use crate::writer::{ConnectionWriteError, ConnectionWriter};
91 pub use futures::{SinkExt, StreamExt};
92
93 /// Wrapper around a [`ConnectionReader`] and [`ConnectionWriter`] to read and write on a network
94 /// connection.
95 pub struct Connection {
96     local_addr: SocketAddr,
97     peer_addr: SocketAddr,
98     reader: Box<dyn Stream<Item = ConnectDatagram> + Unpin + Send + Sync>,
99     writer: Box<dyn Sink<ConnectDatagram, Error = ConnectionWriteError> + Unpin + Send + Sync>,
100 }
101
102 #[allow(dead_code)]
103 impl Connection {
104     pub(crate) fn new(
105         local_addr: SocketAddr,
106         peer_addr: SocketAddr,
107         read_stream: Pin<Box<dyn AsyncRead + Send + Sync>>,
108         write_stream: Pin<Box<dyn AsyncWrite + Send + Sync>>,
109     ) -> Self {
110         Self {
111             local_addr,
112             peer_addr,
113             reader: Box::new(ConnectionReader::new(local_addr, peer_addr, read_stream)),
114             writer: Box::new(ConnectionWriter::new(local_addr, peer_addr, write_stream)),
115         }
116     }
117
118     /// Get the local IP address and port.
119     pub fn local_addr(&self) -> SocketAddr {
120         self.local_addr.clone()
121     }
122
123     /// Get the peer IP address and port.
124     pub fn peer_addr(&self) -> SocketAddr {
125         self.peer_addr.clone()
126     }
127
128     /// Consume the [`Connection`] to split into separate [`ConnectionReader`] and
129     /// [`ConnectionWriter`] halves.
130     ///
131     /// [`Connection`]s are split when reading and writing must be concurrent operations.
132     pub fn split(
133         self,
134     ) -> (
135         impl Stream<Item = ConnectDatagram> + Send + Sync,
136         impl Sink<ConnectDatagram, Error = ConnectionWriteError> + Send + Sync,
137     ) {
138         (self.reader, self.writer)
139     }
140
141     /// Re-wrap the [`ConnectionReader`] and [`ConnectionWriter`] halves into a [`Connection`].
142     pub fn join(
143         local_addr: SocketAddr,
144         peer_addr: SocketAddr,
145         reader: impl Stream<Item = ConnectDatagram> + Unpin + Send + Sync + 'static,
146         writer: impl Sink<ConnectDatagram, Error = ConnectionWriteError> + Unpin + Send + Sync + 'static,
147     ) -> Self {
148         Self {
149             local_addr,
150             peer_addr,
151             reader: Box::new(reader),
152             writer: Box::new(writer),
153         }
154     }
155
156     /// Get mutable access to the underlying [`ConnectionReader`].
157     pub fn reader(&mut self) -> &mut impl Stream<Item = ConnectDatagram> {
158         &mut self.reader
159     }
160
161     /// Get mutable access to the underlying [`ConnectionWriter`].
162     pub fn writer(&mut self) -> &mut impl Sink<ConnectDatagram, Error = ConnectionWriteError> {
163         &mut self.writer
164     }
165
166     /// Close the connection by closing both the reading and writing halves.
167     pub async fn close(self) -> SocketAddr {
168         let peer_addr = self.peer_addr();
169         let (reader, writer) = self.split();
170
171         drop(reader);
172
173         // writer.close().await;
174         drop(writer);
175
176         return peer_addr;
177     }
178 }
179
180 #[cfg(test)]
181 mod tests {}