1 use async_channel::RecvError;
2 use async_std::net::SocketAddr;
3 use async_std::pin::Pin;
4 use futures::io::IoSlice;
5 use futures::task::{Context, Poll};
6 use futures::{AsyncWrite, Sink};
9 use crate::protocol::ConnectDatagram;
10 pub use futures::SinkExt;
11 pub use futures::StreamExt;
13 /// An interface to write messages to the network connection
15 /// Implements the [`Sink`] trait to asynchronously write messages to the network connection.
22 /// writer.send(msg).await?;
25 /// Please see the [tcp-client](https://github.com/sachanganesh/connect-rs/blob/main/examples/tcp-client/)
26 /// example program or other client example programs for a more thorough showcase.
28 pub struct ConnectionWriter {
29 local_addr: SocketAddr,
30 peer_addr: SocketAddr,
31 write_stream: Pin<Box<dyn AsyncWrite + Send + Sync>>,
32 pending_writes: Vec<Vec<u8>>,
36 impl ConnectionWriter {
37 /// Creates a new [`ConnectionWriter`] from an [`AsyncWrite`] trait object and the local and peer
40 local_addr: SocketAddr,
41 peer_addr: SocketAddr,
42 write_stream: Pin<Box<dyn AsyncWrite + Send + Sync>>,
48 pending_writes: Vec::new(),
53 /// Get the local IP address and port
54 pub fn local_addr(&self) -> SocketAddr {
55 self.local_addr.clone()
58 /// Get the peer IP address and port
59 pub fn peer_addr(&self) -> SocketAddr {
60 self.peer_addr.clone()
63 /// Check if the [`Sink`] of messages to the network is closed
64 pub fn is_closed(&self) -> bool {
68 pub(crate) fn write_pending_bytes(
71 ) -> Poll<Result<(), RecvError>> {
72 if self.pending_writes.len() > 0 {
73 let stream = self.write_stream.as_mut();
75 match stream.poll_flush(cx) {
76 Poll::Pending => Poll::Pending,
78 Poll::Ready(Ok(_)) => {
79 trace!("Sending pending bytes");
81 let pending = self.pending_writes.split_off(0);
82 let writeable_vec: Vec<IoSlice> =
83 pending.iter().map(|p| IoSlice::new(p)).collect();
85 let stream = self.write_stream.as_mut();
86 match stream.poll_write_vectored(cx, writeable_vec.as_slice()) {
87 Poll::Pending => Poll::Pending,
89 Poll::Ready(Ok(bytes_written)) => {
90 trace!("Wrote {} bytes to network stream", bytes_written);
94 Poll::Ready(Err(_e)) => {
95 error!("Encountered error when writing to network stream");
96 Poll::Ready(Err(RecvError))
101 Poll::Ready(Err(_e)) => {
102 error!("Encountered error when flushing network stream");
103 Poll::Ready(Err(RecvError))
112 impl Sink<ConnectDatagram> for ConnectionWriter {
113 type Error = RecvError;
115 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
116 if self.is_closed() {
117 trace!("Connection is closed, cannot send message");
118 Poll::Ready(Err(RecvError))
120 trace!("Connection ready to send message");
125 fn start_send(mut self: Pin<&mut Self>, item: ConnectDatagram) -> Result<(), Self::Error> {
126 trace!("Preparing message to be sent next");
128 let buffer = item.encode();
129 let msg_size = buffer.len();
130 trace!("Serialized pending message into {} bytes", msg_size);
132 self.pending_writes.push(buffer);
137 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
138 self.write_pending_bytes(cx)
141 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
144 match self.write_pending_bytes(cx) {
145 Poll::Pending => Poll::Pending,
147 Poll::Ready(Ok(_)) => {
148 let stream = self.write_stream.as_mut();
150 match stream.poll_close(cx) {
151 Poll::Pending => Poll::Pending,
153 Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
155 Poll::Ready(Err(_e)) => Poll::Ready(Err(RecvError)),