1 use crate::protocol::ConnectDatagram;
2 use async_std::net::SocketAddr;
3 use async_std::pin::Pin;
4 use futures::io::IoSlice;
5 use futures::task::{Context, Poll};
6 use futures::{AsyncWrite, Sink};
10 pub use futures::SinkExt;
11 pub use futures::StreamExt;
14 /// Encountered when there is an issue with writing messages on the network stream.
17 pub enum ConnectionWriteError {
18 /// Encountered when trying to send a message while the connection is closed.
21 /// Encountered when there is an IO-level error with the connection.
22 IoError(std::io::Error),
25 impl Error for ConnectionWriteError {}
27 impl std::fmt::Display for ConnectionWriteError {
28 fn fmt(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
30 ConnectionWriteError::ConnectionClosed => formatter.write_str("cannot send message when connection is closed"),
31 ConnectionWriteError::IoError(err) => std::fmt::Display::fmt(&err, formatter),
36 /// An interface to write messages to the network connection.
38 /// Implements the [`Sink`] trait to asynchronously write messages to the network connection.
45 /// writer.send(msg).await?;
48 /// Please see the [tcp-client](https://github.com/sachanganesh/connect-rs/blob/main/examples/tcp-client/)
49 /// example program or other client example programs for a more thorough showcase.
51 pub struct ConnectionWriter {
52 local_addr: SocketAddr,
53 peer_addr: SocketAddr,
54 write_stream: Pin<Box<dyn AsyncWrite + Send + Sync>>,
55 pending_writes: Vec<Vec<u8>>,
59 impl ConnectionWriter {
60 /// Creates a new [`ConnectionWriter`] from an [`AsyncWrite`] trait object and the local and peer
63 local_addr: SocketAddr,
64 peer_addr: SocketAddr,
65 write_stream: Pin<Box<dyn AsyncWrite + Send + Sync>>,
71 pending_writes: Vec::new(),
76 /// Get the local IP address and port.
77 pub fn local_addr(&self) -> SocketAddr {
78 self.local_addr.clone()
81 /// Get the peer IP address and port.
82 pub fn peer_addr(&self) -> SocketAddr {
83 self.peer_addr.clone()
86 /// Check if the [`Sink`] of messages to the network is closed.
87 pub fn is_closed(&self) -> bool {
91 pub(crate) fn write_pending_bytes(
94 ) -> Poll<Result<(), ConnectionWriteError>> {
95 if self.pending_writes.len() > 0 {
96 let stream = self.write_stream.as_mut();
98 match stream.poll_flush(cx) {
99 Poll::Pending => Poll::Pending,
101 Poll::Ready(Ok(_)) => {
102 trace!("Sending pending bytes");
104 let pending = self.pending_writes.split_off(0);
105 let writeable_vec: Vec<IoSlice> =
106 pending.iter().map(|p| IoSlice::new(p)).collect();
108 let stream = self.write_stream.as_mut();
109 match stream.poll_write_vectored(cx, writeable_vec.as_slice()) {
110 Poll::Pending => Poll::Pending,
112 Poll::Ready(Ok(bytes_written)) => {
113 trace!("Wrote {} bytes to network stream", bytes_written);
117 Poll::Ready(Err(err)) => {
118 error!("Encountered error when writing to network stream");
119 Poll::Ready(Err(ConnectionWriteError::IoError(err)))
124 Poll::Ready(Err(err)) => {
125 error!("Encountered error when flushing network stream");
126 Poll::Ready(Err(ConnectionWriteError::IoError(err)))
135 impl Sink<ConnectDatagram> for ConnectionWriter {
136 type Error = ConnectionWriteError;
138 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
139 if self.is_closed() {
140 trace!("Connection is closed, cannot send message");
141 Poll::Ready(Err(ConnectionWriteError::ConnectionClosed))
143 trace!("Connection ready to send message");
148 fn start_send(mut self: Pin<&mut Self>, item: ConnectDatagram) -> Result<(), Self::Error> {
149 trace!("Preparing message to be sent next");
151 let buffer = item.encode();
152 let msg_size = buffer.len();
153 trace!("Serialized pending message into {} bytes", msg_size);
155 self.pending_writes.push(buffer);
160 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
161 self.write_pending_bytes(cx)
164 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
167 match self.write_pending_bytes(cx) {
168 Poll::Pending => Poll::Pending,
170 Poll::Ready(Ok(_)) => {
171 let stream = self.write_stream.as_mut();
173 match stream.poll_close(cx) {
174 Poll::Pending => Poll::Pending,
176 Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
178 Poll::Ready(Err(err)) => Poll::Ready(Err(ConnectionWriteError::IoError(err))),