--- /dev/null
+[package]
+name = "connect"
+version = "0.0.2"
+authors = ["Sachandhan Ganesh <sachan.ganesh@gmail.com>"]
+edition = "2018"
+
+[dependencies]
+anyhow = "1.0.31"
+async-channel = "1.4.0"
+async-std = { version = "1.6.2", features = ["tokio02", "unstable"] }
+async-tls = { version = "0.9.0", default-features = false, features = ["client", "server"]}
+async-trait = "0.1.39"
+bytes = "0.5.5"
+dashmap = "3.11.10"
+futures = "0.3.8"
+log = "0.4"
+protobuf = "2.18.1"
+rustls = "0.18.0"
+
+[build-dependencies]
+protobuf-codegen-pure = "2.18.1"
--- /dev/null
+# connect
+
+This crate provides a distributed message queue abstraction over asynchronous network streams.
+
+By using a message queue, crate users can focus on sending and receiving messages between clients instead of low-level networking and failure recovery.
+
+## Future Goals
+
+- Documentation
+- Connection pool (+ accounting for ordering of messages)
+- Configurable policies for handling of non-registered (unexpected) message types
+- Testing
+- Benchmarking
+
+## Feature Status
+
+| Feature | Status |
+|----------------------------------------------------- |-------- |
+| [TCP Client](examples/tcp-client) | ✓ |
+| [TCP Server](examples/tcp-echo-server) | ✓ |
+| UDP Client | |
+| UDP Server | |
+| [TLS Client](examples/tls-client) | ✓ |
+| [TLS Server](examples/tls-echo-server) | ✓ |
+| QUIC Client | |
+| QUIC Server | |
+| SCTP Client | |
+| SCTP Server | |
+| DTLS-SCTP Client | |
+| DTLS-SCTP Server | |
+| Kafka Client | |
+| RMQ Client | |
+| SQS Client | |
+| NSQ Client | |
--- /dev/null
+fn main() -> Result<(), Box<dyn std::error::Error>> {
+ protobuf_codegen_pure::Codegen::new()
+ .out_dir("src/schema")
+ .inputs(&["schema/message.proto"])
+ .include("schema")
+ .run()
+ .expect("Codegen failed.");
+ Ok(())
+}
--- /dev/null
+[package]
+name = "tcp-client"
+version = "0.1.0"
+authors = ["Sachandhan Ganesh <sachan.ganesh@gmail.com>"]
+edition = "2018"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+anyhow = "1.0.31"
+async-std = { version = "1.6.2", features = ["attributes"] }
+env_logger = "0.7"
+log = "0.4"
+protobuf = "2.18.1"
+
+stitch-net = { path = "../../" }
+
+[build-dependencies]
+protobuf-codegen-pure = "2.18.1"
\ No newline at end of file
--- /dev/null
+# seam-channel tcp-client example
+
+This example program will:
+
+1. Establish a connection with a TCP server
+2. Send a `String` message to the server
+3. Wait for a `String` message reply from the server
+
+## Usage
+
+```
+export RUST_LOG=info
+cargo run <ip-address-to-connect-to>
+```
+
+## Example Usage
+
+```
+export RUST_LOG=info
+cargo run localhost:5678
+```
\ No newline at end of file
--- /dev/null
+fn main() -> Result<(), Box<dyn std::error::Error>> {
+ protobuf_codegen_pure::Codegen::new()
+ .out_dir("src/schema")
+ .inputs(&["schema/hello_world.proto"])
+ .include("schema")
+ .run()
+ .expect("Codegen failed.");
+ Ok(())
+}
--- /dev/null
+syntax = "proto3";
+package hello_world;
+
+message HelloWorld {
+ string message = 1;
+}
\ No newline at end of file
--- /dev/null
+pub mod schema;
+
+use crate::schema::hello_world::HelloWorld;
+use log::*;
+use protobuf::well_known_types::Any;
+use std::env;
+use stitch_net::{SinkExt, StitchConnection, StreamExt};
+
+#[async_std::main]
+async fn main() -> anyhow::Result<()> {
+ env_logger::init();
+
+ // Get ip address from cmd line args
+ let args: Vec<String> = env::args().collect();
+ let ip_address = match args.get(1) {
+ Some(addr) => addr,
+ None => {
+ error!("Need to pass IP address to connect to as command line argument");
+ panic!();
+ }
+ };
+
+ // create a client connection to the server
+ let mut conn = StitchConnection::tcp_client(ip_address)?;
+
+ // send a message to the server
+ let raw_msg = String::from("Hello world");
+ info!("Sending message: {}", raw_msg);
+
+ let mut msg = HelloWorld::new();
+ msg.set_message(raw_msg);
+
+ conn.writer().send(msg).await?;
+
+ // wait for the server to reply with an ack
+ while let Some(reply) = conn.reader().next().await {
+ info!("Received message");
+
+ let msg: HelloWorld = Any::unpack(&reply)?.unwrap();
+
+ info!("Unpacked reply: {}", msg.get_message());
+ }
+
+ Ok(())
+}
--- /dev/null
+// This file is generated by rust-protobuf 2.19.0. Do not edit
+// @generated
+
+// https://github.com/rust-lang/rust-clippy/issues/702
+#![allow(unknown_lints)]
+#![allow(clippy::all)]
+
+#![allow(unused_attributes)]
+#![rustfmt::skip]
+
+#![allow(box_pointers)]
+#![allow(dead_code)]
+#![allow(missing_docs)]
+#![allow(non_camel_case_types)]
+#![allow(non_snake_case)]
+#![allow(non_upper_case_globals)]
+#![allow(trivial_casts)]
+#![allow(unused_imports)]
+#![allow(unused_results)]
+//! Generated file from `hello_world.proto`
+
+/// Generated files are compatible only with the same version
+/// of protobuf runtime.
+// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_19_0;
+
+#[derive(PartialEq,Clone,Default)]
+pub struct HelloWorld {
+ // message fields
+ pub message: ::std::string::String,
+ // special fields
+ pub unknown_fields: ::protobuf::UnknownFields,
+ pub cached_size: ::protobuf::CachedSize,
+}
+
+impl<'a> ::std::default::Default for &'a HelloWorld {
+ fn default() -> &'a HelloWorld {
+ <HelloWorld as ::protobuf::Message>::default_instance()
+ }
+}
+
+impl HelloWorld {
+ pub fn new() -> HelloWorld {
+ ::std::default::Default::default()
+ }
+
+ // string message = 1;
+
+
+ pub fn get_message(&self) -> &str {
+ &self.message
+ }
+ pub fn clear_message(&mut self) {
+ self.message.clear();
+ }
+
+ // Param is passed by value, moved
+ pub fn set_message(&mut self, v: ::std::string::String) {
+ self.message = v;
+ }
+
+ // Mutable pointer to the field.
+ // If field is not initialized, it is initialized with default value first.
+ pub fn mut_message(&mut self) -> &mut ::std::string::String {
+ &mut self.message
+ }
+
+ // Take field
+ pub fn take_message(&mut self) -> ::std::string::String {
+ ::std::mem::replace(&mut self.message, ::std::string::String::new())
+ }
+}
+
+impl ::protobuf::Message for HelloWorld {
+ fn is_initialized(&self) -> bool {
+ true
+ }
+
+ fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> {
+ while !is.eof()? {
+ let (field_number, wire_type) = is.read_tag_unpack()?;
+ match field_number {
+ 1 => {
+ ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.message)?;
+ },
+ _ => {
+ ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
+ },
+ };
+ }
+ ::std::result::Result::Ok(())
+ }
+
+ // Compute sizes of nested messages
+ #[allow(unused_variables)]
+ fn compute_size(&self) -> u32 {
+ let mut my_size = 0;
+ if !self.message.is_empty() {
+ my_size += ::protobuf::rt::string_size(1, &self.message);
+ }
+ my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
+ self.cached_size.set(my_size);
+ my_size
+ }
+
+ fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
+ if !self.message.is_empty() {
+ os.write_string(1, &self.message)?;
+ }
+ os.write_unknown_fields(self.get_unknown_fields())?;
+ ::std::result::Result::Ok(())
+ }
+
+ fn get_cached_size(&self) -> u32 {
+ self.cached_size.get()
+ }
+
+ fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
+ &self.unknown_fields
+ }
+
+ fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
+ &mut self.unknown_fields
+ }
+
+ fn as_any(&self) -> &dyn (::std::any::Any) {
+ self as &dyn (::std::any::Any)
+ }
+ fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
+ self as &mut dyn (::std::any::Any)
+ }
+ fn into_any(self: ::std::boxed::Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
+ self
+ }
+
+ fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
+ Self::descriptor_static()
+ }
+
+ fn new() -> HelloWorld {
+ HelloWorld::new()
+ }
+
+ fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
+ static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT;
+ descriptor.get(|| {
+ let mut fields = ::std::vec::Vec::new();
+ fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
+ "message",
+ |m: &HelloWorld| { &m.message },
+ |m: &mut HelloWorld| { &mut m.message },
+ ));
+ ::protobuf::reflect::MessageDescriptor::new_pb_name::<HelloWorld>(
+ "HelloWorld",
+ fields,
+ file_descriptor_proto()
+ )
+ })
+ }
+
+ fn default_instance() -> &'static HelloWorld {
+ static instance: ::protobuf::rt::LazyV2<HelloWorld> = ::protobuf::rt::LazyV2::INIT;
+ instance.get(HelloWorld::new)
+ }
+}
+
+impl ::protobuf::Clear for HelloWorld {
+ fn clear(&mut self) {
+ self.message.clear();
+ self.unknown_fields.clear();
+ }
+}
+
+impl ::std::fmt::Debug for HelloWorld {
+ fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
+ ::protobuf::text_format::fmt(self, f)
+ }
+}
+
+impl ::protobuf::reflect::ProtobufValue for HelloWorld {
+ fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
+ ::protobuf::reflect::ReflectValueRef::Message(self)
+ }
+}
+
+static file_descriptor_proto_data: &'static [u8] = b"\
+ \n\x11hello_world.proto\x12\x0bhello_world\"*\n\nHelloWorld\x12\x1a\n\
+ \x07message\x18\x01\x20\x01(\tR\x07messageB\0:\0B\0b\x06proto3\
+";
+
+static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;
+
+fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto {
+ ::protobuf::Message::parse_from_bytes(file_descriptor_proto_data).unwrap()
+}
+
+pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto {
+ file_descriptor_proto_lazy.get(|| {
+ parse_descriptor_proto()
+ })
+}
--- /dev/null
+pub mod hello_world;
--- /dev/null
+[package]
+name = "tcp-echo-server"
+version = "0.1.0"
+authors = ["Sachandhan Ganesh <sachan.ganesh@gmail.com>"]
+edition = "2018"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+anyhow = "1.0.31"
+async-std = { version = "1.6.2", features = ["attributes"] }
+env_logger = "0.7"
+log = "0.4"
+protobuf = "2.18.1"
+
+stitch-net = { path = "../../" }
+
+[build-dependencies]
+protobuf-codegen-pure = "2.18.1"
\ No newline at end of file
--- /dev/null
+# seam-channel tcp-echo-server example
+
+This example program will:
+
+1. Bind to an IP address
+2. Accept any number of TCP connections
+3. Handle each connection by:
+ 1. Waiting for `String` messages to be received
+ 2. Echoing the `String` message back to the source
+
+## Usage
+
+```
+export RUST_LOG=info
+cargo run <ip-address-to-bind-to>
+```
+
+## Example Usage
+
+```
+export RUST_LOG=info
+cargo run localhost:5678
+```
\ No newline at end of file
--- /dev/null
+fn main() -> Result<(), Box<dyn std::error::Error>> {
+ protobuf_codegen_pure::Codegen::new()
+ .out_dir("src/schema")
+ .inputs(&["../tcp-client/schema/hello_world.proto"])
+ .include("../tcp-client/schema")
+ .run()
+ .expect("Codegen failed.");
+ Ok(())
+}
--- /dev/null
+mod schema;
+
+use crate::schema::hello_world::HelloWorld;
+use async_std::task;
+use log::*;
+use std::env;
+use stitch_net::tcp::StitchTcpServer;
+use stitch_net::{SinkExt, StreamExt};
+
+#[async_std::main]
+async fn main() -> anyhow::Result<()> {
+ env_logger::init();
+
+ // Get ip address from cmd line args
+ let args: Vec<String> = env::args().collect();
+
+ let ip_address = match args.get(1) {
+ Some(addr) => addr,
+ None => {
+ error!("Need to pass IP address to bind to as command line argument");
+ panic!();
+ }
+ };
+
+ // create a server
+ let mut server = StitchTcpServer::new(ip_address)?;
+
+ // handle server connections
+ // wait for a connection to come in and be accepted
+ while let Some(mut conn) = server.next().await {
+ info!("Handling connection from {}", conn.peer_addr());
+
+ task::spawn(async move {
+ while let Some(msg) = conn.reader().next().await {
+ if msg.is::<HelloWorld>() {
+ if let Ok(Some(contents)) = msg.unpack::<HelloWorld>() {
+ info!(
+ "Received a message \"{}\" from {}",
+ contents.get_message(),
+ conn.peer_addr()
+ );
+
+ conn.writer()
+ .send(contents)
+ .await
+ .expect("Could not send message back to source connection");
+ info!("Sent message back to original sender");
+ }
+ } else {
+ error!("Received a message of unknown type")
+ }
+ }
+ });
+ }
+
+ Ok(())
+}
--- /dev/null
+// This file is generated by rust-protobuf 2.19.0. Do not edit
+// @generated
+
+// https://github.com/rust-lang/rust-clippy/issues/702
+#![allow(unknown_lints)]
+#![allow(clippy::all)]
+
+#![allow(unused_attributes)]
+#![rustfmt::skip]
+
+#![allow(box_pointers)]
+#![allow(dead_code)]
+#![allow(missing_docs)]
+#![allow(non_camel_case_types)]
+#![allow(non_snake_case)]
+#![allow(non_upper_case_globals)]
+#![allow(trivial_casts)]
+#![allow(unused_imports)]
+#![allow(unused_results)]
+//! Generated file from `hello_world.proto`
+
+/// Generated files are compatible only with the same version
+/// of protobuf runtime.
+// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_19_0;
+
+#[derive(PartialEq,Clone,Default)]
+pub struct HelloWorld {
+ // message fields
+ pub message: ::std::string::String,
+ // special fields
+ pub unknown_fields: ::protobuf::UnknownFields,
+ pub cached_size: ::protobuf::CachedSize,
+}
+
+impl<'a> ::std::default::Default for &'a HelloWorld {
+ fn default() -> &'a HelloWorld {
+ <HelloWorld as ::protobuf::Message>::default_instance()
+ }
+}
+
+impl HelloWorld {
+ pub fn new() -> HelloWorld {
+ ::std::default::Default::default()
+ }
+
+ // string message = 1;
+
+
+ pub fn get_message(&self) -> &str {
+ &self.message
+ }
+ pub fn clear_message(&mut self) {
+ self.message.clear();
+ }
+
+ // Param is passed by value, moved
+ pub fn set_message(&mut self, v: ::std::string::String) {
+ self.message = v;
+ }
+
+ // Mutable pointer to the field.
+ // If field is not initialized, it is initialized with default value first.
+ pub fn mut_message(&mut self) -> &mut ::std::string::String {
+ &mut self.message
+ }
+
+ // Take field
+ pub fn take_message(&mut self) -> ::std::string::String {
+ ::std::mem::replace(&mut self.message, ::std::string::String::new())
+ }
+}
+
+impl ::protobuf::Message for HelloWorld {
+ fn is_initialized(&self) -> bool {
+ true
+ }
+
+ fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> {
+ while !is.eof()? {
+ let (field_number, wire_type) = is.read_tag_unpack()?;
+ match field_number {
+ 1 => {
+ ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.message)?;
+ },
+ _ => {
+ ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
+ },
+ };
+ }
+ ::std::result::Result::Ok(())
+ }
+
+ // Compute sizes of nested messages
+ #[allow(unused_variables)]
+ fn compute_size(&self) -> u32 {
+ let mut my_size = 0;
+ if !self.message.is_empty() {
+ my_size += ::protobuf::rt::string_size(1, &self.message);
+ }
+ my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
+ self.cached_size.set(my_size);
+ my_size
+ }
+
+ fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
+ if !self.message.is_empty() {
+ os.write_string(1, &self.message)?;
+ }
+ os.write_unknown_fields(self.get_unknown_fields())?;
+ ::std::result::Result::Ok(())
+ }
+
+ fn get_cached_size(&self) -> u32 {
+ self.cached_size.get()
+ }
+
+ fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
+ &self.unknown_fields
+ }
+
+ fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
+ &mut self.unknown_fields
+ }
+
+ fn as_any(&self) -> &dyn (::std::any::Any) {
+ self as &dyn (::std::any::Any)
+ }
+ fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
+ self as &mut dyn (::std::any::Any)
+ }
+ fn into_any(self: ::std::boxed::Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
+ self
+ }
+
+ fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
+ Self::descriptor_static()
+ }
+
+ fn new() -> HelloWorld {
+ HelloWorld::new()
+ }
+
+ fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
+ static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT;
+ descriptor.get(|| {
+ let mut fields = ::std::vec::Vec::new();
+ fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
+ "message",
+ |m: &HelloWorld| { &m.message },
+ |m: &mut HelloWorld| { &mut m.message },
+ ));
+ ::protobuf::reflect::MessageDescriptor::new_pb_name::<HelloWorld>(
+ "HelloWorld",
+ fields,
+ file_descriptor_proto()
+ )
+ })
+ }
+
+ fn default_instance() -> &'static HelloWorld {
+ static instance: ::protobuf::rt::LazyV2<HelloWorld> = ::protobuf::rt::LazyV2::INIT;
+ instance.get(HelloWorld::new)
+ }
+}
+
+impl ::protobuf::Clear for HelloWorld {
+ fn clear(&mut self) {
+ self.message.clear();
+ self.unknown_fields.clear();
+ }
+}
+
+impl ::std::fmt::Debug for HelloWorld {
+ fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
+ ::protobuf::text_format::fmt(self, f)
+ }
+}
+
+impl ::protobuf::reflect::ProtobufValue for HelloWorld {
+ fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
+ ::protobuf::reflect::ReflectValueRef::Message(self)
+ }
+}
+
+static file_descriptor_proto_data: &'static [u8] = b"\
+ \n\x11hello_world.proto\x12\x0bhello_world\"*\n\nHelloWorld\x12\x1a\n\
+ \x07message\x18\x01\x20\x01(\tR\x07messageB\0:\0B\0b\x06proto3\
+";
+
+static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;
+
+fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto {
+ ::protobuf::Message::parse_from_bytes(file_descriptor_proto_data).unwrap()
+}
+
+pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto {
+ file_descriptor_proto_lazy.get(|| {
+ parse_descriptor_proto()
+ })
+}
--- /dev/null
+pub mod hello_world;
--- /dev/null
+[package]
+name = "tls-client"
+version = "0.1.0"
+authors = ["Sachandhan Ganesh <sachan.ganesh@gmail.com>"]
+edition = "2018"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+anyhow = "1.0.31"
+async-std = { version = "1.6.2", features = ["attributes"] }
+env_logger = "0.7"
+log = "0.4"
+
+seam-channel = { path = "../../" }
\ No newline at end of file
--- /dev/null
+# seam-channel tls-client example
+
+This example program will:
+
+1. Establish a secure connection with a TLS server
+2. Send a `String` message to the server
+3. Wait for a `String` message reply from the server
+
+## Usage
+
+```
+export RUST_LOG=info
+cargo run <ip-address-to-connect-to> <domain-name> <ca-file>
+```
+
+## Example Usage
+
+```
+export RUST_LOG=info
+cargo run 127.0.0.1:5678 localhost end.chain
+```
\ No newline at end of file
--- /dev/null
+-----BEGIN CERTIFICATE-----
+MIIGnzCCAoegAwIBAgIBezANBgkqhkiG9w0BAQsFADAaMRgwFgYDVQQDDA9wb255
+dG93biBSU0EgQ0EwHhcNMTYxMjEwMTc0MjMzWhcNMjYxMjA4MTc0MjMzWjAsMSow
+KAYDVQQDDCFwb255dG93biBSU0EgbGV2ZWwgMiBpbnRlcm1lZGlhdGUwggGiMA0G
+CSqGSIb3DQEBAQUAA4IBjwAwggGKAoIBgQDnfb7vaJbaHEyVTflswWhmHqx5W0NO
+KyKbDp2zXEJwDO+NDJq6i1HGnFd/vO4LyjJBU1wUsKtE+m55cfRmUHVuZ2w4n/VF
+p7Z7n+SNuvJNcrzDxyKVy4GIZ39zQePnniqtLqXh6eI8Ow6jiMgVxC/wbWcVLKv6
+4RM+2fLjJAC9b27QfjhOlMKVeMOEvPrrpjLSauaHAktQPhuzIAwzxM0+KnvDkWWy
+NVqAV/lq6fSO/9vJRhM4E2nxo6yqi7qTdxVxMmKsNn7L6HvjQgx+FXziAUs55Qd9
+cP7etCmPmoefkcgdbxDOIKH8D+DvfacZwngqcnr/q96Ff4uJ13d2OzR1mWVSZ2hE
+JQt/BbZBANciqu9OZf3dj6uOOXgFF705ak0GfLtpZpc29M+fVnknXPDSiKFqjzOO
+KL+SRGyuNc9ZYjBKkXPJ1OToAs6JSvgDxfOfX0thuo2rslqfpj2qCFugsRIRAqvb
+eyFwg+BPM/P/EfauXlAcQtBF04fOi7xN2okCAwEAAaNeMFwwHQYDVR0OBBYEFNwu
+Py4Do//Sm5CZDrocHWTrNr96MCAGA1UdJQEB/wQWMBQGCCsGAQUFBwMBBggrBgEF
+BQcDAjAMBgNVHRMEBTADAQH/MAsGA1UdDwQEAwIB/jANBgkqhkiG9w0BAQsFAAOC
+BAEAMHZpBqDIUAVFZNw4XbuimXQ4K8q4uePrLGHLb4F/gHbr8kYrU4H+cy4l+xXf
+2dlEBdZoqjSF7uXzQg5Fd8Ff3ZgutXd1xeUJnxo0VdpKIhqeaTPqhffC2X6FQQH5
+KrN7NVWQSnUhPNpBFELpmdpY1lHigFW7nytYj0C6VJ4QsbqhfW+n/t+Zgqtfh/Od
+ZbclzxFwMM55zRA2HP6IwXS2+d61Jk/RpDHTzhWdjGH4906zGNNMa7slHpCTA9Ju
+TrtjEAGt2PBSievBJOHZW80KVAoEX2n9B3ZABaz+uX0VVZG0D2FwhPpUeA57YiXu
+qiktZR4Ankph3LabXp4IlAX16qpYsEW8TWE/HLreeqoM0WDoI6rF9qnTpV2KWqBf
+ziMYkfSkT7hQ2bWc493lW+QwSxCsuBsDwlrCwAl6jFSf1+jEQx98/8n9rDNyD9dL
+PvECmtF30WY98nwZ9/kO2DufQrd0mwSHcIT0pAwl5fimpkwTjj+TTbytO3M4jK5L
+tuIzsViQ95BmJQ3XuLdkQ/Ug8rpECYRX5fQX1qXkkvl920ohpKqKyEji1OmfmJ0Z
+tZChaEcu3Mp3U+gD4az2ogmle3i/Phz8ZEPFo4/21G5Qd72z0lBgaQIeyyCk5MHt
+Yg0vA7X0/w4bz+OJv5tf7zJsPCYSprr+c/7YUJk9Fqu6+g9ZAavI99xFKdGhz4Og
+w0trnKNCxYc6+NPopTDbXuY+fo4DK7C0CSae5sKs7013Ne6w4KvgfLKpvlemkGfg
+ZA3+1FMXVfFIEH7Cw9cx6F02Sr3k1VrU68oM3wH5nvTUkELOf8nRMlzliQjVCpKB
+yFSe9dzRVSFEbMDxChiEulGgNUHj/6wwpg0ZmCwPRHutppT3jkfEqizN5iHb69GH
+k6kol6knJofkaL656Q3Oc9o0ZrMlFh1RwmOvAk5fVK0/CV88/phROz2Wdmy5Bz4a
+t0vzqFWA54y6+9EEVoOk9SU0CYfpGtpX4URjLK1EUG/l+RR3366Uee6TPrtEZ9cg
+56VQMxhSaRNAvJ6DfiSuscSCNJzwuXaMXSZydGYnnP9Tb9p6c1uy1sXdluZkBIcK
+CgC+gdDMSNlDn9ghc4xZGkuA8bjzfAYuRuGKmfTt8uuklkjw2b9w3SHjC4/Cmd2W
+cFRnzfg2oL6e78hNg2ZGgsLzvb6Lu6/5IhXCO7RitzYf2+HLBbc+YLFsnG3qeGe1
+28yGnXOQd97Cr4+IzFucVy/33gMQkesNUSDFJSq1gE/hGrMgTTMQJ7yC3PRqg0kG
+tpqTyKNdM0g1adxlR1qfDPvpUBApkgBbySnMyWEr5+tBuoHUtH2m49oV9YD4odMJ
+yJjlGxituO/YNN6O8oANlraG1Q==
+-----END CERTIFICATE-----
+-----BEGIN CERTIFICATE-----
+MIIJBzCCBO+gAwIBAgIJAN7WS1mRS9A+MA0GCSqGSIb3DQEBCwUAMBoxGDAWBgNV
+BAMMD3Bvbnl0b3duIFJTQSBDQTAeFw0xNjEyMTAxNzQyMzNaFw0yNjEyMDgxNzQy
+MzNaMBoxGDAWBgNVBAMMD3Bvbnl0b3duIFJTQSBDQTCCBCIwDQYJKoZIhvcNAQEB
+BQADggQPADCCBAoCggQBAMNEzJ7aNdD2JSk9+NF9Hh2za9OQnt1d/7j6DtE3ieoT
+ms8mMSXzoImXZayZ9Glx3yx/RhEb2vmINyb0vRUM4I/GH+XHdOBcs9kaJNv/Mpw4
+Ggd4e1LUqV1pzNrhYwRrTQTKyaDiDX2WEBNfQaaYnHltmSmsfyt3Klj+IMc6CyqV
+q8SOQ6Go414Vn++Jj7p3E6owdwuvSvO8ERLobiA6vYB+qrS7E48c4zRIAFIO4uwt
+g4TiCJLLWc1fRSoqGGX7KS+LzQF8Pq67IOHVna4e9peSe6nQnm0LQZAmaosYHvF4
+AX0Bj6TLv9PXCAGtB7Pciev5Br0tRZEdVyYfmwiVKUWcp77TghV3W+VaJVhPh5LN
+X91ktvpeYek3uglqv2ZHtSG2S1KkBtTkbMOD+a2BEUfq0c0+BIsj6jdvt4cvIfet
+4gUOxCvYMBs4/dmNT1zoe/kJ0lf8YXYLsXwVWdIW3jEE8QdkLtLI9XfyU9OKLZuD
+mmoAf7ezvv/T3nKLFqhcwUFGgGtCIX+oWC16XSbDPBcKDBwNZn8C49b7BLdxqAg3
+msfxwhYzSs9F1MXt/h2dh7FVmkCSxtgNDX3NJn5/yT6USws2y0AS5vXVP9hRf0NV
+KfKn9XlmHCxnZExwm68uZkUUYHB05jSWFojbfWE+Mf9djUeQ4FuwusztZdbyQ4yS
+mMtBXO0I6SQBmjCoOa1ySW3DTuw/eKCfq+PoxqWD434bYA9nUa+pE27MP7GLyjCS
+6+ED3MACizSF0YxkcC9pWUo4L5FKp+DxnNbtzMIILnsDZTVHOvKUy/gjTyTWm/+7
+2t98l7vBE8gn3Aux0V5WFe2uZIZ07wIi/OThoBO8mpt9Bm5cJTG07JStKEXX/UH1
+nL7cDZ2V5qbf4hJdDy4qixxxIZtmf//1BRlVQ9iYTOsMoy+36DXWbc3vSmjRefW1
+YENt4zxOPe4LUq2Z+LXq1OgVQrHrVevux0vieys7Rr2gA1sH8FaaNwTr7Q8dq+Av
+Evk+iOUH4FuYorU1HuGHPkAkvLWosVwlB+VhfEai0V6+PmttmaOnCJNHfFTu5wCu
+B9CFJ1tdzTzAbrLwgtWmO70KV7CfZPHO7lMWhSvplU0i5T9WytxP91IoFtXwRSO8
++Ghyu0ynB3HywCH2dez89Vy903P6PEU0qTnYWRz6D/wi5+yHHNrm9CilWurs/Qex
+kyB7lLD7Cb1JJc8QIFTqT6vj+cids3xd245hUdpFyZTX99YbF6IkiB2zGi5wvUmP
+f1GPvkTLb7eF7bne9OClEjEqvc0hVJ2abO2WXkqxlQFEYZHNofm+y6bnby/BZZJo
+beaSFcLOCe2Z8iZvVnzfHBCeLyWE89gc94z784S3LEsCAwEAAaNQME4wHQYDVR0O
+BBYEFNz2wEPCQbx9OdRCNE4eALwHJfIgMB8GA1UdIwQYMBaAFNz2wEPCQbx9OdRC
+NE4eALwHJfIgMAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQELBQADggQBACbm2YX7
+sBG0Aslj36gmVlCTTluNg2tuK2isHbK3YhNwujrH/o/o2OV7UeUkZkPwE4g4/SjC
+OwDWYniRNyDKBOeD9Q0XxR5z5IZQO+pRVvXF8DXO6kygWCOJM9XheKxp9Uke0aDg
+m8F02NslKLUdy7piGlLSz1sgdjiE3izIwFZRpZY7sMozNWWvSAmzprbkE78LghIm
+VEydQzIQlr5soWqc65uFLNbEA6QBPoFc6dDW+mnzXf8nrZUM03CACxAsuq/YkjRp
+OHgwgfdNRdlu4YhZtuQNak4BUvDmigTGxDC+aMJw0ldL1bLtqLG6BvQbyLNPOOfo
+5S8lGh4y06gb//052xHaqtCh5Ax5sHUE5By6wKHAKbuJy26qyKfaRoc3Jigs4Fd5
+3CuoDWHbyXfkgKiU+sc+1mvCxQKFRJ2fpGEFP8iEcLvdUae7ZkRM4Kb0vST+QhQV
+fDaFkM3Bwqtui5YaZ6cHHQVyXQdujCmfesoZXKil2yduQ3KWgePjewzRV+aDWMzk
+qKaF+TRANSqWbBU6JTwwQ4veKQThU3ir7nS2ovdPbhNS/FnWoKodj6eaqXfdYuBh
+XOXLewIF568MJsLOuBubeAO2a9LOlhnv6eLGp2P4M7vwEdN/LRRQtwBBmqq8C3h+
+ewrJP12B/ag0bJDi9vCgPhYtDEpjpfsnxZEIqVZwshJ/MqXykFp2kYk62ylyfDWq
+veI/aHwpzT2k+4CI/XmPWXl9NlI50HPdpcwCBDy8xVHwb/x7stNgQdIhaj9tzmKa
+S+eqitclc8Iqrbd523H//QDzm8yiqRZUdveNa9gioTMErR0ujCpK8tO8mVZcVfNX
+i1/Vsar5++nXcPhxKsd1t8XV2dk3gUZIfMgzLLzs+KSiFg+bT3c7LkCd+I3w30Iv
+fh9cxFBAyYO9giwxaCfJgoz7OYqaHOOtASF85UV7gK9ELT7/z+RAcS/UfY1xbd54
+hIi1vRZj8lfkAYNtnYlud44joi1BvW/GZGFCiJ13SSvfHNs9v/5xguyCSgyCc0qx
+ZkN/fzj/5wFQbxSl3MPn/JrsvlH6wvJht1SA50uVdUvJ5e5V8EgLYfMqlJNNpTHP
+wZcHF+Dw126oyu2KhUxD126Gusxp+tV6I0EEZnVwwduFQWq9xm/gT+qohpveeylf
+Q2XGz56DF2udJJnSFGSqzQOl9XopNC/4ecBMwIzqdFSpaWgK3VNAcigyDajgoE4v
+ZuiVDEiLhLowZvi1V8GOWzcka7R2BQBjhOLWByQGDcm8cOMS7w8oCSQCaYmJyHvE
+tTHq7fX6/sXv0AJqM3ysSdU01IVBNahnr5WEkmQMaFF0DGvRfqkVdKcChwrKv7r2
+DLxargy39i2aQGg=
+-----END CERTIFICATE-----
--- /dev/null
+use log::*;
+use seam_channel::net::tls::rustls::ClientConfig;
+use seam_channel::net::{StitchClient, StitchNetClient};
+use std::env;
+
+#[async_std::main]
+async fn main() -> anyhow::Result<()> {
+ env_logger::init();
+
+ // get ip address and domain from cmd line args
+ let (ip_addr, domain, cafile_path) = parse_args();
+
+ // construct `rustls` client config
+ let cafile = std::fs::read(cafile_path)?;
+
+ let mut client_pem = std::io::Cursor::new(cafile);
+
+ let mut client_config = ClientConfig::new();
+ client_config
+ .root_store
+ .add_pem_file(&mut client_pem)
+ .map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidInput, "invalid cert"))?;
+
+ // create a client connection to the server
+ let dist_chan = StitchNetClient::tls_client(ip_addr, &domain, client_config.into())?;
+
+ // create a channel for String messages on the TCP connection
+ let (sender, receiver) = dist_chan.bounded::<String>(Some(100));
+
+ // alert the connection that you are ready to read and write messages
+ dist_chan.ready()?;
+
+ // send a message to the server
+ let msg = String::from("Hello world");
+ info!("Sending message: {}", msg);
+ sender.send(msg).await?;
+
+ // wait for the server to reply with an ack
+ if let Ok(msg) = receiver.recv().await {
+ info!("Received reply: {}", msg);
+ }
+
+ Ok(())
+}
+
+fn parse_args() -> (String, String, String) {
+ let args: Vec<String> = env::args().collect();
+
+ let ip_address = match args.get(1) {
+ Some(addr) => addr,
+ None => {
+ error!("Need to pass IP address to connect to as first command line argument");
+ panic!();
+ }
+ };
+
+ let domain = match args.get(2) {
+ Some(d) => d,
+ None => {
+ error!("Need to pass domain name as second command line argument");
+ panic!();
+ }
+ };
+
+ let cafile_path = match args.get(3) {
+ Some(d) => d,
+ None => {
+ error!("Need to pass path to cafile as third command line argument");
+ panic!();
+ }
+ };
+
+ (
+ ip_address.to_string(),
+ domain.to_string(),
+ cafile_path.to_string(),
+ )
+}
--- /dev/null
+[package]
+name = "tls-echo-server"
+version = "0.1.0"
+authors = ["Sachandhan Ganesh <sachan.ganesh@gmail.com>"]
+edition = "2018"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+anyhow = "1.0.31"
+async-std = { version = "1.6.2", features = ["attributes"] }
+env_logger = "0.7"
+log = "0.4"
+
+seam-channel = { path = "../../" }
\ No newline at end of file
--- /dev/null
+# seam-channel tls-echo-server example
+
+This example program will:
+
+1. Bind to an IP address
+2. Accept any number of secure TLS connections
+3. Handle each connection by:
+ 1. Waiting for `String` messages to be received
+ 2. Echoing the `String` message back to the source
+
+## Usage
+
+```
+export RUST_LOG=info
+cargo run <ip-address-to-bind-to> <cert-file> <key-file>
+```
+
+## Example Usage
+
+```
+export RUST_LOG=info
+cargo run 127.0.0.1:5678 end.cert end.rsa
+```
\ No newline at end of file
--- /dev/null
+-----BEGIN CERTIFICATE-----
+MIIEADCCAmigAwIBAgICAcgwDQYJKoZIhvcNAQELBQAwLDEqMCgGA1UEAwwhcG9u
+eXRvd24gUlNBIGxldmVsIDIgaW50ZXJtZWRpYXRlMB4XDTE2MTIxMDE3NDIzM1oX
+DTIyMDYwMjE3NDIzM1owGTEXMBUGA1UEAwwOdGVzdHNlcnZlci5jb20wggEiMA0G
+CSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQC1YDz66+7VD4DL1+/sVHMQ+BbDRgmD
+OQlX++mfW8D3QNQm/qDBEbu7T7qqdc9GKDar4WIzBN8SBkzM1EjMGwNnZPV/Tfz0
+qUAR1L/7Zzf1GaFZvWXgksyUpfwvmprH3Iy/dpkETwtPthpTPNlui3hZnm/5kkjR
+RWg9HmID4O04Ld6SK313v2ZgrPZbkKvbqlqhUnYWjL3blKVGbpXIsuZzEU9Ph+gH
+tPcEhZpFsM6eLe+2TVscIrycMEOTXqAAmO6zZ9sQWtfllu3CElm904H6+jA/9Leg
+al72pMmkYr8wWniqDDuijXuCPlVx5EDFFyxBmW18UeDEQaKV3kNfelaTAgMBAAGj
+gb4wgbswDAYDVR0TAQH/BAIwADALBgNVHQ8EBAMCBsAwHQYDVR0OBBYEFIYhJkVy
+AAKT6cY/ruH1Eu+NNxteMEIGA1UdIwQ7MDmAFNwuPy4Do//Sm5CZDrocHWTrNr96
+oR6kHDAaMRgwFgYDVQQDDA9wb255dG93biBSU0EgQ0GCAXswOwYDVR0RBDQwMoIO
+dGVzdHNlcnZlci5jb22CFXNlY29uZC50ZXN0c2VydmVyLmNvbYIJbG9jYWxob3N0
+MA0GCSqGSIb3DQEBCwUAA4IBgQCWV76jfQDZKtfmj45fTwZzoe/PxjWPRbAvSEnt
+LRHrPhqQfpMLqpun8uu/w86mHiR/AmiAySMu3zivW6wfGzlRWLi/zCyO6r9LGsgH
+bNk5CF642cdZFvn1SiSm1oGXQrolIpcyXu88nUpt74RnY4ETCC1dRQKqxsYufe5T
+DOmTm3ChinNW4QRG3yvW6DVuyxVAgZvofyKJOsM3GO6oogIM41aBqZ3UTwmIwp6D
+oISdiATslFOzYzjnyXNR8DG8OOkv1ehWuyb8x+hQCZAuogQOWYtCSd6k3kKgd0EM
+4CWbt1XDV9ZJwBf2uxZeKuCu/KIy9auNtijAwPsUv9qxuzko018zhl3lWm5p2Sqw
+O7fFshU3A6df8hMw7ST6/tgFY7geT88U4iJhfWMwr/CZSRSVMXhTyJgbLIXxKYZj
+Ym5v4NAIQP6hI4HixzQaYgrhW6YX6myk+emMjQLRJHT8uHvmT7fuxMJVWWgsCkr1
+C75pRQEagykN/Uzr5e6Tm8sVu88=
+-----END CERTIFICATE-----
--- /dev/null
+-----BEGIN RSA PRIVATE KEY-----
+MIIEogIBAAKCAQEAtWA8+uvu1Q+Ay9fv7FRzEPgWw0YJgzkJV/vpn1vA90DUJv6g
+wRG7u0+6qnXPRig2q+FiMwTfEgZMzNRIzBsDZ2T1f0389KlAEdS/+2c39RmhWb1l
+4JLMlKX8L5qax9yMv3aZBE8LT7YaUzzZbot4WZ5v+ZJI0UVoPR5iA+DtOC3ekit9
+d79mYKz2W5Cr26paoVJ2Foy925SlRm6VyLLmcxFPT4foB7T3BIWaRbDOni3vtk1b
+HCK8nDBDk16gAJjus2fbEFrX5ZbtwhJZvdOB+vowP/S3oGpe9qTJpGK/MFp4qgw7
+oo17gj5VceRAxRcsQZltfFHgxEGild5DX3pWkwIDAQABAoIBAFDTazlSbGML/pRY
+TTWeyIw2UkaA7npIr45C13BJfitw+1nJPK/tDCDDveZ6i3yzLPHZhV5A/HtWzWC1
+9R7nptOrnO83PNN2nPOVQFxzOe+ClXGdQkoagQp5EXHRTspj0WD9I+FUrDDAcOjJ
+BAgMJPyi6zlnZAXGDVa3NGyQDoZqwU2k36L4rEsJIkG0NVurZhpiCexNkkf32495
+TOINQ0iKdfJ4iZoEYQ9G+x4NiuAJRCHuIcH76SNfT+Uv3wX0ut5EFPtflnvtdgcp
+QVcoKwYdO0+mgO5xqWlBcsujSvgBdiNAGnAxKHWiEaacuIJi4+yYovyEebP6QI2X
+Zg/U2wkCgYEA794dE5CPXLOmv6nioVC/ubOESk7vjSlEka/XFbKr4EY794YEqrB1
+8TUqg09Bn3396AS1e6P2shr3bxos5ybhOxDGSLnJ+aC0tRFjd1BPKnA80vZM7ggt
+5cjmdD5Zp0tIQTIAAYU5bONQOwj0ej4PE7lny26eLa5vfvCwlrD+rM0CgYEAwZMN
+W/5PA2A+EM08IaHic8my0dCunrNLF890ouZnDG99SbgMGvvEsGIcCP1sai702hNh
+VgGDxCz6/HUy+4O4YNFVtjY7uGEpfIEcEI7CsLQRP2ggWEFxThZtnEtO8PbM3J/i
+qcS6njHdE+0XuCjgZwGgva5xH2pkWFzw/AIpEN8CgYB2HOo2axWc8T2n3TCifI+c
+EqCOsqXU3cBM+MgxgASQcCUxMkX0AuZguuxPMmS+85xmdoMi+c8NTqgOhlYcEJIR
+sqXgw9OH3zF8g6513w7Md+4Ld4rUHyTypGWOUfF1pmVS7RsBpKdtTdWA7FzuIMbt
+0HsiujqbheyTFlPuMAOH9QKBgBWS1gJSrWuq5j/pH7J/4EUXTZ6kq1F0mgHlVRJy
+qzlvk38LzA2V0a32wTkfRV3wLcnALzDuqkjK2o4YYb42R+5CZlMQaEd8TKtbmE0g
+HAKljuaKLFCpun8BcOXiXsHsP5i3GQPisQnAdOsrmWEk7R2NyORa9LCToutWMGVl
+uD3xAoGAA183Vldm+m4KPsKS17t8MbwBryDXvowGzruh/Z+PGA0spr+ke4XxwT1y
+kMMP1+5flzmjlAf4+W8LehKuVqvQoMlPn5UVHmSxQ7cGx/O/o6Gbn8Q25/6UT+sM
+B1Y0rlLoKG62pnkeXp1O4I57gnClatWRg5qw11a8V8e3jvDKIYM=
+-----END RSA PRIVATE KEY-----
--- /dev/null
+use async_std::{io, task};
+use log::*;
+use seam_channel::net::tls::rustls::internal::pemfile::{certs, rsa_private_keys};
+use seam_channel::net::tls::rustls::{NoClientAuth, ServerConfig};
+use seam_channel::net::{StitchClient, StitchNetServer};
+use std::env;
+use std::fs::File;
+use std::io::BufReader;
+
+#[async_std::main]
+async fn main() -> anyhow::Result<()> {
+ env_logger::init();
+
+ // Get ip address from cmd line args
+ let (ip_address, cert_path, key_path) = parse_args();
+
+ let certs = certs(&mut BufReader::new(File::open(cert_path)?))
+ .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid cert"))?;
+
+ let mut keys = rsa_private_keys(&mut BufReader::new(File::open(key_path)?))
+ .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid key"))?;
+
+ let mut config = ServerConfig::new(NoClientAuth::new());
+ config
+ .set_single_cert(certs, keys.remove(0))
+ .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
+
+ // create a server
+ let (_server, conns) = StitchNetServer::tls_server(ip_address, config.into())?;
+
+ // handle server connections
+ // wait for a connection to come in and be accepted
+ while let Ok(conn) = conns.recv().await {
+ info!("Handling connection: {}", conn.peer_addr());
+
+ // register for String-typed messages
+ let (sender, receiver) = conn.unbounded::<String>();
+
+ // let the connection know you are ready to send and receive messages
+ conn.ready()
+ .expect("could not ready the connection for reading and writing");
+
+ // handle String messages
+ task::spawn(async move {
+ // for every String message
+ while let Ok(msg) = receiver.recv().await {
+ info!("Echoing message: {}", msg);
+
+ let response = format!("{}, right back at you!", msg);
+
+ // Send the message back to its source
+ if let Err(err) = sender.send(response).await {
+ error!("Could not echo message: {:#?}", err);
+ }
+ }
+ });
+ }
+
+ Ok(())
+}
+
+fn parse_args() -> (String, String, String) {
+ let args: Vec<String> = env::args().collect();
+
+ let ip_address = match args.get(1) {
+ Some(addr) => addr,
+ None => {
+ error!("Need to pass IP address to connect to as first command line argument");
+ panic!();
+ }
+ };
+
+ let cert_path = match args.get(2) {
+ Some(d) => d,
+ None => {
+ error!("Need to pass path to cert file as second command line argument");
+ panic!();
+ }
+ };
+
+ let key_path = match args.get(3) {
+ Some(d) => d,
+ None => {
+ error!("Need to pass path to key file as third command line argument");
+ panic!();
+ }
+ };
+
+ (
+ ip_address.to_string(),
+ cert_path.to_string(),
+ key_path.to_string(),
+ )
+}
--- /dev/null
+syntax = "proto3";
+package message;
+
+import "google/protobuf/any.proto";
+
+message StitchMessage {
+ google.protobuf.Any payload = 1;
+}
--- /dev/null
+pub mod schema;
+pub mod tcp;
+// @todo pub mod tls;
+mod reader;
+mod writer;
+
+pub use crate::reader::StitchConnectionReader;
+use crate::schema::StitchMessage;
+pub use crate::writer::StitchConnectionWriter;
+use async_channel::RecvError;
+use async_std::net::SocketAddr;
+use async_std::pin::Pin;
+use bytes::{Buf, BytesMut};
+use futures::task::{Context, Poll};
+use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, Sink, Stream};
+use log::*;
+use protobuf::Message;
+use std::convert::TryInto;
+
+pub use futures::SinkExt;
+pub use futures::StreamExt;
+use protobuf::well_known_types::Any;
+
+pub struct StitchConnection {
+ local_addr: SocketAddr,
+ peer_addr: SocketAddr,
+ reader: StitchConnectionReader,
+ writer: StitchConnectionWriter,
+}
+
+#[allow(dead_code)]
+impl StitchConnection {
+ pub(crate) fn new(
+ local_addr: SocketAddr,
+ peer_addr: SocketAddr,
+ read_stream: Box<dyn AsyncRead + Send + Sync + Unpin>,
+ write_stream: Box<dyn AsyncWrite + Send + Sync + Unpin>,
+ ) -> Self {
+ Self {
+ local_addr,
+ peer_addr,
+ reader: StitchConnectionReader::new(local_addr, peer_addr, read_stream),
+ writer: StitchConnectionWriter::new(local_addr, peer_addr, write_stream),
+ }
+ }
+
+ pub fn local_addr(&self) -> SocketAddr {
+ self.local_addr.clone()
+ }
+
+ pub fn peer_addr(&self) -> SocketAddr {
+ self.peer_addr.clone()
+ }
+
+ pub fn split(self) -> (StitchConnectionReader, StitchConnectionWriter) {
+ (self.reader, self.writer)
+ }
+
+ pub fn join(reader: StitchConnectionReader, writer: StitchConnectionWriter) -> Self {
+ Self {
+ local_addr: reader.local_addr(),
+ peer_addr: reader.peer_addr(),
+ reader,
+ writer,
+ }
+ }
+
+ pub fn reader(&mut self) -> &mut StitchConnectionReader {
+ &mut self.reader
+ }
+
+ pub fn writer(&mut self) -> &mut StitchConnectionWriter {
+ &mut self.writer
+ }
+
+ pub async fn close(self) -> SocketAddr {
+ let peer_addr = self.peer_addr();
+
+ drop(self.reader);
+ // self.writer.close().await;
+ drop(self.writer);
+
+ return peer_addr;
+ }
+}
--- /dev/null
+use crate::schema::StitchMessage;
+use async_std::net::SocketAddr;
+use async_std::pin::Pin;
+use bytes::{Buf, BytesMut};
+use futures::task::{Context, Poll};
+use futures::{AsyncRead, AsyncReadExt, Stream};
+use log::*;
+use protobuf::Message;
+use std::convert::TryInto;
+
+pub use futures::SinkExt;
+pub use futures::StreamExt;
+use protobuf::well_known_types::Any;
+
+const BUFFER_SIZE: usize = 8192;
+
+pub struct StitchConnectionReader {
+ local_addr: SocketAddr,
+ peer_addr: SocketAddr,
+ read_stream: Box<dyn AsyncRead + Send + Sync + Unpin>,
+ pending_read: Option<BytesMut>,
+}
+
+impl StitchConnectionReader {
+ pub fn new(
+ local_addr: SocketAddr,
+ peer_addr: SocketAddr,
+ read_stream: Box<dyn AsyncRead + Send + Sync + Unpin>,
+ ) -> Self {
+ Self {
+ local_addr,
+ peer_addr,
+ read_stream,
+ pending_read: None,
+ }
+ }
+
+ pub fn local_addr(&self) -> SocketAddr {
+ self.local_addr.clone()
+ }
+
+ pub fn peer_addr(&self) -> SocketAddr {
+ self.peer_addr.clone()
+ }
+}
+
+impl Stream for StitchConnectionReader {
+ type Item = Any;
+
+ fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut buffer = BytesMut::new();
+ buffer.resize(BUFFER_SIZE, 0);
+
+ debug!("Starting new read loop for {}", self.local_addr);
+ loop {
+ trace!("Reading from the stream");
+ match futures::executor::block_on(self.read_stream.read(&mut buffer)) {
+ Ok(mut bytes_read) => {
+ if bytes_read > 0 {
+ debug!("Read {} bytes from the network stream", bytes_read)
+ }
+
+ if let Some(mut pending_buf) = self.pending_read.take() {
+ debug!("Prepending broken data ({} bytes) encountered from earlier read of network stream", pending_buf.len());
+ bytes_read += pending_buf.len();
+
+ pending_buf.unsplit(buffer);
+ buffer = pending_buf;
+ }
+
+ let mut bytes_read_u64: u64 = bytes_read.try_into().expect(
+ format!("Conversion from usize ({}) to u64 failed", bytes_read).as_str(),
+ );
+ while bytes_read_u64 > 0 {
+ debug!(
+ "{} bytes from network stream still unprocessed",
+ bytes_read_u64
+ );
+
+ buffer.resize(bytes_read, 0);
+ debug!("{:?}", buffer.as_ref());
+
+ match StitchMessage::parse_from_bytes(buffer.as_ref()) {
+ Ok(mut data) => {
+ let serialized_size = data.compute_size();
+ debug!("Deserialized message of size {} bytes", serialized_size);
+
+ buffer.advance(serialized_size as usize);
+
+ let serialized_size_u64: u64 = serialized_size.try_into().expect(
+ format!(
+ "Conversion from usize ({}) to u64 failed",
+ serialized_size
+ )
+ .as_str(),
+ );
+ bytes_read_u64 -= serialized_size_u64;
+ debug!("{} bytes still unprocessed", bytes_read_u64);
+
+ debug!("Sending deserialized message downstream");
+ return Poll::Ready(Some(data.take_payload()));
+ }
+
+ Err(err) => {
+ warn!(
+ "Could not deserialize data from the received bytes: {:#?}",
+ err
+ );
+
+ self.pending_read = Some(buffer);
+ buffer = BytesMut::new();
+ break;
+ }
+ }
+ }
+
+ buffer.resize(BUFFER_SIZE, 0);
+ }
+
+ Err(_err) => return Poll::Pending,
+ }
+ }
+ }
+}
--- /dev/null
+// This file is generated by rust-protobuf 2.19.0. Do not edit
+// @generated
+
+// https://github.com/rust-lang/rust-clippy/issues/702
+#![allow(unknown_lints)]
+#![allow(clippy::all)]
+
+#![allow(unused_attributes)]
+#![rustfmt::skip]
+
+#![allow(box_pointers)]
+#![allow(dead_code)]
+#![allow(missing_docs)]
+#![allow(non_camel_case_types)]
+#![allow(non_snake_case)]
+#![allow(non_upper_case_globals)]
+#![allow(trivial_casts)]
+#![allow(unused_imports)]
+#![allow(unused_results)]
+//! Generated file from `message.proto`
+
+/// Generated files are compatible only with the same version
+/// of protobuf runtime.
+// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_19_0;
+
+#[derive(PartialEq,Clone,Default)]
+pub struct StitchMessage {
+ // message fields
+ pub payload: ::protobuf::SingularPtrField<::protobuf::well_known_types::Any>,
+ // special fields
+ pub unknown_fields: ::protobuf::UnknownFields,
+ pub cached_size: ::protobuf::CachedSize,
+}
+
+impl<'a> ::std::default::Default for &'a StitchMessage {
+ fn default() -> &'a StitchMessage {
+ <StitchMessage as ::protobuf::Message>::default_instance()
+ }
+}
+
+impl StitchMessage {
+ pub fn new() -> StitchMessage {
+ ::std::default::Default::default()
+ }
+
+ // .google.protobuf.Any payload = 1;
+
+
+ pub fn get_payload(&self) -> &::protobuf::well_known_types::Any {
+ self.payload.as_ref().unwrap_or_else(|| <::protobuf::well_known_types::Any as ::protobuf::Message>::default_instance())
+ }
+ pub fn clear_payload(&mut self) {
+ self.payload.clear();
+ }
+
+ pub fn has_payload(&self) -> bool {
+ self.payload.is_some()
+ }
+
+ // Param is passed by value, moved
+ pub fn set_payload(&mut self, v: ::protobuf::well_known_types::Any) {
+ self.payload = ::protobuf::SingularPtrField::some(v);
+ }
+
+ // Mutable pointer to the field.
+ // If field is not initialized, it is initialized with default value first.
+ pub fn mut_payload(&mut self) -> &mut ::protobuf::well_known_types::Any {
+ if self.payload.is_none() {
+ self.payload.set_default();
+ }
+ self.payload.as_mut().unwrap()
+ }
+
+ // Take field
+ pub fn take_payload(&mut self) -> ::protobuf::well_known_types::Any {
+ self.payload.take().unwrap_or_else(|| ::protobuf::well_known_types::Any::new())
+ }
+}
+
+impl ::protobuf::Message for StitchMessage {
+ fn is_initialized(&self) -> bool {
+ for v in &self.payload {
+ if !v.is_initialized() {
+ return false;
+ }
+ };
+ true
+ }
+
+ fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> {
+ while !is.eof()? {
+ let (field_number, wire_type) = is.read_tag_unpack()?;
+ match field_number {
+ 1 => {
+ ::protobuf::rt::read_singular_message_into(wire_type, is, &mut self.payload)?;
+ },
+ _ => {
+ ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
+ },
+ };
+ }
+ ::std::result::Result::Ok(())
+ }
+
+ // Compute sizes of nested messages
+ #[allow(unused_variables)]
+ fn compute_size(&self) -> u32 {
+ let mut my_size = 0;
+ if let Some(ref v) = self.payload.as_ref() {
+ let len = v.compute_size();
+ my_size += 1 + ::protobuf::rt::compute_raw_varint32_size(len) + len;
+ }
+ my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
+ self.cached_size.set(my_size);
+ my_size
+ }
+
+ fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
+ if let Some(ref v) = self.payload.as_ref() {
+ os.write_tag(1, ::protobuf::wire_format::WireTypeLengthDelimited)?;
+ os.write_raw_varint32(v.get_cached_size())?;
+ v.write_to_with_cached_sizes(os)?;
+ }
+ os.write_unknown_fields(self.get_unknown_fields())?;
+ ::std::result::Result::Ok(())
+ }
+
+ fn get_cached_size(&self) -> u32 {
+ self.cached_size.get()
+ }
+
+ fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
+ &self.unknown_fields
+ }
+
+ fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
+ &mut self.unknown_fields
+ }
+
+ fn as_any(&self) -> &dyn (::std::any::Any) {
+ self as &dyn (::std::any::Any)
+ }
+ fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
+ self as &mut dyn (::std::any::Any)
+ }
+ fn into_any(self: ::std::boxed::Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
+ self
+ }
+
+ fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
+ Self::descriptor_static()
+ }
+
+ fn new() -> StitchMessage {
+ StitchMessage::new()
+ }
+
+ fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
+ static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT;
+ descriptor.get(|| {
+ let mut fields = ::std::vec::Vec::new();
+ fields.push(::protobuf::reflect::accessor::make_singular_ptr_field_accessor::<_, ::protobuf::types::ProtobufTypeMessage<::protobuf::well_known_types::Any>>(
+ "payload",
+ |m: &StitchMessage| { &m.payload },
+ |m: &mut StitchMessage| { &mut m.payload },
+ ));
+ ::protobuf::reflect::MessageDescriptor::new_pb_name::<StitchMessage>(
+ "StitchMessage",
+ fields,
+ file_descriptor_proto()
+ )
+ })
+ }
+
+ fn default_instance() -> &'static StitchMessage {
+ static instance: ::protobuf::rt::LazyV2<StitchMessage> = ::protobuf::rt::LazyV2::INIT;
+ instance.get(StitchMessage::new)
+ }
+}
+
+impl ::protobuf::Clear for StitchMessage {
+ fn clear(&mut self) {
+ self.payload.clear();
+ self.unknown_fields.clear();
+ }
+}
+
+impl ::std::fmt::Debug for StitchMessage {
+ fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
+ ::protobuf::text_format::fmt(self, f)
+ }
+}
+
+impl ::protobuf::reflect::ProtobufValue for StitchMessage {
+ fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
+ ::protobuf::reflect::ReflectValueRef::Message(self)
+ }
+}
+
+static file_descriptor_proto_data: &'static [u8] = b"\
+ \n\rmessage.proto\x12\x07message\x1a\x19google/protobuf/any.proto\"C\n\r\
+ StitchMessage\x120\n\x07payload\x18\x01\x20\x01(\x0b2\x14.google.protobu\
+ f.AnyR\x07payloadB\0:\0B\0b\x06proto3\
+";
+
+static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;
+
+fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto {
+ ::protobuf::Message::parse_from_bytes(file_descriptor_proto_data).unwrap()
+}
+
+pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto {
+ file_descriptor_proto_lazy.get(|| {
+ parse_descriptor_proto()
+ })
+}
--- /dev/null
+mod message;
+
+pub use message::StitchMessage;
+use protobuf::well_known_types::Any;
+use protobuf::Message;
+
+impl StitchMessage {
+ // @todo make pub(crate)
+ pub fn from_msg<T: Message>(msg: T) -> Self {
+ let mut sm = Self::new();
+ let payload = Any::pack(&msg).expect("Protobuf Message could not be packed into Any type");
+
+ sm.set_payload(payload);
+ return sm;
+ }
+}
--- /dev/null
+use async_std::task;
+use log::*;
+
+use crate::StitchConnection;
+use async_std::net::{TcpStream, ToSocketAddrs};
+
+impl StitchConnection {
+ pub fn tcp_client<A: ToSocketAddrs + std::fmt::Display>(
+ ip_addrs: A,
+ ) -> anyhow::Result<StitchConnection> {
+ let read_stream = task::block_on(TcpStream::connect(&ip_addrs))?;
+ info!("Established client TCP connection to {}", ip_addrs);
+
+ Ok(Self::from(read_stream))
+ }
+}
+
+impl From<TcpStream> for StitchConnection {
+ fn from(stream: TcpStream) -> Self {
+ let write_stream = stream.clone();
+
+ let local_addr = stream
+ .local_addr()
+ .expect("Local address could not be retrieved");
+
+ let peer_addr = stream
+ .peer_addr()
+ .expect("Peer address could not be retrieved");
+
+ Self::new(
+ local_addr,
+ peer_addr,
+ Box::new(stream),
+ Box::new(write_stream),
+ )
+ }
+}
--- /dev/null
+pub(crate) mod client;
+pub(crate) mod server;
+
+pub use client::*;
+pub use server::*;
--- /dev/null
+use crate::StitchConnection;
+use async_std::net::{SocketAddr, TcpListener, ToSocketAddrs};
+use async_std::pin::Pin;
+use async_std::task;
+use futures::task::{Context, Poll};
+use futures::{Stream, StreamExt};
+use log::*;
+
+#[allow(dead_code)]
+pub struct StitchTcpServer {
+ local_addrs: SocketAddr,
+ listener: TcpListener,
+}
+
+impl StitchTcpServer {
+ pub fn new<A: ToSocketAddrs + std::fmt::Display>(ip_addrs: A) -> anyhow::Result<Self> {
+ let listener = task::block_on(TcpListener::bind(&ip_addrs))?;
+ info!("Started TCP server at {}", &ip_addrs);
+
+ Ok(Self {
+ local_addrs: listener.local_addr()?,
+ listener,
+ })
+ }
+}
+
+impl Stream for StitchTcpServer {
+ type Item = StitchConnection;
+
+ fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ if let Some(Ok(conn)) = futures::executor::block_on(self.listener.incoming().next()) {
+ Poll::Ready(Some(StitchConnection::from(conn)))
+ } else {
+ Poll::Ready(None)
+ }
+ }
+}
--- /dev/null
+use async_channel::{Receiver, Sender};
+use async_std::io::*;
+use async_std::net::*;
+use async_std::task;
+use async_tls::TlsConnector;
+use futures_util::io::AsyncReadExt;
+use log::*;
+
+use crate::registry::StitchRegistry;
+use crate::StitchNetClient;
+use crate::{channel_factory, StitchMessage};
+use async_std::sync::{Arc, Condvar, Mutex};
+
+impl StitchNetClient {
+ pub fn tls_client<A: ToSocketAddrs + std::fmt::Display>(
+ ip_addrs: A,
+ domain: &str,
+ connector: TlsConnector,
+ ) -> Result<Self> {
+ Self::tls_client_with_bound(ip_addrs, domain, connector, None)
+ }
+
+ pub fn tls_client_with_bound<A: ToSocketAddrs + std::fmt::Display>(
+ ip_addrs: A,
+ domain: &str,
+ connector: TlsConnector,
+ cap: Option<usize>,
+ ) -> Result<Self> {
+ let stream = task::block_on(TcpStream::connect(&ip_addrs))?;
+ stream.set_nodelay(true)?;
+ info!("Established client TCP connection to {}", ip_addrs);
+
+ Self::tls_client_from_parts(stream, domain, connector, channel_factory(cap))
+ }
+
+ pub fn tls_client_from_parts(
+ stream: TcpStream,
+ domain: &str,
+ connector: TlsConnector,
+ (tls_write_sender, tls_write_receiver): (Sender<StitchMessage>, Receiver<StitchMessage>),
+ ) -> Result<Self> {
+ let local_addr = stream.local_addr()?;
+ let peer_addr = stream.peer_addr()?;
+
+ let encrypted_stream = task::block_on(connector.connect(domain, stream))?;
+ let (read_stream, write_stream) = encrypted_stream.split();
+ info!("Completed TLS handshake with {}", peer_addr);
+
+ let registry: StitchRegistry = crate::registry::new();
+ let read_readiness = Arc::new((Mutex::new(false), Condvar::new()));
+ let write_readiness = Arc::new((Mutex::new(false), Condvar::new()));
+
+ let read_task = task::spawn(crate::tasks::read_from_stream(
+ registry.clone(),
+ read_stream,
+ read_readiness.clone(),
+ ));
+
+ let write_task = task::spawn(crate::tasks::write_to_stream(
+ tls_write_receiver.clone(),
+ write_stream,
+ write_readiness.clone(),
+ ));
+
+ Ok(Self {
+ local_addr,
+ peer_addr,
+ registry,
+ stream_writer_chan: (tls_write_sender, tls_write_receiver),
+ read_readiness,
+ write_readiness,
+ read_task,
+ write_task,
+ })
+ }
+}
--- /dev/null
+pub(crate) mod client;
+pub(crate) mod server;
+
+pub use client::*;
+pub use server::*;
+
+pub use async_tls;
+pub use rustls;
--- /dev/null
+use crate::channel_factory;
+use crate::registry::StitchRegistry;
+use crate::{ServerRegistry, StitchClient, StitchNetClient, StitchNetServer};
+use async_channel::{Receiver, Sender};
+use async_std::io::*;
+use async_std::net::*;
+use async_std::prelude::*;
+use async_std::sync::{Arc, Condvar, Mutex};
+use async_std::task;
+use async_tls::TlsAcceptor;
+use dashmap::DashMap;
+use futures_util::AsyncReadExt;
+use log::*;
+
+impl StitchNetServer {
+ pub fn tls_server<A: ToSocketAddrs + std::fmt::Display>(
+ ip_addrs: A,
+ acceptor: TlsAcceptor,
+ ) -> Result<(StitchNetServer, Receiver<Arc<StitchNetClient>>)> {
+ Self::tls_server_with_bound(ip_addrs, acceptor, None)
+ }
+
+ pub fn tls_server_with_bound<A: ToSocketAddrs + std::fmt::Display>(
+ ip_addrs: A,
+ acceptor: TlsAcceptor,
+ cap: Option<usize>,
+ ) -> Result<(Self, Receiver<Arc<StitchNetClient>>)> {
+ let listener = task::block_on(TcpListener::bind(ip_addrs))?;
+ info!("Started TLS server at {}", listener.local_addr()?);
+
+ let registry = Arc::new(DashMap::new());
+ let (sender, receiver) = channel_factory(cap);
+
+ let handler = task::spawn(handle_server_connections(
+ acceptor,
+ registry.clone(),
+ listener,
+ sender.clone(),
+ cap,
+ ));
+
+ Ok((
+ Self {
+ registry,
+ connections_chan: (sender, receiver.clone()),
+ accept_loop_task: handler,
+ },
+ receiver,
+ ))
+ }
+}
+
+async fn handle_server_connections<'a>(
+ acceptor: TlsAcceptor,
+ registry: ServerRegistry,
+ input: TcpListener,
+ output: Sender<Arc<StitchNetClient>>,
+ cap: Option<usize>,
+) -> anyhow::Result<()> {
+ let mut conns = input.incoming();
+
+ debug!("Reading from the stream of incoming connections");
+ loop {
+ match conns.next().await {
+ Some(Ok(tcp_stream)) => {
+ let local_addr = tcp_stream.local_addr()?;
+ let peer_addr = tcp_stream.peer_addr()?;
+
+ debug!("Received connection attempt from {}", peer_addr);
+
+ let tls_stream = acceptor.accept(tcp_stream).await?;
+
+ let (read_stream, write_stream) = tls_stream.split();
+ let (tls_write_sender, tls_write_receiver) = channel_factory(cap);
+
+ let client_registry: StitchRegistry = crate::registry::new();
+ let read_readiness = Arc::new((Mutex::new(false), Condvar::new()));
+ let write_readiness = Arc::new((Mutex::new(false), Condvar::new()));
+
+ let read_task = task::spawn(crate::tasks::read_from_stream(
+ client_registry.clone(),
+ read_stream,
+ read_readiness.clone(),
+ ));
+
+ let write_task = task::spawn(crate::tasks::write_to_stream(
+ tls_write_receiver.clone(),
+ write_stream,
+ write_readiness.clone(),
+ ));
+
+ let conn = StitchNetClient {
+ local_addr,
+ peer_addr,
+ registry: client_registry,
+ stream_writer_chan: (tls_write_sender, tls_write_receiver),
+ read_readiness,
+ write_readiness,
+ read_task,
+ write_task,
+ };
+
+ debug!("Attempting to register connection from {}", peer_addr);
+ let conn = Arc::new(conn);
+ registry.insert(conn.peer_addr(), conn.clone());
+ debug!(
+ "Registered client connection for {} in server registry",
+ peer_addr
+ );
+
+ if let Err(err) = output.send(conn).await {
+ error!(
+ "Stopping the server accept loop - could not send accepted TLS client connection to channel: {:#?}",
+ err
+ );
+
+ break Err(anyhow::Error::from(err));
+ } else {
+ info!("Accepted connection from {}", peer_addr);
+ }
+ }
+
+ Some(Err(err)) => error!(
+ "Encountered error when accepting TLS connection: {:#?}",
+ err
+ ),
+
+ None => {
+ warn!("Stopping the server accept loop - unable to accept any more connections");
+
+ break Ok(());
+ }
+ }
+ }
+}
--- /dev/null
+use crate::schema::StitchMessage;
+use async_channel::RecvError;
+use async_std::net::SocketAddr;
+use async_std::pin::Pin;
+use futures::task::{Context, Poll};
+use futures::{AsyncWrite, AsyncWriteExt, Sink};
+use log::*;
+use protobuf::Message;
+
+pub use futures::SinkExt;
+pub use futures::StreamExt;
+
+pub struct StitchConnectionWriter {
+ local_addr: SocketAddr,
+ peer_addr: SocketAddr,
+ write_stream: Box<dyn AsyncWrite + Send + Sync + Unpin>,
+ pending_write: Option<StitchMessage>,
+}
+
+impl StitchConnectionWriter {
+ pub fn new(
+ local_addr: SocketAddr,
+ peer_addr: SocketAddr,
+ write_stream: Box<dyn AsyncWrite + Send + Sync + Unpin>,
+ ) -> Self {
+ Self {
+ local_addr,
+ peer_addr,
+ write_stream,
+ pending_write: None,
+ }
+ }
+
+ pub fn local_addr(&self) -> SocketAddr {
+ self.local_addr.clone()
+ }
+
+ pub fn peer_addr(&self) -> SocketAddr {
+ self.peer_addr.clone()
+ }
+}
+
+impl<T: Message> Sink<T> for StitchConnectionWriter {
+ type Error = RecvError;
+
+ fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ if self.pending_write.is_some() {
+ debug!("Connection not ready to send message yet, waiting for prior message");
+ Poll::Pending
+ } else {
+ debug!("Connection ready to send message");
+ Poll::Ready(Ok(()))
+ }
+ }
+
+ fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
+ debug!("Preparing message to be sent next");
+ let stitch_msg: StitchMessage = StitchMessage::from_msg(item);
+ self.pending_write.replace(stitch_msg);
+
+ Ok(())
+ }
+
+ fn poll_flush(
+ mut self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
+ if let Some(pending_msg) = self.pending_write.take() {
+ debug!("Send pending message");
+ if let Ok(buffer) = pending_msg.write_to_bytes() {
+ let msg_size = buffer.len();
+ debug!("{} bytes to be sent over network connection", msg_size);
+
+ debug!("{:?}", buffer.as_slice());
+
+ return if let Ok(_) =
+ futures::executor::block_on(self.write_stream.write_all(buffer.as_slice()))
+ {
+ if let Ok(_) = futures::executor::block_on(self.write_stream.flush()) {
+ debug!("Sent message of {} bytes", msg_size);
+ Poll::Ready(Ok(()))
+ } else {
+ debug!("Encountered error while flushing queued bytes to network stream");
+ Poll::Ready(Err(RecvError))
+ }
+ } else {
+ debug!("Encountered error when writing to network stream");
+ Poll::Ready(Err(RecvError))
+ };
+ } else {
+ debug!("Encountered error when serializing message to bytes");
+ return Poll::Ready(Err(RecvError));
+ }
+ } else {
+ debug!("No message to send over connection");
+ }
+
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_close(
+ mut self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
+ if let Ok(_) = futures::executor::block_on(self.write_stream.close()) {
+ Poll::Ready(Ok(()))
+ } else {
+ Poll::Ready(Err(RecvError))
+ }
+ }
+}