From: Sachandhan Ganesh Date: Fri, 15 Jan 2021 08:03:19 +0000 (-0800) Subject: working tls examples X-Git-Url: https://git.lizzy.rs/?a=commitdiff_plain;h=d6746514ba62b577be68d4dc2dac0f02bb96d068;p=connect-rs.git working tls examples --- diff --git a/examples/tls-client/Cargo.toml b/examples/tls-client/Cargo.toml index f952662..407ccb6 100644 --- a/examples/tls-client/Cargo.toml +++ b/examples/tls-client/Cargo.toml @@ -11,5 +11,9 @@ anyhow = "1.0.31" async-std = { version = "1.6.2", features = ["attributes"] } env_logger = "0.7" log = "0.4" +protobuf = "2.18.1" -seam-channel = { path = "../../" } \ No newline at end of file +connect = { path = "../../" } + +[build-dependencies] +protobuf-codegen-pure = "2.18.1" diff --git a/examples/tls-client/build.rs b/examples/tls-client/build.rs new file mode 100644 index 0000000..ae9222f --- /dev/null +++ b/examples/tls-client/build.rs @@ -0,0 +1,9 @@ +fn main() -> Result<(), Box> { + protobuf_codegen_pure::Codegen::new() + .out_dir("src/schema") + .inputs(&["../tcp-client/schema/hello_world.proto"]) + .include("../tcp-client/schema") + .run() + .expect("Codegen failed."); + Ok(()) +} diff --git a/examples/tls-client/src/main.rs b/examples/tls-client/src/main.rs index 6d57a35..b513a67 100644 --- a/examples/tls-client/src/main.rs +++ b/examples/tls-client/src/main.rs @@ -1,6 +1,10 @@ +mod schema; + +use crate::schema::hello_world::HelloWorld; +use connect::tls::rustls::ClientConfig; +use connect::{Connection, SinkExt, StreamExt}; use log::*; -use seam_channel::net::tls::rustls::ClientConfig; -use seam_channel::net::{StitchClient, StitchNetClient}; +use protobuf::well_known_types::Any; use std::env; #[async_std::main] @@ -22,22 +26,24 @@ async fn main() -> anyhow::Result<()> { .map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidInput, "invalid cert"))?; // create a client connection to the server - let dist_chan = StitchNetClient::tls_client(ip_addr, &domain, client_config.into())?; + let mut conn = Connection::tls_client(ip_addr, &domain, client_config.into())?; - // create a channel for String messages on the TCP connection - let (sender, receiver) = dist_chan.bounded::(Some(100)); + // send a message to the server + let raw_msg = String::from("Hello world"); + info!("Sending message: {}", raw_msg); - // alert the connection that you are ready to read and write messages - dist_chan.ready()?; + let mut msg = HelloWorld::new(); + msg.set_message(raw_msg); - // send a message to the server - let msg = String::from("Hello world"); - info!("Sending message: {}", msg); - sender.send(msg).await?; + conn.writer().send(msg).await?; // wait for the server to reply with an ack - if let Ok(msg) = receiver.recv().await { - info!("Received reply: {}", msg); + while let Some(reply) = conn.reader().next().await { + info!("Received message"); + + let msg: HelloWorld = Any::unpack(&reply)?.unwrap(); + + info!("Unpacked reply: {}", msg.get_message()); } Ok(()) diff --git a/examples/tls-client/src/schema/hello_world.rs b/examples/tls-client/src/schema/hello_world.rs new file mode 100644 index 0000000..5af4935 --- /dev/null +++ b/examples/tls-client/src/schema/hello_world.rs @@ -0,0 +1,200 @@ +// This file is generated by rust-protobuf 2.19.0. Do not edit +// @generated + +// https://github.com/rust-lang/rust-clippy/issues/702 +#![allow(unknown_lints)] +#![allow(clippy::all)] + +#![allow(unused_attributes)] +#![rustfmt::skip] + +#![allow(box_pointers)] +#![allow(dead_code)] +#![allow(missing_docs)] +#![allow(non_camel_case_types)] +#![allow(non_snake_case)] +#![allow(non_upper_case_globals)] +#![allow(trivial_casts)] +#![allow(unused_imports)] +#![allow(unused_results)] +//! Generated file from `hello_world.proto` + +/// Generated files are compatible only with the same version +/// of protobuf runtime. +// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_19_0; + +#[derive(PartialEq,Clone,Default)] +pub struct HelloWorld { + // message fields + pub message: ::std::string::String, + // special fields + pub unknown_fields: ::protobuf::UnknownFields, + pub cached_size: ::protobuf::CachedSize, +} + +impl<'a> ::std::default::Default for &'a HelloWorld { + fn default() -> &'a HelloWorld { + ::default_instance() + } +} + +impl HelloWorld { + pub fn new() -> HelloWorld { + ::std::default::Default::default() + } + + // string message = 1; + + + pub fn get_message(&self) -> &str { + &self.message + } + pub fn clear_message(&mut self) { + self.message.clear(); + } + + // Param is passed by value, moved + pub fn set_message(&mut self, v: ::std::string::String) { + self.message = v; + } + + // Mutable pointer to the field. + // If field is not initialized, it is initialized with default value first. + pub fn mut_message(&mut self) -> &mut ::std::string::String { + &mut self.message + } + + // Take field + pub fn take_message(&mut self) -> ::std::string::String { + ::std::mem::replace(&mut self.message, ::std::string::String::new()) + } +} + +impl ::protobuf::Message for HelloWorld { + fn is_initialized(&self) -> bool { + true + } + + fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> { + while !is.eof()? { + let (field_number, wire_type) = is.read_tag_unpack()?; + match field_number { + 1 => { + ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.message)?; + }, + _ => { + ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; + }, + }; + } + ::std::result::Result::Ok(()) + } + + // Compute sizes of nested messages + #[allow(unused_variables)] + fn compute_size(&self) -> u32 { + let mut my_size = 0; + if !self.message.is_empty() { + my_size += ::protobuf::rt::string_size(1, &self.message); + } + my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); + self.cached_size.set(my_size); + my_size + } + + fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> { + if !self.message.is_empty() { + os.write_string(1, &self.message)?; + } + os.write_unknown_fields(self.get_unknown_fields())?; + ::std::result::Result::Ok(()) + } + + fn get_cached_size(&self) -> u32 { + self.cached_size.get() + } + + fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { + &self.unknown_fields + } + + fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { + &mut self.unknown_fields + } + + fn as_any(&self) -> &dyn (::std::any::Any) { + self as &dyn (::std::any::Any) + } + fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) { + self as &mut dyn (::std::any::Any) + } + fn into_any(self: ::std::boxed::Box) -> ::std::boxed::Box { + self + } + + fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { + Self::descriptor_static() + } + + fn new() -> HelloWorld { + HelloWorld::new() + } + + fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor { + static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT; + descriptor.get(|| { + let mut fields = ::std::vec::Vec::new(); + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( + "message", + |m: &HelloWorld| { &m.message }, + |m: &mut HelloWorld| { &mut m.message }, + )); + ::protobuf::reflect::MessageDescriptor::new_pb_name::( + "HelloWorld", + fields, + file_descriptor_proto() + ) + }) + } + + fn default_instance() -> &'static HelloWorld { + static instance: ::protobuf::rt::LazyV2 = ::protobuf::rt::LazyV2::INIT; + instance.get(HelloWorld::new) + } +} + +impl ::protobuf::Clear for HelloWorld { + fn clear(&mut self) { + self.message.clear(); + self.unknown_fields.clear(); + } +} + +impl ::std::fmt::Debug for HelloWorld { + fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { + ::protobuf::text_format::fmt(self, f) + } +} + +impl ::protobuf::reflect::ProtobufValue for HelloWorld { + fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { + ::protobuf::reflect::ReflectValueRef::Message(self) + } +} + +static file_descriptor_proto_data: &'static [u8] = b"\ + \n\x11hello_world.proto\x12\x0bhello_world\"*\n\nHelloWorld\x12\x1a\n\ + \x07message\x18\x01\x20\x01(\tR\x07messageB\0:\0B\0b\x06proto3\ +"; + +static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; + +fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto { + ::protobuf::Message::parse_from_bytes(file_descriptor_proto_data).unwrap() +} + +pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto { + file_descriptor_proto_lazy.get(|| { + parse_descriptor_proto() + }) +} diff --git a/examples/tls-client/src/schema/mod.rs b/examples/tls-client/src/schema/mod.rs new file mode 100644 index 0000000..c6dbc18 --- /dev/null +++ b/examples/tls-client/src/schema/mod.rs @@ -0,0 +1 @@ +pub mod hello_world; diff --git a/examples/tls-echo-server/Cargo.toml b/examples/tls-echo-server/Cargo.toml index 31bff03..60a13bc 100644 --- a/examples/tls-echo-server/Cargo.toml +++ b/examples/tls-echo-server/Cargo.toml @@ -11,5 +11,9 @@ anyhow = "1.0.31" async-std = { version = "1.6.2", features = ["attributes"] } env_logger = "0.7" log = "0.4" +protobuf = "2.18.1" -seam-channel = { path = "../../" } \ No newline at end of file +connect = { path = "../../" } + +[build-dependencies] +protobuf-codegen-pure = "2.18.1" diff --git a/examples/tls-echo-server/build.rs b/examples/tls-echo-server/build.rs new file mode 100644 index 0000000..ae9222f --- /dev/null +++ b/examples/tls-echo-server/build.rs @@ -0,0 +1,9 @@ +fn main() -> Result<(), Box> { + protobuf_codegen_pure::Codegen::new() + .out_dir("src/schema") + .inputs(&["../tcp-client/schema/hello_world.proto"]) + .include("../tcp-client/schema") + .run() + .expect("Codegen failed."); + Ok(()) +} diff --git a/examples/tls-echo-server/src/main.rs b/examples/tls-echo-server/src/main.rs index 1129657..3e8b576 100644 --- a/examples/tls-echo-server/src/main.rs +++ b/examples/tls-echo-server/src/main.rs @@ -1,8 +1,12 @@ +mod schema; + +use crate::schema::hello_world::HelloWorld; use async_std::{io, task}; +use connect::tls::rustls::internal::pemfile::{certs, rsa_private_keys}; +use connect::tls::rustls::{NoClientAuth, ServerConfig}; +use connect::tls::TlsServer; +use connect::{SinkExt, StreamExt}; use log::*; -use seam_channel::net::tls::rustls::internal::pemfile::{certs, rsa_private_keys}; -use seam_channel::net::tls::rustls::{NoClientAuth, ServerConfig}; -use seam_channel::net::{StitchClient, StitchNetServer}; use std::env; use std::fs::File; use std::io::BufReader; @@ -26,31 +30,31 @@ async fn main() -> anyhow::Result<()> { .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?; // create a server - let (_server, conns) = StitchNetServer::tls_server(ip_address, config.into())?; + let mut server = TlsServer::new(ip_address, config.into())?; // handle server connections // wait for a connection to come in and be accepted - while let Ok(conn) = conns.recv().await { - info!("Handling connection: {}", conn.peer_addr()); - - // register for String-typed messages - let (sender, receiver) = conn.unbounded::(); - - // let the connection know you are ready to send and receive messages - conn.ready() - .expect("could not ready the connection for reading and writing"); + while let Some(mut conn) = server.next().await { + info!("Handling connection from {}", conn.peer_addr()); - // handle String messages task::spawn(async move { - // for every String message - while let Ok(msg) = receiver.recv().await { - info!("Echoing message: {}", msg); - - let response = format!("{}, right back at you!", msg); - - // Send the message back to its source - if let Err(err) = sender.send(response).await { - error!("Could not echo message: {:#?}", err); + while let Some(msg) = conn.reader().next().await { + if msg.is::() { + if let Ok(Some(contents)) = msg.unpack::() { + info!( + "Received a message \"{}\" from {}", + contents.get_message(), + conn.peer_addr() + ); + + conn.writer() + .send(contents) + .await + .expect("Could not send message back to source connection"); + info!("Sent message back to original sender"); + } + } else { + error!("Received a message of unknown type") } } }); diff --git a/examples/tls-echo-server/src/schema/hello_world.rs b/examples/tls-echo-server/src/schema/hello_world.rs new file mode 100644 index 0000000..5af4935 --- /dev/null +++ b/examples/tls-echo-server/src/schema/hello_world.rs @@ -0,0 +1,200 @@ +// This file is generated by rust-protobuf 2.19.0. Do not edit +// @generated + +// https://github.com/rust-lang/rust-clippy/issues/702 +#![allow(unknown_lints)] +#![allow(clippy::all)] + +#![allow(unused_attributes)] +#![rustfmt::skip] + +#![allow(box_pointers)] +#![allow(dead_code)] +#![allow(missing_docs)] +#![allow(non_camel_case_types)] +#![allow(non_snake_case)] +#![allow(non_upper_case_globals)] +#![allow(trivial_casts)] +#![allow(unused_imports)] +#![allow(unused_results)] +//! Generated file from `hello_world.proto` + +/// Generated files are compatible only with the same version +/// of protobuf runtime. +// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_19_0; + +#[derive(PartialEq,Clone,Default)] +pub struct HelloWorld { + // message fields + pub message: ::std::string::String, + // special fields + pub unknown_fields: ::protobuf::UnknownFields, + pub cached_size: ::protobuf::CachedSize, +} + +impl<'a> ::std::default::Default for &'a HelloWorld { + fn default() -> &'a HelloWorld { + ::default_instance() + } +} + +impl HelloWorld { + pub fn new() -> HelloWorld { + ::std::default::Default::default() + } + + // string message = 1; + + + pub fn get_message(&self) -> &str { + &self.message + } + pub fn clear_message(&mut self) { + self.message.clear(); + } + + // Param is passed by value, moved + pub fn set_message(&mut self, v: ::std::string::String) { + self.message = v; + } + + // Mutable pointer to the field. + // If field is not initialized, it is initialized with default value first. + pub fn mut_message(&mut self) -> &mut ::std::string::String { + &mut self.message + } + + // Take field + pub fn take_message(&mut self) -> ::std::string::String { + ::std::mem::replace(&mut self.message, ::std::string::String::new()) + } +} + +impl ::protobuf::Message for HelloWorld { + fn is_initialized(&self) -> bool { + true + } + + fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> { + while !is.eof()? { + let (field_number, wire_type) = is.read_tag_unpack()?; + match field_number { + 1 => { + ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.message)?; + }, + _ => { + ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; + }, + }; + } + ::std::result::Result::Ok(()) + } + + // Compute sizes of nested messages + #[allow(unused_variables)] + fn compute_size(&self) -> u32 { + let mut my_size = 0; + if !self.message.is_empty() { + my_size += ::protobuf::rt::string_size(1, &self.message); + } + my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); + self.cached_size.set(my_size); + my_size + } + + fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> { + if !self.message.is_empty() { + os.write_string(1, &self.message)?; + } + os.write_unknown_fields(self.get_unknown_fields())?; + ::std::result::Result::Ok(()) + } + + fn get_cached_size(&self) -> u32 { + self.cached_size.get() + } + + fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { + &self.unknown_fields + } + + fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { + &mut self.unknown_fields + } + + fn as_any(&self) -> &dyn (::std::any::Any) { + self as &dyn (::std::any::Any) + } + fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) { + self as &mut dyn (::std::any::Any) + } + fn into_any(self: ::std::boxed::Box) -> ::std::boxed::Box { + self + } + + fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { + Self::descriptor_static() + } + + fn new() -> HelloWorld { + HelloWorld::new() + } + + fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor { + static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT; + descriptor.get(|| { + let mut fields = ::std::vec::Vec::new(); + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( + "message", + |m: &HelloWorld| { &m.message }, + |m: &mut HelloWorld| { &mut m.message }, + )); + ::protobuf::reflect::MessageDescriptor::new_pb_name::( + "HelloWorld", + fields, + file_descriptor_proto() + ) + }) + } + + fn default_instance() -> &'static HelloWorld { + static instance: ::protobuf::rt::LazyV2 = ::protobuf::rt::LazyV2::INIT; + instance.get(HelloWorld::new) + } +} + +impl ::protobuf::Clear for HelloWorld { + fn clear(&mut self) { + self.message.clear(); + self.unknown_fields.clear(); + } +} + +impl ::std::fmt::Debug for HelloWorld { + fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { + ::protobuf::text_format::fmt(self, f) + } +} + +impl ::protobuf::reflect::ProtobufValue for HelloWorld { + fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { + ::protobuf::reflect::ReflectValueRef::Message(self) + } +} + +static file_descriptor_proto_data: &'static [u8] = b"\ + \n\x11hello_world.proto\x12\x0bhello_world\"*\n\nHelloWorld\x12\x1a\n\ + \x07message\x18\x01\x20\x01(\tR\x07messageB\0:\0B\0b\x06proto3\ +"; + +static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; + +fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto { + ::protobuf::Message::parse_from_bytes(file_descriptor_proto_data).unwrap() +} + +pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto { + file_descriptor_proto_lazy.get(|| { + parse_descriptor_proto() + }) +} diff --git a/examples/tls-echo-server/src/schema/mod.rs b/examples/tls-echo-server/src/schema/mod.rs new file mode 100644 index 0000000..c6dbc18 --- /dev/null +++ b/examples/tls-echo-server/src/schema/mod.rs @@ -0,0 +1 @@ +pub mod hello_world; diff --git a/src/tls/client.rs b/src/tls/client.rs index ef0ea64..af60056 100644 --- a/src/tls/client.rs +++ b/src/tls/client.rs @@ -3,14 +3,22 @@ use async_tls::TlsConnector; use log::*; use crate::Connection; -use async_std::net::{TcpStream, SocketAddr, ToSocketAddrs}; +use async_std::net::{SocketAddr, TcpStream, ToSocketAddrs}; use async_tls::client; use async_tls::server; use futures::AsyncReadExt; pub enum TlsConnectionMetadata { - Client { local_addr: SocketAddr, peer_addr: SocketAddr, stream: client::TlsStream }, - Server { local_addr: SocketAddr, peer_addr: SocketAddr, stream: server::TlsStream }, + Client { + local_addr: SocketAddr, + peer_addr: SocketAddr, + stream: client::TlsStream, + }, + Server { + local_addr: SocketAddr, + peer_addr: SocketAddr, + stream: server::TlsStream, + }, } impl Connection { @@ -30,14 +38,22 @@ impl Connection { task::block_on(connector.connect(domain, stream))?; info!("Completed TLS handshake with {}", peer_addr); - Ok(Self::from(TlsConnectionMetadata::Client { local_addr, peer_addr, stream: encrypted_stream })) + Ok(Self::from(TlsConnectionMetadata::Client { + local_addr, + peer_addr, + stream: encrypted_stream, + })) } } impl From for Connection { fn from(metadata: TlsConnectionMetadata) -> Self { match metadata { - TlsConnectionMetadata::Client { local_addr, peer_addr, stream } => { + TlsConnectionMetadata::Client { + local_addr, + peer_addr, + stream, + } => { let (read_stream, write_stream) = stream.split(); Self::new( @@ -46,9 +62,13 @@ impl From for Connection { Box::new(read_stream), Box::new(write_stream), ) - }, + } - TlsConnectionMetadata::Server { local_addr, peer_addr, stream } => { + TlsConnectionMetadata::Server { + local_addr, + peer_addr, + stream, + } => { let (read_stream, write_stream) = stream.split(); Self::new( @@ -59,7 +79,5 @@ impl From for Connection { ) } } - - } } diff --git a/src/tls/server.rs b/src/tls/server.rs index 66e4206..64e44c9 100644 --- a/src/tls/server.rs +++ b/src/tls/server.rs @@ -1,5 +1,5 @@ -use crate::Connection; use crate::tls::TlsConnectionMetadata; +use crate::Connection; use async_std::net::*; use async_std::pin::Pin; use async_std::prelude::*; @@ -16,7 +16,10 @@ pub struct TlsServer { } impl TlsServer { - pub fn new(ip_addrs: A, acceptor: TlsAcceptor) -> anyhow::Result { + pub fn new( + ip_addrs: A, + acceptor: TlsAcceptor, + ) -> anyhow::Result { let listener = task::block_on(TcpListener::bind(ip_addrs))?; info!("Started TLS server at {}", listener.local_addr()?); @@ -33,18 +36,22 @@ impl Stream for TlsServer { fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { if let Some(Ok(tcp_stream)) = futures::executor::block_on(self.listener.incoming().next()) { - let local_addr = tcp_stream.local_addr().expect( - "Local address could not be retrieved", - ); + let local_addr = tcp_stream + .local_addr() + .expect("Local address could not be retrieved"); - let peer_addr = tcp_stream.peer_addr().expect( - "Peer address could not be retrieved", - ); + let peer_addr = tcp_stream + .peer_addr() + .expect("Peer address could not be retrieved"); debug!("Received connection attempt from {}", peer_addr); if let Ok(tls_stream) = futures::executor::block_on(self.acceptor.accept(tcp_stream)) { debug!("Established TLS connection from {}", peer_addr); - Poll::Ready(Some(Connection::from(TlsConnectionMetadata::Server{ local_addr, peer_addr, stream: tls_stream }))) + Poll::Ready(Some(Connection::from(TlsConnectionMetadata::Server { + local_addr, + peer_addr, + stream: tls_stream, + }))) } else { debug!("Could not encrypt connection with TLS from {}", peer_addr); Poll::Pending