From: Sachandhan Ganesh Date: Fri, 15 Jan 2021 07:42:33 +0000 (-0800) Subject: fix tls and cull warnings X-Git-Url: https://git.lizzy.rs/?a=commitdiff_plain;h=e7564af97e5143e83f7c8417883230d3bad8967a;p=connect-rs.git fix tls and cull warnings --- diff --git a/Cargo.toml b/Cargo.toml index f5bf2a0..e8c34c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,8 @@ name = "connect" version = "0.0.2" authors = ["Sachandhan Ganesh "] edition = "2018" +description = "message queue abstraction over async network streams" + [dependencies] anyhow = "1.0.31" diff --git a/examples/tcp-client/Cargo.toml b/examples/tcp-client/Cargo.toml index 1449e16..1fb8e2b 100644 --- a/examples/tcp-client/Cargo.toml +++ b/examples/tcp-client/Cargo.toml @@ -13,7 +13,7 @@ env_logger = "0.7" log = "0.4" protobuf = "2.18.1" -stitch-net = { path = "../../" } +connect = { path = "../../" } [build-dependencies] -protobuf-codegen-pure = "2.18.1" \ No newline at end of file +protobuf-codegen-pure = "2.18.1" diff --git a/examples/tcp-client/src/main.rs b/examples/tcp-client/src/main.rs index a5de8e5..f1b1d5f 100644 --- a/examples/tcp-client/src/main.rs +++ b/examples/tcp-client/src/main.rs @@ -1,10 +1,10 @@ pub mod schema; use crate::schema::hello_world::HelloWorld; +use connect::{Connection, SinkExt, StreamExt}; use log::*; use protobuf::well_known_types::Any; use std::env; -use stitch_net::{SinkExt, StitchConnection, StreamExt}; #[async_std::main] async fn main() -> anyhow::Result<()> { @@ -21,7 +21,7 @@ async fn main() -> anyhow::Result<()> { }; // create a client connection to the server - let mut conn = StitchConnection::tcp_client(ip_address)?; + let mut conn = Connection::tcp_client(ip_address)?; // send a message to the server let raw_msg = String::from("Hello world"); diff --git a/examples/tcp-echo-server/Cargo.toml b/examples/tcp-echo-server/Cargo.toml index d44763c..29af62d 100644 --- a/examples/tcp-echo-server/Cargo.toml +++ b/examples/tcp-echo-server/Cargo.toml @@ -13,7 +13,7 @@ env_logger = "0.7" log = "0.4" protobuf = "2.18.1" -stitch-net = { path = "../../" } +connect = { path = "../../" } [build-dependencies] -protobuf-codegen-pure = "2.18.1" \ No newline at end of file +protobuf-codegen-pure = "2.18.1" diff --git a/examples/tcp-echo-server/src/main.rs b/examples/tcp-echo-server/src/main.rs index 376ab0c..cc5b5d7 100644 --- a/examples/tcp-echo-server/src/main.rs +++ b/examples/tcp-echo-server/src/main.rs @@ -2,10 +2,10 @@ mod schema; use crate::schema::hello_world::HelloWorld; use async_std::task; +use connect::tcp::TcpServer; +use connect::{SinkExt, StreamExt}; use log::*; use std::env; -use stitch_net::tcp::StitchTcpServer; -use stitch_net::{SinkExt, StreamExt}; #[async_std::main] async fn main() -> anyhow::Result<()> { @@ -23,7 +23,7 @@ async fn main() -> anyhow::Result<()> { }; // create a server - let mut server = StitchTcpServer::new(ip_address)?; + let mut server = TcpServer::new(ip_address)?; // handle server connections // wait for a connection to come in and be accepted diff --git a/schema/message.proto b/schema/message.proto index e80daff..d1f6ee0 100644 --- a/schema/message.proto +++ b/schema/message.proto @@ -3,6 +3,6 @@ package message; import "google/protobuf/any.proto"; -message StitchMessage { +message ConnectionMessage { google.protobuf.Any payload = 1; } diff --git a/src/lib.rs b/src/lib.rs index c227762..b6163be 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,35 +1,24 @@ +mod reader; pub mod schema; pub mod tcp; -// @todo pub mod tls; -mod reader; +pub mod tls; mod writer; -pub use crate::reader::StitchConnectionReader; -use crate::schema::StitchMessage; -pub use crate::writer::StitchConnectionWriter; -use async_channel::RecvError; +pub use crate::reader::ConnectionReader; +pub use crate::writer::ConnectionWriter; use async_std::net::SocketAddr; -use async_std::pin::Pin; -use bytes::{Buf, BytesMut}; -use futures::task::{Context, Poll}; -use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, Sink, Stream}; -use log::*; -use protobuf::Message; -use std::convert::TryInto; - -pub use futures::SinkExt; -pub use futures::StreamExt; -use protobuf::well_known_types::Any; +use futures::{AsyncRead, AsyncWrite}; +pub use futures::{SinkExt, StreamExt}; -pub struct StitchConnection { +pub struct Connection { local_addr: SocketAddr, peer_addr: SocketAddr, - reader: StitchConnectionReader, - writer: StitchConnectionWriter, + reader: ConnectionReader, + writer: ConnectionWriter, } #[allow(dead_code)] -impl StitchConnection { +impl Connection { pub(crate) fn new( local_addr: SocketAddr, peer_addr: SocketAddr, @@ -39,8 +28,8 @@ impl StitchConnection { Self { local_addr, peer_addr, - reader: StitchConnectionReader::new(local_addr, peer_addr, read_stream), - writer: StitchConnectionWriter::new(local_addr, peer_addr, write_stream), + reader: ConnectionReader::new(local_addr, peer_addr, read_stream), + writer: ConnectionWriter::new(local_addr, peer_addr, write_stream), } } @@ -52,11 +41,11 @@ impl StitchConnection { self.peer_addr.clone() } - pub fn split(self) -> (StitchConnectionReader, StitchConnectionWriter) { + pub fn split(self) -> (ConnectionReader, ConnectionWriter) { (self.reader, self.writer) } - pub fn join(reader: StitchConnectionReader, writer: StitchConnectionWriter) -> Self { + pub fn join(reader: ConnectionReader, writer: ConnectionWriter) -> Self { Self { local_addr: reader.local_addr(), peer_addr: reader.peer_addr(), @@ -65,11 +54,11 @@ impl StitchConnection { } } - pub fn reader(&mut self) -> &mut StitchConnectionReader { + pub fn reader(&mut self) -> &mut ConnectionReader { &mut self.reader } - pub fn writer(&mut self) -> &mut StitchConnectionWriter { + pub fn writer(&mut self) -> &mut ConnectionWriter { &mut self.writer } diff --git a/src/reader.rs b/src/reader.rs index 568e40a..1dd9f98 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -1,4 +1,4 @@ -use crate::schema::StitchMessage; +use crate::schema::ConnectionMessage; use async_std::net::SocketAddr; use async_std::pin::Pin; use bytes::{Buf, BytesMut}; @@ -14,14 +14,14 @@ use protobuf::well_known_types::Any; const BUFFER_SIZE: usize = 8192; -pub struct StitchConnectionReader { +pub struct ConnectionReader { local_addr: SocketAddr, peer_addr: SocketAddr, read_stream: Box, pending_read: Option, } -impl StitchConnectionReader { +impl ConnectionReader { pub fn new( local_addr: SocketAddr, peer_addr: SocketAddr, @@ -44,7 +44,7 @@ impl StitchConnectionReader { } } -impl Stream for StitchConnectionReader { +impl Stream for ConnectionReader { type Item = Any; fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { @@ -80,7 +80,7 @@ impl Stream for StitchConnectionReader { buffer.resize(bytes_read, 0); debug!("{:?}", buffer.as_ref()); - match StitchMessage::parse_from_bytes(buffer.as_ref()) { + match ConnectionMessage::parse_from_bytes(buffer.as_ref()) { Ok(mut data) => { let serialized_size = data.compute_size(); debug!("Deserialized message of size {} bytes", serialized_size); diff --git a/src/schema/message.rs b/src/schema/message.rs index 352580a..c41fbb0 100644 --- a/src/schema/message.rs +++ b/src/schema/message.rs @@ -24,7 +24,7 @@ // const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_19_0; #[derive(PartialEq,Clone,Default)] -pub struct StitchMessage { +pub struct ConnectionMessage { // message fields pub payload: ::protobuf::SingularPtrField<::protobuf::well_known_types::Any>, // special fields @@ -32,14 +32,14 @@ pub struct StitchMessage { pub cached_size: ::protobuf::CachedSize, } -impl<'a> ::std::default::Default for &'a StitchMessage { - fn default() -> &'a StitchMessage { - ::default_instance() +impl<'a> ::std::default::Default for &'a ConnectionMessage { + fn default() -> &'a ConnectionMessage { + ::default_instance() } } -impl StitchMessage { - pub fn new() -> StitchMessage { +impl ConnectionMessage { + pub fn new() -> ConnectionMessage { ::std::default::Default::default() } @@ -77,7 +77,7 @@ impl StitchMessage { } } -impl ::protobuf::Message for StitchMessage { +impl ::protobuf::Message for ConnectionMessage { fn is_initialized(&self) -> bool { for v in &self.payload { if !v.is_initialized() { @@ -151,8 +151,8 @@ impl ::protobuf::Message for StitchMessage { Self::descriptor_static() } - fn new() -> StitchMessage { - StitchMessage::new() + fn new() -> ConnectionMessage { + ConnectionMessage::new() } fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor { @@ -161,46 +161,46 @@ impl ::protobuf::Message for StitchMessage { let mut fields = ::std::vec::Vec::new(); fields.push(::protobuf::reflect::accessor::make_singular_ptr_field_accessor::<_, ::protobuf::types::ProtobufTypeMessage<::protobuf::well_known_types::Any>>( "payload", - |m: &StitchMessage| { &m.payload }, - |m: &mut StitchMessage| { &mut m.payload }, + |m: &ConnectionMessage| { &m.payload }, + |m: &mut ConnectionMessage| { &mut m.payload }, )); - ::protobuf::reflect::MessageDescriptor::new_pb_name::( - "StitchMessage", + ::protobuf::reflect::MessageDescriptor::new_pb_name::( + "ConnectionMessage", fields, file_descriptor_proto() ) }) } - fn default_instance() -> &'static StitchMessage { - static instance: ::protobuf::rt::LazyV2 = ::protobuf::rt::LazyV2::INIT; - instance.get(StitchMessage::new) + fn default_instance() -> &'static ConnectionMessage { + static instance: ::protobuf::rt::LazyV2 = ::protobuf::rt::LazyV2::INIT; + instance.get(ConnectionMessage::new) } } -impl ::protobuf::Clear for StitchMessage { +impl ::protobuf::Clear for ConnectionMessage { fn clear(&mut self) { self.payload.clear(); self.unknown_fields.clear(); } } -impl ::std::fmt::Debug for StitchMessage { +impl ::std::fmt::Debug for ConnectionMessage { fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { ::protobuf::text_format::fmt(self, f) } } -impl ::protobuf::reflect::ProtobufValue for StitchMessage { +impl ::protobuf::reflect::ProtobufValue for ConnectionMessage { fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { ::protobuf::reflect::ReflectValueRef::Message(self) } } static file_descriptor_proto_data: &'static [u8] = b"\ - \n\rmessage.proto\x12\x07message\x1a\x19google/protobuf/any.proto\"C\n\r\ - StitchMessage\x120\n\x07payload\x18\x01\x20\x01(\x0b2\x14.google.protobu\ - f.AnyR\x07payloadB\0:\0B\0b\x06proto3\ + \n\rmessage.proto\x12\x07message\x1a\x19google/protobuf/any.proto\"G\n\ + \x11ConnectionMessage\x120\n\x07payload\x18\x01\x20\x01(\x0b2\x14.google\ + .protobuf.AnyR\x07payloadB\0:\0B\0b\x06proto3\ "; static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; diff --git a/src/schema/mod.rs b/src/schema/mod.rs index 1b8ac93..49c0b7c 100644 --- a/src/schema/mod.rs +++ b/src/schema/mod.rs @@ -1,12 +1,11 @@ mod message; -pub use message::StitchMessage; +pub use message::ConnectionMessage; use protobuf::well_known_types::Any; use protobuf::Message; -impl StitchMessage { - // @todo make pub(crate) - pub fn from_msg(msg: T) -> Self { +impl ConnectionMessage { + pub(crate) fn from_msg(msg: T) -> Self { let mut sm = Self::new(); let payload = Any::pack(&msg).expect("Protobuf Message could not be packed into Any type"); diff --git a/src/tcp/client.rs b/src/tcp/client.rs index a0994d4..4c85138 100644 --- a/src/tcp/client.rs +++ b/src/tcp/client.rs @@ -1,21 +1,20 @@ use async_std::task; use log::*; -use crate::StitchConnection; +use crate::Connection; use async_std::net::{TcpStream, ToSocketAddrs}; -impl StitchConnection { - pub fn tcp_client( - ip_addrs: A, - ) -> anyhow::Result { - let read_stream = task::block_on(TcpStream::connect(&ip_addrs))?; +impl Connection { + pub fn tcp_client(ip_addrs: A) -> anyhow::Result { + let stream = task::block_on(TcpStream::connect(&ip_addrs))?; info!("Established client TCP connection to {}", ip_addrs); - Ok(Self::from(read_stream)) + stream.set_nodelay(true)?; + Ok(Self::from(stream)) } } -impl From for StitchConnection { +impl From for Connection { fn from(stream: TcpStream) -> Self { let write_stream = stream.clone(); diff --git a/src/tcp/server.rs b/src/tcp/server.rs index adae4cc..43fd1de 100644 --- a/src/tcp/server.rs +++ b/src/tcp/server.rs @@ -1,4 +1,4 @@ -use crate::StitchConnection; +use crate::Connection; use async_std::net::{SocketAddr, TcpListener, ToSocketAddrs}; use async_std::pin::Pin; use async_std::task; @@ -7,12 +7,12 @@ use futures::{Stream, StreamExt}; use log::*; #[allow(dead_code)] -pub struct StitchTcpServer { +pub struct TcpServer { local_addrs: SocketAddr, listener: TcpListener, } -impl StitchTcpServer { +impl TcpServer { pub fn new(ip_addrs: A) -> anyhow::Result { let listener = task::block_on(TcpListener::bind(&ip_addrs))?; info!("Started TCP server at {}", &ip_addrs); @@ -24,12 +24,12 @@ impl StitchTcpServer { } } -impl Stream for StitchTcpServer { - type Item = StitchConnection; +impl Stream for TcpServer { + type Item = Connection; fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { if let Some(Ok(conn)) = futures::executor::block_on(self.listener.incoming().next()) { - Poll::Ready(Some(StitchConnection::from(conn))) + Poll::Ready(Some(Connection::from(conn))) } else { Poll::Ready(None) } diff --git a/src/tls/client.rs b/src/tls/client.rs index 5f9acb2..ef0ea64 100644 --- a/src/tls/client.rs +++ b/src/tls/client.rs @@ -1,76 +1,65 @@ -use async_channel::{Receiver, Sender}; -use async_std::io::*; -use async_std::net::*; use async_std::task; use async_tls::TlsConnector; -use futures_util::io::AsyncReadExt; use log::*; -use crate::registry::StitchRegistry; -use crate::StitchNetClient; -use crate::{channel_factory, StitchMessage}; -use async_std::sync::{Arc, Condvar, Mutex}; +use crate::Connection; +use async_std::net::{TcpStream, SocketAddr, ToSocketAddrs}; +use async_tls::client; +use async_tls::server; +use futures::AsyncReadExt; -impl StitchNetClient { - pub fn tls_client( - ip_addrs: A, - domain: &str, - connector: TlsConnector, - ) -> Result { - Self::tls_client_with_bound(ip_addrs, domain, connector, None) - } +pub enum TlsConnectionMetadata { + Client { local_addr: SocketAddr, peer_addr: SocketAddr, stream: client::TlsStream }, + Server { local_addr: SocketAddr, peer_addr: SocketAddr, stream: server::TlsStream }, +} - pub fn tls_client_with_bound( +impl Connection { + pub fn tls_client( ip_addrs: A, domain: &str, connector: TlsConnector, - cap: Option, - ) -> Result { + ) -> anyhow::Result { let stream = task::block_on(TcpStream::connect(&ip_addrs))?; - stream.set_nodelay(true)?; info!("Established client TCP connection to {}", ip_addrs); + stream.set_nodelay(true)?; - Self::tls_client_from_parts(stream, domain, connector, channel_factory(cap)) - } - - pub fn tls_client_from_parts( - stream: TcpStream, - domain: &str, - connector: TlsConnector, - (tls_write_sender, tls_write_receiver): (Sender, Receiver), - ) -> Result { - let local_addr = stream.local_addr()?; + let local_addr = stream.peer_addr()?; let peer_addr = stream.peer_addr()?; - let encrypted_stream = task::block_on(connector.connect(domain, stream))?; - let (read_stream, write_stream) = encrypted_stream.split(); + let encrypted_stream: client::TlsStream = + task::block_on(connector.connect(domain, stream))?; info!("Completed TLS handshake with {}", peer_addr); - let registry: StitchRegistry = crate::registry::new(); - let read_readiness = Arc::new((Mutex::new(false), Condvar::new())); - let write_readiness = Arc::new((Mutex::new(false), Condvar::new())); + Ok(Self::from(TlsConnectionMetadata::Client { local_addr, peer_addr, stream: encrypted_stream })) + } +} + +impl From for Connection { + fn from(metadata: TlsConnectionMetadata) -> Self { + match metadata { + TlsConnectionMetadata::Client { local_addr, peer_addr, stream } => { + let (read_stream, write_stream) = stream.split(); + + Self::new( + local_addr, + peer_addr, + Box::new(read_stream), + Box::new(write_stream), + ) + }, + + TlsConnectionMetadata::Server { local_addr, peer_addr, stream } => { + let (read_stream, write_stream) = stream.split(); - let read_task = task::spawn(crate::tasks::read_from_stream( - registry.clone(), - read_stream, - read_readiness.clone(), - )); + Self::new( + local_addr, + peer_addr, + Box::new(read_stream), + Box::new(write_stream), + ) + } + } - let write_task = task::spawn(crate::tasks::write_to_stream( - tls_write_receiver.clone(), - write_stream, - write_readiness.clone(), - )); - Ok(Self { - local_addr, - peer_addr, - registry, - stream_writer_chan: (tls_write_sender, tls_write_receiver), - read_readiness, - write_readiness, - read_task, - write_task, - }) } } diff --git a/src/tls/server.rs b/src/tls/server.rs index 79dba44..66e4206 100644 --- a/src/tls/server.rs +++ b/src/tls/server.rs @@ -1,135 +1,56 @@ -use crate::channel_factory; -use crate::registry::StitchRegistry; -use crate::{ServerRegistry, StitchClient, StitchNetClient, StitchNetServer}; -use async_channel::{Receiver, Sender}; -use async_std::io::*; +use crate::Connection; +use crate::tls::TlsConnectionMetadata; use async_std::net::*; +use async_std::pin::Pin; use async_std::prelude::*; -use async_std::sync::{Arc, Condvar, Mutex}; use async_std::task; use async_tls::TlsAcceptor; -use dashmap::DashMap; -use futures_util::AsyncReadExt; +use futures::task::{Context, Poll}; use log::*; -impl StitchNetServer { - pub fn tls_server( - ip_addrs: A, - acceptor: TlsAcceptor, - ) -> Result<(StitchNetServer, Receiver>)> { - Self::tls_server_with_bound(ip_addrs, acceptor, None) - } +#[allow(dead_code)] +pub struct TlsServer { + local_addrs: SocketAddr, + listener: TcpListener, + acceptor: TlsAcceptor, +} - pub fn tls_server_with_bound( - ip_addrs: A, - acceptor: TlsAcceptor, - cap: Option, - ) -> Result<(Self, Receiver>)> { +impl TlsServer { + pub fn new(ip_addrs: A, acceptor: TlsAcceptor) -> anyhow::Result { let listener = task::block_on(TcpListener::bind(ip_addrs))?; info!("Started TLS server at {}", listener.local_addr()?); - let registry = Arc::new(DashMap::new()); - let (sender, receiver) = channel_factory(cap); - - let handler = task::spawn(handle_server_connections( - acceptor, - registry.clone(), + Ok(Self { + local_addrs: listener.local_addr()?, listener, - sender.clone(), - cap, - )); - - Ok(( - Self { - registry, - connections_chan: (sender, receiver.clone()), - accept_loop_task: handler, - }, - receiver, - )) + acceptor, + }) } } -async fn handle_server_connections<'a>( - acceptor: TlsAcceptor, - registry: ServerRegistry, - input: TcpListener, - output: Sender>, - cap: Option, -) -> anyhow::Result<()> { - let mut conns = input.incoming(); - - debug!("Reading from the stream of incoming connections"); - loop { - match conns.next().await { - Some(Ok(tcp_stream)) => { - let local_addr = tcp_stream.local_addr()?; - let peer_addr = tcp_stream.peer_addr()?; - - debug!("Received connection attempt from {}", peer_addr); - - let tls_stream = acceptor.accept(tcp_stream).await?; - - let (read_stream, write_stream) = tls_stream.split(); - let (tls_write_sender, tls_write_receiver) = channel_factory(cap); - - let client_registry: StitchRegistry = crate::registry::new(); - let read_readiness = Arc::new((Mutex::new(false), Condvar::new())); - let write_readiness = Arc::new((Mutex::new(false), Condvar::new())); - - let read_task = task::spawn(crate::tasks::read_from_stream( - client_registry.clone(), - read_stream, - read_readiness.clone(), - )); - - let write_task = task::spawn(crate::tasks::write_to_stream( - tls_write_receiver.clone(), - write_stream, - write_readiness.clone(), - )); - - let conn = StitchNetClient { - local_addr, - peer_addr, - registry: client_registry, - stream_writer_chan: (tls_write_sender, tls_write_receiver), - read_readiness, - write_readiness, - read_task, - write_task, - }; - - debug!("Attempting to register connection from {}", peer_addr); - let conn = Arc::new(conn); - registry.insert(conn.peer_addr(), conn.clone()); - debug!( - "Registered client connection for {} in server registry", - peer_addr - ); - - if let Err(err) = output.send(conn).await { - error!( - "Stopping the server accept loop - could not send accepted TLS client connection to channel: {:#?}", - err - ); - - break Err(anyhow::Error::from(err)); - } else { - info!("Accepted connection from {}", peer_addr); - } - } - - Some(Err(err)) => error!( - "Encountered error when accepting TLS connection: {:#?}", - err - ), - - None => { - warn!("Stopping the server accept loop - unable to accept any more connections"); - - break Ok(()); +impl Stream for TlsServer { + type Item = Connection; + + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + if let Some(Ok(tcp_stream)) = futures::executor::block_on(self.listener.incoming().next()) { + let local_addr = tcp_stream.local_addr().expect( + "Local address could not be retrieved", + ); + + let peer_addr = tcp_stream.peer_addr().expect( + "Peer address could not be retrieved", + ); + debug!("Received connection attempt from {}", peer_addr); + + if let Ok(tls_stream) = futures::executor::block_on(self.acceptor.accept(tcp_stream)) { + debug!("Established TLS connection from {}", peer_addr); + Poll::Ready(Some(Connection::from(TlsConnectionMetadata::Server{ local_addr, peer_addr, stream: tls_stream }))) + } else { + debug!("Could not encrypt connection with TLS from {}", peer_addr); + Poll::Pending } + } else { + Poll::Ready(None) } } } diff --git a/src/writer.rs b/src/writer.rs index f6f267e..c2275ac 100644 --- a/src/writer.rs +++ b/src/writer.rs @@ -1,4 +1,4 @@ -use crate::schema::StitchMessage; +use crate::schema::ConnectionMessage; use async_channel::RecvError; use async_std::net::SocketAddr; use async_std::pin::Pin; @@ -10,14 +10,14 @@ use protobuf::Message; pub use futures::SinkExt; pub use futures::StreamExt; -pub struct StitchConnectionWriter { +pub struct ConnectionWriter { local_addr: SocketAddr, peer_addr: SocketAddr, write_stream: Box, - pending_write: Option, + pending_write: Option, } -impl StitchConnectionWriter { +impl ConnectionWriter { pub fn new( local_addr: SocketAddr, peer_addr: SocketAddr, @@ -40,7 +40,7 @@ impl StitchConnectionWriter { } } -impl Sink for StitchConnectionWriter { +impl Sink for ConnectionWriter { type Error = RecvError; fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { @@ -55,7 +55,7 @@ impl Sink for StitchConnectionWriter { fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { debug!("Preparing message to be sent next"); - let stitch_msg: StitchMessage = StitchMessage::from_msg(item); + let stitch_msg: ConnectionMessage = ConnectionMessage::from_msg(item); self.pending_write.replace(stitch_msg); Ok(())