]> git.lizzy.rs Git - connect-rs.git/commitdiff
rename stitch-net to connect
authorSachandhan Ganesh <sachan.ganesh@gmail.com>
Fri, 15 Jan 2021 05:25:55 +0000 (21:25 -0800)
committerSachandhan Ganesh <sachan.ganesh@gmail.com>
Fri, 15 Jan 2021 05:27:38 +0000 (21:27 -0800)
37 files changed:
Cargo.toml [new file with mode: 0644]
README.md [new file with mode: 0644]
build.rs [new file with mode: 0644]
examples/tcp-client/Cargo.toml [new file with mode: 0644]
examples/tcp-client/README.md [new file with mode: 0644]
examples/tcp-client/build.rs [new file with mode: 0644]
examples/tcp-client/schema/hello_world.proto [new file with mode: 0644]
examples/tcp-client/src/main.rs [new file with mode: 0644]
examples/tcp-client/src/schema/hello_world.rs [new file with mode: 0644]
examples/tcp-client/src/schema/mod.rs [new file with mode: 0644]
examples/tcp-echo-server/Cargo.toml [new file with mode: 0644]
examples/tcp-echo-server/README.md [new file with mode: 0644]
examples/tcp-echo-server/build.rs [new file with mode: 0644]
examples/tcp-echo-server/src/main.rs [new file with mode: 0644]
examples/tcp-echo-server/src/schema/hello_world.rs [new file with mode: 0644]
examples/tcp-echo-server/src/schema/mod.rs [new file with mode: 0644]
examples/tls-client/Cargo.toml [new file with mode: 0644]
examples/tls-client/README.md [new file with mode: 0644]
examples/tls-client/end.chain [new file with mode: 0644]
examples/tls-client/src/main.rs [new file with mode: 0644]
examples/tls-echo-server/Cargo.toml [new file with mode: 0644]
examples/tls-echo-server/README.md [new file with mode: 0644]
examples/tls-echo-server/end.cert [new file with mode: 0644]
examples/tls-echo-server/end.rsa [new file with mode: 0644]
examples/tls-echo-server/src/main.rs [new file with mode: 0644]
schema/message.proto [new file with mode: 0644]
src/lib.rs [new file with mode: 0644]
src/reader.rs [new file with mode: 0644]
src/schema/message.rs [new file with mode: 0644]
src/schema/mod.rs [new file with mode: 0644]
src/tcp/client.rs [new file with mode: 0644]
src/tcp/mod.rs [new file with mode: 0644]
src/tcp/server.rs [new file with mode: 0644]
src/tls/client.rs [new file with mode: 0644]
src/tls/mod.rs [new file with mode: 0644]
src/tls/server.rs [new file with mode: 0644]
src/writer.rs [new file with mode: 0644]

