]> git.lizzy.rs Git - connect-rs.git/commitdiff
revert back to using vectored writes to the network stream
authorSachandhan Ganesh <sachan.ganesh@gmail.com>
Sun, 14 Feb 2021 07:06:41 +0000 (23:06 -0800)
committerSachandhan Ganesh <sachan.ganesh@gmail.com>
Sun, 14 Feb 2021 07:06:41 +0000 (23:06 -0800)
src/writer.rs

index 040bafde9220a13456315253bc6a6784808718d4..87ad3e6a029c93b1b6069a48832ebffa8a74d3e9 100644 (file)
@@ -1,13 +1,13 @@
 use crate::protocol::ConnectDatagram;
 use async_std::net::SocketAddr;
 use async_std::pin::Pin;
+use futures::io::IoSlice;
 use futures::task::{Context, Poll};
 use futures::{AsyncWrite, Sink};
 use log::*;
 use std::error::Error;
 
-pub use futures::SinkExt;
-pub use futures::StreamExt;
+pub use futures::{SinkExt, StreamExt};
 use std::fmt::Debug;
 
 /// Encountered when there is an issue with writing messages on the network stream.
@@ -53,7 +53,7 @@ pub struct ConnectionWriter {
     local_addr: SocketAddr,
     peer_addr: SocketAddr,
     write_stream: Pin<Box<dyn AsyncWrite + Send + Sync>>,
-    pending_writes: Vec<u8>,
+    pending_writes: Vec<Vec<u8>>,
     closed: bool,
 }
 
@@ -102,8 +102,12 @@ impl ConnectionWriter {
                 Poll::Ready(Ok(_)) => {
                     let stream = self.write_stream.as_mut();
 
+                    let pending = self.pending_writes.split_off(0);
+                    let writeable_vec: Vec<IoSlice> =
+                        pending.iter().map(|p| IoSlice::new(p)).collect();
+
                     trace!("sending pending bytes to network stream");
-                    match stream.poll_write(cx, self.pending_writes.as_slice()) {
+                    match stream.poll_write_vectored(cx, writeable_vec.as_slice()) {
                         Poll::Pending => Poll::Pending,
 
                         Poll::Ready(Ok(bytes_written)) => {
@@ -150,7 +154,7 @@ impl Sink<ConnectDatagram> for ConnectionWriter {
         let msg_size = buffer.len();
         trace!("serialized pending message into {} bytes", msg_size);
 
-        self.pending_writes.extend(buffer);
+        self.pending_writes.push(buffer);
 
         Ok(())
     }