From 3012c6c96b954d7867f039e51d1f5964c134356a Mon Sep 17 00:00:00 2001 From: Sachandhan Ganesh Date: Thu, 14 Jan 2021 21:25:55 -0800 Subject: [PATCH] rename stitch-net to connect --- Cargo.toml | 21 ++ README.md | 34 +++ build.rs | 9 + examples/tcp-client/Cargo.toml | 19 ++ examples/tcp-client/README.md | 21 ++ examples/tcp-client/build.rs | 9 + examples/tcp-client/schema/hello_world.proto | 6 + examples/tcp-client/src/main.rs | 45 ++++ examples/tcp-client/src/schema/hello_world.rs | 200 ++++++++++++++++ examples/tcp-client/src/schema/mod.rs | 1 + examples/tcp-echo-server/Cargo.toml | 19 ++ examples/tcp-echo-server/README.md | 23 ++ examples/tcp-echo-server/build.rs | 9 + examples/tcp-echo-server/src/main.rs | 57 +++++ .../tcp-echo-server/src/schema/hello_world.rs | 200 ++++++++++++++++ examples/tcp-echo-server/src/schema/mod.rs | 1 + examples/tls-client/Cargo.toml | 15 ++ examples/tls-client/README.md | 21 ++ examples/tls-client/end.chain | 89 ++++++++ examples/tls-client/src/main.rs | 78 +++++++ examples/tls-echo-server/Cargo.toml | 15 ++ examples/tls-echo-server/README.md | 23 ++ examples/tls-echo-server/end.cert | 24 ++ examples/tls-echo-server/end.rsa | 27 +++ examples/tls-echo-server/src/main.rs | 94 ++++++++ schema/message.proto | 8 + src/lib.rs | 85 +++++++ src/reader.rs | 124 ++++++++++ src/schema/message.rs | 216 ++++++++++++++++++ src/schema/mod.rs | 16 ++ src/tcp/client.rs | 37 +++ src/tcp/mod.rs | 5 + src/tcp/server.rs | 37 +++ src/tls/client.rs | 76 ++++++ src/tls/mod.rs | 8 + src/tls/server.rs | 135 +++++++++++ src/writer.rs | 111 +++++++++ 37 files changed, 1918 insertions(+) create mode 100644 Cargo.toml create mode 100644 README.md create mode 100644 build.rs create mode 100644 examples/tcp-client/Cargo.toml create mode 100644 examples/tcp-client/README.md create mode 100644 examples/tcp-client/build.rs create mode 100644 examples/tcp-client/schema/hello_world.proto create mode 100644 examples/tcp-client/src/main.rs create mode 100644 examples/tcp-client/src/schema/hello_world.rs create mode 100644 examples/tcp-client/src/schema/mod.rs create mode 100644 examples/tcp-echo-server/Cargo.toml create mode 100644 examples/tcp-echo-server/README.md create mode 100644 examples/tcp-echo-server/build.rs create mode 100644 examples/tcp-echo-server/src/main.rs create mode 100644 examples/tcp-echo-server/src/schema/hello_world.rs create mode 100644 examples/tcp-echo-server/src/schema/mod.rs create mode 100644 examples/tls-client/Cargo.toml create mode 100644 examples/tls-client/README.md create mode 100644 examples/tls-client/end.chain create mode 100644 examples/tls-client/src/main.rs create mode 100644 examples/tls-echo-server/Cargo.toml create mode 100644 examples/tls-echo-server/README.md create mode 100644 examples/tls-echo-server/end.cert create mode 100644 examples/tls-echo-server/end.rsa create mode 100644 examples/tls-echo-server/src/main.rs create mode 100644 schema/message.proto create mode 100644 src/lib.rs create mode 100644 src/reader.rs create mode 100644 src/schema/message.rs create mode 100644 src/schema/mod.rs create mode 100644 src/tcp/client.rs create mode 100644 src/tcp/mod.rs create mode 100644 src/tcp/server.rs create mode 100644 src/tls/client.rs create mode 100644 src/tls/mod.rs create mode 100644 src/tls/server.rs create mode 100644 src/writer.rs diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..f5bf2a0 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "connect" +version = "0.0.2" +authors = ["Sachandhan Ganesh "] +edition = "2018" + +[dependencies] +anyhow = "1.0.31" +async-channel = "1.4.0" +async-std = { version = "1.6.2", features = ["tokio02", "unstable"] } +async-tls = { version = "0.9.0", default-features = false, features = ["client", "server"]} +async-trait = "0.1.39" +bytes = "0.5.5" +dashmap = "3.11.10" +futures = "0.3.8" +log = "0.4" +protobuf = "2.18.1" +rustls = "0.18.0" + +[build-dependencies] +protobuf-codegen-pure = "2.18.1" diff --git a/README.md b/README.md new file mode 100644 index 0000000..224f95d --- /dev/null +++ b/README.md @@ -0,0 +1,34 @@ +# connect + +This crate provides a distributed message queue abstraction over asynchronous network streams. + +By using a message queue, crate users can focus on sending and receiving messages between clients instead of low-level networking and failure recovery. + +## Future Goals + +- Documentation +- Connection pool (+ accounting for ordering of messages) +- Configurable policies for handling of non-registered (unexpected) message types +- Testing +- Benchmarking + +## Feature Status + +| Feature | Status | +|----------------------------------------------------- |-------- | +| [TCP Client](examples/tcp-client) | ✓ | +| [TCP Server](examples/tcp-echo-server) | ✓ | +| UDP Client | | +| UDP Server | | +| [TLS Client](examples/tls-client) | ✓ | +| [TLS Server](examples/tls-echo-server) | ✓ | +| QUIC Client | | +| QUIC Server | | +| SCTP Client | | +| SCTP Server | | +| DTLS-SCTP Client | | +| DTLS-SCTP Server | | +| Kafka Client | | +| RMQ Client | | +| SQS Client | | +| NSQ Client | | diff --git a/build.rs b/build.rs new file mode 100644 index 0000000..efa836d --- /dev/null +++ b/build.rs @@ -0,0 +1,9 @@ +fn main() -> Result<(), Box> { + protobuf_codegen_pure::Codegen::new() + .out_dir("src/schema") + .inputs(&["schema/message.proto"]) + .include("schema") + .run() + .expect("Codegen failed."); + Ok(()) +} diff --git a/examples/tcp-client/Cargo.toml b/examples/tcp-client/Cargo.toml new file mode 100644 index 0000000..1449e16 --- /dev/null +++ b/examples/tcp-client/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "tcp-client" +version = "0.1.0" +authors = ["Sachandhan Ganesh "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1.0.31" +async-std = { version = "1.6.2", features = ["attributes"] } +env_logger = "0.7" +log = "0.4" +protobuf = "2.18.1" + +stitch-net = { path = "../../" } + +[build-dependencies] +protobuf-codegen-pure = "2.18.1" \ No newline at end of file diff --git a/examples/tcp-client/README.md b/examples/tcp-client/README.md new file mode 100644 index 0000000..b25cbba --- /dev/null +++ b/examples/tcp-client/README.md @@ -0,0 +1,21 @@ +# seam-channel tcp-client example + +This example program will: + +1. Establish a connection with a TCP server +2. Send a `String` message to the server +3. Wait for a `String` message reply from the server + +## Usage + +``` +export RUST_LOG=info +cargo run +``` + +## Example Usage + +``` +export RUST_LOG=info +cargo run localhost:5678 +``` \ No newline at end of file diff --git a/examples/tcp-client/build.rs b/examples/tcp-client/build.rs new file mode 100644 index 0000000..aa2ff90 --- /dev/null +++ b/examples/tcp-client/build.rs @@ -0,0 +1,9 @@ +fn main() -> Result<(), Box> { + protobuf_codegen_pure::Codegen::new() + .out_dir("src/schema") + .inputs(&["schema/hello_world.proto"]) + .include("schema") + .run() + .expect("Codegen failed."); + Ok(()) +} diff --git a/examples/tcp-client/schema/hello_world.proto b/examples/tcp-client/schema/hello_world.proto new file mode 100644 index 0000000..5ffe7b2 --- /dev/null +++ b/examples/tcp-client/schema/hello_world.proto @@ -0,0 +1,6 @@ +syntax = "proto3"; +package hello_world; + +message HelloWorld { + string message = 1; +} \ No newline at end of file diff --git a/examples/tcp-client/src/main.rs b/examples/tcp-client/src/main.rs new file mode 100644 index 0000000..a5de8e5 --- /dev/null +++ b/examples/tcp-client/src/main.rs @@ -0,0 +1,45 @@ +pub mod schema; + +use crate::schema::hello_world::HelloWorld; +use log::*; +use protobuf::well_known_types::Any; +use std::env; +use stitch_net::{SinkExt, StitchConnection, StreamExt}; + +#[async_std::main] +async fn main() -> anyhow::Result<()> { + env_logger::init(); + + // Get ip address from cmd line args + let args: Vec = env::args().collect(); + let ip_address = match args.get(1) { + Some(addr) => addr, + None => { + error!("Need to pass IP address to connect to as command line argument"); + panic!(); + } + }; + + // create a client connection to the server + let mut conn = StitchConnection::tcp_client(ip_address)?; + + // send a message to the server + let raw_msg = String::from("Hello world"); + info!("Sending message: {}", raw_msg); + + let mut msg = HelloWorld::new(); + msg.set_message(raw_msg); + + conn.writer().send(msg).await?; + + // wait for the server to reply with an ack + while let Some(reply) = conn.reader().next().await { + info!("Received message"); + + let msg: HelloWorld = Any::unpack(&reply)?.unwrap(); + + info!("Unpacked reply: {}", msg.get_message()); + } + + Ok(()) +} diff --git a/examples/tcp-client/src/schema/hello_world.rs b/examples/tcp-client/src/schema/hello_world.rs new file mode 100644 index 0000000..5af4935 --- /dev/null +++ b/examples/tcp-client/src/schema/hello_world.rs @@ -0,0 +1,200 @@ +// This file is generated by rust-protobuf 2.19.0. Do not edit +// @generated + +// https://github.com/rust-lang/rust-clippy/issues/702 +#![allow(unknown_lints)] +#![allow(clippy::all)] + +#![allow(unused_attributes)] +#![rustfmt::skip] + +#![allow(box_pointers)] +#![allow(dead_code)] +#![allow(missing_docs)] +#![allow(non_camel_case_types)] +#![allow(non_snake_case)] +#![allow(non_upper_case_globals)] +#![allow(trivial_casts)] +#![allow(unused_imports)] +#![allow(unused_results)] +//! Generated file from `hello_world.proto` + +/// Generated files are compatible only with the same version +/// of protobuf runtime. +// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_19_0; + +#[derive(PartialEq,Clone,Default)] +pub struct HelloWorld { + // message fields + pub message: ::std::string::String, + // special fields + pub unknown_fields: ::protobuf::UnknownFields, + pub cached_size: ::protobuf::CachedSize, +} + +impl<'a> ::std::default::Default for &'a HelloWorld { + fn default() -> &'a HelloWorld { + ::default_instance() + } +} + +impl HelloWorld { + pub fn new() -> HelloWorld { + ::std::default::Default::default() + } + + // string message = 1; + + + pub fn get_message(&self) -> &str { + &self.message + } + pub fn clear_message(&mut self) { + self.message.clear(); + } + + // Param is passed by value, moved + pub fn set_message(&mut self, v: ::std::string::String) { + self.message = v; + } + + // Mutable pointer to the field. + // If field is not initialized, it is initialized with default value first. + pub fn mut_message(&mut self) -> &mut ::std::string::String { + &mut self.message + } + + // Take field + pub fn take_message(&mut self) -> ::std::string::String { + ::std::mem::replace(&mut self.message, ::std::string::String::new()) + } +} + +impl ::protobuf::Message for HelloWorld { + fn is_initialized(&self) -> bool { + true + } + + fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> { + while !is.eof()? { + let (field_number, wire_type) = is.read_tag_unpack()?; + match field_number { + 1 => { + ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.message)?; + }, + _ => { + ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; + }, + }; + } + ::std::result::Result::Ok(()) + } + + // Compute sizes of nested messages + #[allow(unused_variables)] + fn compute_size(&self) -> u32 { + let mut my_size = 0; + if !self.message.is_empty() { + my_size += ::protobuf::rt::string_size(1, &self.message); + } + my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); + self.cached_size.set(my_size); + my_size + } + + fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> { + if !self.message.is_empty() { + os.write_string(1, &self.message)?; + } + os.write_unknown_fields(self.get_unknown_fields())?; + ::std::result::Result::Ok(()) + } + + fn get_cached_size(&self) -> u32 { + self.cached_size.get() + } + + fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { + &self.unknown_fields + } + + fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { + &mut self.unknown_fields + } + + fn as_any(&self) -> &dyn (::std::any::Any) { + self as &dyn (::std::any::Any) + } + fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) { + self as &mut dyn (::std::any::Any) + } + fn into_any(self: ::std::boxed::Box) -> ::std::boxed::Box { + self + } + + fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { + Self::descriptor_static() + } + + fn new() -> HelloWorld { + HelloWorld::new() + } + + fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor { + static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT; + descriptor.get(|| { + let mut fields = ::std::vec::Vec::new(); + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( + "message", + |m: &HelloWorld| { &m.message }, + |m: &mut HelloWorld| { &mut m.message }, + )); + ::protobuf::reflect::MessageDescriptor::new_pb_name::( + "HelloWorld", + fields, + file_descriptor_proto() + ) + }) + } + + fn default_instance() -> &'static HelloWorld { + static instance: ::protobuf::rt::LazyV2 = ::protobuf::rt::LazyV2::INIT; + instance.get(HelloWorld::new) + } +} + +impl ::protobuf::Clear for HelloWorld { + fn clear(&mut self) { + self.message.clear(); + self.unknown_fields.clear(); + } +} + +impl ::std::fmt::Debug for HelloWorld { + fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { + ::protobuf::text_format::fmt(self, f) + } +} + +impl ::protobuf::reflect::ProtobufValue for HelloWorld { + fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { + ::protobuf::reflect::ReflectValueRef::Message(self) + } +} + +static file_descriptor_proto_data: &'static [u8] = b"\ + \n\x11hello_world.proto\x12\x0bhello_world\"*\n\nHelloWorld\x12\x1a\n\ + \x07message\x18\x01\x20\x01(\tR\x07messageB\0:\0B\0b\x06proto3\ +"; + +static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; + +fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto { + ::protobuf::Message::parse_from_bytes(file_descriptor_proto_data).unwrap() +} + +pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto { + file_descriptor_proto_lazy.get(|| { + parse_descriptor_proto() + }) +} diff --git a/examples/tcp-client/src/schema/mod.rs b/examples/tcp-client/src/schema/mod.rs new file mode 100644 index 0000000..c6dbc18 --- /dev/null +++ b/examples/tcp-client/src/schema/mod.rs @@ -0,0 +1 @@ +pub mod hello_world; diff --git a/examples/tcp-echo-server/Cargo.toml b/examples/tcp-echo-server/Cargo.toml new file mode 100644 index 0000000..d44763c --- /dev/null +++ b/examples/tcp-echo-server/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "tcp-echo-server" +version = "0.1.0" +authors = ["Sachandhan Ganesh "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1.0.31" +async-std = { version = "1.6.2", features = ["attributes"] } +env_logger = "0.7" +log = "0.4" +protobuf = "2.18.1" + +stitch-net = { path = "../../" } + +[build-dependencies] +protobuf-codegen-pure = "2.18.1" \ No newline at end of file diff --git a/examples/tcp-echo-server/README.md b/examples/tcp-echo-server/README.md new file mode 100644 index 0000000..72695ae --- /dev/null +++ b/examples/tcp-echo-server/README.md @@ -0,0 +1,23 @@ +# seam-channel tcp-echo-server example + +This example program will: + +1. Bind to an IP address +2. Accept any number of TCP connections +3. Handle each connection by: + 1. Waiting for `String` messages to be received + 2. Echoing the `String` message back to the source + +## Usage + +``` +export RUST_LOG=info +cargo run +``` + +## Example Usage + +``` +export RUST_LOG=info +cargo run localhost:5678 +``` \ No newline at end of file diff --git a/examples/tcp-echo-server/build.rs b/examples/tcp-echo-server/build.rs new file mode 100644 index 0000000..ae9222f --- /dev/null +++ b/examples/tcp-echo-server/build.rs @@ -0,0 +1,9 @@ +fn main() -> Result<(), Box> { + protobuf_codegen_pure::Codegen::new() + .out_dir("src/schema") + .inputs(&["../tcp-client/schema/hello_world.proto"]) + .include("../tcp-client/schema") + .run() + .expect("Codegen failed."); + Ok(()) +} diff --git a/examples/tcp-echo-server/src/main.rs b/examples/tcp-echo-server/src/main.rs new file mode 100644 index 0000000..376ab0c --- /dev/null +++ b/examples/tcp-echo-server/src/main.rs @@ -0,0 +1,57 @@ +mod schema; + +use crate::schema::hello_world::HelloWorld; +use async_std::task; +use log::*; +use std::env; +use stitch_net::tcp::StitchTcpServer; +use stitch_net::{SinkExt, StreamExt}; + +#[async_std::main] +async fn main() -> anyhow::Result<()> { + env_logger::init(); + + // Get ip address from cmd line args + let args: Vec = env::args().collect(); + + let ip_address = match args.get(1) { + Some(addr) => addr, + None => { + error!("Need to pass IP address to bind to as command line argument"); + panic!(); + } + }; + + // create a server + let mut server = StitchTcpServer::new(ip_address)?; + + // handle server connections + // wait for a connection to come in and be accepted + while let Some(mut conn) = server.next().await { + info!("Handling connection from {}", conn.peer_addr()); + + task::spawn(async move { + while let Some(msg) = conn.reader().next().await { + if msg.is::() { + if let Ok(Some(contents)) = msg.unpack::() { + info!( + "Received a message \"{}\" from {}", + contents.get_message(), + conn.peer_addr() + ); + + conn.writer() + .send(contents) + .await + .expect("Could not send message back to source connection"); + info!("Sent message back to original sender"); + } + } else { + error!("Received a message of unknown type") + } + } + }); + } + + Ok(()) +} diff --git a/examples/tcp-echo-server/src/schema/hello_world.rs b/examples/tcp-echo-server/src/schema/hello_world.rs new file mode 100644 index 0000000..5af4935 --- /dev/null +++ b/examples/tcp-echo-server/src/schema/hello_world.rs @@ -0,0 +1,200 @@ +// This file is generated by rust-protobuf 2.19.0. Do not edit +// @generated + +// https://github.com/rust-lang/rust-clippy/issues/702 +#![allow(unknown_lints)] +#![allow(clippy::all)] + +#![allow(unused_attributes)] +#![rustfmt::skip] + +#![allow(box_pointers)] +#![allow(dead_code)] +#![allow(missing_docs)] +#![allow(non_camel_case_types)] +#![allow(non_snake_case)] +#![allow(non_upper_case_globals)] +#![allow(trivial_casts)] +#![allow(unused_imports)] +#![allow(unused_results)] +//! Generated file from `hello_world.proto` + +/// Generated files are compatible only with the same version +/// of protobuf runtime. +// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_19_0; + +#[derive(PartialEq,Clone,Default)] +pub struct HelloWorld { + // message fields + pub message: ::std::string::String, + // special fields + pub unknown_fields: ::protobuf::UnknownFields, + pub cached_size: ::protobuf::CachedSize, +} + +impl<'a> ::std::default::Default for &'a HelloWorld { + fn default() -> &'a HelloWorld { + ::default_instance() + } +} + +impl HelloWorld { + pub fn new() -> HelloWorld { + ::std::default::Default::default() + } + + // string message = 1; + + + pub fn get_message(&self) -> &str { + &self.message + } + pub fn clear_message(&mut self) { + self.message.clear(); + } + + // Param is passed by value, moved + pub fn set_message(&mut self, v: ::std::string::String) { + self.message = v; + } + + // Mutable pointer to the field. + // If field is not initialized, it is initialized with default value first. + pub fn mut_message(&mut self) -> &mut ::std::string::String { + &mut self.message + } + + // Take field + pub fn take_message(&mut self) -> ::std::string::String { + ::std::mem::replace(&mut self.message, ::std::string::String::new()) + } +} + +impl ::protobuf::Message for HelloWorld { + fn is_initialized(&self) -> bool { + true + } + + fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> { + while !is.eof()? { + let (field_number, wire_type) = is.read_tag_unpack()?; + match field_number { + 1 => { + ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.message)?; + }, + _ => { + ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; + }, + }; + } + ::std::result::Result::Ok(()) + } + + // Compute sizes of nested messages + #[allow(unused_variables)] + fn compute_size(&self) -> u32 { + let mut my_size = 0; + if !self.message.is_empty() { + my_size += ::protobuf::rt::string_size(1, &self.message); + } + my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); + self.cached_size.set(my_size); + my_size + } + + fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> { + if !self.message.is_empty() { + os.write_string(1, &self.message)?; + } + os.write_unknown_fields(self.get_unknown_fields())?; + ::std::result::Result::Ok(()) + } + + fn get_cached_size(&self) -> u32 { + self.cached_size.get() + } + + fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { + &self.unknown_fields + } + + fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { + &mut self.unknown_fields + } + + fn as_any(&self) -> &dyn (::std::any::Any) { + self as &dyn (::std::any::Any) + } + fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) { + self as &mut dyn (::std::any::Any) + } + fn into_any(self: ::std::boxed::Box) -> ::std::boxed::Box { + self + } + + fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { + Self::descriptor_static() + } + + fn new() -> HelloWorld { + HelloWorld::new() + } + + fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor { + static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT; + descriptor.get(|| { + let mut fields = ::std::vec::Vec::new(); + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( + "message", + |m: &HelloWorld| { &m.message }, + |m: &mut HelloWorld| { &mut m.message }, + )); + ::protobuf::reflect::MessageDescriptor::new_pb_name::( + "HelloWorld", + fields, + file_descriptor_proto() + ) + }) + } + + fn default_instance() -> &'static HelloWorld { + static instance: ::protobuf::rt::LazyV2 = ::protobuf::rt::LazyV2::INIT; + instance.get(HelloWorld::new) + } +} + +impl ::protobuf::Clear for HelloWorld { + fn clear(&mut self) { + self.message.clear(); + self.unknown_fields.clear(); + } +} + +impl ::std::fmt::Debug for HelloWorld { + fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { + ::protobuf::text_format::fmt(self, f) + } +} + +impl ::protobuf::reflect::ProtobufValue for HelloWorld { + fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { + ::protobuf::reflect::ReflectValueRef::Message(self) + } +} + +static file_descriptor_proto_data: &'static [u8] = b"\ + \n\x11hello_world.proto\x12\x0bhello_world\"*\n\nHelloWorld\x12\x1a\n\ + \x07message\x18\x01\x20\x01(\tR\x07messageB\0:\0B\0b\x06proto3\ +"; + +static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; + +fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto { + ::protobuf::Message::parse_from_bytes(file_descriptor_proto_data).unwrap() +} + +pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto { + file_descriptor_proto_lazy.get(|| { + parse_descriptor_proto() + }) +} diff --git a/examples/tcp-echo-server/src/schema/mod.rs b/examples/tcp-echo-server/src/schema/mod.rs new file mode 100644 index 0000000..c6dbc18 --- /dev/null +++ b/examples/tcp-echo-server/src/schema/mod.rs @@ -0,0 +1 @@ +pub mod hello_world; diff --git a/examples/tls-client/Cargo.toml b/examples/tls-client/Cargo.toml new file mode 100644 index 0000000..f952662 --- /dev/null +++ b/examples/tls-client/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "tls-client" +version = "0.1.0" +authors = ["Sachandhan Ganesh "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1.0.31" +async-std = { version = "1.6.2", features = ["attributes"] } +env_logger = "0.7" +log = "0.4" + +seam-channel = { path = "../../" } \ No newline at end of file diff --git a/examples/tls-client/README.md b/examples/tls-client/README.md new file mode 100644 index 0000000..a55c0b8 --- /dev/null +++ b/examples/tls-client/README.md @@ -0,0 +1,21 @@ +# seam-channel tls-client example + +This example program will: + +1. Establish a secure connection with a TLS server +2. Send a `String` message to the server +3. Wait for a `String` message reply from the server + +## Usage + +``` +export RUST_LOG=info +cargo run +``` + +## Example Usage + +``` +export RUST_LOG=info +cargo run 127.0.0.1:5678 localhost end.chain +``` \ No newline at end of file diff --git a/examples/tls-client/end.chain b/examples/tls-client/end.chain new file mode 100644 index 0000000..7c39013 --- /dev/null +++ b/examples/tls-client/end.chain @@ -0,0 +1,89 @@ +-----BEGIN CERTIFICATE----- +MIIGnzCCAoegAwIBAgIBezANBgkqhkiG9w0BAQsFADAaMRgwFgYDVQQDDA9wb255 +dG93biBSU0EgQ0EwHhcNMTYxMjEwMTc0MjMzWhcNMjYxMjA4MTc0MjMzWjAsMSow +KAYDVQQDDCFwb255dG93biBSU0EgbGV2ZWwgMiBpbnRlcm1lZGlhdGUwggGiMA0G +CSqGSIb3DQEBAQUAA4IBjwAwggGKAoIBgQDnfb7vaJbaHEyVTflswWhmHqx5W0NO +KyKbDp2zXEJwDO+NDJq6i1HGnFd/vO4LyjJBU1wUsKtE+m55cfRmUHVuZ2w4n/VF +p7Z7n+SNuvJNcrzDxyKVy4GIZ39zQePnniqtLqXh6eI8Ow6jiMgVxC/wbWcVLKv6 +4RM+2fLjJAC9b27QfjhOlMKVeMOEvPrrpjLSauaHAktQPhuzIAwzxM0+KnvDkWWy +NVqAV/lq6fSO/9vJRhM4E2nxo6yqi7qTdxVxMmKsNn7L6HvjQgx+FXziAUs55Qd9 +cP7etCmPmoefkcgdbxDOIKH8D+DvfacZwngqcnr/q96Ff4uJ13d2OzR1mWVSZ2hE +JQt/BbZBANciqu9OZf3dj6uOOXgFF705ak0GfLtpZpc29M+fVnknXPDSiKFqjzOO +KL+SRGyuNc9ZYjBKkXPJ1OToAs6JSvgDxfOfX0thuo2rslqfpj2qCFugsRIRAqvb +eyFwg+BPM/P/EfauXlAcQtBF04fOi7xN2okCAwEAAaNeMFwwHQYDVR0OBBYEFNwu +Py4Do//Sm5CZDrocHWTrNr96MCAGA1UdJQEB/wQWMBQGCCsGAQUFBwMBBggrBgEF +BQcDAjAMBgNVHRMEBTADAQH/MAsGA1UdDwQEAwIB/jANBgkqhkiG9w0BAQsFAAOC +BAEAMHZpBqDIUAVFZNw4XbuimXQ4K8q4uePrLGHLb4F/gHbr8kYrU4H+cy4l+xXf +2dlEBdZoqjSF7uXzQg5Fd8Ff3ZgutXd1xeUJnxo0VdpKIhqeaTPqhffC2X6FQQH5 +KrN7NVWQSnUhPNpBFELpmdpY1lHigFW7nytYj0C6VJ4QsbqhfW+n/t+Zgqtfh/Od +ZbclzxFwMM55zRA2HP6IwXS2+d61Jk/RpDHTzhWdjGH4906zGNNMa7slHpCTA9Ju +TrtjEAGt2PBSievBJOHZW80KVAoEX2n9B3ZABaz+uX0VVZG0D2FwhPpUeA57YiXu +qiktZR4Ankph3LabXp4IlAX16qpYsEW8TWE/HLreeqoM0WDoI6rF9qnTpV2KWqBf +ziMYkfSkT7hQ2bWc493lW+QwSxCsuBsDwlrCwAl6jFSf1+jEQx98/8n9rDNyD9dL +PvECmtF30WY98nwZ9/kO2DufQrd0mwSHcIT0pAwl5fimpkwTjj+TTbytO3M4jK5L +tuIzsViQ95BmJQ3XuLdkQ/Ug8rpECYRX5fQX1qXkkvl920ohpKqKyEji1OmfmJ0Z +tZChaEcu3Mp3U+gD4az2ogmle3i/Phz8ZEPFo4/21G5Qd72z0lBgaQIeyyCk5MHt +Yg0vA7X0/w4bz+OJv5tf7zJsPCYSprr+c/7YUJk9Fqu6+g9ZAavI99xFKdGhz4Og +w0trnKNCxYc6+NPopTDbXuY+fo4DK7C0CSae5sKs7013Ne6w4KvgfLKpvlemkGfg +ZA3+1FMXVfFIEH7Cw9cx6F02Sr3k1VrU68oM3wH5nvTUkELOf8nRMlzliQjVCpKB +yFSe9dzRVSFEbMDxChiEulGgNUHj/6wwpg0ZmCwPRHutppT3jkfEqizN5iHb69GH +k6kol6knJofkaL656Q3Oc9o0ZrMlFh1RwmOvAk5fVK0/CV88/phROz2Wdmy5Bz4a +t0vzqFWA54y6+9EEVoOk9SU0CYfpGtpX4URjLK1EUG/l+RR3366Uee6TPrtEZ9cg +56VQMxhSaRNAvJ6DfiSuscSCNJzwuXaMXSZydGYnnP9Tb9p6c1uy1sXdluZkBIcK +CgC+gdDMSNlDn9ghc4xZGkuA8bjzfAYuRuGKmfTt8uuklkjw2b9w3SHjC4/Cmd2W +cFRnzfg2oL6e78hNg2ZGgsLzvb6Lu6/5IhXCO7RitzYf2+HLBbc+YLFsnG3qeGe1 +28yGnXOQd97Cr4+IzFucVy/33gMQkesNUSDFJSq1gE/hGrMgTTMQJ7yC3PRqg0kG +tpqTyKNdM0g1adxlR1qfDPvpUBApkgBbySnMyWEr5+tBuoHUtH2m49oV9YD4odMJ +yJjlGxituO/YNN6O8oANlraG1Q== +-----END CERTIFICATE----- +-----BEGIN CERTIFICATE----- +MIIJBzCCBO+gAwIBAgIJAN7WS1mRS9A+MA0GCSqGSIb3DQEBCwUAMBoxGDAWBgNV +BAMMD3Bvbnl0b3duIFJTQSBDQTAeFw0xNjEyMTAxNzQyMzNaFw0yNjEyMDgxNzQy +MzNaMBoxGDAWBgNVBAMMD3Bvbnl0b3duIFJTQSBDQTCCBCIwDQYJKoZIhvcNAQEB +BQADggQPADCCBAoCggQBAMNEzJ7aNdD2JSk9+NF9Hh2za9OQnt1d/7j6DtE3ieoT +ms8mMSXzoImXZayZ9Glx3yx/RhEb2vmINyb0vRUM4I/GH+XHdOBcs9kaJNv/Mpw4 +Ggd4e1LUqV1pzNrhYwRrTQTKyaDiDX2WEBNfQaaYnHltmSmsfyt3Klj+IMc6CyqV +q8SOQ6Go414Vn++Jj7p3E6owdwuvSvO8ERLobiA6vYB+qrS7E48c4zRIAFIO4uwt +g4TiCJLLWc1fRSoqGGX7KS+LzQF8Pq67IOHVna4e9peSe6nQnm0LQZAmaosYHvF4 +AX0Bj6TLv9PXCAGtB7Pciev5Br0tRZEdVyYfmwiVKUWcp77TghV3W+VaJVhPh5LN +X91ktvpeYek3uglqv2ZHtSG2S1KkBtTkbMOD+a2BEUfq0c0+BIsj6jdvt4cvIfet +4gUOxCvYMBs4/dmNT1zoe/kJ0lf8YXYLsXwVWdIW3jEE8QdkLtLI9XfyU9OKLZuD +mmoAf7ezvv/T3nKLFqhcwUFGgGtCIX+oWC16XSbDPBcKDBwNZn8C49b7BLdxqAg3 +msfxwhYzSs9F1MXt/h2dh7FVmkCSxtgNDX3NJn5/yT6USws2y0AS5vXVP9hRf0NV +KfKn9XlmHCxnZExwm68uZkUUYHB05jSWFojbfWE+Mf9djUeQ4FuwusztZdbyQ4yS +mMtBXO0I6SQBmjCoOa1ySW3DTuw/eKCfq+PoxqWD434bYA9nUa+pE27MP7GLyjCS +6+ED3MACizSF0YxkcC9pWUo4L5FKp+DxnNbtzMIILnsDZTVHOvKUy/gjTyTWm/+7 +2t98l7vBE8gn3Aux0V5WFe2uZIZ07wIi/OThoBO8mpt9Bm5cJTG07JStKEXX/UH1 +nL7cDZ2V5qbf4hJdDy4qixxxIZtmf//1BRlVQ9iYTOsMoy+36DXWbc3vSmjRefW1 +YENt4zxOPe4LUq2Z+LXq1OgVQrHrVevux0vieys7Rr2gA1sH8FaaNwTr7Q8dq+Av +Evk+iOUH4FuYorU1HuGHPkAkvLWosVwlB+VhfEai0V6+PmttmaOnCJNHfFTu5wCu +B9CFJ1tdzTzAbrLwgtWmO70KV7CfZPHO7lMWhSvplU0i5T9WytxP91IoFtXwRSO8 ++Ghyu0ynB3HywCH2dez89Vy903P6PEU0qTnYWRz6D/wi5+yHHNrm9CilWurs/Qex +kyB7lLD7Cb1JJc8QIFTqT6vj+cids3xd245hUdpFyZTX99YbF6IkiB2zGi5wvUmP +f1GPvkTLb7eF7bne9OClEjEqvc0hVJ2abO2WXkqxlQFEYZHNofm+y6bnby/BZZJo +beaSFcLOCe2Z8iZvVnzfHBCeLyWE89gc94z784S3LEsCAwEAAaNQME4wHQYDVR0O +BBYEFNz2wEPCQbx9OdRCNE4eALwHJfIgMB8GA1UdIwQYMBaAFNz2wEPCQbx9OdRC +NE4eALwHJfIgMAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQELBQADggQBACbm2YX7 +sBG0Aslj36gmVlCTTluNg2tuK2isHbK3YhNwujrH/o/o2OV7UeUkZkPwE4g4/SjC +OwDWYniRNyDKBOeD9Q0XxR5z5IZQO+pRVvXF8DXO6kygWCOJM9XheKxp9Uke0aDg +m8F02NslKLUdy7piGlLSz1sgdjiE3izIwFZRpZY7sMozNWWvSAmzprbkE78LghIm +VEydQzIQlr5soWqc65uFLNbEA6QBPoFc6dDW+mnzXf8nrZUM03CACxAsuq/YkjRp +OHgwgfdNRdlu4YhZtuQNak4BUvDmigTGxDC+aMJw0ldL1bLtqLG6BvQbyLNPOOfo +5S8lGh4y06gb//052xHaqtCh5Ax5sHUE5By6wKHAKbuJy26qyKfaRoc3Jigs4Fd5 +3CuoDWHbyXfkgKiU+sc+1mvCxQKFRJ2fpGEFP8iEcLvdUae7ZkRM4Kb0vST+QhQV +fDaFkM3Bwqtui5YaZ6cHHQVyXQdujCmfesoZXKil2yduQ3KWgePjewzRV+aDWMzk +qKaF+TRANSqWbBU6JTwwQ4veKQThU3ir7nS2ovdPbhNS/FnWoKodj6eaqXfdYuBh +XOXLewIF568MJsLOuBubeAO2a9LOlhnv6eLGp2P4M7vwEdN/LRRQtwBBmqq8C3h+ +ewrJP12B/ag0bJDi9vCgPhYtDEpjpfsnxZEIqVZwshJ/MqXykFp2kYk62ylyfDWq +veI/aHwpzT2k+4CI/XmPWXl9NlI50HPdpcwCBDy8xVHwb/x7stNgQdIhaj9tzmKa +S+eqitclc8Iqrbd523H//QDzm8yiqRZUdveNa9gioTMErR0ujCpK8tO8mVZcVfNX +i1/Vsar5++nXcPhxKsd1t8XV2dk3gUZIfMgzLLzs+KSiFg+bT3c7LkCd+I3w30Iv +fh9cxFBAyYO9giwxaCfJgoz7OYqaHOOtASF85UV7gK9ELT7/z+RAcS/UfY1xbd54 +hIi1vRZj8lfkAYNtnYlud44joi1BvW/GZGFCiJ13SSvfHNs9v/5xguyCSgyCc0qx +ZkN/fzj/5wFQbxSl3MPn/JrsvlH6wvJht1SA50uVdUvJ5e5V8EgLYfMqlJNNpTHP +wZcHF+Dw126oyu2KhUxD126Gusxp+tV6I0EEZnVwwduFQWq9xm/gT+qohpveeylf +Q2XGz56DF2udJJnSFGSqzQOl9XopNC/4ecBMwIzqdFSpaWgK3VNAcigyDajgoE4v +ZuiVDEiLhLowZvi1V8GOWzcka7R2BQBjhOLWByQGDcm8cOMS7w8oCSQCaYmJyHvE +tTHq7fX6/sXv0AJqM3ysSdU01IVBNahnr5WEkmQMaFF0DGvRfqkVdKcChwrKv7r2 +DLxargy39i2aQGg= +-----END CERTIFICATE----- diff --git a/examples/tls-client/src/main.rs b/examples/tls-client/src/main.rs new file mode 100644 index 0000000..6d57a35 --- /dev/null +++ b/examples/tls-client/src/main.rs @@ -0,0 +1,78 @@ +use log::*; +use seam_channel::net::tls::rustls::ClientConfig; +use seam_channel::net::{StitchClient, StitchNetClient}; +use std::env; + +#[async_std::main] +async fn main() -> anyhow::Result<()> { + env_logger::init(); + + // get ip address and domain from cmd line args + let (ip_addr, domain, cafile_path) = parse_args(); + + // construct `rustls` client config + let cafile = std::fs::read(cafile_path)?; + + let mut client_pem = std::io::Cursor::new(cafile); + + let mut client_config = ClientConfig::new(); + client_config + .root_store + .add_pem_file(&mut client_pem) + .map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidInput, "invalid cert"))?; + + // create a client connection to the server + let dist_chan = StitchNetClient::tls_client(ip_addr, &domain, client_config.into())?; + + // create a channel for String messages on the TCP connection + let (sender, receiver) = dist_chan.bounded::(Some(100)); + + // alert the connection that you are ready to read and write messages + dist_chan.ready()?; + + // send a message to the server + let msg = String::from("Hello world"); + info!("Sending message: {}", msg); + sender.send(msg).await?; + + // wait for the server to reply with an ack + if let Ok(msg) = receiver.recv().await { + info!("Received reply: {}", msg); + } + + Ok(()) +} + +fn parse_args() -> (String, String, String) { + let args: Vec = env::args().collect(); + + let ip_address = match args.get(1) { + Some(addr) => addr, + None => { + error!("Need to pass IP address to connect to as first command line argument"); + panic!(); + } + }; + + let domain = match args.get(2) { + Some(d) => d, + None => { + error!("Need to pass domain name as second command line argument"); + panic!(); + } + }; + + let cafile_path = match args.get(3) { + Some(d) => d, + None => { + error!("Need to pass path to cafile as third command line argument"); + panic!(); + } + }; + + ( + ip_address.to_string(), + domain.to_string(), + cafile_path.to_string(), + ) +} diff --git a/examples/tls-echo-server/Cargo.toml b/examples/tls-echo-server/Cargo.toml new file mode 100644 index 0000000..31bff03 --- /dev/null +++ b/examples/tls-echo-server/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "tls-echo-server" +version = "0.1.0" +authors = ["Sachandhan Ganesh "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1.0.31" +async-std = { version = "1.6.2", features = ["attributes"] } +env_logger = "0.7" +log = "0.4" + +seam-channel = { path = "../../" } \ No newline at end of file diff --git a/examples/tls-echo-server/README.md b/examples/tls-echo-server/README.md new file mode 100644 index 0000000..eeafd03 --- /dev/null +++ b/examples/tls-echo-server/README.md @@ -0,0 +1,23 @@ +# seam-channel tls-echo-server example + +This example program will: + +1. Bind to an IP address +2. Accept any number of secure TLS connections +3. Handle each connection by: + 1. Waiting for `String` messages to be received + 2. Echoing the `String` message back to the source + +## Usage + +``` +export RUST_LOG=info +cargo run +``` + +## Example Usage + +``` +export RUST_LOG=info +cargo run 127.0.0.1:5678 end.cert end.rsa +``` \ No newline at end of file diff --git a/examples/tls-echo-server/end.cert b/examples/tls-echo-server/end.cert new file mode 100644 index 0000000..66f087e --- /dev/null +++ b/examples/tls-echo-server/end.cert @@ -0,0 +1,24 @@ +-----BEGIN CERTIFICATE----- +MIIEADCCAmigAwIBAgICAcgwDQYJKoZIhvcNAQELBQAwLDEqMCgGA1UEAwwhcG9u +eXRvd24gUlNBIGxldmVsIDIgaW50ZXJtZWRpYXRlMB4XDTE2MTIxMDE3NDIzM1oX +DTIyMDYwMjE3NDIzM1owGTEXMBUGA1UEAwwOdGVzdHNlcnZlci5jb20wggEiMA0G +CSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQC1YDz66+7VD4DL1+/sVHMQ+BbDRgmD +OQlX++mfW8D3QNQm/qDBEbu7T7qqdc9GKDar4WIzBN8SBkzM1EjMGwNnZPV/Tfz0 +qUAR1L/7Zzf1GaFZvWXgksyUpfwvmprH3Iy/dpkETwtPthpTPNlui3hZnm/5kkjR +RWg9HmID4O04Ld6SK313v2ZgrPZbkKvbqlqhUnYWjL3blKVGbpXIsuZzEU9Ph+gH +tPcEhZpFsM6eLe+2TVscIrycMEOTXqAAmO6zZ9sQWtfllu3CElm904H6+jA/9Leg +al72pMmkYr8wWniqDDuijXuCPlVx5EDFFyxBmW18UeDEQaKV3kNfelaTAgMBAAGj +gb4wgbswDAYDVR0TAQH/BAIwADALBgNVHQ8EBAMCBsAwHQYDVR0OBBYEFIYhJkVy +AAKT6cY/ruH1Eu+NNxteMEIGA1UdIwQ7MDmAFNwuPy4Do//Sm5CZDrocHWTrNr96 +oR6kHDAaMRgwFgYDVQQDDA9wb255dG93biBSU0EgQ0GCAXswOwYDVR0RBDQwMoIO +dGVzdHNlcnZlci5jb22CFXNlY29uZC50ZXN0c2VydmVyLmNvbYIJbG9jYWxob3N0 +MA0GCSqGSIb3DQEBCwUAA4IBgQCWV76jfQDZKtfmj45fTwZzoe/PxjWPRbAvSEnt +LRHrPhqQfpMLqpun8uu/w86mHiR/AmiAySMu3zivW6wfGzlRWLi/zCyO6r9LGsgH +bNk5CF642cdZFvn1SiSm1oGXQrolIpcyXu88nUpt74RnY4ETCC1dRQKqxsYufe5T +DOmTm3ChinNW4QRG3yvW6DVuyxVAgZvofyKJOsM3GO6oogIM41aBqZ3UTwmIwp6D +oISdiATslFOzYzjnyXNR8DG8OOkv1ehWuyb8x+hQCZAuogQOWYtCSd6k3kKgd0EM +4CWbt1XDV9ZJwBf2uxZeKuCu/KIy9auNtijAwPsUv9qxuzko018zhl3lWm5p2Sqw +O7fFshU3A6df8hMw7ST6/tgFY7geT88U4iJhfWMwr/CZSRSVMXhTyJgbLIXxKYZj +Ym5v4NAIQP6hI4HixzQaYgrhW6YX6myk+emMjQLRJHT8uHvmT7fuxMJVWWgsCkr1 +C75pRQEagykN/Uzr5e6Tm8sVu88= +-----END CERTIFICATE----- diff --git a/examples/tls-echo-server/end.rsa b/examples/tls-echo-server/end.rsa new file mode 100644 index 0000000..744bba5 --- /dev/null +++ b/examples/tls-echo-server/end.rsa @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEogIBAAKCAQEAtWA8+uvu1Q+Ay9fv7FRzEPgWw0YJgzkJV/vpn1vA90DUJv6g +wRG7u0+6qnXPRig2q+FiMwTfEgZMzNRIzBsDZ2T1f0389KlAEdS/+2c39RmhWb1l +4JLMlKX8L5qax9yMv3aZBE8LT7YaUzzZbot4WZ5v+ZJI0UVoPR5iA+DtOC3ekit9 +d79mYKz2W5Cr26paoVJ2Foy925SlRm6VyLLmcxFPT4foB7T3BIWaRbDOni3vtk1b +HCK8nDBDk16gAJjus2fbEFrX5ZbtwhJZvdOB+vowP/S3oGpe9qTJpGK/MFp4qgw7 +oo17gj5VceRAxRcsQZltfFHgxEGild5DX3pWkwIDAQABAoIBAFDTazlSbGML/pRY +TTWeyIw2UkaA7npIr45C13BJfitw+1nJPK/tDCDDveZ6i3yzLPHZhV5A/HtWzWC1 +9R7nptOrnO83PNN2nPOVQFxzOe+ClXGdQkoagQp5EXHRTspj0WD9I+FUrDDAcOjJ +BAgMJPyi6zlnZAXGDVa3NGyQDoZqwU2k36L4rEsJIkG0NVurZhpiCexNkkf32495 +TOINQ0iKdfJ4iZoEYQ9G+x4NiuAJRCHuIcH76SNfT+Uv3wX0ut5EFPtflnvtdgcp +QVcoKwYdO0+mgO5xqWlBcsujSvgBdiNAGnAxKHWiEaacuIJi4+yYovyEebP6QI2X +Zg/U2wkCgYEA794dE5CPXLOmv6nioVC/ubOESk7vjSlEka/XFbKr4EY794YEqrB1 +8TUqg09Bn3396AS1e6P2shr3bxos5ybhOxDGSLnJ+aC0tRFjd1BPKnA80vZM7ggt +5cjmdD5Zp0tIQTIAAYU5bONQOwj0ej4PE7lny26eLa5vfvCwlrD+rM0CgYEAwZMN +W/5PA2A+EM08IaHic8my0dCunrNLF890ouZnDG99SbgMGvvEsGIcCP1sai702hNh +VgGDxCz6/HUy+4O4YNFVtjY7uGEpfIEcEI7CsLQRP2ggWEFxThZtnEtO8PbM3J/i +qcS6njHdE+0XuCjgZwGgva5xH2pkWFzw/AIpEN8CgYB2HOo2axWc8T2n3TCifI+c +EqCOsqXU3cBM+MgxgASQcCUxMkX0AuZguuxPMmS+85xmdoMi+c8NTqgOhlYcEJIR +sqXgw9OH3zF8g6513w7Md+4Ld4rUHyTypGWOUfF1pmVS7RsBpKdtTdWA7FzuIMbt +0HsiujqbheyTFlPuMAOH9QKBgBWS1gJSrWuq5j/pH7J/4EUXTZ6kq1F0mgHlVRJy +qzlvk38LzA2V0a32wTkfRV3wLcnALzDuqkjK2o4YYb42R+5CZlMQaEd8TKtbmE0g +HAKljuaKLFCpun8BcOXiXsHsP5i3GQPisQnAdOsrmWEk7R2NyORa9LCToutWMGVl +uD3xAoGAA183Vldm+m4KPsKS17t8MbwBryDXvowGzruh/Z+PGA0spr+ke4XxwT1y +kMMP1+5flzmjlAf4+W8LehKuVqvQoMlPn5UVHmSxQ7cGx/O/o6Gbn8Q25/6UT+sM +B1Y0rlLoKG62pnkeXp1O4I57gnClatWRg5qw11a8V8e3jvDKIYM= +-----END RSA PRIVATE KEY----- diff --git a/examples/tls-echo-server/src/main.rs b/examples/tls-echo-server/src/main.rs new file mode 100644 index 0000000..1129657 --- /dev/null +++ b/examples/tls-echo-server/src/main.rs @@ -0,0 +1,94 @@ +use async_std::{io, task}; +use log::*; +use seam_channel::net::tls::rustls::internal::pemfile::{certs, rsa_private_keys}; +use seam_channel::net::tls::rustls::{NoClientAuth, ServerConfig}; +use seam_channel::net::{StitchClient, StitchNetServer}; +use std::env; +use std::fs::File; +use std::io::BufReader; + +#[async_std::main] +async fn main() -> anyhow::Result<()> { + env_logger::init(); + + // Get ip address from cmd line args + let (ip_address, cert_path, key_path) = parse_args(); + + let certs = certs(&mut BufReader::new(File::open(cert_path)?)) + .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid cert"))?; + + let mut keys = rsa_private_keys(&mut BufReader::new(File::open(key_path)?)) + .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid key"))?; + + let mut config = ServerConfig::new(NoClientAuth::new()); + config + .set_single_cert(certs, keys.remove(0)) + .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?; + + // create a server + let (_server, conns) = StitchNetServer::tls_server(ip_address, config.into())?; + + // handle server connections + // wait for a connection to come in and be accepted + while let Ok(conn) = conns.recv().await { + info!("Handling connection: {}", conn.peer_addr()); + + // register for String-typed messages + let (sender, receiver) = conn.unbounded::(); + + // let the connection know you are ready to send and receive messages + conn.ready() + .expect("could not ready the connection for reading and writing"); + + // handle String messages + task::spawn(async move { + // for every String message + while let Ok(msg) = receiver.recv().await { + info!("Echoing message: {}", msg); + + let response = format!("{}, right back at you!", msg); + + // Send the message back to its source + if let Err(err) = sender.send(response).await { + error!("Could not echo message: {:#?}", err); + } + } + }); + } + + Ok(()) +} + +fn parse_args() -> (String, String, String) { + let args: Vec = env::args().collect(); + + let ip_address = match args.get(1) { + Some(addr) => addr, + None => { + error!("Need to pass IP address to connect to as first command line argument"); + panic!(); + } + }; + + let cert_path = match args.get(2) { + Some(d) => d, + None => { + error!("Need to pass path to cert file as second command line argument"); + panic!(); + } + }; + + let key_path = match args.get(3) { + Some(d) => d, + None => { + error!("Need to pass path to key file as third command line argument"); + panic!(); + } + }; + + ( + ip_address.to_string(), + cert_path.to_string(), + key_path.to_string(), + ) +} diff --git a/schema/message.proto b/schema/message.proto new file mode 100644 index 0000000..e80daff --- /dev/null +++ b/schema/message.proto @@ -0,0 +1,8 @@ +syntax = "proto3"; +package message; + +import "google/protobuf/any.proto"; + +message StitchMessage { + google.protobuf.Any payload = 1; +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..c227762 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,85 @@ +pub mod schema; +pub mod tcp; +// @todo pub mod tls; +mod reader; +mod writer; + +pub use crate::reader::StitchConnectionReader; +use crate::schema::StitchMessage; +pub use crate::writer::StitchConnectionWriter; +use async_channel::RecvError; +use async_std::net::SocketAddr; +use async_std::pin::Pin; +use bytes::{Buf, BytesMut}; +use futures::task::{Context, Poll}; +use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, Sink, Stream}; +use log::*; +use protobuf::Message; +use std::convert::TryInto; + +pub use futures::SinkExt; +pub use futures::StreamExt; +use protobuf::well_known_types::Any; + +pub struct StitchConnection { + local_addr: SocketAddr, + peer_addr: SocketAddr, + reader: StitchConnectionReader, + writer: StitchConnectionWriter, +} + +#[allow(dead_code)] +impl StitchConnection { + pub(crate) fn new( + local_addr: SocketAddr, + peer_addr: SocketAddr, + read_stream: Box, + write_stream: Box, + ) -> Self { + Self { + local_addr, + peer_addr, + reader: StitchConnectionReader::new(local_addr, peer_addr, read_stream), + writer: StitchConnectionWriter::new(local_addr, peer_addr, write_stream), + } + } + + pub fn local_addr(&self) -> SocketAddr { + self.local_addr.clone() + } + + pub fn peer_addr(&self) -> SocketAddr { + self.peer_addr.clone() + } + + pub fn split(self) -> (StitchConnectionReader, StitchConnectionWriter) { + (self.reader, self.writer) + } + + pub fn join(reader: StitchConnectionReader, writer: StitchConnectionWriter) -> Self { + Self { + local_addr: reader.local_addr(), + peer_addr: reader.peer_addr(), + reader, + writer, + } + } + + pub fn reader(&mut self) -> &mut StitchConnectionReader { + &mut self.reader + } + + pub fn writer(&mut self) -> &mut StitchConnectionWriter { + &mut self.writer + } + + pub async fn close(self) -> SocketAddr { + let peer_addr = self.peer_addr(); + + drop(self.reader); + // self.writer.close().await; + drop(self.writer); + + return peer_addr; + } +} diff --git a/src/reader.rs b/src/reader.rs new file mode 100644 index 0000000..568e40a --- /dev/null +++ b/src/reader.rs @@ -0,0 +1,124 @@ +use crate::schema::StitchMessage; +use async_std::net::SocketAddr; +use async_std::pin::Pin; +use bytes::{Buf, BytesMut}; +use futures::task::{Context, Poll}; +use futures::{AsyncRead, AsyncReadExt, Stream}; +use log::*; +use protobuf::Message; +use std::convert::TryInto; + +pub use futures::SinkExt; +pub use futures::StreamExt; +use protobuf::well_known_types::Any; + +const BUFFER_SIZE: usize = 8192; + +pub struct StitchConnectionReader { + local_addr: SocketAddr, + peer_addr: SocketAddr, + read_stream: Box, + pending_read: Option, +} + +impl StitchConnectionReader { + pub fn new( + local_addr: SocketAddr, + peer_addr: SocketAddr, + read_stream: Box, + ) -> Self { + Self { + local_addr, + peer_addr, + read_stream, + pending_read: None, + } + } + + pub fn local_addr(&self) -> SocketAddr { + self.local_addr.clone() + } + + pub fn peer_addr(&self) -> SocketAddr { + self.peer_addr.clone() + } +} + +impl Stream for StitchConnectionReader { + type Item = Any; + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + let mut buffer = BytesMut::new(); + buffer.resize(BUFFER_SIZE, 0); + + debug!("Starting new read loop for {}", self.local_addr); + loop { + trace!("Reading from the stream"); + match futures::executor::block_on(self.read_stream.read(&mut buffer)) { + Ok(mut bytes_read) => { + if bytes_read > 0 { + debug!("Read {} bytes from the network stream", bytes_read) + } + + if let Some(mut pending_buf) = self.pending_read.take() { + debug!("Prepending broken data ({} bytes) encountered from earlier read of network stream", pending_buf.len()); + bytes_read += pending_buf.len(); + + pending_buf.unsplit(buffer); + buffer = pending_buf; + } + + let mut bytes_read_u64: u64 = bytes_read.try_into().expect( + format!("Conversion from usize ({}) to u64 failed", bytes_read).as_str(), + ); + while bytes_read_u64 > 0 { + debug!( + "{} bytes from network stream still unprocessed", + bytes_read_u64 + ); + + buffer.resize(bytes_read, 0); + debug!("{:?}", buffer.as_ref()); + + match StitchMessage::parse_from_bytes(buffer.as_ref()) { + Ok(mut data) => { + let serialized_size = data.compute_size(); + debug!("Deserialized message of size {} bytes", serialized_size); + + buffer.advance(serialized_size as usize); + + let serialized_size_u64: u64 = serialized_size.try_into().expect( + format!( + "Conversion from usize ({}) to u64 failed", + serialized_size + ) + .as_str(), + ); + bytes_read_u64 -= serialized_size_u64; + debug!("{} bytes still unprocessed", bytes_read_u64); + + debug!("Sending deserialized message downstream"); + return Poll::Ready(Some(data.take_payload())); + } + + Err(err) => { + warn!( + "Could not deserialize data from the received bytes: {:#?}", + err + ); + + self.pending_read = Some(buffer); + buffer = BytesMut::new(); + break; + } + } + } + + buffer.resize(BUFFER_SIZE, 0); + } + + Err(_err) => return Poll::Pending, + } + } + } +} diff --git a/src/schema/message.rs b/src/schema/message.rs new file mode 100644 index 0000000..352580a --- /dev/null +++ b/src/schema/message.rs @@ -0,0 +1,216 @@ +// This file is generated by rust-protobuf 2.19.0. Do not edit +// @generated + +// https://github.com/rust-lang/rust-clippy/issues/702 +#![allow(unknown_lints)] +#![allow(clippy::all)] + +#![allow(unused_attributes)] +#![rustfmt::skip] + +#![allow(box_pointers)] +#![allow(dead_code)] +#![allow(missing_docs)] +#![allow(non_camel_case_types)] +#![allow(non_snake_case)] +#![allow(non_upper_case_globals)] +#![allow(trivial_casts)] +#![allow(unused_imports)] +#![allow(unused_results)] +//! Generated file from `message.proto` + +/// Generated files are compatible only with the same version +/// of protobuf runtime. +// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_19_0; + +#[derive(PartialEq,Clone,Default)] +pub struct StitchMessage { + // message fields + pub payload: ::protobuf::SingularPtrField<::protobuf::well_known_types::Any>, + // special fields + pub unknown_fields: ::protobuf::UnknownFields, + pub cached_size: ::protobuf::CachedSize, +} + +impl<'a> ::std::default::Default for &'a StitchMessage { + fn default() -> &'a StitchMessage { + ::default_instance() + } +} + +impl StitchMessage { + pub fn new() -> StitchMessage { + ::std::default::Default::default() + } + + // .google.protobuf.Any payload = 1; + + + pub fn get_payload(&self) -> &::protobuf::well_known_types::Any { + self.payload.as_ref().unwrap_or_else(|| <::protobuf::well_known_types::Any as ::protobuf::Message>::default_instance()) + } + pub fn clear_payload(&mut self) { + self.payload.clear(); + } + + pub fn has_payload(&self) -> bool { + self.payload.is_some() + } + + // Param is passed by value, moved + pub fn set_payload(&mut self, v: ::protobuf::well_known_types::Any) { + self.payload = ::protobuf::SingularPtrField::some(v); + } + + // Mutable pointer to the field. + // If field is not initialized, it is initialized with default value first. + pub fn mut_payload(&mut self) -> &mut ::protobuf::well_known_types::Any { + if self.payload.is_none() { + self.payload.set_default(); + } + self.payload.as_mut().unwrap() + } + + // Take field + pub fn take_payload(&mut self) -> ::protobuf::well_known_types::Any { + self.payload.take().unwrap_or_else(|| ::protobuf::well_known_types::Any::new()) + } +} + +impl ::protobuf::Message for StitchMessage { + fn is_initialized(&self) -> bool { + for v in &self.payload { + if !v.is_initialized() { + return false; + } + }; + true + } + + fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> { + while !is.eof()? { + let (field_number, wire_type) = is.read_tag_unpack()?; + match field_number { + 1 => { + ::protobuf::rt::read_singular_message_into(wire_type, is, &mut self.payload)?; + }, + _ => { + ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; + }, + }; + } + ::std::result::Result::Ok(()) + } + + // Compute sizes of nested messages + #[allow(unused_variables)] + fn compute_size(&self) -> u32 { + let mut my_size = 0; + if let Some(ref v) = self.payload.as_ref() { + let len = v.compute_size(); + my_size += 1 + ::protobuf::rt::compute_raw_varint32_size(len) + len; + } + my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); + self.cached_size.set(my_size); + my_size + } + + fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> { + if let Some(ref v) = self.payload.as_ref() { + os.write_tag(1, ::protobuf::wire_format::WireTypeLengthDelimited)?; + os.write_raw_varint32(v.get_cached_size())?; + v.write_to_with_cached_sizes(os)?; + } + os.write_unknown_fields(self.get_unknown_fields())?; + ::std::result::Result::Ok(()) + } + + fn get_cached_size(&self) -> u32 { + self.cached_size.get() + } + + fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { + &self.unknown_fields + } + + fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { + &mut self.unknown_fields + } + + fn as_any(&self) -> &dyn (::std::any::Any) { + self as &dyn (::std::any::Any) + } + fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) { + self as &mut dyn (::std::any::Any) + } + fn into_any(self: ::std::boxed::Box) -> ::std::boxed::Box { + self + } + + fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { + Self::descriptor_static() + } + + fn new() -> StitchMessage { + StitchMessage::new() + } + + fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor { + static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT; + descriptor.get(|| { + let mut fields = ::std::vec::Vec::new(); + fields.push(::protobuf::reflect::accessor::make_singular_ptr_field_accessor::<_, ::protobuf::types::ProtobufTypeMessage<::protobuf::well_known_types::Any>>( + "payload", + |m: &StitchMessage| { &m.payload }, + |m: &mut StitchMessage| { &mut m.payload }, + )); + ::protobuf::reflect::MessageDescriptor::new_pb_name::( + "StitchMessage", + fields, + file_descriptor_proto() + ) + }) + } + + fn default_instance() -> &'static StitchMessage { + static instance: ::protobuf::rt::LazyV2 = ::protobuf::rt::LazyV2::INIT; + instance.get(StitchMessage::new) + } +} + +impl ::protobuf::Clear for StitchMessage { + fn clear(&mut self) { + self.payload.clear(); + self.unknown_fields.clear(); + } +} + +impl ::std::fmt::Debug for StitchMessage { + fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { + ::protobuf::text_format::fmt(self, f) + } +} + +impl ::protobuf::reflect::ProtobufValue for StitchMessage { + fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { + ::protobuf::reflect::ReflectValueRef::Message(self) + } +} + +static file_descriptor_proto_data: &'static [u8] = b"\ + \n\rmessage.proto\x12\x07message\x1a\x19google/protobuf/any.proto\"C\n\r\ + StitchMessage\x120\n\x07payload\x18\x01\x20\x01(\x0b2\x14.google.protobu\ + f.AnyR\x07payloadB\0:\0B\0b\x06proto3\ +"; + +static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; + +fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto { + ::protobuf::Message::parse_from_bytes(file_descriptor_proto_data).unwrap() +} + +pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto { + file_descriptor_proto_lazy.get(|| { + parse_descriptor_proto() + }) +} diff --git a/src/schema/mod.rs b/src/schema/mod.rs new file mode 100644 index 0000000..1b8ac93 --- /dev/null +++ b/src/schema/mod.rs @@ -0,0 +1,16 @@ +mod message; + +pub use message::StitchMessage; +use protobuf::well_known_types::Any; +use protobuf::Message; + +impl StitchMessage { + // @todo make pub(crate) + pub fn from_msg(msg: T) -> Self { + let mut sm = Self::new(); + let payload = Any::pack(&msg).expect("Protobuf Message could not be packed into Any type"); + + sm.set_payload(payload); + return sm; + } +} diff --git a/src/tcp/client.rs b/src/tcp/client.rs new file mode 100644 index 0000000..a0994d4 --- /dev/null +++ b/src/tcp/client.rs @@ -0,0 +1,37 @@ +use async_std::task; +use log::*; + +use crate::StitchConnection; +use async_std::net::{TcpStream, ToSocketAddrs}; + +impl StitchConnection { + pub fn tcp_client( + ip_addrs: A, + ) -> anyhow::Result { + let read_stream = task::block_on(TcpStream::connect(&ip_addrs))?; + info!("Established client TCP connection to {}", ip_addrs); + + Ok(Self::from(read_stream)) + } +} + +impl From for StitchConnection { + fn from(stream: TcpStream) -> Self { + let write_stream = stream.clone(); + + let local_addr = stream + .local_addr() + .expect("Local address could not be retrieved"); + + let peer_addr = stream + .peer_addr() + .expect("Peer address could not be retrieved"); + + Self::new( + local_addr, + peer_addr, + Box::new(stream), + Box::new(write_stream), + ) + } +} diff --git a/src/tcp/mod.rs b/src/tcp/mod.rs new file mode 100644 index 0000000..fbc24b2 --- /dev/null +++ b/src/tcp/mod.rs @@ -0,0 +1,5 @@ +pub(crate) mod client; +pub(crate) mod server; + +pub use client::*; +pub use server::*; diff --git a/src/tcp/server.rs b/src/tcp/server.rs new file mode 100644 index 0000000..adae4cc --- /dev/null +++ b/src/tcp/server.rs @@ -0,0 +1,37 @@ +use crate::StitchConnection; +use async_std::net::{SocketAddr, TcpListener, ToSocketAddrs}; +use async_std::pin::Pin; +use async_std::task; +use futures::task::{Context, Poll}; +use futures::{Stream, StreamExt}; +use log::*; + +#[allow(dead_code)] +pub struct StitchTcpServer { + local_addrs: SocketAddr, + listener: TcpListener, +} + +impl StitchTcpServer { + pub fn new(ip_addrs: A) -> anyhow::Result { + let listener = task::block_on(TcpListener::bind(&ip_addrs))?; + info!("Started TCP server at {}", &ip_addrs); + + Ok(Self { + local_addrs: listener.local_addr()?, + listener, + }) + } +} + +impl Stream for StitchTcpServer { + type Item = StitchConnection; + + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + if let Some(Ok(conn)) = futures::executor::block_on(self.listener.incoming().next()) { + Poll::Ready(Some(StitchConnection::from(conn))) + } else { + Poll::Ready(None) + } + } +} diff --git a/src/tls/client.rs b/src/tls/client.rs new file mode 100644 index 0000000..5f9acb2 --- /dev/null +++ b/src/tls/client.rs @@ -0,0 +1,76 @@ +use async_channel::{Receiver, Sender}; +use async_std::io::*; +use async_std::net::*; +use async_std::task; +use async_tls::TlsConnector; +use futures_util::io::AsyncReadExt; +use log::*; + +use crate::registry::StitchRegistry; +use crate::StitchNetClient; +use crate::{channel_factory, StitchMessage}; +use async_std::sync::{Arc, Condvar, Mutex}; + +impl StitchNetClient { + pub fn tls_client( + ip_addrs: A, + domain: &str, + connector: TlsConnector, + ) -> Result { + Self::tls_client_with_bound(ip_addrs, domain, connector, None) + } + + pub fn tls_client_with_bound( + ip_addrs: A, + domain: &str, + connector: TlsConnector, + cap: Option, + ) -> Result { + let stream = task::block_on(TcpStream::connect(&ip_addrs))?; + stream.set_nodelay(true)?; + info!("Established client TCP connection to {}", ip_addrs); + + Self::tls_client_from_parts(stream, domain, connector, channel_factory(cap)) + } + + pub fn tls_client_from_parts( + stream: TcpStream, + domain: &str, + connector: TlsConnector, + (tls_write_sender, tls_write_receiver): (Sender, Receiver), + ) -> Result { + let local_addr = stream.local_addr()?; + let peer_addr = stream.peer_addr()?; + + let encrypted_stream = task::block_on(connector.connect(domain, stream))?; + let (read_stream, write_stream) = encrypted_stream.split(); + info!("Completed TLS handshake with {}", peer_addr); + + let registry: StitchRegistry = crate::registry::new(); + let read_readiness = Arc::new((Mutex::new(false), Condvar::new())); + let write_readiness = Arc::new((Mutex::new(false), Condvar::new())); + + let read_task = task::spawn(crate::tasks::read_from_stream( + registry.clone(), + read_stream, + read_readiness.clone(), + )); + + let write_task = task::spawn(crate::tasks::write_to_stream( + tls_write_receiver.clone(), + write_stream, + write_readiness.clone(), + )); + + Ok(Self { + local_addr, + peer_addr, + registry, + stream_writer_chan: (tls_write_sender, tls_write_receiver), + read_readiness, + write_readiness, + read_task, + write_task, + }) + } +} diff --git a/src/tls/mod.rs b/src/tls/mod.rs new file mode 100644 index 0000000..da8703d --- /dev/null +++ b/src/tls/mod.rs @@ -0,0 +1,8 @@ +pub(crate) mod client; +pub(crate) mod server; + +pub use client::*; +pub use server::*; + +pub use async_tls; +pub use rustls; diff --git a/src/tls/server.rs b/src/tls/server.rs new file mode 100644 index 0000000..79dba44 --- /dev/null +++ b/src/tls/server.rs @@ -0,0 +1,135 @@ +use crate::channel_factory; +use crate::registry::StitchRegistry; +use crate::{ServerRegistry, StitchClient, StitchNetClient, StitchNetServer}; +use async_channel::{Receiver, Sender}; +use async_std::io::*; +use async_std::net::*; +use async_std::prelude::*; +use async_std::sync::{Arc, Condvar, Mutex}; +use async_std::task; +use async_tls::TlsAcceptor; +use dashmap::DashMap; +use futures_util::AsyncReadExt; +use log::*; + +impl StitchNetServer { + pub fn tls_server( + ip_addrs: A, + acceptor: TlsAcceptor, + ) -> Result<(StitchNetServer, Receiver>)> { + Self::tls_server_with_bound(ip_addrs, acceptor, None) + } + + pub fn tls_server_with_bound( + ip_addrs: A, + acceptor: TlsAcceptor, + cap: Option, + ) -> Result<(Self, Receiver>)> { + let listener = task::block_on(TcpListener::bind(ip_addrs))?; + info!("Started TLS server at {}", listener.local_addr()?); + + let registry = Arc::new(DashMap::new()); + let (sender, receiver) = channel_factory(cap); + + let handler = task::spawn(handle_server_connections( + acceptor, + registry.clone(), + listener, + sender.clone(), + cap, + )); + + Ok(( + Self { + registry, + connections_chan: (sender, receiver.clone()), + accept_loop_task: handler, + }, + receiver, + )) + } +} + +async fn handle_server_connections<'a>( + acceptor: TlsAcceptor, + registry: ServerRegistry, + input: TcpListener, + output: Sender>, + cap: Option, +) -> anyhow::Result<()> { + let mut conns = input.incoming(); + + debug!("Reading from the stream of incoming connections"); + loop { + match conns.next().await { + Some(Ok(tcp_stream)) => { + let local_addr = tcp_stream.local_addr()?; + let peer_addr = tcp_stream.peer_addr()?; + + debug!("Received connection attempt from {}", peer_addr); + + let tls_stream = acceptor.accept(tcp_stream).await?; + + let (read_stream, write_stream) = tls_stream.split(); + let (tls_write_sender, tls_write_receiver) = channel_factory(cap); + + let client_registry: StitchRegistry = crate::registry::new(); + let read_readiness = Arc::new((Mutex::new(false), Condvar::new())); + let write_readiness = Arc::new((Mutex::new(false), Condvar::new())); + + let read_task = task::spawn(crate::tasks::read_from_stream( + client_registry.clone(), + read_stream, + read_readiness.clone(), + )); + + let write_task = task::spawn(crate::tasks::write_to_stream( + tls_write_receiver.clone(), + write_stream, + write_readiness.clone(), + )); + + let conn = StitchNetClient { + local_addr, + peer_addr, + registry: client_registry, + stream_writer_chan: (tls_write_sender, tls_write_receiver), + read_readiness, + write_readiness, + read_task, + write_task, + }; + + debug!("Attempting to register connection from {}", peer_addr); + let conn = Arc::new(conn); + registry.insert(conn.peer_addr(), conn.clone()); + debug!( + "Registered client connection for {} in server registry", + peer_addr + ); + + if let Err(err) = output.send(conn).await { + error!( + "Stopping the server accept loop - could not send accepted TLS client connection to channel: {:#?}", + err + ); + + break Err(anyhow::Error::from(err)); + } else { + info!("Accepted connection from {}", peer_addr); + } + } + + Some(Err(err)) => error!( + "Encountered error when accepting TLS connection: {:#?}", + err + ), + + None => { + warn!("Stopping the server accept loop - unable to accept any more connections"); + + break Ok(()); + } + } + } +} diff --git a/src/writer.rs b/src/writer.rs new file mode 100644 index 0000000..f6f267e --- /dev/null +++ b/src/writer.rs @@ -0,0 +1,111 @@ +use crate::schema::StitchMessage; +use async_channel::RecvError; +use async_std::net::SocketAddr; +use async_std::pin::Pin; +use futures::task::{Context, Poll}; +use futures::{AsyncWrite, AsyncWriteExt, Sink}; +use log::*; +use protobuf::Message; + +pub use futures::SinkExt; +pub use futures::StreamExt; + +pub struct StitchConnectionWriter { + local_addr: SocketAddr, + peer_addr: SocketAddr, + write_stream: Box, + pending_write: Option, +} + +impl StitchConnectionWriter { + pub fn new( + local_addr: SocketAddr, + peer_addr: SocketAddr, + write_stream: Box, + ) -> Self { + Self { + local_addr, + peer_addr, + write_stream, + pending_write: None, + } + } + + pub fn local_addr(&self) -> SocketAddr { + self.local_addr.clone() + } + + pub fn peer_addr(&self) -> SocketAddr { + self.peer_addr.clone() + } +} + +impl Sink for StitchConnectionWriter { + type Error = RecvError; + + fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + if self.pending_write.is_some() { + debug!("Connection not ready to send message yet, waiting for prior message"); + Poll::Pending + } else { + debug!("Connection ready to send message"); + Poll::Ready(Ok(())) + } + } + + fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { + debug!("Preparing message to be sent next"); + let stitch_msg: StitchMessage = StitchMessage::from_msg(item); + self.pending_write.replace(stitch_msg); + + Ok(()) + } + + fn poll_flush( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + if let Some(pending_msg) = self.pending_write.take() { + debug!("Send pending message"); + if let Ok(buffer) = pending_msg.write_to_bytes() { + let msg_size = buffer.len(); + debug!("{} bytes to be sent over network connection", msg_size); + + debug!("{:?}", buffer.as_slice()); + + return if let Ok(_) = + futures::executor::block_on(self.write_stream.write_all(buffer.as_slice())) + { + if let Ok(_) = futures::executor::block_on(self.write_stream.flush()) { + debug!("Sent message of {} bytes", msg_size); + Poll::Ready(Ok(())) + } else { + debug!("Encountered error while flushing queued bytes to network stream"); + Poll::Ready(Err(RecvError)) + } + } else { + debug!("Encountered error when writing to network stream"); + Poll::Ready(Err(RecvError)) + }; + } else { + debug!("Encountered error when serializing message to bytes"); + return Poll::Ready(Err(RecvError)); + } + } else { + debug!("No message to send over connection"); + } + + Poll::Ready(Ok(())) + } + + fn poll_close( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + if let Ok(_) = futures::executor::block_on(self.write_stream.close()) { + Poll::Ready(Ok(())) + } else { + Poll::Ready(Err(RecvError)) + } + } +} -- 2.44.0