1 use crate::protocol::ConnectDatagram;
2 use async_std::net::SocketAddr;
3 use async_std::pin::Pin;
4 use bytes::{Buf, BytesMut};
5 use futures::task::{Context, Poll};
6 use futures::{AsyncRead, Stream};
10 pub use futures::SinkExt;
11 pub use futures::StreamExt;
13 /// A default buffer size to read in bytes and then deserialize as messages.
14 const BUFFER_SIZE: usize = 8192;
16 /// An interface to read messages from the network connection.
18 /// Implements the [`Stream`] trait to asynchronously read messages from the network connection.
25 /// while let Some(msg) = reader.next().await {
26 /// // handle the received message
30 /// Please see the [tcp-client](https://github.com/sachanganesh/connect-rs/blob/main/examples/tcp-client/)
31 /// example program or other client example programs for a more thorough showcase.
34 pub struct ConnectionReader {
35 local_addr: SocketAddr,
36 peer_addr: SocketAddr,
37 read_stream: Pin<Box<dyn AsyncRead + Send + Sync>>,
38 pending_read: Option<BytesMut>,
42 impl ConnectionReader {
43 /// Creates a new [`ConnectionReader`] from an [`AsyncRead`] trait object and the local and peer
46 local_addr: SocketAddr,
47 peer_addr: SocketAddr,
48 read_stream: Pin<Box<dyn AsyncRead + Send + Sync>>,
59 /// Get the local IP address and port.
60 pub fn local_addr(&self) -> SocketAddr {
61 self.local_addr.clone()
64 /// Get the peer IP address and port.
65 pub fn peer_addr(&self) -> SocketAddr {
66 self.peer_addr.clone()
69 /// Check if the [`Stream`] of messages from the network is closed.
70 pub fn is_closed(&self) -> bool {
74 pub(crate) fn close_stream(&mut self) {
75 trace!("Closing the stream for connection with {}", self.peer_addr);
76 self.pending_read.take();
81 impl Stream for ConnectionReader {
82 type Item = ConnectDatagram;
84 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
85 let mut buffer = BytesMut::new();
86 buffer.resize(BUFFER_SIZE, 0);
88 trace!("Starting new read loop for {}", self.local_addr);
90 trace!("Reading from the stream");
91 let stream = self.read_stream.as_mut();
93 match stream.poll_read(cx, &mut buffer) {
94 Poll::Pending => return Poll::Pending,
96 Poll::Ready(Ok(mut bytes_read)) => {
98 trace!("Read {} bytes from the network stream", bytes_read)
99 } else if self.pending_read.is_none() {
101 return Poll::Ready(None);
104 if let Some(mut pending_buf) = self.pending_read.take() {
105 trace!("Prepending broken data ({} bytes) encountered from earlier read of network stream", pending_buf.len());
106 bytes_read += pending_buf.len();
108 pending_buf.unsplit(buffer);
109 buffer = pending_buf;
112 while bytes_read > 0 {
113 trace!("{} bytes from network stream still unprocessed", bytes_read);
115 buffer.resize(bytes_read, 0);
117 let mut cursor = Cursor::new(buffer.as_mut());
118 match ConnectDatagram::decode(&mut cursor) {
120 return match data.version() {
122 let serialized_size = data.size();
124 "Deserialized message of size {} bytes",
128 buffer.advance(serialized_size);
129 bytes_read -= serialized_size;
130 trace!("{} bytes still unprocessed", bytes_read);
132 trace!("Sending deserialized message downstream");
133 Poll::Ready(Some(data))
140 "Could not deserialize data from the received bytes: {:#?}",
144 self.pending_read = Some(buffer);
145 buffer = BytesMut::new();
151 buffer.resize(BUFFER_SIZE, 0);
155 Poll::Ready(Err(_e)) => {
157 return Poll::Ready(None);