]> git.lizzy.rs Git - connect-rs.git/blobdiff - src/writer.rs
make async-oriented, remove block_on
[connect-rs.git] / src / writer.rs
index f26ab4b12eba3719b98947c25fc80dc96fa5f24d..24030249110beb3768a0211e46654c779e488d71 100644 (file)
@@ -2,9 +2,9 @@ use crate::schema::ConnectionMessage;
 use async_channel::RecvError;
 use async_std::net::SocketAddr;
 use async_std::pin::Pin;
-use futures::{AsyncWrite, Sink};
 use futures::io::IoSlice;
 use futures::task::{Context, Poll};
+use futures::{AsyncWrite, Sink};
 use log::*;
 use protobuf::Message;
 
@@ -12,11 +12,11 @@ pub use futures::SinkExt;
 pub use futures::StreamExt;
 
 pub struct ConnectionWriter {
-    local_addr:     SocketAddr,
-    peer_addr:      SocketAddr,
-    write_stream:   Pin<Box<dyn AsyncWrite + Send + Sync>>,
+    local_addr: SocketAddr,
+    peer_addr: SocketAddr,
+    write_stream: Pin<Box<dyn AsyncWrite + Send + Sync>>,
     pending_writes: Vec<Vec<u8>>,
-    closed:         bool,
+    closed: bool,
 }
 
 impl ConnectionWriter {
@@ -77,10 +77,7 @@ impl<M: Message> Sink<M> for ConnectionWriter {
         }
     }
 
-    fn poll_flush(
-        mut self: Pin<&mut Self>,
-        cx: &mut Context<'_>,
-    ) -> Poll<Result<(), Self::Error>> {
+    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
         if self.pending_writes.len() > 0 {
             let stream = self.write_stream.as_mut();
 
@@ -91,9 +88,8 @@ impl<M: Message> Sink<M> for ConnectionWriter {
                     trace!("Sending pending bytes");
 
                     let pending = self.pending_writes.split_off(0);
-                    let writeable_vec: Vec<IoSlice> = pending.iter().map(|p| {
-                        IoSlice::new(p)
-                    }).collect();
+                    let writeable_vec: Vec<IoSlice> =
+                        pending.iter().map(|p| IoSlice::new(p)).collect();
 
                     let stream = self.write_stream.as_mut();
                     match stream.poll_write_vectored(cx, writeable_vec.as_slice()) {
@@ -102,14 +98,14 @@ impl<M: Message> Sink<M> for ConnectionWriter {
                         Poll::Ready(Ok(bytes_written)) => {
                             trace!("Wrote {} bytes to network stream", bytes_written);
                             Poll::Ready(Ok(()))
-                        },
+                        }
 
                         Poll::Ready(Err(_e)) => {
                             error!("Encountered error when writing to network stream");
                             Poll::Ready(Err(RecvError))
-                        },
+                        }
                     }
-                },
+                }
 
                 Poll::Ready(Err(_e)) => {
                     error!("Encountered error when flushing network stream");
@@ -121,10 +117,7 @@ impl<M: Message> Sink<M> for ConnectionWriter {
         }
     }
 
-    fn poll_close(
-        mut self: Pin<&mut Self>,
-        cx: &mut Context<'_>,
-    ) -> Poll<Result<(), Self::Error>> {
+    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
         self.closed = true;
 
         let flush = if self.pending_writes.len() > 0 {
@@ -137,9 +130,8 @@ impl<M: Message> Sink<M> for ConnectionWriter {
                     trace!("Sending pending bytes");
 
                     let pending = self.pending_writes.split_off(0);
-                    let writeable_vec: Vec<IoSlice> = pending.iter().map(|p| {
-                        IoSlice::new(p)
-                    }).collect();
+                    let writeable_vec: Vec<IoSlice> =
+                        pending.iter().map(|p| IoSlice::new(p)).collect();
 
                     let stream = self.write_stream.as_mut();
                     match stream.poll_write_vectored(cx, writeable_vec.as_slice()) {
@@ -148,14 +140,14 @@ impl<M: Message> Sink<M> for ConnectionWriter {
                         Poll::Ready(Ok(bytes_written)) => {
                             trace!("Wrote {} bytes to network stream", bytes_written);
                             Poll::Ready(Ok(()))
-                        },
+                        }
 
                         Poll::Ready(Err(_e)) => {
                             error!("Encountered error when writing to network stream");
                             Poll::Ready(Err(RecvError))
-                        },
+                        }
                     }
-                },
+                }
 
                 Poll::Ready(Err(_e)) => {
                     error!("Encountered error when flushing network stream");
@@ -179,7 +171,7 @@ impl<M: Message> Sink<M> for ConnectionWriter {
 
                     Poll::Ready(Err(_e)) => Poll::Ready(Err(RecvError)),
                 }
-            },
+            }
 
             err => err,
         }