async-std = { version = "1.6.2", features = ["attributes"] }
env_logger = "0.7"
log = "0.4"
+protobuf = "2.18.1"
-seam-channel = { path = "../../" }
\ No newline at end of file
+connect = { path = "../../" }
+
+[build-dependencies]
+protobuf-codegen-pure = "2.18.1"
--- /dev/null
+fn main() -> Result<(), Box<dyn std::error::Error>> {
+ protobuf_codegen_pure::Codegen::new()
+ .out_dir("src/schema")
+ .inputs(&["../tcp-client/schema/hello_world.proto"])
+ .include("../tcp-client/schema")
+ .run()
+ .expect("Codegen failed.");
+ Ok(())
+}
+mod schema;
+
+use crate::schema::hello_world::HelloWorld;
+use connect::tls::rustls::ClientConfig;
+use connect::{Connection, SinkExt, StreamExt};
use log::*;
-use seam_channel::net::tls::rustls::ClientConfig;
-use seam_channel::net::{StitchClient, StitchNetClient};
+use protobuf::well_known_types::Any;
use std::env;
#[async_std::main]
.map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidInput, "invalid cert"))?;
// create a client connection to the server
- let dist_chan = StitchNetClient::tls_client(ip_addr, &domain, client_config.into())?;
+ let mut conn = Connection::tls_client(ip_addr, &domain, client_config.into())?;
- // create a channel for String messages on the TCP connection
- let (sender, receiver) = dist_chan.bounded::<String>(Some(100));
+ // send a message to the server
+ let raw_msg = String::from("Hello world");
+ info!("Sending message: {}", raw_msg);
- // alert the connection that you are ready to read and write messages
- dist_chan.ready()?;
+ let mut msg = HelloWorld::new();
+ msg.set_message(raw_msg);
- // send a message to the server
- let msg = String::from("Hello world");
- info!("Sending message: {}", msg);
- sender.send(msg).await?;
+ conn.writer().send(msg).await?;
// wait for the server to reply with an ack
- if let Ok(msg) = receiver.recv().await {
- info!("Received reply: {}", msg);
+ while let Some(reply) = conn.reader().next().await {
+ info!("Received message");
+
+ let msg: HelloWorld = Any::unpack(&reply)?.unwrap();
+
+ info!("Unpacked reply: {}", msg.get_message());
}
Ok(())
--- /dev/null
+// This file is generated by rust-protobuf 2.19.0. Do not edit
+// @generated
+
+// https://github.com/rust-lang/rust-clippy/issues/702
+#![allow(unknown_lints)]
+#![allow(clippy::all)]
+
+#![allow(unused_attributes)]
+#![rustfmt::skip]
+
+#![allow(box_pointers)]
+#![allow(dead_code)]
+#![allow(missing_docs)]
+#![allow(non_camel_case_types)]
+#![allow(non_snake_case)]
+#![allow(non_upper_case_globals)]
+#![allow(trivial_casts)]
+#![allow(unused_imports)]
+#![allow(unused_results)]
+//! Generated file from `hello_world.proto`
+
+/// Generated files are compatible only with the same version
+/// of protobuf runtime.
+// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_19_0;
+
+#[derive(PartialEq,Clone,Default)]
+pub struct HelloWorld {
+ // message fields
+ pub message: ::std::string::String,
+ // special fields
+ pub unknown_fields: ::protobuf::UnknownFields,
+ pub cached_size: ::protobuf::CachedSize,
+}
+
+impl<'a> ::std::default::Default for &'a HelloWorld {
+ fn default() -> &'a HelloWorld {
+ <HelloWorld as ::protobuf::Message>::default_instance()
+ }
+}
+
+impl HelloWorld {
+ pub fn new() -> HelloWorld {
+ ::std::default::Default::default()
+ }
+
+ // string message = 1;
+
+
+ pub fn get_message(&self) -> &str {
+ &self.message
+ }
+ pub fn clear_message(&mut self) {
+ self.message.clear();
+ }
+
+ // Param is passed by value, moved
+ pub fn set_message(&mut self, v: ::std::string::String) {
+ self.message = v;
+ }
+
+ // Mutable pointer to the field.
+ // If field is not initialized, it is initialized with default value first.
+ pub fn mut_message(&mut self) -> &mut ::std::string::String {
+ &mut self.message
+ }
+
+ // Take field
+ pub fn take_message(&mut self) -> ::std::string::String {
+ ::std::mem::replace(&mut self.message, ::std::string::String::new())
+ }
+}
+
+impl ::protobuf::Message for HelloWorld {
+ fn is_initialized(&self) -> bool {
+ true
+ }
+
+ fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> {
+ while !is.eof()? {
+ let (field_number, wire_type) = is.read_tag_unpack()?;
+ match field_number {
+ 1 => {
+ ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.message)?;
+ },
+ _ => {
+ ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
+ },
+ };
+ }
+ ::std::result::Result::Ok(())
+ }
+
+ // Compute sizes of nested messages
+ #[allow(unused_variables)]
+ fn compute_size(&self) -> u32 {
+ let mut my_size = 0;
+ if !self.message.is_empty() {
+ my_size += ::protobuf::rt::string_size(1, &self.message);
+ }
+ my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
+ self.cached_size.set(my_size);
+ my_size
+ }
+
+ fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
+ if !self.message.is_empty() {
+ os.write_string(1, &self.message)?;
+ }
+ os.write_unknown_fields(self.get_unknown_fields())?;
+ ::std::result::Result::Ok(())
+ }
+
+ fn get_cached_size(&self) -> u32 {
+ self.cached_size.get()
+ }
+
+ fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
+ &self.unknown_fields
+ }
+
+ fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
+ &mut self.unknown_fields
+ }
+
+ fn as_any(&self) -> &dyn (::std::any::Any) {
+ self as &dyn (::std::any::Any)
+ }
+ fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
+ self as &mut dyn (::std::any::Any)
+ }
+ fn into_any(self: ::std::boxed::Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
+ self
+ }
+
+ fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
+ Self::descriptor_static()
+ }
+
+ fn new() -> HelloWorld {
+ HelloWorld::new()
+ }
+
+ fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
+ static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT;
+ descriptor.get(|| {
+ let mut fields = ::std::vec::Vec::new();
+ fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
+ "message",
+ |m: &HelloWorld| { &m.message },
+ |m: &mut HelloWorld| { &mut m.message },
+ ));
+ ::protobuf::reflect::MessageDescriptor::new_pb_name::<HelloWorld>(
+ "HelloWorld",
+ fields,
+ file_descriptor_proto()
+ )
+ })
+ }
+
+ fn default_instance() -> &'static HelloWorld {
+ static instance: ::protobuf::rt::LazyV2<HelloWorld> = ::protobuf::rt::LazyV2::INIT;
+ instance.get(HelloWorld::new)
+ }
+}
+
+impl ::protobuf::Clear for HelloWorld {
+ fn clear(&mut self) {
+ self.message.clear();
+ self.unknown_fields.clear();
+ }
+}
+
+impl ::std::fmt::Debug for HelloWorld {
+ fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
+ ::protobuf::text_format::fmt(self, f)
+ }
+}
+
+impl ::protobuf::reflect::ProtobufValue for HelloWorld {
+ fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
+ ::protobuf::reflect::ReflectValueRef::Message(self)
+ }
+}
+
+static file_descriptor_proto_data: &'static [u8] = b"\
+ \n\x11hello_world.proto\x12\x0bhello_world\"*\n\nHelloWorld\x12\x1a\n\
+ \x07message\x18\x01\x20\x01(\tR\x07messageB\0:\0B\0b\x06proto3\
+";
+
+static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;
+
+fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto {
+ ::protobuf::Message::parse_from_bytes(file_descriptor_proto_data).unwrap()
+}
+
+pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto {
+ file_descriptor_proto_lazy.get(|| {
+ parse_descriptor_proto()
+ })
+}
--- /dev/null
+pub mod hello_world;
async-std = { version = "1.6.2", features = ["attributes"] }
env_logger = "0.7"
log = "0.4"
+protobuf = "2.18.1"
-seam-channel = { path = "../../" }
\ No newline at end of file
+connect = { path = "../../" }
+
+[build-dependencies]
+protobuf-codegen-pure = "2.18.1"
--- /dev/null
+fn main() -> Result<(), Box<dyn std::error::Error>> {
+ protobuf_codegen_pure::Codegen::new()
+ .out_dir("src/schema")
+ .inputs(&["../tcp-client/schema/hello_world.proto"])
+ .include("../tcp-client/schema")
+ .run()
+ .expect("Codegen failed.");
+ Ok(())
+}
+mod schema;
+
+use crate::schema::hello_world::HelloWorld;
use async_std::{io, task};
+use connect::tls::rustls::internal::pemfile::{certs, rsa_private_keys};
+use connect::tls::rustls::{NoClientAuth, ServerConfig};
+use connect::tls::TlsServer;
+use connect::{SinkExt, StreamExt};
use log::*;
-use seam_channel::net::tls::rustls::internal::pemfile::{certs, rsa_private_keys};
-use seam_channel::net::tls::rustls::{NoClientAuth, ServerConfig};
-use seam_channel::net::{StitchClient, StitchNetServer};
use std::env;
use std::fs::File;
use std::io::BufReader;
.map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
// create a server
- let (_server, conns) = StitchNetServer::tls_server(ip_address, config.into())?;
+ let mut server = TlsServer::new(ip_address, config.into())?;
// handle server connections
// wait for a connection to come in and be accepted
- while let Ok(conn) = conns.recv().await {
- info!("Handling connection: {}", conn.peer_addr());
-
- // register for String-typed messages
- let (sender, receiver) = conn.unbounded::<String>();
-
- // let the connection know you are ready to send and receive messages
- conn.ready()
- .expect("could not ready the connection for reading and writing");
+ while let Some(mut conn) = server.next().await {
+ info!("Handling connection from {}", conn.peer_addr());
- // handle String messages
task::spawn(async move {
- // for every String message
- while let Ok(msg) = receiver.recv().await {
- info!("Echoing message: {}", msg);
-
- let response = format!("{}, right back at you!", msg);
-
- // Send the message back to its source
- if let Err(err) = sender.send(response).await {
- error!("Could not echo message: {:#?}", err);
+ while let Some(msg) = conn.reader().next().await {
+ if msg.is::<HelloWorld>() {
+ if let Ok(Some(contents)) = msg.unpack::<HelloWorld>() {
+ info!(
+ "Received a message \"{}\" from {}",
+ contents.get_message(),
+ conn.peer_addr()
+ );
+
+ conn.writer()
+ .send(contents)
+ .await
+ .expect("Could not send message back to source connection");
+ info!("Sent message back to original sender");
+ }
+ } else {
+ error!("Received a message of unknown type")
}
}
});
--- /dev/null
+// This file is generated by rust-protobuf 2.19.0. Do not edit
+// @generated
+
+// https://github.com/rust-lang/rust-clippy/issues/702
+#![allow(unknown_lints)]
+#![allow(clippy::all)]
+
+#![allow(unused_attributes)]
+#![rustfmt::skip]
+
+#![allow(box_pointers)]
+#![allow(dead_code)]
+#![allow(missing_docs)]
+#![allow(non_camel_case_types)]
+#![allow(non_snake_case)]
+#![allow(non_upper_case_globals)]
+#![allow(trivial_casts)]
+#![allow(unused_imports)]
+#![allow(unused_results)]
+//! Generated file from `hello_world.proto`
+
+/// Generated files are compatible only with the same version
+/// of protobuf runtime.
+// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_19_0;
+
+#[derive(PartialEq,Clone,Default)]
+pub struct HelloWorld {
+ // message fields
+ pub message: ::std::string::String,
+ // special fields
+ pub unknown_fields: ::protobuf::UnknownFields,
+ pub cached_size: ::protobuf::CachedSize,
+}
+
+impl<'a> ::std::default::Default for &'a HelloWorld {
+ fn default() -> &'a HelloWorld {
+ <HelloWorld as ::protobuf::Message>::default_instance()
+ }
+}
+
+impl HelloWorld {
+ pub fn new() -> HelloWorld {
+ ::std::default::Default::default()
+ }
+
+ // string message = 1;
+
+
+ pub fn get_message(&self) -> &str {
+ &self.message
+ }
+ pub fn clear_message(&mut self) {
+ self.message.clear();
+ }
+
+ // Param is passed by value, moved
+ pub fn set_message(&mut self, v: ::std::string::String) {
+ self.message = v;
+ }
+
+ // Mutable pointer to the field.
+ // If field is not initialized, it is initialized with default value first.
+ pub fn mut_message(&mut self) -> &mut ::std::string::String {
+ &mut self.message
+ }
+
+ // Take field
+ pub fn take_message(&mut self) -> ::std::string::String {
+ ::std::mem::replace(&mut self.message, ::std::string::String::new())
+ }
+}
+
+impl ::protobuf::Message for HelloWorld {
+ fn is_initialized(&self) -> bool {
+ true
+ }
+
+ fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> {
+ while !is.eof()? {
+ let (field_number, wire_type) = is.read_tag_unpack()?;
+ match field_number {
+ 1 => {
+ ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.message)?;
+ },
+ _ => {
+ ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
+ },
+ };
+ }
+ ::std::result::Result::Ok(())
+ }
+
+ // Compute sizes of nested messages
+ #[allow(unused_variables)]
+ fn compute_size(&self) -> u32 {
+ let mut my_size = 0;
+ if !self.message.is_empty() {
+ my_size += ::protobuf::rt::string_size(1, &self.message);
+ }
+ my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
+ self.cached_size.set(my_size);
+ my_size
+ }
+
+ fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
+ if !self.message.is_empty() {
+ os.write_string(1, &self.message)?;
+ }
+ os.write_unknown_fields(self.get_unknown_fields())?;
+ ::std::result::Result::Ok(())
+ }
+
+ fn get_cached_size(&self) -> u32 {
+ self.cached_size.get()
+ }
+
+ fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
+ &self.unknown_fields
+ }
+
+ fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
+ &mut self.unknown_fields
+ }
+
+ fn as_any(&self) -> &dyn (::std::any::Any) {
+ self as &dyn (::std::any::Any)
+ }
+ fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
+ self as &mut dyn (::std::any::Any)
+ }
+ fn into_any(self: ::std::boxed::Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
+ self
+ }
+
+ fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
+ Self::descriptor_static()
+ }
+
+ fn new() -> HelloWorld {
+ HelloWorld::new()
+ }
+
+ fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
+ static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT;
+ descriptor.get(|| {
+ let mut fields = ::std::vec::Vec::new();
+ fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
+ "message",
+ |m: &HelloWorld| { &m.message },
+ |m: &mut HelloWorld| { &mut m.message },
+ ));
+ ::protobuf::reflect::MessageDescriptor::new_pb_name::<HelloWorld>(
+ "HelloWorld",
+ fields,
+ file_descriptor_proto()
+ )
+ })
+ }
+
+ fn default_instance() -> &'static HelloWorld {
+ static instance: ::protobuf::rt::LazyV2<HelloWorld> = ::protobuf::rt::LazyV2::INIT;
+ instance.get(HelloWorld::new)
+ }
+}
+
+impl ::protobuf::Clear for HelloWorld {
+ fn clear(&mut self) {
+ self.message.clear();
+ self.unknown_fields.clear();
+ }
+}
+
+impl ::std::fmt::Debug for HelloWorld {
+ fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
+ ::protobuf::text_format::fmt(self, f)
+ }
+}
+
+impl ::protobuf::reflect::ProtobufValue for HelloWorld {
+ fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
+ ::protobuf::reflect::ReflectValueRef::Message(self)
+ }
+}
+
+static file_descriptor_proto_data: &'static [u8] = b"\
+ \n\x11hello_world.proto\x12\x0bhello_world\"*\n\nHelloWorld\x12\x1a\n\
+ \x07message\x18\x01\x20\x01(\tR\x07messageB\0:\0B\0b\x06proto3\
+";
+
+static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;
+
+fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto {
+ ::protobuf::Message::parse_from_bytes(file_descriptor_proto_data).unwrap()
+}
+
+pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto {
+ file_descriptor_proto_lazy.get(|| {
+ parse_descriptor_proto()
+ })
+}
--- /dev/null
+pub mod hello_world;
use log::*;
use crate::Connection;
-use async_std::net::{TcpStream, SocketAddr, ToSocketAddrs};
+use async_std::net::{SocketAddr, TcpStream, ToSocketAddrs};
use async_tls::client;
use async_tls::server;
use futures::AsyncReadExt;
pub enum TlsConnectionMetadata {
- Client { local_addr: SocketAddr, peer_addr: SocketAddr, stream: client::TlsStream<TcpStream> },
- Server { local_addr: SocketAddr, peer_addr: SocketAddr, stream: server::TlsStream<TcpStream> },
+ Client {
+ local_addr: SocketAddr,
+ peer_addr: SocketAddr,
+ stream: client::TlsStream<TcpStream>,
+ },
+ Server {
+ local_addr: SocketAddr,
+ peer_addr: SocketAddr,
+ stream: server::TlsStream<TcpStream>,
+ },
}
impl Connection {
task::block_on(connector.connect(domain, stream))?;
info!("Completed TLS handshake with {}", peer_addr);
- Ok(Self::from(TlsConnectionMetadata::Client { local_addr, peer_addr, stream: encrypted_stream }))
+ Ok(Self::from(TlsConnectionMetadata::Client {
+ local_addr,
+ peer_addr,
+ stream: encrypted_stream,
+ }))
}
}
impl From<TlsConnectionMetadata> for Connection {
fn from(metadata: TlsConnectionMetadata) -> Self {
match metadata {
- TlsConnectionMetadata::Client { local_addr, peer_addr, stream } => {
+ TlsConnectionMetadata::Client {
+ local_addr,
+ peer_addr,
+ stream,
+ } => {
let (read_stream, write_stream) = stream.split();
Self::new(
Box::new(read_stream),
Box::new(write_stream),
)
- },
+ }
- TlsConnectionMetadata::Server { local_addr, peer_addr, stream } => {
+ TlsConnectionMetadata::Server {
+ local_addr,
+ peer_addr,
+ stream,
+ } => {
let (read_stream, write_stream) = stream.split();
Self::new(
)
}
}
-
-
}
}
-use crate::Connection;
use crate::tls::TlsConnectionMetadata;
+use crate::Connection;
use async_std::net::*;
use async_std::pin::Pin;
use async_std::prelude::*;
}
impl TlsServer {
- pub fn new<A: ToSocketAddrs + std::fmt::Display>(ip_addrs: A, acceptor: TlsAcceptor) -> anyhow::Result<Self> {
+ pub fn new<A: ToSocketAddrs + std::fmt::Display>(
+ ip_addrs: A,
+ acceptor: TlsAcceptor,
+ ) -> anyhow::Result<Self> {
let listener = task::block_on(TcpListener::bind(ip_addrs))?;
info!("Started TLS server at {}", listener.local_addr()?);
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(Ok(tcp_stream)) = futures::executor::block_on(self.listener.incoming().next()) {
- let local_addr = tcp_stream.local_addr().expect(
- "Local address could not be retrieved",
- );
+ let local_addr = tcp_stream
+ .local_addr()
+ .expect("Local address could not be retrieved");
- let peer_addr = tcp_stream.peer_addr().expect(
- "Peer address could not be retrieved",
- );
+ let peer_addr = tcp_stream
+ .peer_addr()
+ .expect("Peer address could not be retrieved");
debug!("Received connection attempt from {}", peer_addr);
if let Ok(tls_stream) = futures::executor::block_on(self.acceptor.accept(tcp_stream)) {
debug!("Established TLS connection from {}", peer_addr);
- Poll::Ready(Some(Connection::from(TlsConnectionMetadata::Server{ local_addr, peer_addr, stream: tls_stream })))
+ Poll::Ready(Some(Connection::from(TlsConnectionMetadata::Server {
+ local_addr,
+ peer_addr,
+ stream: tls_stream,
+ })))
} else {
debug!("Could not encrypt connection with TLS from {}", peer_addr);
Poll::Pending