diff --git a/Cargo.toml b/Cargo.toml
new file mode 100644 (file)
index 0000000..f5bf2a0
--- /dev/null
@@ -0,0 +1,21 @@
+[package]
+name = "connect"
+version = "0.0.2"
+authors = ["Sachandhan Ganesh <sachan.ganesh@gmail.com>"]
+edition = "2018"
+
+[dependencies]
+anyhow = "1.0.31"
+async-channel = "1.4.0"
+async-std = { version = "1.6.2", features = ["tokio02", "unstable"] }
+async-tls = { version = "0.9.0", default-features = false, features = ["client", "server"]}
+async-trait = "0.1.39"
+bytes = "0.5.5"
+dashmap = "3.11.10"
+futures = "0.3.8"
+log = "0.4"
+protobuf = "2.18.1"
+rustls = "0.18.0"
+
+[build-dependencies]
+protobuf-codegen-pure = "2.18.1"
diff --git a/README.md b/README.md
new file mode 100644 (file)
index 0000000..224f95d
--- /dev/null
+++ b/README.md
@@ -0,0 +1,34 @@
+# connect
+
+This crate provides a distributed message queue abstraction over asynchronous network streams.
+
+By using a message queue, crate users can focus on sending and receiving messages between clients instead of low-level networking and failure recovery.
+
+## Future Goals
+
+- Documentation
+- Connection pool (+ accounting for ordering of messages)
+- Configurable policies for handling of non-registered (unexpected) message types
+- Testing
+- Benchmarking
+
+## Feature Status
+
+| Feature                                              | Status        |
+|----------------------------------------------------- |--------       |
+| [TCP Client](examples/tcp-client)                        |    ✓    |
+| [TCP Server](examples/tcp-echo-server)                   |    ✓    |
+| UDP Client                                           |               |
+| UDP Server                                           |               |
+| [TLS Client](examples/tls-client)                        |    ✓    |
+| [TLS Server](examples/tls-echo-server)                   |    ✓    |
+| QUIC Client                                          |               |
+| QUIC Server                                          |               |
+| SCTP Client                                          |               |
+| SCTP Server                                          |               |
+| DTLS-SCTP Client                                     |               |
+| DTLS-SCTP Server                                     |               |
+| Kafka Client                                         |               |
+| RMQ Client                                           |               |
+| SQS Client                                           |               |
+| NSQ Client                                           |               |
diff --git a/build.rs b/build.rs
new file mode 100644 (file)
index 0000000..efa836d
--- /dev/null
+++ b/build.rs
@@ -0,0 +1,9 @@
+fn main() -> Result<(), Box<dyn std::error::Error>> {
+    protobuf_codegen_pure::Codegen::new()
+        .out_dir("src/schema")
+        .inputs(&["schema/message.proto"])
+        .include("schema")
+        .run()
+        .expect("Codegen failed.");
+    Ok(())
+}
diff --git a/examples/tcp-client/Cargo.toml b/examples/tcp-client/Cargo.toml
new file mode 100644 (file)
index 0000000..1449e16
--- /dev/null
@@ -0,0 +1,19 @@
+[package]
+name = "tcp-client"
+version = "0.1.0"
+authors = ["Sachandhan Ganesh <sachan.ganesh@gmail.com>"]
+edition = "2018"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+anyhow = "1.0.31"
+async-std = { version = "1.6.2", features = ["attributes"] }
+env_logger = "0.7"
+log = "0.4"
+protobuf = "2.18.1"
+
+stitch-net = { path = "../../" }
+
+[build-dependencies]
+protobuf-codegen-pure = "2.18.1"
\ No newline at end of file
diff --git a/examples/tcp-client/README.md b/examples/tcp-client/README.md
new file mode 100644 (file)
index 0000000..b25cbba
--- /dev/null
@@ -0,0 +1,21 @@
+# seam-channel tcp-client example
+
+This example program will:
+
+1. Establish a connection with a TCP server
+2. Send a `String` message to the server
+3. Wait for a `String` message reply from the server
+
+## Usage
+
+```
+export RUST_LOG=info
+cargo run <ip-address-to-connect-to>
+```
+
+## Example Usage
+
+```
+export RUST_LOG=info
+cargo run localhost:5678
+```
\ No newline at end of file
diff --git a/examples/tcp-client/build.rs b/examples/tcp-client/build.rs
new file mode 100644 (file)
index 0000000..aa2ff90
--- /dev/null
@@ -0,0 +1,9 @@
+fn main() -> Result<(), Box<dyn std::error::Error>> {
+    protobuf_codegen_pure::Codegen::new()
+        .out_dir("src/schema")
+        .inputs(&["schema/hello_world.proto"])
+        .include("schema")
+        .run()
+        .expect("Codegen failed.");
+    Ok(())
+}
diff --git a/examples/tcp-client/schema/hello_world.proto b/examples/tcp-client/schema/hello_world.proto
new file mode 100644 (file)
index 0000000..5ffe7b2
--- /dev/null
@@ -0,0 +1,6 @@
+syntax = "proto3";
+package hello_world;
+
+message HelloWorld {
+  string message = 1;
+}
\ No newline at end of file
diff --git a/examples/tcp-client/src/main.rs b/examples/tcp-client/src/main.rs
new file mode 100644 (file)
index 0000000..a5de8e5
--- /dev/null
@@ -0,0 +1,45 @@
+pub mod schema;
+
+use crate::schema::hello_world::HelloWorld;
+use log::*;
+use protobuf::well_known_types::Any;
+use std::env;
+use stitch_net::{SinkExt, StitchConnection, StreamExt};
+
+#[async_std::main]
+async fn main() -> anyhow::Result<()> {
+    env_logger::init();
+
+    // Get ip address from cmd line args
+    let args: Vec<String> = env::args().collect();
+    let ip_address = match args.get(1) {
+        Some(addr) => addr,
+        None => {
+            error!("Need to pass IP address to connect to as command line argument");
+            panic!();
+        }
+    };
+
+    // create a client connection to the server
+    let mut conn = StitchConnection::tcp_client(ip_address)?;
+
+    // send a message to the server
+    let raw_msg = String::from("Hello world");
+    info!("Sending message: {}", raw_msg);
+
+    let mut msg = HelloWorld::new();
+    msg.set_message(raw_msg);
+
+    conn.writer().send(msg).await?;
+
+    // wait for the server to reply with an ack
+    while let Some(reply) = conn.reader().next().await {
+        info!("Received message");
+
+        let msg: HelloWorld = Any::unpack(&reply)?.unwrap();
+
+        info!("Unpacked reply: {}", msg.get_message());
+    }
+
+    Ok(())
+}
diff --git a/examples/tcp-client/src/schema/hello_world.rs b/examples/tcp-client/src/schema/hello_world.rs
new file mode 100644 (file)
index 0000000..5af4935
--- /dev/null
@@ -0,0 +1,200 @@
+// This file is generated by rust-protobuf 2.19.0. Do not edit
+// @generated
+
+// https://github.com/rust-lang/rust-clippy/issues/702
+#![allow(unknown_lints)]
+#![allow(clippy::all)]
+
+#![allow(unused_attributes)]
+#![rustfmt::skip]
+
+#![allow(box_pointers)]
+#![allow(dead_code)]
+#![allow(missing_docs)]
+#![allow(non_camel_case_types)]
+#![allow(non_snake_case)]
+#![allow(non_upper_case_globals)]
+#![allow(trivial_casts)]
+#![allow(unused_imports)]
+#![allow(unused_results)]
+//! Generated file from `hello_world.proto`
+
+/// Generated files are compatible only with the same version
+/// of protobuf runtime.
+// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_19_0;
+
+#[derive(PartialEq,Clone,Default)]
+pub struct HelloWorld {
+    // message fields
+    pub message: ::std::string::String,
+    // special fields
+    pub unknown_fields: ::protobuf::UnknownFields,
+    pub cached_size: ::protobuf::CachedSize,
+}
+
+impl<'a> ::std::default::Default for &'a HelloWorld {
+    fn default() -> &'a HelloWorld {
+        <HelloWorld as ::protobuf::Message>::default_instance()
+    }
+}
+
+impl HelloWorld {
+    pub fn new() -> HelloWorld {
+        ::std::default::Default::default()
+    }
+
+    // string message = 1;
+
+
+    pub fn get_message(&self) -> &str {
+        &self.message
+    }
+    pub fn clear_message(&mut self) {
+        self.message.clear();
+    }
+
+    // Param is passed by value, moved
+    pub fn set_message(&mut self, v: ::std::string::String) {
+        self.message = v;
+    }
+
+    // Mutable pointer to the field.
+    // If field is not initialized, it is initialized with default value first.
+    pub fn mut_message(&mut self) -> &mut ::std::string::String {
+        &mut self.message
+    }
+
+    // Take field
+    pub fn take_message(&mut self) -> ::std::string::String {
+        ::std::mem::replace(&mut self.message, ::std::string::String::new())
+    }
+}
+
+impl ::protobuf::Message for HelloWorld {
+    fn is_initialized(&self) -> bool {
+        true
+    }
+
+    fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> {
+        while !is.eof()? {
+            let (field_number, wire_type) = is.read_tag_unpack()?;
+            match field_number {
+                1 => {
+                    ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.message)?;
+                },
+                _ => {
+                    ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
+                },
+            };
+        }
+        ::std::result::Result::Ok(())
+    }
+
+    // Compute sizes of nested messages
+    #[allow(unused_variables)]
+    fn compute_size(&self) -> u32 {
+        let mut my_size = 0;
+        if !self.message.is_empty() {
+            my_size += ::protobuf::rt::string_size(1, &self.message);
+        }
+        my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
+        self.cached_size.set(my_size);
+        my_size
+    }
+
+    fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
+        if !self.message.is_empty() {
+            os.write_string(1, &self.message)?;
+        }
+        os.write_unknown_fields(self.get_unknown_fields())?;
+        ::std::result::Result::Ok(())
+    }
+
+    fn get_cached_size(&self) -> u32 {
+        self.cached_size.get()
+    }
+
+    fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
+        &self.unknown_fields
+    }
+
+    fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
+        &mut self.unknown_fields
+    }
+
+    fn as_any(&self) -> &dyn (::std::any::Any) {
+        self as &dyn (::std::any::Any)
+    }
+    fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
+        self as &mut dyn (::std::any::Any)
+    }
+    fn into_any(self: ::std::boxed::Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
+        self
+    }
+
+    fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
+        Self::descriptor_static()
+    }
+
+    fn new() -> HelloWorld {
+        HelloWorld::new()
+    }
+
+    fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
+        static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT;
+        descriptor.get(|| {
+            let mut fields = ::std::vec::Vec::new();
+            fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
+                "message",
+                |m: &HelloWorld| { &m.message },
+                |m: &mut HelloWorld| { &mut m.message },
+            ));
+            ::protobuf::reflect::MessageDescriptor::new_pb_name::<HelloWorld>(
+                "HelloWorld",
+                fields,
+                file_descriptor_proto()
+            )
+        })
+    }
+
+    fn default_instance() -> &'static HelloWorld {
+        static instance: ::protobuf::rt::LazyV2<HelloWorld> = ::protobuf::rt::LazyV2::INIT;
+        instance.get(HelloWorld::new)
+    }
+}
+
+impl ::protobuf::Clear for HelloWorld {
+    fn clear(&mut self) {
+        self.message.clear();
+        self.unknown_fields.clear();
+    }
+}
+
+impl ::std::fmt::Debug for HelloWorld {
+    fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
+        ::protobuf::text_format::fmt(self, f)
+    }
+}
+
+impl ::protobuf::reflect::ProtobufValue for HelloWorld {
+    fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
+        ::protobuf::reflect::ReflectValueRef::Message(self)
+    }
+}
+
+static file_descriptor_proto_data: &'static [u8] = b"\
+    \n\x11hello_world.proto\x12\x0bhello_world\"*\n\nHelloWorld\x12\x1a\n\
+    \x07message\x18\x01\x20\x01(\tR\x07messageB\0:\0B\0b\x06proto3\
+";
+
+static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;
+
+fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto {
+    ::protobuf::Message::parse_from_bytes(file_descriptor_proto_data).unwrap()
+}
+
+pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto {
+    file_descriptor_proto_lazy.get(|| {
+        parse_descriptor_proto()
+    })
+}
diff --git a/examples/tcp-client/src/schema/mod.rs b/examples/tcp-client/src/schema/mod.rs
new file mode 100644 (file)
index 0000000..c6dbc18
--- /dev/null
@@ -0,0 +1 @@
+pub mod hello_world;
diff --git a/examples/tcp-echo-server/Cargo.toml b/examples/tcp-echo-server/Cargo.toml
new file mode 100644 (file)
index 0000000..d44763c
--- /dev/null
@@ -0,0 +1,19 @@
+[package]
+name = "tcp-echo-server"
+version = "0.1.0"
+authors = ["Sachandhan Ganesh <sachan.ganesh@gmail.com>"]
+edition = "2018"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+anyhow = "1.0.31"
+async-std = { version = "1.6.2", features = ["attributes"] }
+env_logger = "0.7"
+log = "0.4"
+protobuf = "2.18.1"
+
+stitch-net = { path = "../../" }
+
+[build-dependencies]
+protobuf-codegen-pure = "2.18.1"
\ No newline at end of file
diff --git a/examples/tcp-echo-server/README.md b/examples/tcp-echo-server/README.md
new file mode 100644 (file)
index 0000000..72695ae
--- /dev/null
@@ -0,0 +1,23 @@
+# seam-channel tcp-echo-server example
+
+This example program will:
+
+1. Bind to an IP address
+2. Accept any number of TCP connections
+3. Handle each connection by:
+    1. Waiting for `String` messages to be received
+    2. Echoing the `String` message back to the source
+
+## Usage
+
+```
+export RUST_LOG=info
+cargo run <ip-address-to-bind-to>
+```
+
+## Example Usage
+
+```
+export RUST_LOG=info
+cargo run localhost:5678
+```
\ No newline at end of file
diff --git a/examples/tcp-echo-server/build.rs b/examples/tcp-echo-server/build.rs
new file mode 100644 (file)
index 0000000..ae9222f
--- /dev/null
@@ -0,0 +1,9 @@
+fn main() -> Result<(), Box<dyn std::error::Error>> {
+    protobuf_codegen_pure::Codegen::new()
+        .out_dir("src/schema")
+        .inputs(&["../tcp-client/schema/hello_world.proto"])
+        .include("../tcp-client/schema")
+        .run()
+        .expect("Codegen failed.");
+    Ok(())
+}
diff --git a/examples/tcp-echo-server/src/main.rs b/examples/tcp-echo-server/src/main.rs
new file mode 100644 (file)
index 0000000..376ab0c
--- /dev/null
@@ -0,0 +1,57 @@
+mod schema;
+
+use crate::schema::hello_world::HelloWorld;
+use async_std::task;
+use log::*;
+use std::env;
+use stitch_net::tcp::StitchTcpServer;
+use stitch_net::{SinkExt, StreamExt};
+
+#[async_std::main]
+async fn main() -> anyhow::Result<()> {
+    env_logger::init();
+
+    // Get ip address from cmd line args
+    let args: Vec<String> = env::args().collect();
+
+    let ip_address = match args.get(1) {
+        Some(addr) => addr,
+        None => {
+            error!("Need to pass IP address to bind to as command line argument");
+            panic!();
+        }
+    };
+
+    // create a server
+    let mut server = StitchTcpServer::new(ip_address)?;
+
+    // handle server connections
+    // wait for a connection to come in and be accepted
+    while let Some(mut conn) = server.next().await {
+        info!("Handling connection from {}", conn.peer_addr());
+
+        task::spawn(async move {
+            while let Some(msg) = conn.reader().next().await {
+                if msg.is::<HelloWorld>() {
+                    if let Ok(Some(contents)) = msg.unpack::<HelloWorld>() {
+                        info!(
+                            "Received a message \"{}\" from {}",
+                            contents.get_message(),
+                            conn.peer_addr()
+                        );
+
+                        conn.writer()
+                            .send(contents)
+                            .await
+                            .expect("Could not send message back to source connection");
+                        info!("Sent message back to original sender");
+                    }
+                } else {
+                    error!("Received a message of unknown type")
+                }
+            }
+        });
+    }
+
+    Ok(())
+}
diff --git a/examples/tcp-echo-server/src/schema/hello_world.rs b/examples/tcp-echo-server/src/schema/hello_world.rs
new file mode 100644 (file)
index 0000000..5af4935
--- /dev/null
@@ -0,0 +1,200 @@
+// This file is generated by rust-protobuf 2.19.0. Do not edit
+// @generated
+
+// https://github.com/rust-lang/rust-clippy/issues/702
+#![allow(unknown_lints)]
+#![allow(clippy::all)]
+
+#![allow(unused_attributes)]
+#![rustfmt::skip]
+
+#![allow(box_pointers)]
+#![allow(dead_code)]
+#![allow(missing_docs)]
+#![allow(non_camel_case_types)]
+#![allow(non_snake_case)]
+#![allow(non_upper_case_globals)]
+#![allow(trivial_casts)]
+#![allow(unused_imports)]
+#![allow(unused_results)]
+//! Generated file from `hello_world.proto`
+
+/// Generated files are compatible only with the same version
+/// of protobuf runtime.
+// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_19_0;
+
+#[derive(PartialEq,Clone,Default)]
+pub struct HelloWorld {
+    // message fields
+    pub message: ::std::string::String,
+    // special fields
+    pub unknown_fields: ::protobuf::UnknownFields,
+    pub cached_size: ::protobuf::CachedSize,
+}
+
+impl<'a> ::std::default::Default for &'a HelloWorld {
+    fn default() -> &'a HelloWorld {
+        <HelloWorld as ::protobuf::Message>::default_instance()
+    }
+}
+
+impl HelloWorld {
+    pub fn new() -> HelloWorld {
+        ::std::default::Default::default()
+    }
+
+    // string message = 1;
+
+
+    pub fn get_message(&self) -> &str {
+        &self.message
+    }
+    pub fn clear_message(&mut self) {
+        self.message.clear();
+    }
+
+    // Param is passed by value, moved
+    pub fn set_message(&mut self, v: ::std::string::String) {
+        self.message = v;
+    }
+
+    // Mutable pointer to the field.
+    // If field is not initialized, it is initialized with default value first.
+    pub fn mut_message(&mut self) -> &mut ::std::string::String {
+        &mut self.message
+    }
+
+    // Take field
+    pub fn take_message(&mut self) -> ::std::string::String {
+        ::std::mem::replace(&mut self.message, ::std::string::String::new())
+    }
+}
+
+impl ::protobuf::Message for HelloWorld {
+    fn is_initialized(&self) -> bool {
+        true
+    }
+
+    fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> {
+        while !is.eof()? {
+            let (field_number, wire_type) = is.read_tag_unpack()?;
+            match field_number {
+                1 => {
+                    ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.message)?;
+                },
+                _ => {
+                    ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
+                },
+            };
+        }
+        ::std::result::Result::Ok(())
+    }
+
+    // Compute sizes of nested messages
+    #[allow(unused_variables)]
+    fn compute_size(&self) -> u32 {
+        let mut my_size = 0;
+        if !self.message.is_empty() {
+            my_size += ::protobuf::rt::string_size(1, &self.message);
+        }
+        my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
+        self.cached_size.set(my_size);
+        my_size
+    }
+
+    fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
+        if !self.message.is_empty() {
+            os.write_string(1, &self.message)?;
+        }
+        os.write_unknown_fields(self.get_unknown_fields())?;
+        ::std::result::Result::Ok(())
+    }
+
+    fn get_cached_size(&self) -> u32 {
+        self.cached_size.get()
+    }
+
+    fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
+        &self.unknown_fields
+    }
+
+    fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
+        &mut self.unknown_fields
+    }
+
+    fn as_any(&self) -> &dyn (::std::any::Any) {
+        self as &dyn (::std::any::Any)
+    }
+    fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
+        self as &mut dyn (::std::any::Any)
+    }
+    fn into_any(self: ::std::boxed::Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
+        self
+    }
+
+    fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
+        Self::descriptor_static()
+    }
+
+    fn new() -> HelloWorld {
+        HelloWorld::new()
+    }
+
+    fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
+        static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT;
+        descriptor.get(|| {
+            let mut fields = ::std::vec::Vec::new();
+            fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
+                "message",
+                |m: &HelloWorld| { &m.message },
+                |m: &mut HelloWorld| { &mut m.message },
+            ));
+            ::protobuf::reflect::MessageDescriptor::new_pb_name::<HelloWorld>(
+                "HelloWorld",
+                fields,
+                file_descriptor_proto()
+            )
+        })
+    }
+
+    fn default_instance() -> &'static HelloWorld {
+        static instance: ::protobuf::rt::LazyV2<HelloWorld> = ::protobuf::rt::LazyV2::INIT;
+        instance.get(HelloWorld::new)
+    }
+}
+
+impl ::protobuf::Clear for HelloWorld {
+    fn clear(&mut self) {
+        self.message.clear();
+        self.unknown_fields.clear();
+    }
+}
+
+impl ::std::fmt::Debug for HelloWorld {
+    fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
+        ::protobuf::text_format::fmt(self, f)
+    }
+}
+
+impl ::protobuf::reflect::ProtobufValue for HelloWorld {
+    fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
+        ::protobuf::reflect::ReflectValueRef::Message(self)
+    }
+}
+
+static file_descriptor_proto_data: &'static [u8] = b"\
+    \n\x11hello_world.proto\x12\x0bhello_world\"*\n\nHelloWorld\x12\x1a\n\
+    \x07message\x18\x01\x20\x01(\tR\x07messageB\0:\0B\0b\x06proto3\
+";
+
+static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;
+
+fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto {
+    ::protobuf::Message::parse_from_bytes(file_descriptor_proto_data).unwrap()
+}
+
+pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto {
+    file_descriptor_proto_lazy.get(|| {
+        parse_descriptor_proto()
+    })
+}
diff --git a/examples/tcp-echo-server/src/schema/mod.rs b/examples/tcp-echo-server/src/schema/mod.rs
new file mode 100644 (file)
index 0000000..c6dbc18
--- /dev/null
@@ -0,0 +1 @@
+pub mod hello_world;
diff --git a/examples/tls-client/Cargo.toml b/examples/tls-client/Cargo.toml
new file mode 100644 (file)
index 0000000..f952662
--- /dev/null
@@ -0,0 +1,15 @@
+[package]
+name = "tls-client"
+version = "0.1.0"
+authors = ["Sachandhan Ganesh <sachan.ganesh@gmail.com>"]
+edition = "2018"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+anyhow = "1.0.31"
+async-std = { version = "1.6.2", features = ["attributes"] }
+env_logger = "0.7"
+log = "0.4"
+
+seam-channel = { path = "../../" }
\ No newline at end of file
diff --git a/examples/tls-client/README.md b/examples/tls-client/README.md
new file mode 100644 (file)
index 0000000..a55c0b8
--- /dev/null
@@ -0,0 +1,21 @@
+# seam-channel tls-client example
+
+This example program will:
+
+1. Establish a secure connection with a TLS server
+2. Send a `String` message to the server
+3. Wait for a `String` message reply from the server
+
+## Usage
+
+```
+export RUST_LOG=info
+cargo run <ip-address-to-connect-to> <domain-name> <ca-file>
+```
+
+## Example Usage
+
+```
+export RUST_LOG=info
+cargo run 127.0.0.1:5678 localhost end.chain
+```
\ No newline at end of file
diff --git a/examples/tls-client/end.chain b/examples/tls-client/end.chain
new file mode 100644 (file)
index 0000000..7c39013
--- /dev/null
@@ -0,0 +1,89 @@
+-----BEGIN CERTIFICATE-----
+MIIGnzCCAoegAwIBAgIBezANBgkqhkiG9w0BAQsFADAaMRgwFgYDVQQDDA9wb255
+dG93biBSU0EgQ0EwHhcNMTYxMjEwMTc0MjMzWhcNMjYxMjA4MTc0MjMzWjAsMSow
+KAYDVQQDDCFwb255dG93biBSU0EgbGV2ZWwgMiBpbnRlcm1lZGlhdGUwggGiMA0G
+CSqGSIb3DQEBAQUAA4IBjwAwggGKAoIBgQDnfb7vaJbaHEyVTflswWhmHqx5W0NO
+KyKbDp2zXEJwDO+NDJq6i1HGnFd/vO4LyjJBU1wUsKtE+m55cfRmUHVuZ2w4n/VF
+p7Z7n+SNuvJNcrzDxyKVy4GIZ39zQePnniqtLqXh6eI8Ow6jiMgVxC/wbWcVLKv6
+4RM+2fLjJAC9b27QfjhOlMKVeMOEvPrrpjLSauaHAktQPhuzIAwzxM0+KnvDkWWy
+NVqAV/lq6fSO/9vJRhM4E2nxo6yqi7qTdxVxMmKsNn7L6HvjQgx+FXziAUs55Qd9
+cP7etCmPmoefkcgdbxDOIKH8D+DvfacZwngqcnr/q96Ff4uJ13d2OzR1mWVSZ2hE
+JQt/BbZBANciqu9OZf3dj6uOOXgFF705ak0GfLtpZpc29M+fVnknXPDSiKFqjzOO
+KL+SRGyuNc9ZYjBKkXPJ1OToAs6JSvgDxfOfX0thuo2rslqfpj2qCFugsRIRAqvb
+eyFwg+BPM/P/EfauXlAcQtBF04fOi7xN2okCAwEAAaNeMFwwHQYDVR0OBBYEFNwu
+Py4Do//Sm5CZDrocHWTrNr96MCAGA1UdJQEB/wQWMBQGCCsGAQUFBwMBBggrBgEF
+BQcDAjAMBgNVHRMEBTADAQH/MAsGA1UdDwQEAwIB/jANBgkqhkiG9w0BAQsFAAOC
+BAEAMHZpBqDIUAVFZNw4XbuimXQ4K8q4uePrLGHLb4F/gHbr8kYrU4H+cy4l+xXf
+2dlEBdZoqjSF7uXzQg5Fd8Ff3ZgutXd1xeUJnxo0VdpKIhqeaTPqhffC2X6FQQH5
+KrN7NVWQSnUhPNpBFELpmdpY1lHigFW7nytYj0C6VJ4QsbqhfW+n/t+Zgqtfh/Od
+ZbclzxFwMM55zRA2HP6IwXS2+d61Jk/RpDHTzhWdjGH4906zGNNMa7slHpCTA9Ju
+TrtjEAGt2PBSievBJOHZW80KVAoEX2n9B3ZABaz+uX0VVZG0D2FwhPpUeA57YiXu
+qiktZR4Ankph3LabXp4IlAX16qpYsEW8TWE/HLreeqoM0WDoI6rF9qnTpV2KWqBf
+ziMYkfSkT7hQ2bWc493lW+QwSxCsuBsDwlrCwAl6jFSf1+jEQx98/8n9rDNyD9dL
+PvECmtF30WY98nwZ9/kO2DufQrd0mwSHcIT0pAwl5fimpkwTjj+TTbytO3M4jK5L
+tuIzsViQ95BmJQ3XuLdkQ/Ug8rpECYRX5fQX1qXkkvl920ohpKqKyEji1OmfmJ0Z
+tZChaEcu3Mp3U+gD4az2ogmle3i/Phz8ZEPFo4/21G5Qd72z0lBgaQIeyyCk5MHt
+Yg0vA7X0/w4bz+OJv5tf7zJsPCYSprr+c/7YUJk9Fqu6+g9ZAavI99xFKdGhz4Og
+w0trnKNCxYc6+NPopTDbXuY+fo4DK7C0CSae5sKs7013Ne6w4KvgfLKpvlemkGfg
+ZA3+1FMXVfFIEH7Cw9cx6F02Sr3k1VrU68oM3wH5nvTUkELOf8nRMlzliQjVCpKB
+yFSe9dzRVSFEbMDxChiEulGgNUHj/6wwpg0ZmCwPRHutppT3jkfEqizN5iHb69GH
+k6kol6knJofkaL656Q3Oc9o0ZrMlFh1RwmOvAk5fVK0/CV88/phROz2Wdmy5Bz4a
+t0vzqFWA54y6+9EEVoOk9SU0CYfpGtpX4URjLK1EUG/l+RR3366Uee6TPrtEZ9cg
+56VQMxhSaRNAvJ6DfiSuscSCNJzwuXaMXSZydGYnnP9Tb9p6c1uy1sXdluZkBIcK
+CgC+gdDMSNlDn9ghc4xZGkuA8bjzfAYuRuGKmfTt8uuklkjw2b9w3SHjC4/Cmd2W
+cFRnzfg2oL6e78hNg2ZGgsLzvb6Lu6/5IhXCO7RitzYf2+HLBbc+YLFsnG3qeGe1
+28yGnXOQd97Cr4+IzFucVy/33gMQkesNUSDFJSq1gE/hGrMgTTMQJ7yC3PRqg0kG
+tpqTyKNdM0g1adxlR1qfDPvpUBApkgBbySnMyWEr5+tBuoHUtH2m49oV9YD4odMJ
+yJjlGxituO/YNN6O8oANlraG1Q==
+-----END CERTIFICATE-----
+-----BEGIN CERTIFICATE-----
+MIIJBzCCBO+gAwIBAgIJAN7WS1mRS9A+MA0GCSqGSIb3DQEBCwUAMBoxGDAWBgNV
+BAMMD3Bvbnl0b3duIFJTQSBDQTAeFw0xNjEyMTAxNzQyMzNaFw0yNjEyMDgxNzQy
+MzNaMBoxGDAWBgNVBAMMD3Bvbnl0b3duIFJTQSBDQTCCBCIwDQYJKoZIhvcNAQEB
+BQADggQPADCCBAoCggQBAMNEzJ7aNdD2JSk9+NF9Hh2za9OQnt1d/7j6DtE3ieoT
+ms8mMSXzoImXZayZ9Glx3yx/RhEb2vmINyb0vRUM4I/GH+XHdOBcs9kaJNv/Mpw4
+Ggd4e1LUqV1pzNrhYwRrTQTKyaDiDX2WEBNfQaaYnHltmSmsfyt3Klj+IMc6CyqV
+q8SOQ6Go414Vn++Jj7p3E6owdwuvSvO8ERLobiA6vYB+qrS7E48c4zRIAFIO4uwt
+g4TiCJLLWc1fRSoqGGX7KS+LzQF8Pq67IOHVna4e9peSe6nQnm0LQZAmaosYHvF4
+AX0Bj6TLv9PXCAGtB7Pciev5Br0tRZEdVyYfmwiVKUWcp77TghV3W+VaJVhPh5LN
+X91ktvpeYek3uglqv2ZHtSG2S1KkBtTkbMOD+a2BEUfq0c0+BIsj6jdvt4cvIfet
+4gUOxCvYMBs4/dmNT1zoe/kJ0lf8YXYLsXwVWdIW3jEE8QdkLtLI9XfyU9OKLZuD
+mmoAf7ezvv/T3nKLFqhcwUFGgGtCIX+oWC16XSbDPBcKDBwNZn8C49b7BLdxqAg3
+msfxwhYzSs9F1MXt/h2dh7FVmkCSxtgNDX3NJn5/yT6USws2y0AS5vXVP9hRf0NV
+KfKn9XlmHCxnZExwm68uZkUUYHB05jSWFojbfWE+Mf9djUeQ4FuwusztZdbyQ4yS
+mMtBXO0I6SQBmjCoOa1ySW3DTuw/eKCfq+PoxqWD434bYA9nUa+pE27MP7GLyjCS
+6+ED3MACizSF0YxkcC9pWUo4L5FKp+DxnNbtzMIILnsDZTVHOvKUy/gjTyTWm/+7
+2t98l7vBE8gn3Aux0V5WFe2uZIZ07wIi/OThoBO8mpt9Bm5cJTG07JStKEXX/UH1
+nL7cDZ2V5qbf4hJdDy4qixxxIZtmf//1BRlVQ9iYTOsMoy+36DXWbc3vSmjRefW1
+YENt4zxOPe4LUq2Z+LXq1OgVQrHrVevux0vieys7Rr2gA1sH8FaaNwTr7Q8dq+Av
+Evk+iOUH4FuYorU1HuGHPkAkvLWosVwlB+VhfEai0V6+PmttmaOnCJNHfFTu5wCu
+B9CFJ1tdzTzAbrLwgtWmO70KV7CfZPHO7lMWhSvplU0i5T9WytxP91IoFtXwRSO8
++Ghyu0ynB3HywCH2dez89Vy903P6PEU0qTnYWRz6D/wi5+yHHNrm9CilWurs/Qex
+kyB7lLD7Cb1JJc8QIFTqT6vj+cids3xd245hUdpFyZTX99YbF6IkiB2zGi5wvUmP
+f1GPvkTLb7eF7bne9OClEjEqvc0hVJ2abO2WXkqxlQFEYZHNofm+y6bnby/BZZJo
+beaSFcLOCe2Z8iZvVnzfHBCeLyWE89gc94z784S3LEsCAwEAAaNQME4wHQYDVR0O
+BBYEFNz2wEPCQbx9OdRCNE4eALwHJfIgMB8GA1UdIwQYMBaAFNz2wEPCQbx9OdRC
+NE4eALwHJfIgMAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQELBQADggQBACbm2YX7
+sBG0Aslj36gmVlCTTluNg2tuK2isHbK3YhNwujrH/o/o2OV7UeUkZkPwE4g4/SjC
+OwDWYniRNyDKBOeD9Q0XxR5z5IZQO+pRVvXF8DXO6kygWCOJM9XheKxp9Uke0aDg
+m8F02NslKLUdy7piGlLSz1sgdjiE3izIwFZRpZY7sMozNWWvSAmzprbkE78LghIm
+VEydQzIQlr5soWqc65uFLNbEA6QBPoFc6dDW+mnzXf8nrZUM03CACxAsuq/YkjRp
+OHgwgfdNRdlu4YhZtuQNak4BUvDmigTGxDC+aMJw0ldL1bLtqLG6BvQbyLNPOOfo
+5S8lGh4y06gb//052xHaqtCh5Ax5sHUE5By6wKHAKbuJy26qyKfaRoc3Jigs4Fd5
+3CuoDWHbyXfkgKiU+sc+1mvCxQKFRJ2fpGEFP8iEcLvdUae7ZkRM4Kb0vST+QhQV
+fDaFkM3Bwqtui5YaZ6cHHQVyXQdujCmfesoZXKil2yduQ3KWgePjewzRV+aDWMzk
+qKaF+TRANSqWbBU6JTwwQ4veKQThU3ir7nS2ovdPbhNS/FnWoKodj6eaqXfdYuBh
+XOXLewIF568MJsLOuBubeAO2a9LOlhnv6eLGp2P4M7vwEdN/LRRQtwBBmqq8C3h+
+ewrJP12B/ag0bJDi9vCgPhYtDEpjpfsnxZEIqVZwshJ/MqXykFp2kYk62ylyfDWq
+veI/aHwpzT2k+4CI/XmPWXl9NlI50HPdpcwCBDy8xVHwb/x7stNgQdIhaj9tzmKa
+S+eqitclc8Iqrbd523H//QDzm8yiqRZUdveNa9gioTMErR0ujCpK8tO8mVZcVfNX
+i1/Vsar5++nXcPhxKsd1t8XV2dk3gUZIfMgzLLzs+KSiFg+bT3c7LkCd+I3w30Iv
+fh9cxFBAyYO9giwxaCfJgoz7OYqaHOOtASF85UV7gK9ELT7/z+RAcS/UfY1xbd54
+hIi1vRZj8lfkAYNtnYlud44joi1BvW/GZGFCiJ13SSvfHNs9v/5xguyCSgyCc0qx
+ZkN/fzj/5wFQbxSl3MPn/JrsvlH6wvJht1SA50uVdUvJ5e5V8EgLYfMqlJNNpTHP
+wZcHF+Dw126oyu2KhUxD126Gusxp+tV6I0EEZnVwwduFQWq9xm/gT+qohpveeylf
+Q2XGz56DF2udJJnSFGSqzQOl9XopNC/4ecBMwIzqdFSpaWgK3VNAcigyDajgoE4v
+ZuiVDEiLhLowZvi1V8GOWzcka7R2BQBjhOLWByQGDcm8cOMS7w8oCSQCaYmJyHvE
+tTHq7fX6/sXv0AJqM3ysSdU01IVBNahnr5WEkmQMaFF0DGvRfqkVdKcChwrKv7r2
+DLxargy39i2aQGg=
+-----END CERTIFICATE-----
diff --git a/examples/tls-client/src/main.rs b/examples/tls-client/src/main.rs
new file mode 100644 (file)
index 0000000..6d57a35
--- /dev/null
@@ -0,0 +1,78 @@
+use log::*;
+use seam_channel::net::tls::rustls::ClientConfig;
+use seam_channel::net::{StitchClient, StitchNetClient};
+use std::env;
+
+#[async_std::main]
+async fn main() -> anyhow::Result<()> {
+    env_logger::init();
+
+    // get ip address and domain from cmd line args
+    let (ip_addr, domain, cafile_path) = parse_args();
+
+    // construct `rustls` client config
+    let cafile = std::fs::read(cafile_path)?;
+
+    let mut client_pem = std::io::Cursor::new(cafile);
+
+    let mut client_config = ClientConfig::new();
+    client_config
+        .root_store
+        .add_pem_file(&mut client_pem)
+        .map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidInput, "invalid cert"))?;
+
+    // create a client connection to the server
+    let dist_chan = StitchNetClient::tls_client(ip_addr, &domain, client_config.into())?;
+
+    // create a channel for String messages on the TCP connection
+    let (sender, receiver) = dist_chan.bounded::<String>(Some(100));
+
+    // alert the connection that you are ready to read and write messages
+    dist_chan.ready()?;
+
+    // send a message to the server
+    let msg = String::from("Hello world");
+    info!("Sending message: {}", msg);
+    sender.send(msg).await?;
+
+    // wait for the server to reply with an ack
+    if let Ok(msg) = receiver.recv().await {
+        info!("Received reply: {}", msg);
+    }
+
+    Ok(())
+}
+
+fn parse_args() -> (String, String, String) {
+    let args: Vec<String> = env::args().collect();
+
+    let ip_address = match args.get(1) {
+        Some(addr) => addr,
+        None => {
+            error!("Need to pass IP address to connect to as first command line argument");
+            panic!();
+        }
+    };
+
+    let domain = match args.get(2) {
+        Some(d) => d,
+        None => {
+            error!("Need to pass domain name as second command line argument");
+            panic!();
+        }
+    };
+
+    let cafile_path = match args.get(3) {
+        Some(d) => d,
+        None => {
+            error!("Need to pass path to cafile as third command line argument");
+            panic!();
+        }
+    };
+
+    (
+        ip_address.to_string(),
+        domain.to_string(),
+        cafile_path.to_string(),
+    )
+}
diff --git a/examples/tls-echo-server/Cargo.toml b/examples/tls-echo-server/Cargo.toml
new file mode 100644 (file)
index 0000000..31bff03
--- /dev/null
@@ -0,0 +1,15 @@
+[package]
+name = "tls-echo-server"
+version = "0.1.0"
+authors = ["Sachandhan Ganesh <sachan.ganesh@gmail.com>"]
+edition = "2018"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+anyhow = "1.0.31"
+async-std = { version = "1.6.2", features = ["attributes"] }
+env_logger = "0.7"
+log = "0.4"
+
+seam-channel = { path = "../../" }
\ No newline at end of file
diff --git a/examples/tls-echo-server/README.md b/examples/tls-echo-server/README.md
new file mode 100644 (file)
index 0000000..eeafd03
--- /dev/null
@@ -0,0 +1,23 @@
+# seam-channel tls-echo-server example
+
+This example program will:
+
+1. Bind to an IP address
+2. Accept any number of secure TLS connections
+3. Handle each connection by:
+    1. Waiting for `String` messages to be received
+    2. Echoing the `String` message back to the source
+
+## Usage
+
+```
+export RUST_LOG=info
+cargo run <ip-address-to-bind-to> <cert-file> <key-file>
+```
+
+## Example Usage
+
+```
+export RUST_LOG=info
+cargo run 127.0.0.1:5678 end.cert end.rsa
+```
\ No newline at end of file
diff --git a/examples/tls-echo-server/end.cert b/examples/tls-echo-server/end.cert
new file mode 100644 (file)
index 0000000..66f087e
--- /dev/null
@@ -0,0 +1,24 @@
+-----BEGIN CERTIFICATE-----
+MIIEADCCAmigAwIBAgICAcgwDQYJKoZIhvcNAQELBQAwLDEqMCgGA1UEAwwhcG9u
+eXRvd24gUlNBIGxldmVsIDIgaW50ZXJtZWRpYXRlMB4XDTE2MTIxMDE3NDIzM1oX
+DTIyMDYwMjE3NDIzM1owGTEXMBUGA1UEAwwOdGVzdHNlcnZlci5jb20wggEiMA0G
+CSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQC1YDz66+7VD4DL1+/sVHMQ+BbDRgmD
+OQlX++mfW8D3QNQm/qDBEbu7T7qqdc9GKDar4WIzBN8SBkzM1EjMGwNnZPV/Tfz0
+qUAR1L/7Zzf1GaFZvWXgksyUpfwvmprH3Iy/dpkETwtPthpTPNlui3hZnm/5kkjR
+RWg9HmID4O04Ld6SK313v2ZgrPZbkKvbqlqhUnYWjL3blKVGbpXIsuZzEU9Ph+gH
+tPcEhZpFsM6eLe+2TVscIrycMEOTXqAAmO6zZ9sQWtfllu3CElm904H6+jA/9Leg
+al72pMmkYr8wWniqDDuijXuCPlVx5EDFFyxBmW18UeDEQaKV3kNfelaTAgMBAAGj
+gb4wgbswDAYDVR0TAQH/BAIwADALBgNVHQ8EBAMCBsAwHQYDVR0OBBYEFIYhJkVy
+AAKT6cY/ruH1Eu+NNxteMEIGA1UdIwQ7MDmAFNwuPy4Do//Sm5CZDrocHWTrNr96
+oR6kHDAaMRgwFgYDVQQDDA9wb255dG93biBSU0EgQ0GCAXswOwYDVR0RBDQwMoIO
+dGVzdHNlcnZlci5jb22CFXNlY29uZC50ZXN0c2VydmVyLmNvbYIJbG9jYWxob3N0
+MA0GCSqGSIb3DQEBCwUAA4IBgQCWV76jfQDZKtfmj45fTwZzoe/PxjWPRbAvSEnt
+LRHrPhqQfpMLqpun8uu/w86mHiR/AmiAySMu3zivW6wfGzlRWLi/zCyO6r9LGsgH
+bNk5CF642cdZFvn1SiSm1oGXQrolIpcyXu88nUpt74RnY4ETCC1dRQKqxsYufe5T
+DOmTm3ChinNW4QRG3yvW6DVuyxVAgZvofyKJOsM3GO6oogIM41aBqZ3UTwmIwp6D
+oISdiATslFOzYzjnyXNR8DG8OOkv1ehWuyb8x+hQCZAuogQOWYtCSd6k3kKgd0EM
+4CWbt1XDV9ZJwBf2uxZeKuCu/KIy9auNtijAwPsUv9qxuzko018zhl3lWm5p2Sqw
+O7fFshU3A6df8hMw7ST6/tgFY7geT88U4iJhfWMwr/CZSRSVMXhTyJgbLIXxKYZj
+Ym5v4NAIQP6hI4HixzQaYgrhW6YX6myk+emMjQLRJHT8uHvmT7fuxMJVWWgsCkr1
+C75pRQEagykN/Uzr5e6Tm8sVu88=
+-----END CERTIFICATE-----
diff --git a/examples/tls-echo-server/end.rsa b/examples/tls-echo-server/end.rsa
new file mode 100644 (file)
index 0000000..744bba5
--- /dev/null
@@ -0,0 +1,27 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEogIBAAKCAQEAtWA8+uvu1Q+Ay9fv7FRzEPgWw0YJgzkJV/vpn1vA90DUJv6g
+wRG7u0+6qnXPRig2q+FiMwTfEgZMzNRIzBsDZ2T1f0389KlAEdS/+2c39RmhWb1l
+4JLMlKX8L5qax9yMv3aZBE8LT7YaUzzZbot4WZ5v+ZJI0UVoPR5iA+DtOC3ekit9
+d79mYKz2W5Cr26paoVJ2Foy925SlRm6VyLLmcxFPT4foB7T3BIWaRbDOni3vtk1b
+HCK8nDBDk16gAJjus2fbEFrX5ZbtwhJZvdOB+vowP/S3oGpe9qTJpGK/MFp4qgw7
+oo17gj5VceRAxRcsQZltfFHgxEGild5DX3pWkwIDAQABAoIBAFDTazlSbGML/pRY
+TTWeyIw2UkaA7npIr45C13BJfitw+1nJPK/tDCDDveZ6i3yzLPHZhV5A/HtWzWC1
+9R7nptOrnO83PNN2nPOVQFxzOe+ClXGdQkoagQp5EXHRTspj0WD9I+FUrDDAcOjJ
+BAgMJPyi6zlnZAXGDVa3NGyQDoZqwU2k36L4rEsJIkG0NVurZhpiCexNkkf32495
+TOINQ0iKdfJ4iZoEYQ9G+x4NiuAJRCHuIcH76SNfT+Uv3wX0ut5EFPtflnvtdgcp
+QVcoKwYdO0+mgO5xqWlBcsujSvgBdiNAGnAxKHWiEaacuIJi4+yYovyEebP6QI2X
+Zg/U2wkCgYEA794dE5CPXLOmv6nioVC/ubOESk7vjSlEka/XFbKr4EY794YEqrB1
+8TUqg09Bn3396AS1e6P2shr3bxos5ybhOxDGSLnJ+aC0tRFjd1BPKnA80vZM7ggt
+5cjmdD5Zp0tIQTIAAYU5bONQOwj0ej4PE7lny26eLa5vfvCwlrD+rM0CgYEAwZMN
+W/5PA2A+EM08IaHic8my0dCunrNLF890ouZnDG99SbgMGvvEsGIcCP1sai702hNh
+VgGDxCz6/HUy+4O4YNFVtjY7uGEpfIEcEI7CsLQRP2ggWEFxThZtnEtO8PbM3J/i
+qcS6njHdE+0XuCjgZwGgva5xH2pkWFzw/AIpEN8CgYB2HOo2axWc8T2n3TCifI+c
+EqCOsqXU3cBM+MgxgASQcCUxMkX0AuZguuxPMmS+85xmdoMi+c8NTqgOhlYcEJIR
+sqXgw9OH3zF8g6513w7Md+4Ld4rUHyTypGWOUfF1pmVS7RsBpKdtTdWA7FzuIMbt
+0HsiujqbheyTFlPuMAOH9QKBgBWS1gJSrWuq5j/pH7J/4EUXTZ6kq1F0mgHlVRJy
+qzlvk38LzA2V0a32wTkfRV3wLcnALzDuqkjK2o4YYb42R+5CZlMQaEd8TKtbmE0g
+HAKljuaKLFCpun8BcOXiXsHsP5i3GQPisQnAdOsrmWEk7R2NyORa9LCToutWMGVl
+uD3xAoGAA183Vldm+m4KPsKS17t8MbwBryDXvowGzruh/Z+PGA0spr+ke4XxwT1y
+kMMP1+5flzmjlAf4+W8LehKuVqvQoMlPn5UVHmSxQ7cGx/O/o6Gbn8Q25/6UT+sM
+B1Y0rlLoKG62pnkeXp1O4I57gnClatWRg5qw11a8V8e3jvDKIYM=
+-----END RSA PRIVATE KEY-----
diff --git a/examples/tls-echo-server/src/main.rs b/examples/tls-echo-server/src/main.rs
new file mode 100644 (file)
index 0000000..1129657
--- /dev/null
@@ -0,0 +1,94 @@
+use async_std::{io, task};
+use log::*;
+use seam_channel::net::tls::rustls::internal::pemfile::{certs, rsa_private_keys};
+use seam_channel::net::tls::rustls::{NoClientAuth, ServerConfig};
+use seam_channel::net::{StitchClient, StitchNetServer};
+use std::env;
+use std::fs::File;
+use std::io::BufReader;
+
+#[async_std::main]
+async fn main() -> anyhow::Result<()> {
+    env_logger::init();
+
+    // Get ip address from cmd line args
+    let (ip_address, cert_path, key_path) = parse_args();
+
+    let certs = certs(&mut BufReader::new(File::open(cert_path)?))
+        .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid cert"))?;
+
+    let mut keys = rsa_private_keys(&mut BufReader::new(File::open(key_path)?))
+        .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid key"))?;
+
+    let mut config = ServerConfig::new(NoClientAuth::new());
+    config
+        .set_single_cert(certs, keys.remove(0))
+        .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
+
+    // create a server
+    let (_server, conns) = StitchNetServer::tls_server(ip_address, config.into())?;
+
+    // handle server connections
+    // wait for a connection to come in and be accepted
+    while let Ok(conn) = conns.recv().await {
+        info!("Handling connection: {}", conn.peer_addr());
+
+        // register for String-typed messages
+        let (sender, receiver) = conn.unbounded::<String>();
+
+        // let the connection know you are ready to send and receive messages
+        conn.ready()
+            .expect("could not ready the connection for reading and writing");
+
+        // handle String messages
+        task::spawn(async move {
+            // for every String message
+            while let Ok(msg) = receiver.recv().await {
+                info!("Echoing message: {}", msg);
+
+                let response = format!("{}, right back at you!", msg);
+
+                // Send the message back to its source
+                if let Err(err) = sender.send(response).await {
+                    error!("Could not echo message: {:#?}", err);
+                }
+            }
+        });
+    }
+
+    Ok(())
+}
+
+fn parse_args() -> (String, String, String) {
+    let args: Vec<String> = env::args().collect();
+
+    let ip_address = match args.get(1) {
+        Some(addr) => addr,
+        None => {
+            error!("Need to pass IP address to connect to as first command line argument");
+            panic!();
+        }
+    };
+
+    let cert_path = match args.get(2) {
+        Some(d) => d,
+        None => {
+            error!("Need to pass path to cert file as second command line argument");
+            panic!();
+        }
+    };
+
+    let key_path = match args.get(3) {
+        Some(d) => d,
+        None => {
+            error!("Need to pass path to key file as third command line argument");
+            panic!();
+        }
+    };
+
+    (
+        ip_address.to_string(),
+        cert_path.to_string(),
+        key_path.to_string(),
+    )
+}
diff --git a/schema/message.proto b/schema/message.proto
new file mode 100644 (file)
index 0000000..e80daff
--- /dev/null
@@ -0,0 +1,8 @@
+syntax = "proto3";
+package message;
+
+import "google/protobuf/any.proto";
+
+message StitchMessage {
+    google.protobuf.Any payload = 1;
+}
diff --git a/src/lib.rs b/src/lib.rs
new file mode 100644 (file)
index 0000000..c227762
--- /dev/null
@@ -0,0 +1,85 @@
+pub mod schema;
+pub mod tcp;
+// @todo pub mod tls;
+mod reader;
+mod writer;
+
+pub use crate::reader::StitchConnectionReader;
+use crate::schema::StitchMessage;
+pub use crate::writer::StitchConnectionWriter;
+use async_channel::RecvError;
+use async_std::net::SocketAddr;
+use async_std::pin::Pin;
+use bytes::{Buf, BytesMut};
+use futures::task::{Context, Poll};
+use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, Sink, Stream};
+use log::*;
+use protobuf::Message;
+use std::convert::TryInto;
+
+pub use futures::SinkExt;
+pub use futures::StreamExt;
+use protobuf::well_known_types::Any;
+
+pub struct StitchConnection {
+    local_addr: SocketAddr,
+    peer_addr: SocketAddr,
+    reader: StitchConnectionReader,
+    writer: StitchConnectionWriter,
+}
+
+#[allow(dead_code)]
+impl StitchConnection {
+    pub(crate) fn new(
+        local_addr: SocketAddr,
+        peer_addr: SocketAddr,
+        read_stream: Box<dyn AsyncRead + Send + Sync + Unpin>,
+        write_stream: Box<dyn AsyncWrite + Send + Sync + Unpin>,
+    ) -> Self {
+        Self {
+            local_addr,
+            peer_addr,
+            reader: StitchConnectionReader::new(local_addr, peer_addr, read_stream),
+            writer: StitchConnectionWriter::new(local_addr, peer_addr, write_stream),
+        }
+    }
+
+    pub fn local_addr(&self) -> SocketAddr {
+        self.local_addr.clone()
+    }
+
+    pub fn peer_addr(&self) -> SocketAddr {
+        self.peer_addr.clone()
+    }
+
+    pub fn split(self) -> (StitchConnectionReader, StitchConnectionWriter) {
+        (self.reader, self.writer)
+    }
+
+    pub fn join(reader: StitchConnectionReader, writer: StitchConnectionWriter) -> Self {
+        Self {
+            local_addr: reader.local_addr(),
+            peer_addr: reader.peer_addr(),
+            reader,
+            writer,
+        }
+    }
+
+    pub fn reader(&mut self) -> &mut StitchConnectionReader {
+        &mut self.reader
+    }
+
+    pub fn writer(&mut self) -> &mut StitchConnectionWriter {
+        &mut self.writer
+    }
+
+    pub async fn close(self) -> SocketAddr {
+        let peer_addr = self.peer_addr();
+
+        drop(self.reader);
+        // self.writer.close().await;
+        drop(self.writer);
+
+        return peer_addr;
+    }
+}
diff --git a/src/reader.rs b/src/reader.rs
new file mode 100644 (file)
index 0000000..568e40a
--- /dev/null
@@ -0,0 +1,124 @@
+use crate::schema::StitchMessage;
+use async_std::net::SocketAddr;
+use async_std::pin::Pin;
+use bytes::{Buf, BytesMut};
+use futures::task::{Context, Poll};
+use futures::{AsyncRead, AsyncReadExt, Stream};
+use log::*;
+use protobuf::Message;
+use std::convert::TryInto;
+
+pub use futures::SinkExt;
+pub use futures::StreamExt;
+use protobuf::well_known_types::Any;
+
+const BUFFER_SIZE: usize = 8192;
+
+pub struct StitchConnectionReader {
+    local_addr: SocketAddr,
+    peer_addr: SocketAddr,
+    read_stream: Box<dyn AsyncRead + Send + Sync + Unpin>,
+    pending_read: Option<BytesMut>,
+}
+
+impl StitchConnectionReader {
+    pub fn new(
+        local_addr: SocketAddr,
+        peer_addr: SocketAddr,
+        read_stream: Box<dyn AsyncRead + Send + Sync + Unpin>,
+    ) -> Self {
+        Self {
+            local_addr,
+            peer_addr,
+            read_stream,
+            pending_read: None,
+        }
+    }
+
+    pub fn local_addr(&self) -> SocketAddr {
+        self.local_addr.clone()
+    }
+
+    pub fn peer_addr(&self) -> SocketAddr {
+        self.peer_addr.clone()
+    }
+}
+
+impl Stream for StitchConnectionReader {
+    type Item = Any;
+
+    fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        let mut buffer = BytesMut::new();
+        buffer.resize(BUFFER_SIZE, 0);
+
+        debug!("Starting new read loop for {}", self.local_addr);
+        loop {
+            trace!("Reading from the stream");
+            match futures::executor::block_on(self.read_stream.read(&mut buffer)) {
+                Ok(mut bytes_read) => {
+                    if bytes_read > 0 {
+                        debug!("Read {} bytes from the network stream", bytes_read)
+                    }
+
+                    if let Some(mut pending_buf) = self.pending_read.take() {
+                        debug!("Prepending broken data ({} bytes) encountered from earlier read of network stream", pending_buf.len());
+                        bytes_read += pending_buf.len();
+
+                        pending_buf.unsplit(buffer);
+                        buffer = pending_buf;
+                    }
+
+                    let mut bytes_read_u64: u64 = bytes_read.try_into().expect(
+                        format!("Conversion from usize ({}) to u64 failed", bytes_read).as_str(),
+                    );
+                    while bytes_read_u64 > 0 {
+                        debug!(
+                            "{} bytes from network stream still unprocessed",
+                            bytes_read_u64
+                        );
+
+                        buffer.resize(bytes_read, 0);
+                        debug!("{:?}", buffer.as_ref());
+
+                        match StitchMessage::parse_from_bytes(buffer.as_ref()) {
+                            Ok(mut data) => {
+                                let serialized_size = data.compute_size();
+                                debug!("Deserialized message of size {} bytes", serialized_size);
+
+                                buffer.advance(serialized_size as usize);
+
+                                let serialized_size_u64: u64 = serialized_size.try_into().expect(
+                                    format!(
+                                        "Conversion from usize ({}) to u64 failed",
+                                        serialized_size
+                                    )
+                                    .as_str(),
+                                );
+                                bytes_read_u64 -= serialized_size_u64;
+                                debug!("{} bytes still unprocessed", bytes_read_u64);
+
+                                debug!("Sending deserialized message downstream");
+                                return Poll::Ready(Some(data.take_payload()));
+                            }
+
+                            Err(err) => {
+                                warn!(
+                                    "Could not deserialize data from the received bytes: {:#?}",
+                                    err
+                                );
+
+                                self.pending_read = Some(buffer);
+                                buffer = BytesMut::new();
+                                break;
+                            }
+                        }
+                    }
+
+                    buffer.resize(BUFFER_SIZE, 0);
+                }
+
+                Err(_err) => return Poll::Pending,
+            }
+        }
+    }
+}
diff --git a/src/schema/message.rs b/src/schema/message.rs
new file mode 100644 (file)
index 0000000..352580a
--- /dev/null
@@ -0,0 +1,216 @@
+// This file is generated by rust-protobuf 2.19.0. Do not edit
+// @generated
+
+// https://github.com/rust-lang/rust-clippy/issues/702
+#![allow(unknown_lints)]
+#![allow(clippy::all)]
+
+#![allow(unused_attributes)]
+#![rustfmt::skip]
+
+#![allow(box_pointers)]
+#![allow(dead_code)]
+#![allow(missing_docs)]
+#![allow(non_camel_case_types)]
+#![allow(non_snake_case)]
+#![allow(non_upper_case_globals)]
+#![allow(trivial_casts)]
+#![allow(unused_imports)]
+#![allow(unused_results)]
+//! Generated file from `message.proto`
+
+/// Generated files are compatible only with the same version
+/// of protobuf runtime.
+// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_19_0;
+
+#[derive(PartialEq,Clone,Default)]
+pub struct StitchMessage {
+    // message fields
+    pub payload: ::protobuf::SingularPtrField<::protobuf::well_known_types::Any>,
+    // special fields
+    pub unknown_fields: ::protobuf::UnknownFields,
+    pub cached_size: ::protobuf::CachedSize,
+}
+
+impl<'a> ::std::default::Default for &'a StitchMessage {
+    fn default() -> &'a StitchMessage {
+        <StitchMessage as ::protobuf::Message>::default_instance()
+    }
+}
+
+impl StitchMessage {
+    pub fn new() -> StitchMessage {
+        ::std::default::Default::default()
+    }
+
+    // .google.protobuf.Any payload = 1;
+
+
+    pub fn get_payload(&self) -> &::protobuf::well_known_types::Any {
+        self.payload.as_ref().unwrap_or_else(|| <::protobuf::well_known_types::Any as ::protobuf::Message>::default_instance())
+    }
+    pub fn clear_payload(&mut self) {
+        self.payload.clear();
+    }
+
+    pub fn has_payload(&self) -> bool {
+        self.payload.is_some()
+    }
+
+    // Param is passed by value, moved
+    pub fn set_payload(&mut self, v: ::protobuf::well_known_types::Any) {
+        self.payload = ::protobuf::SingularPtrField::some(v);
+    }
+
+    // Mutable pointer to the field.
+    // If field is not initialized, it is initialized with default value first.
+    pub fn mut_payload(&mut self) -> &mut ::protobuf::well_known_types::Any {
+        if self.payload.is_none() {
+            self.payload.set_default();
+        }
+        self.payload.as_mut().unwrap()
+    }
+
+    // Take field
+    pub fn take_payload(&mut self) -> ::protobuf::well_known_types::Any {
+        self.payload.take().unwrap_or_else(|| ::protobuf::well_known_types::Any::new())
+    }
+}
+
+impl ::protobuf::Message for StitchMessage {
+    fn is_initialized(&self) -> bool {
+        for v in &self.payload {
+            if !v.is_initialized() {
+                return false;
+            }
+        };
+        true
+    }
+
+    fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> {
+        while !is.eof()? {
+            let (field_number, wire_type) = is.read_tag_unpack()?;
+            match field_number {
+                1 => {
+                    ::protobuf::rt::read_singular_message_into(wire_type, is, &mut self.payload)?;
+                },
+                _ => {
+                    ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
+                },
+            };
+        }
+        ::std::result::Result::Ok(())
+    }
+
+    // Compute sizes of nested messages
+    #[allow(unused_variables)]
+    fn compute_size(&self) -> u32 {
+        let mut my_size = 0;
+        if let Some(ref v) = self.payload.as_ref() {
+            let len = v.compute_size();
+            my_size += 1 + ::protobuf::rt::compute_raw_varint32_size(len) + len;
+        }
+        my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
+        self.cached_size.set(my_size);
+        my_size
+    }
+
+    fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
+        if let Some(ref v) = self.payload.as_ref() {
+            os.write_tag(1, ::protobuf::wire_format::WireTypeLengthDelimited)?;
+            os.write_raw_varint32(v.get_cached_size())?;
+            v.write_to_with_cached_sizes(os)?;
+        }
+        os.write_unknown_fields(self.get_unknown_fields())?;
+        ::std::result::Result::Ok(())
+    }
+
+    fn get_cached_size(&self) -> u32 {
+        self.cached_size.get()
+    }
+
+    fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
+        &self.unknown_fields
+    }
+
+    fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
+        &mut self.unknown_fields
+    }
+
+    fn as_any(&self) -> &dyn (::std::any::Any) {
+        self as &dyn (::std::any::Any)
+    }
+    fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
+        self as &mut dyn (::std::any::Any)
+    }
+    fn into_any(self: ::std::boxed::Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
+        self
+    }
+
+    fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
+        Self::descriptor_static()
+    }
+
+    fn new() -> StitchMessage {
+        StitchMessage::new()
+    }
+
+    fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
+        static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT;
+        descriptor.get(|| {
+            let mut fields = ::std::vec::Vec::new();
+            fields.push(::protobuf::reflect::accessor::make_singular_ptr_field_accessor::<_, ::protobuf::types::ProtobufTypeMessage<::protobuf::well_known_types::Any>>(
+                "payload",
+                |m: &StitchMessage| { &m.payload },
+                |m: &mut StitchMessage| { &mut m.payload },
+            ));
+            ::protobuf::reflect::MessageDescriptor::new_pb_name::<StitchMessage>(
+                "StitchMessage",
+                fields,
+                file_descriptor_proto()
+            )
+        })
+    }
+
+    fn default_instance() -> &'static StitchMessage {
+        static instance: ::protobuf::rt::LazyV2<StitchMessage> = ::protobuf::rt::LazyV2::INIT;
+        instance.get(StitchMessage::new)
+    }
+}
+
+impl ::protobuf::Clear for StitchMessage {
+    fn clear(&mut self) {
+        self.payload.clear();
+        self.unknown_fields.clear();
+    }
+}
+
+impl ::std::fmt::Debug for StitchMessage {
+    fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
+        ::protobuf::text_format::fmt(self, f)
+    }
+}
+
+impl ::protobuf::reflect::ProtobufValue for StitchMessage {
+    fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
+        ::protobuf::reflect::ReflectValueRef::Message(self)
+    }
+}
+
+static file_descriptor_proto_data: &'static [u8] = b"\
+    \n\rmessage.proto\x12\x07message\x1a\x19google/protobuf/any.proto\"C\n\r\
+    StitchMessage\x120\n\x07payload\x18\x01\x20\x01(\x0b2\x14.google.protobu\
+    f.AnyR\x07payloadB\0:\0B\0b\x06proto3\
+";
+
+static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;
+
+fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto {
+    ::protobuf::Message::parse_from_bytes(file_descriptor_proto_data).unwrap()
+}
+
+pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto {
+    file_descriptor_proto_lazy.get(|| {
+        parse_descriptor_proto()
+    })
+}
diff --git a/src/schema/mod.rs b/src/schema/mod.rs
new file mode 100644 (file)
index 0000000..1b8ac93
--- /dev/null
@@ -0,0 +1,16 @@
+mod message;
+
+pub use message::StitchMessage;
+use protobuf::well_known_types::Any;
+use protobuf::Message;
+
+impl StitchMessage {
+    // @todo make pub(crate)
+    pub fn from_msg<T: Message>(msg: T) -> Self {
+        let mut sm = Self::new();
+        let payload = Any::pack(&msg).expect("Protobuf Message could not be packed into Any type");
+
+        sm.set_payload(payload);
+        return sm;
+    }
+}
diff --git a/src/tcp/client.rs b/src/tcp/client.rs
new file mode 100644 (file)
index 0000000..a0994d4
--- /dev/null
@@ -0,0 +1,37 @@
+use async_std::task;
+use log::*;
+
+use crate::StitchConnection;
+use async_std::net::{TcpStream, ToSocketAddrs};
+
+impl StitchConnection {
+    pub fn tcp_client<A: ToSocketAddrs + std::fmt::Display>(
+        ip_addrs: A,
+    ) -> anyhow::Result<StitchConnection> {
+        let read_stream = task::block_on(TcpStream::connect(&ip_addrs))?;
+        info!("Established client TCP connection to {}", ip_addrs);
+
+        Ok(Self::from(read_stream))
+    }
+}
+
+impl From<TcpStream> for StitchConnection {
+    fn from(stream: TcpStream) -> Self {
+        let write_stream = stream.clone();
+
+        let local_addr = stream
+            .local_addr()
+            .expect("Local address could not be retrieved");
+
+        let peer_addr = stream
+            .peer_addr()
+            .expect("Peer address could not be retrieved");
+
+        Self::new(
+            local_addr,
+            peer_addr,
+            Box::new(stream),
+            Box::new(write_stream),
+        )
+    }
+}
diff --git a/src/tcp/mod.rs b/src/tcp/mod.rs
new file mode 100644 (file)
index 0000000..fbc24b2
--- /dev/null
@@ -0,0 +1,5 @@
+pub(crate) mod client;
+pub(crate) mod server;
+
+pub use client::*;
+pub use server::*;
diff --git a/src/tcp/server.rs b/src/tcp/server.rs
new file mode 100644 (file)
index 0000000..adae4cc
--- /dev/null
@@ -0,0 +1,37 @@
+use crate::StitchConnection;
+use async_std::net::{SocketAddr, TcpListener, ToSocketAddrs};
+use async_std::pin::Pin;
+use async_std::task;
+use futures::task::{Context, Poll};
+use futures::{Stream, StreamExt};
+use log::*;
+
+#[allow(dead_code)]
+pub struct StitchTcpServer {
+    local_addrs: SocketAddr,
+    listener: TcpListener,
+}
+
+impl StitchTcpServer {
+    pub fn new<A: ToSocketAddrs + std::fmt::Display>(ip_addrs: A) -> anyhow::Result<Self> {
+        let listener = task::block_on(TcpListener::bind(&ip_addrs))?;
+        info!("Started TCP server at {}", &ip_addrs);
+
+        Ok(Self {
+            local_addrs: listener.local_addr()?,
+            listener,
+        })
+    }
+}
+
+impl Stream for StitchTcpServer {
+    type Item = StitchConnection;
+
+    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        if let Some(Ok(conn)) = futures::executor::block_on(self.listener.incoming().next()) {
+            Poll::Ready(Some(StitchConnection::from(conn)))
+        } else {
+            Poll::Ready(None)
+        }
+    }
+}
diff --git a/src/tls/client.rs b/src/tls/client.rs
new file mode 100644 (file)
index 0000000..5f9acb2
--- /dev/null
@@ -0,0 +1,76 @@
+use async_channel::{Receiver, Sender};
+use async_std::io::*;
+use async_std::net::*;
+use async_std::task;
+use async_tls::TlsConnector;
+use futures_util::io::AsyncReadExt;
+use log::*;
+
+use crate::registry::StitchRegistry;
+use crate::StitchNetClient;
+use crate::{channel_factory, StitchMessage};
+use async_std::sync::{Arc, Condvar, Mutex};
+
+impl StitchNetClient {
+    pub fn tls_client<A: ToSocketAddrs + std::fmt::Display>(
+        ip_addrs: A,
+        domain: &str,
+        connector: TlsConnector,
+    ) -> Result<Self> {
+        Self::tls_client_with_bound(ip_addrs, domain, connector, None)
+    }
+
+    pub fn tls_client_with_bound<A: ToSocketAddrs + std::fmt::Display>(
+        ip_addrs: A,
+        domain: &str,
+        connector: TlsConnector,
+        cap: Option<usize>,
+    ) -> Result<Self> {
+        let stream = task::block_on(TcpStream::connect(&ip_addrs))?;
+        stream.set_nodelay(true)?;
+        info!("Established client TCP connection to {}", ip_addrs);
+
+        Self::tls_client_from_parts(stream, domain, connector, channel_factory(cap))
+    }
+
+    pub fn tls_client_from_parts(
+        stream: TcpStream,
+        domain: &str,
+        connector: TlsConnector,
+        (tls_write_sender, tls_write_receiver): (Sender<StitchMessage>, Receiver<StitchMessage>),
+    ) -> Result<Self> {
+        let local_addr = stream.local_addr()?;
+        let peer_addr = stream.peer_addr()?;
+
+        let encrypted_stream = task::block_on(connector.connect(domain, stream))?;
+        let (read_stream, write_stream) = encrypted_stream.split();
+        info!("Completed TLS handshake with {}", peer_addr);
+
+        let registry: StitchRegistry = crate::registry::new();
+        let read_readiness = Arc::new((Mutex::new(false), Condvar::new()));
+        let write_readiness = Arc::new((Mutex::new(false), Condvar::new()));
+
+        let read_task = task::spawn(crate::tasks::read_from_stream(
+            registry.clone(),
+            read_stream,
+            read_readiness.clone(),
+        ));
+
+        let write_task = task::spawn(crate::tasks::write_to_stream(
+            tls_write_receiver.clone(),
+            write_stream,
+            write_readiness.clone(),
+        ));
+
+        Ok(Self {
+            local_addr,
+            peer_addr,
+            registry,
+            stream_writer_chan: (tls_write_sender, tls_write_receiver),
+            read_readiness,
+            write_readiness,
+            read_task,
+            write_task,
+        })
+    }
+}
diff --git a/src/tls/mod.rs b/src/tls/mod.rs
new file mode 100644 (file)
index 0000000..da8703d
--- /dev/null
@@ -0,0 +1,8 @@
+pub(crate) mod client;
+pub(crate) mod server;
+
+pub use client::*;
+pub use server::*;
+
+pub use async_tls;
+pub use rustls;
diff --git a/src/tls/server.rs b/src/tls/server.rs
new file mode 100644 (file)
index 0000000..79dba44
--- /dev/null
@@ -0,0 +1,135 @@
+use crate::channel_factory;
+use crate::registry::StitchRegistry;
+use crate::{ServerRegistry, StitchClient, StitchNetClient, StitchNetServer};
+use async_channel::{Receiver, Sender};
+use async_std::io::*;
+use async_std::net::*;
+use async_std::prelude::*;
+use async_std::sync::{Arc, Condvar, Mutex};
+use async_std::task;
+use async_tls::TlsAcceptor;
+use dashmap::DashMap;
+use futures_util::AsyncReadExt;
+use log::*;
+
+impl StitchNetServer {
+    pub fn tls_server<A: ToSocketAddrs + std::fmt::Display>(
+        ip_addrs: A,
+        acceptor: TlsAcceptor,
+    ) -> Result<(StitchNetServer, Receiver<Arc<StitchNetClient>>)> {
+        Self::tls_server_with_bound(ip_addrs, acceptor, None)
+    }
+
+    pub fn tls_server_with_bound<A: ToSocketAddrs + std::fmt::Display>(
+        ip_addrs: A,
+        acceptor: TlsAcceptor,
+        cap: Option<usize>,
+    ) -> Result<(Self, Receiver<Arc<StitchNetClient>>)> {
+        let listener = task::block_on(TcpListener::bind(ip_addrs))?;
+        info!("Started TLS server at {}", listener.local_addr()?);
+
+        let registry = Arc::new(DashMap::new());
+        let (sender, receiver) = channel_factory(cap);
+
+        let handler = task::spawn(handle_server_connections(
+            acceptor,
+            registry.clone(),
+            listener,
+            sender.clone(),
+            cap,
+        ));
+
+        Ok((
+            Self {
+                registry,
+                connections_chan: (sender, receiver.clone()),
+                accept_loop_task: handler,
+            },
+            receiver,
+        ))
+    }
+}
+
+async fn handle_server_connections<'a>(
+    acceptor: TlsAcceptor,
+    registry: ServerRegistry,
+    input: TcpListener,
+    output: Sender<Arc<StitchNetClient>>,
+    cap: Option<usize>,
+) -> anyhow::Result<()> {
+    let mut conns = input.incoming();
+
+    debug!("Reading from the stream of incoming connections");
+    loop {
+        match conns.next().await {
+            Some(Ok(tcp_stream)) => {
+                let local_addr = tcp_stream.local_addr()?;
+                let peer_addr = tcp_stream.peer_addr()?;
+
+                debug!("Received connection attempt from {}", peer_addr);
+
+                let tls_stream = acceptor.accept(tcp_stream).await?;
+
+                let (read_stream, write_stream) = tls_stream.split();
+                let (tls_write_sender, tls_write_receiver) = channel_factory(cap);
+
+                let client_registry: StitchRegistry = crate::registry::new();
+                let read_readiness = Arc::new((Mutex::new(false), Condvar::new()));
+                let write_readiness = Arc::new((Mutex::new(false), Condvar::new()));
+
+                let read_task = task::spawn(crate::tasks::read_from_stream(
+                    client_registry.clone(),
+                    read_stream,
+                    read_readiness.clone(),
+                ));
+
+                let write_task = task::spawn(crate::tasks::write_to_stream(
+                    tls_write_receiver.clone(),
+                    write_stream,
+                    write_readiness.clone(),
+                ));
+
+                let conn = StitchNetClient {
+                    local_addr,
+                    peer_addr,
+                    registry: client_registry,
+                    stream_writer_chan: (tls_write_sender, tls_write_receiver),
+                    read_readiness,
+                    write_readiness,
+                    read_task,
+                    write_task,
+                };
+
+                debug!("Attempting to register connection from {}", peer_addr);
+                let conn = Arc::new(conn);
+                registry.insert(conn.peer_addr(), conn.clone());
+                debug!(
+                    "Registered client connection for {} in server registry",
+                    peer_addr
+                );
+
+                if let Err(err) = output.send(conn).await {
+                    error!(
+                        "Stopping the server accept loop - could not send accepted TLS client connection to channel: {:#?}",
+                        err
+                    );
+
+                    break Err(anyhow::Error::from(err));
+                } else {
+                    info!("Accepted connection from {}", peer_addr);
+                }
+            }
+
+            Some(Err(err)) => error!(
+                "Encountered error when accepting TLS connection: {:#?}",
+                err
+            ),
+
+            None => {
+                warn!("Stopping the server accept loop - unable to accept any more connections");
+
+                break Ok(());
+            }
+        }
+    }
+}
diff --git a/src/writer.rs b/src/writer.rs
new file mode 100644 (file)
index 0000000..f6f267e
--- /dev/null
@@ -0,0 +1,111 @@
+use crate::schema::StitchMessage;
+use async_channel::RecvError;
+use async_std::net::SocketAddr;
+use async_std::pin::Pin;
+use futures::task::{Context, Poll};
+use futures::{AsyncWrite, AsyncWriteExt, Sink};
+use log::*;
+use protobuf::Message;
+
+pub use futures::SinkExt;
+pub use futures::StreamExt;
+
+pub struct StitchConnectionWriter {
+    local_addr: SocketAddr,
+    peer_addr: SocketAddr,
+    write_stream: Box<dyn AsyncWrite + Send + Sync + Unpin>,
+    pending_write: Option<StitchMessage>,
+}
+
+impl StitchConnectionWriter {
+    pub fn new(
+        local_addr: SocketAddr,
+        peer_addr: SocketAddr,
+        write_stream: Box<dyn AsyncWrite + Send + Sync + Unpin>,
+    ) -> Self {
+        Self {
+            local_addr,
+            peer_addr,
+            write_stream,
+            pending_write: None,
+        }
+    }
+
+    pub fn local_addr(&self) -> SocketAddr {
+        self.local_addr.clone()
+    }
+
+    pub fn peer_addr(&self) -> SocketAddr {
+        self.peer_addr.clone()
+    }
+}
+
+impl<T: Message> Sink<T> for StitchConnectionWriter {
+    type Error = RecvError;
+
+    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+        if self.pending_write.is_some() {
+            debug!("Connection not ready to send message yet, waiting for prior message");
+            Poll::Pending
+        } else {
+            debug!("Connection ready to send message");
+            Poll::Ready(Ok(()))
+        }
+    }
+
+    fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
+        debug!("Preparing message to be sent next");
+        let stitch_msg: StitchMessage = StitchMessage::from_msg(item);
+        self.pending_write.replace(stitch_msg);
+
+        Ok(())
+    }
+
+    fn poll_flush(
+        mut self: Pin<&mut Self>,
+        _cx: &mut Context<'_>,
+    ) -> Poll<Result<(), Self::Error>> {
+        if let Some(pending_msg) = self.pending_write.take() {
+            debug!("Send pending message");
+            if let Ok(buffer) = pending_msg.write_to_bytes() {
+                let msg_size = buffer.len();
+                debug!("{} bytes to be sent over network connection", msg_size);
+
+                debug!("{:?}", buffer.as_slice());
+
+                return if let Ok(_) =
+                    futures::executor::block_on(self.write_stream.write_all(buffer.as_slice()))
+                {
+                    if let Ok(_) = futures::executor::block_on(self.write_stream.flush()) {
+                        debug!("Sent message of {} bytes", msg_size);
+                        Poll::Ready(Ok(()))
+                    } else {
+                        debug!("Encountered error while flushing queued bytes to network stream");
+                        Poll::Ready(Err(RecvError))
+                    }
+                } else {
+                    debug!("Encountered error when writing to network stream");
+                    Poll::Ready(Err(RecvError))
+                };
+            } else {
+                debug!("Encountered error when serializing message to bytes");
+                return Poll::Ready(Err(RecvError));
+            }
+        } else {
+            debug!("No message to send over connection");
+        }
+
+        Poll::Ready(Ok(()))
+    }
+
+    fn poll_close(
+        mut self: Pin<&mut Self>,
+        _cx: &mut Context<'_>,
+    ) -> Poll<Result<(), Self::Error>> {
+        if let Ok(_) = futures::executor::block_on(self.write_stream.close()) {
+            Poll::Ready(Ok(()))
+        } else {
+            Poll::Ready(Err(RecvError))
+        }
+    }
+}