1 use crate::protocol::ConnectDatagram;
2 use async_std::net::SocketAddr;
3 use async_std::pin::Pin;
4 use futures::io::IoSlice;
5 use futures::task::{Context, Poll};
6 use futures::{AsyncWrite, Sink};
10 pub use futures::SinkExt;
11 pub use futures::StreamExt;
14 /// Encountered when there is an issue with writing messages on the network stream.
17 pub enum ConnectionWriteError {
18 /// Encountered when trying to send a message while the connection is closed.
21 /// Encountered when there is an IO-level error with the connection.
22 IoError(std::io::Error),
25 impl Error for ConnectionWriteError {}
27 impl std::fmt::Display for ConnectionWriteError {
28 fn fmt(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
30 ConnectionWriteError::ConnectionClosed => {
31 formatter.write_str("cannot send message when connection is closed")
33 ConnectionWriteError::IoError(err) => std::fmt::Display::fmt(&err, formatter),
38 /// An interface to write messages to the network connection.
40 /// Implements the [`Sink`] trait to asynchronously write messages to the network connection.
47 /// writer.send(msg).await?;
50 /// Please see the [tcp-client](https://github.com/sachanganesh/connect-rs/blob/main/examples/tcp-client/)
51 /// example program or other client example programs for a more thorough showcase.
53 pub struct ConnectionWriter {
54 local_addr: SocketAddr,
55 peer_addr: SocketAddr,
56 write_stream: Pin<Box<dyn AsyncWrite + Send + Sync>>,
57 pending_writes: Vec<Vec<u8>>,
61 impl ConnectionWriter {
62 /// Creates a new [`ConnectionWriter`] from an [`AsyncWrite`] trait object and the local and peer
65 local_addr: SocketAddr,
66 peer_addr: SocketAddr,
67 write_stream: Pin<Box<dyn AsyncWrite + Send + Sync>>,
73 pending_writes: Vec::new(),
78 /// Get the local IP address and port.
79 pub fn local_addr(&self) -> SocketAddr {
80 self.local_addr.clone()
83 /// Get the peer IP address and port.
84 pub fn peer_addr(&self) -> SocketAddr {
85 self.peer_addr.clone()
88 /// Check if the [`Sink`] of messages to the network is closed.
89 pub fn is_closed(&self) -> bool {
93 pub(crate) fn write_pending_bytes(
96 ) -> Poll<Result<(), ConnectionWriteError>> {
97 if self.pending_writes.len() > 0 {
98 let stream = self.write_stream.as_mut();
100 match stream.poll_flush(cx) {
101 Poll::Pending => Poll::Pending,
103 Poll::Ready(Ok(_)) => {
104 trace!("Sending pending bytes");
106 let pending = self.pending_writes.split_off(0);
107 let writeable_vec: Vec<IoSlice> =
108 pending.iter().map(|p| IoSlice::new(p)).collect();
110 let stream = self.write_stream.as_mut();
111 match stream.poll_write_vectored(cx, writeable_vec.as_slice()) {
112 Poll::Pending => Poll::Pending,
114 Poll::Ready(Ok(bytes_written)) => {
115 trace!("Wrote {} bytes to network stream", bytes_written);
119 Poll::Ready(Err(err)) => {
120 error!("Encountered error when writing to network stream");
121 Poll::Ready(Err(ConnectionWriteError::IoError(err)))
126 Poll::Ready(Err(err)) => {
127 error!("Encountered error when flushing network stream");
128 Poll::Ready(Err(ConnectionWriteError::IoError(err)))
137 impl Sink<ConnectDatagram> for ConnectionWriter {
138 type Error = ConnectionWriteError;
140 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
141 if self.is_closed() {
142 trace!("Connection is closed, cannot send message");
143 Poll::Ready(Err(ConnectionWriteError::ConnectionClosed))
145 trace!("Connection ready to send message");
150 fn start_send(mut self: Pin<&mut Self>, item: ConnectDatagram) -> Result<(), Self::Error> {
151 trace!("Preparing message to be sent next");
153 let buffer = item.encode();
154 let msg_size = buffer.len();
155 trace!("Serialized pending message into {} bytes", msg_size);
157 self.pending_writes.push(buffer);
162 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
163 self.write_pending_bytes(cx)
166 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
169 match self.write_pending_bytes(cx) {
170 Poll::Pending => Poll::Pending,
172 Poll::Ready(Ok(_)) => {
173 let stream = self.write_stream.as_mut();
175 match stream.poll_close(cx) {
176 Poll::Pending => Poll::Pending,
178 Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
180 Poll::Ready(Err(err)) => Poll::Ready(Err(ConnectionWriteError::IoError(err))),