use async_channel::RecvError;
use async_std::net::SocketAddr;
use async_std::pin::Pin;
-use futures::{AsyncWrite, Sink};
use futures::io::IoSlice;
use futures::task::{Context, Poll};
+use futures::{AsyncWrite, Sink};
use log::*;
use protobuf::Message;
pub use futures::StreamExt;
pub struct ConnectionWriter {
- local_addr: SocketAddr,
- peer_addr: SocketAddr,
- write_stream: Pin<Box<dyn AsyncWrite + Send + Sync>>,
+ local_addr: SocketAddr,
+ peer_addr: SocketAddr,
+ write_stream: Pin<Box<dyn AsyncWrite + Send + Sync>>,
pending_writes: Vec<Vec<u8>>,
- closed: bool,
+ closed: bool,
}
impl ConnectionWriter {
}
}
- fn poll_flush(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.pending_writes.len() > 0 {
let stream = self.write_stream.as_mut();
trace!("Sending pending bytes");
let pending = self.pending_writes.split_off(0);
- let writeable_vec: Vec<IoSlice> = pending.iter().map(|p| {
- IoSlice::new(p)
- }).collect();
+ let writeable_vec: Vec<IoSlice> =
+ pending.iter().map(|p| IoSlice::new(p)).collect();
let stream = self.write_stream.as_mut();
match stream.poll_write_vectored(cx, writeable_vec.as_slice()) {
Poll::Ready(Ok(bytes_written)) => {
trace!("Wrote {} bytes to network stream", bytes_written);
Poll::Ready(Ok(()))
- },
+ }
Poll::Ready(Err(_e)) => {
error!("Encountered error when writing to network stream");
Poll::Ready(Err(RecvError))
- },
+ }
}
- },
+ }
Poll::Ready(Err(_e)) => {
error!("Encountered error when flushing network stream");
}
}
- fn poll_close(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
+ fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.closed = true;
let flush = if self.pending_writes.len() > 0 {
trace!("Sending pending bytes");
let pending = self.pending_writes.split_off(0);
- let writeable_vec: Vec<IoSlice> = pending.iter().map(|p| {
- IoSlice::new(p)
- }).collect();
+ let writeable_vec: Vec<IoSlice> =
+ pending.iter().map(|p| IoSlice::new(p)).collect();
let stream = self.write_stream.as_mut();
match stream.poll_write_vectored(cx, writeable_vec.as_slice()) {
Poll::Ready(Ok(bytes_written)) => {
trace!("Wrote {} bytes to network stream", bytes_written);
Poll::Ready(Ok(()))
- },
+ }
Poll::Ready(Err(_e)) => {
error!("Encountered error when writing to network stream");
Poll::Ready(Err(RecvError))
- },
+ }
}
- },
+ }
Poll::Ready(Err(_e)) => {
error!("Encountered error when flushing network stream");
Poll::Ready(Err(_e)) => Poll::Ready(Err(RecvError)),
}
- },
+ }
err => err,
}