1 use crate::schema::ConnectionMessage;
2 use async_channel::RecvError;
3 use async_std::net::SocketAddr;
4 use async_std::pin::Pin;
5 use futures::{AsyncWrite, Sink};
6 use futures::io::IoSlice;
7 use futures::task::{Context, Poll};
11 pub use futures::SinkExt;
12 pub use futures::StreamExt;
14 pub struct ConnectionWriter {
15 local_addr: SocketAddr,
16 peer_addr: SocketAddr,
17 write_stream: Pin<Box<dyn AsyncWrite + Send + Sync>>,
18 pending_writes: Vec<Vec<u8>>,
22 impl ConnectionWriter {
24 local_addr: SocketAddr,
25 peer_addr: SocketAddr,
26 write_stream: Pin<Box<dyn AsyncWrite + Send + Sync>>,
32 pending_writes: Vec::new(),
37 pub fn local_addr(&self) -> SocketAddr {
38 self.local_addr.clone()
41 pub fn peer_addr(&self) -> SocketAddr {
42 self.peer_addr.clone()
45 pub fn is_closed(&self) -> bool {
50 impl<M: Message> Sink<M> for ConnectionWriter {
51 type Error = RecvError;
53 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
55 trace!("Connection is closed, cannot send message");
56 Poll::Ready(Err(RecvError))
58 trace!("Connection ready to send message");
63 fn start_send(mut self: Pin<&mut Self>, item: M) -> Result<(), Self::Error> {
64 trace!("Preparing message to be sent next");
65 let msg: ConnectionMessage = ConnectionMessage::from_msg(item);
67 if let Ok(buffer) = msg.write_to_bytes() {
68 let msg_size = buffer.len();
69 trace!("Serialized pending message into {} bytes", msg_size);
71 self.pending_writes.push(buffer);
75 error!("Encountered error when serializing message to bytes");
81 mut self: Pin<&mut Self>,
83 ) -> Poll<Result<(), Self::Error>> {
84 if self.pending_writes.len() > 0 {
85 let stream = self.write_stream.as_mut();
87 match stream.poll_flush(cx) {
88 Poll::Pending => Poll::Pending,
90 Poll::Ready(Ok(_)) => {
91 trace!("Sending pending bytes");
93 let pending = self.pending_writes.split_off(0);
94 let writeable_vec: Vec<IoSlice> = pending.iter().map(|p| {
98 let stream = self.write_stream.as_mut();
99 match stream.poll_write_vectored(cx, writeable_vec.as_slice()) {
100 Poll::Pending => Poll::Pending,
102 Poll::Ready(Ok(bytes_written)) => {
103 trace!("Wrote {} bytes to network stream", bytes_written);
107 Poll::Ready(Err(_e)) => {
108 error!("Encountered error when writing to network stream");
109 Poll::Ready(Err(RecvError))
114 Poll::Ready(Err(_e)) => {
115 error!("Encountered error when flushing network stream");
116 Poll::Ready(Err(RecvError))
125 mut self: Pin<&mut Self>,
126 cx: &mut Context<'_>,
127 ) -> Poll<Result<(), Self::Error>> {
130 let flush = if self.pending_writes.len() > 0 {
131 let stream = self.write_stream.as_mut();
133 match stream.poll_flush(cx) {
134 Poll::Pending => Poll::Pending,
136 Poll::Ready(Ok(_)) => {
137 trace!("Sending pending bytes");
139 let pending = self.pending_writes.split_off(0);
140 let writeable_vec: Vec<IoSlice> = pending.iter().map(|p| {
144 let stream = self.write_stream.as_mut();
145 match stream.poll_write_vectored(cx, writeable_vec.as_slice()) {
146 Poll::Pending => Poll::Pending,
148 Poll::Ready(Ok(bytes_written)) => {
149 trace!("Wrote {} bytes to network stream", bytes_written);
153 Poll::Ready(Err(_e)) => {
154 error!("Encountered error when writing to network stream");
155 Poll::Ready(Err(RecvError))
160 Poll::Ready(Err(_e)) => {
161 error!("Encountered error when flushing network stream");
162 Poll::Ready(Err(RecvError))
170 Poll::Pending => Poll::Pending,
172 Poll::Ready(Ok(_)) => {
173 let stream = self.write_stream.as_mut();
175 match stream.poll_close(cx) {
176 Poll::Pending => Poll::Pending,
178 Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
180 Poll::Ready(Err(_e)) => Poll::Ready(Err(RecvError)),