]> git.lizzy.rs Git - connect-rs.git/blobdiff - src/writer.rs
refactor read/write for correctness and ordering of messages
[connect-rs.git] / src / writer.rs
index 9b48c02899f7b9d73d1db13ffb693a9dfa9147b8..040bafde9220a13456315253bc6a6784808718d4 100644 (file)
@@ -1,7 +1,6 @@
 use crate::protocol::ConnectDatagram;
 use async_std::net::SocketAddr;
 use async_std::pin::Pin;
-use futures::io::IoSlice;
 use futures::task::{Context, Poll};
 use futures::{AsyncWrite, Sink};
 use log::*;
@@ -54,7 +53,7 @@ pub struct ConnectionWriter {
     local_addr: SocketAddr,
     peer_addr: SocketAddr,
     write_stream: Pin<Box<dyn AsyncWrite + Send + Sync>>,
-    pending_writes: Vec<Vec<u8>>,
+    pending_writes: Vec<u8>,
     closed: bool,
 }
 
@@ -101,18 +100,15 @@ impl ConnectionWriter {
                 Poll::Pending => Poll::Pending,
 
                 Poll::Ready(Ok(_)) => {
-                    trace!("Sending pending bytes");
-
-                    let pending = self.pending_writes.split_off(0);
-                    let writeable_vec: Vec<IoSlice> =
-                        pending.iter().map(|p| IoSlice::new(p)).collect();
-
                     let stream = self.write_stream.as_mut();
-                    match stream.poll_write_vectored(cx, writeable_vec.as_slice()) {
+
+                    trace!("sending pending bytes to network stream");
+                    match stream.poll_write(cx, self.pending_writes.as_slice()) {
                         Poll::Pending => Poll::Pending,
 
                         Poll::Ready(Ok(bytes_written)) => {
-                            trace!("Wrote {} bytes to network stream", bytes_written);
+                            trace!("wrote {} bytes to network stream", bytes_written);
+                            self.pending_writes.clear();
                             Poll::Ready(Ok(()))
                         }
 
@@ -139,22 +135,22 @@ impl Sink<ConnectDatagram> for ConnectionWriter {
 
     fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
         if self.is_closed() {
-            trace!("Connection is closed, cannot send message");
+            trace!("connection is closed, cannot send message");
             Poll::Ready(Err(ConnectionWriteError::ConnectionClosed))
         } else {
-            trace!("Connection ready to send message");
+            trace!("connection ready to send message");
             Poll::Ready(Ok(()))
         }
     }
 
     fn start_send(mut self: Pin<&mut Self>, item: ConnectDatagram) -> Result<(), Self::Error> {
-        trace!("Preparing message to be sent next");
+        trace!("preparing datagram to be queued for sending");
 
         let buffer = item.encode();
         let msg_size = buffer.len();
-        trace!("Serialized pending message into {} bytes", msg_size);
+        trace!("serialized pending message into {} bytes", msg_size);
 
-        self.pending_writes.push(buffer);
+        self.pending_writes.extend(buffer);
 
         Ok(())
     }