1 use crate::protocol::ConnectDatagram;
2 use async_std::net::SocketAddr;
3 use async_std::pin::Pin;
4 use futures::io::IoSlice;
5 use futures::task::{Context, Poll};
6 use futures::{AsyncWrite, Sink};
10 pub use futures::{SinkExt, StreamExt};
13 /// Encountered when there is an issue with writing messages on the network stream.
16 pub enum ConnectionWriteError {
17 /// Encountered when trying to send a message while the connection is closed.
20 /// Encountered when there is an IO-level error with the connection.
21 IoError(std::io::Error),
24 impl Error for ConnectionWriteError {}
26 impl std::fmt::Display for ConnectionWriteError {
27 fn fmt(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
29 ConnectionWriteError::ConnectionClosed => {
30 formatter.write_str("cannot send message when connection is closed")
32 ConnectionWriteError::IoError(err) => std::fmt::Display::fmt(&err, formatter),
37 /// An interface to write messages to the network connection.
39 /// Implements the `Sink` trait to asynchronously write messages to the network connection.
46 /// writer.send(msg).await?;
49 /// Please see the [tcp-client](https://github.com/sachanganesh/connect-rs/blob/main/examples/tcp-client/)
50 /// example program or other client example programs for a more thorough showcase.
52 pub struct ConnectionWriter {
53 local_addr: SocketAddr,
54 peer_addr: SocketAddr,
55 write_stream: Pin<Box<dyn AsyncWrite + Send + Sync>>,
56 pending_writes: Vec<Vec<u8>>,
60 impl ConnectionWriter {
61 /// Creates a new [`ConnectionWriter`] from an [`AsyncWrite`] trait object and the local and peer
64 local_addr: SocketAddr,
65 peer_addr: SocketAddr,
66 write_stream: Pin<Box<dyn AsyncWrite + Send + Sync>>,
72 pending_writes: Vec::new(),
77 /// Get the local IP address and port.
78 pub fn local_addr(&self) -> SocketAddr {
79 self.local_addr.clone()
82 /// Get the peer IP address and port.
83 pub fn peer_addr(&self) -> SocketAddr {
84 self.peer_addr.clone()
87 /// Check if the `Sink` of messages to the network is closed.
88 pub fn is_closed(&self) -> bool {
92 pub(crate) fn write_pending_bytes(
95 ) -> Poll<Result<(), ConnectionWriteError>> {
96 if self.pending_writes.len() > 0 {
97 let stream = self.write_stream.as_mut();
99 match stream.poll_flush(cx) {
100 Poll::Pending => Poll::Pending,
102 Poll::Ready(Ok(_)) => {
103 let stream = self.write_stream.as_mut();
105 let split = self.pending_writes.split_off(0);
106 let pending: Vec<IoSlice> = split.iter().map(|p| IoSlice::new(p)).collect();
108 trace!("sending pending bytes to network stream");
109 match stream.poll_write_vectored(cx, pending.as_slice()) {
111 self.pending_writes = split;
115 Poll::Ready(Ok(bytes_written)) => {
116 trace!("wrote {} bytes to network stream", bytes_written);
120 Poll::Ready(Err(err)) => {
121 error!("Encountered error when writing to network stream");
122 self.pending_writes = split;
123 Poll::Ready(Err(ConnectionWriteError::IoError(err)))
128 Poll::Ready(Err(err)) => {
129 error!("Encountered error when flushing network stream");
130 Poll::Ready(Err(ConnectionWriteError::IoError(err)))
139 impl Sink<ConnectDatagram> for ConnectionWriter {
140 type Error = ConnectionWriteError;
142 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
143 if self.is_closed() {
144 trace!("connection is closed - cannot send message");
145 Poll::Ready(Err(ConnectionWriteError::ConnectionClosed))
147 trace!("connection ready to send message");
152 fn start_send(mut self: Pin<&mut Self>, item: ConnectDatagram) -> Result<(), Self::Error> {
153 trace!("preparing datagram to be queued for sending");
155 let buffer = item.into_bytes();
156 let msg_size = buffer.len();
157 trace!("serialized pending message into {} bytes", msg_size);
159 self.pending_writes.push(buffer);
164 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
165 self.write_pending_bytes(cx)
168 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
170 debug!("Closing the sink for connection with {}", self.peer_addr);
172 match self.write_pending_bytes(cx) {
173 Poll::Pending => Poll::Pending,
175 Poll::Ready(Ok(_)) => {
176 let stream = self.write_stream.as_mut();
178 match stream.poll_close(cx) {
179 Poll::Pending => Poll::Pending,
181 Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
183 Poll::Ready(Err(err)) => Poll::Ready(Err(ConnectionWriteError::IoError(err))),