[package]
name = "connect"
-version = "0.1.2"
+version = "0.1.3"
edition = "2018"
authors = ["Sachandhan Ganesh <sachan.ganesh@gmail.com>"]
description = "message queue abstraction over async network streams"
[workspace]
members = [
- "examples/tcp-client",
- "examples/tcp-echo-server",
- "examples/tls-client",
- "examples/tls-echo-server",
+ "examples/*",
]
[dependencies]
anyhow = "1.0.31"
async-channel = "1.4.0"
-async-std = { version = "1.6.2", features = ["unstable"] }
+async-std = { version = "1.6.2", features = ["attributes", "unstable"] }
async-tls = { version = "0.9.0", default-features = false, features = ["client", "server"]}
bytes = "0.5.5"
futures = "0.3.8"
log = "0.4"
-protobuf = "2.18.1"
rustls = "0.18.0"
-
-[build-dependencies]
-protobuf-codegen-pure = "2.18.1"
+++ /dev/null
-use std::env;
-
-fn main() -> Result<(), Box<dyn std::error::Error>> {
- if Ok("dev".to_owned()) == env::var("PROFILE") {
- protobuf_codegen_pure::Codegen::new()
- .out_dir("src/schema")
- .inputs(&["schema/message.proto"])
- .include("schema")
- .run()
- .expect("Codegen failed.");
- }
-
- Ok(())
-}
async-std = { version = "1.6.2", features = ["attributes"] }
env_logger = "0.7"
log = "0.4"
-protobuf = "2.18.1"
connect = { path = "../../" }
-
-[build-dependencies]
-protobuf-codegen-pure = "2.18.1"
+++ /dev/null
-fn main() -> Result<(), Box<dyn std::error::Error>> {
- protobuf_codegen_pure::Codegen::new()
- .out_dir("src/schema")
- .inputs(&["schema/hello_world.proto"])
- .include("schema")
- .run()
- .expect("Codegen failed.");
- Ok(())
-}
+++ /dev/null
-syntax = "proto3";
-package hello_world;
-
-message HelloWorld {
- string message = 1;
-}
\ No newline at end of file
-pub mod schema;
-
-use crate::schema::hello_world::HelloWorld;
-use connect::{Connection, SinkExt, StreamExt};
+use connect::{ConnectDatagram, Connection, SinkExt, StreamExt};
use log::*;
-use protobuf::well_known_types::Any;
use std::env;
#[async_std::main]
let mut conn = Connection::tcp_client(ip_address).await?;
// send a message to the server
- let raw_msg = String::from("Hello world");
-
- let mut msg = HelloWorld::new();
- msg.set_message(raw_msg.clone());
+ let msg = String::from("Hello world");
+ info!("Sending message: {}", msg);
- conn.writer().send(msg).await?;
- info!("Sent message: {}", raw_msg);
+ let envelope = ConnectDatagram::new(65535, msg.into_bytes())?;
+ conn.writer().send(envelope).await?;
// wait for the server to reply with an ack
- while let Some(reply) = conn.reader().next().await {
+ if let Some(mut reply) = conn.reader().next().await {
info!("Received message");
- let msg: HelloWorld = Any::unpack(&reply)?.unwrap();
+ let data = reply.take_data().unwrap();
+ let msg = String::from_utf8(data)?;
- info!("Unpacked reply: {}", msg.get_message());
+ info!("Unpacked reply: {}", msg);
}
Ok(())
+++ /dev/null
-// This file is generated by rust-protobuf 2.20.0. Do not edit
-// @generated
-
-// https://github.com/rust-lang/rust-clippy/issues/702
-#![allow(unknown_lints)]
-#![allow(clippy::all)]
-
-#![allow(unused_attributes)]
-#![rustfmt::skip]
-
-#![allow(box_pointers)]
-#![allow(dead_code)]
-#![allow(missing_docs)]
-#![allow(non_camel_case_types)]
-#![allow(non_snake_case)]
-#![allow(non_upper_case_globals)]
-#![allow(trivial_casts)]
-#![allow(unused_imports)]
-#![allow(unused_results)]
-//! Generated file from `hello_world.proto`
-
-/// Generated files are compatible only with the same version
-/// of protobuf runtime.
-// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_20_0;
-
-#[derive(PartialEq,Clone,Default)]
-pub struct HelloWorld {
- // message fields
- pub message: ::std::string::String,
- // special fields
- pub unknown_fields: ::protobuf::UnknownFields,
- pub cached_size: ::protobuf::CachedSize,
-}
-
-impl<'a> ::std::default::Default for &'a HelloWorld {
- fn default() -> &'a HelloWorld {
- <HelloWorld as ::protobuf::Message>::default_instance()
- }
-}
-
-impl HelloWorld {
- pub fn new() -> HelloWorld {
- ::std::default::Default::default()
- }
-
- // string message = 1;
-
-
- pub fn get_message(&self) -> &str {
- &self.message
- }
- pub fn clear_message(&mut self) {
- self.message.clear();
- }
-
- // Param is passed by value, moved
- pub fn set_message(&mut self, v: ::std::string::String) {
- self.message = v;
- }
-
- // Mutable pointer to the field.
- // If field is not initialized, it is initialized with default value first.
- pub fn mut_message(&mut self) -> &mut ::std::string::String {
- &mut self.message
- }
-
- // Take field
- pub fn take_message(&mut self) -> ::std::string::String {
- ::std::mem::replace(&mut self.message, ::std::string::String::new())
- }
-}
-
-impl ::protobuf::Message for HelloWorld {
- fn is_initialized(&self) -> bool {
- true
- }
-
- fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> {
- while !is.eof()? {
- let (field_number, wire_type) = is.read_tag_unpack()?;
- match field_number {
- 1 => {
- ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.message)?;
- },
- _ => {
- ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
- },
- };
- }
- ::std::result::Result::Ok(())
- }
-
- // Compute sizes of nested messages
- #[allow(unused_variables)]
- fn compute_size(&self) -> u32 {
- let mut my_size = 0;
- if !self.message.is_empty() {
- my_size += ::protobuf::rt::string_size(1, &self.message);
- }
- my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
- self.cached_size.set(my_size);
- my_size
- }
-
- fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
- if !self.message.is_empty() {
- os.write_string(1, &self.message)?;
- }
- os.write_unknown_fields(self.get_unknown_fields())?;
- ::std::result::Result::Ok(())
- }
-
- fn get_cached_size(&self) -> u32 {
- self.cached_size.get()
- }
-
- fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
- &self.unknown_fields
- }
-
- fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
- &mut self.unknown_fields
- }
-
- fn as_any(&self) -> &dyn (::std::any::Any) {
- self as &dyn (::std::any::Any)
- }
- fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
- self as &mut dyn (::std::any::Any)
- }
- fn into_any(self: ::std::boxed::Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
- self
- }
-
- fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
- Self::descriptor_static()
- }
-
- fn new() -> HelloWorld {
- HelloWorld::new()
- }
-
- fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
- static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT;
- descriptor.get(|| {
- let mut fields = ::std::vec::Vec::new();
- fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
- "message",
- |m: &HelloWorld| { &m.message },
- |m: &mut HelloWorld| { &mut m.message },
- ));
- ::protobuf::reflect::MessageDescriptor::new_pb_name::<HelloWorld>(
- "HelloWorld",
- fields,
- file_descriptor_proto()
- )
- })
- }
-
- fn default_instance() -> &'static HelloWorld {
- static instance: ::protobuf::rt::LazyV2<HelloWorld> = ::protobuf::rt::LazyV2::INIT;
- instance.get(HelloWorld::new)
- }
-}
-
-impl ::protobuf::Clear for HelloWorld {
- fn clear(&mut self) {
- self.message.clear();
- self.unknown_fields.clear();
- }
-}
-
-impl ::std::fmt::Debug for HelloWorld {
- fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
- ::protobuf::text_format::fmt(self, f)
- }
-}
-
-impl ::protobuf::reflect::ProtobufValue for HelloWorld {
- fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
- ::protobuf::reflect::ReflectValueRef::Message(self)
- }
-}
-
-static file_descriptor_proto_data: &'static [u8] = b"\
- \n\x11hello_world.proto\x12\x0bhello_world\"*\n\nHelloWorld\x12\x1a\n\
- \x07message\x18\x01\x20\x01(\tR\x07messageB\0:\0B\0b\x06proto3\
-";
-
-static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;
-
-fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto {
- ::protobuf::Message::parse_from_bytes(file_descriptor_proto_data).unwrap()
-}
-
-pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto {
- file_descriptor_proto_lazy.get(|| {
- parse_descriptor_proto()
- })
-}
+++ /dev/null
-pub mod hello_world;
async-std = { version = "1.6.2", features = ["attributes"] }
env_logger = "0.7"
log = "0.4"
-protobuf = "2.18.1"
connect = { path = "../../" }
-
-[build-dependencies]
-protobuf-codegen-pure = "2.18.1"
+++ /dev/null
-fn main() -> Result<(), Box<dyn std::error::Error>> {
- protobuf_codegen_pure::Codegen::new()
- .out_dir("src/schema")
- .inputs(&["../tcp-client/schema/hello_world.proto"])
- .include("../tcp-client/schema")
- .run()
- .expect("Codegen failed.");
- Ok(())
-}
-mod schema;
-
-use crate::schema::hello_world::HelloWorld;
use async_std::task;
use connect::tcp::TcpListener;
-use connect::{SinkExt, StreamExt};
+use connect::{ConnectDatagram, SinkExt, StreamExt};
use log::*;
use std::env;
info!("Handling connection from {}", conn.peer_addr());
task::spawn(async move {
- while let Some(msg) = conn.reader().next().await {
- if msg.is::<HelloWorld>() {
- if let Ok(Some(contents)) = msg.unpack::<HelloWorld>() {
- info!(
- "Received a message \"{}\" from {}",
- contents.get_message(),
- conn.peer_addr()
- );
-
- conn.writer()
- .send(contents)
- .await
- .expect("Could not send message back to source connection");
- info!("Sent message back to original sender");
- }
+ while let Some(mut envelope) = conn.reader().next().await {
+ // handle message based on intended recipient
+ if envelope.recipient() == 65535 {
+ // if recipient is 65535, we do custom processing
+ let data = envelope.take_data().unwrap();
+ let msg =
+ String::from_utf8(data).expect("could not build String from payload bytes");
+ info!("Received a message \"{}\" from {}", msg, conn.peer_addr());
+
+ let reply = ConnectDatagram::new(envelope.recipient(), msg.into_bytes())
+ .expect("could not construct new datagram from built String");
+
+ conn.writer()
+ .send(reply)
+ .await
+ .expect("Could not send message back to source connection");
+ info!("Sent message back to original sender");
} else {
- error!("Received a message of unknown type")
+ // if recipient is anything else, we just send it back
+ warn!(
+ "Received a message for unknown recipient {} from {}",
+ envelope.recipient(),
+ conn.peer_addr()
+ );
+
+ conn.writer()
+ .send(envelope)
+ .await
+ .expect("Could not send message back to source connection");
+ info!("Sent message back to original sender");
}
}
});
+++ /dev/null
-// This file is generated by rust-protobuf 2.20.0. Do not edit
-// @generated
-
-// https://github.com/rust-lang/rust-clippy/issues/702
-#![allow(unknown_lints)]
-#![allow(clippy::all)]
-
-#![allow(unused_attributes)]
-#![rustfmt::skip]
-
-#![allow(box_pointers)]
-#![allow(dead_code)]
-#![allow(missing_docs)]
-#![allow(non_camel_case_types)]
-#![allow(non_snake_case)]
-#![allow(non_upper_case_globals)]
-#![allow(trivial_casts)]
-#![allow(unused_imports)]
-#![allow(unused_results)]
-//! Generated file from `hello_world.proto`
-
-/// Generated files are compatible only with the same version
-/// of protobuf runtime.
-// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_20_0;
-
-#[derive(PartialEq,Clone,Default)]
-pub struct HelloWorld {
- // message fields
- pub message: ::std::string::String,
- // special fields
- pub unknown_fields: ::protobuf::UnknownFields,
- pub cached_size: ::protobuf::CachedSize,
-}
-
-impl<'a> ::std::default::Default for &'a HelloWorld {
- fn default() -> &'a HelloWorld {
- <HelloWorld as ::protobuf::Message>::default_instance()
- }
-}
-
-impl HelloWorld {
- pub fn new() -> HelloWorld {
- ::std::default::Default::default()
- }
-
- // string message = 1;
-
-
- pub fn get_message(&self) -> &str {
- &self.message
- }
- pub fn clear_message(&mut self) {
- self.message.clear();
- }
-
- // Param is passed by value, moved
- pub fn set_message(&mut self, v: ::std::string::String) {
- self.message = v;
- }
-
- // Mutable pointer to the field.
- // If field is not initialized, it is initialized with default value first.
- pub fn mut_message(&mut self) -> &mut ::std::string::String {
- &mut self.message
- }
-
- // Take field
- pub fn take_message(&mut self) -> ::std::string::String {
- ::std::mem::replace(&mut self.message, ::std::string::String::new())
- }
-}
-
-impl ::protobuf::Message for HelloWorld {
- fn is_initialized(&self) -> bool {
- true
- }
-
- fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> {
- while !is.eof()? {
- let (field_number, wire_type) = is.read_tag_unpack()?;
- match field_number {
- 1 => {
- ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.message)?;
- },
- _ => {
- ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
- },
- };
- }
- ::std::result::Result::Ok(())
- }
-
- // Compute sizes of nested messages
- #[allow(unused_variables)]
- fn compute_size(&self) -> u32 {
- let mut my_size = 0;
- if !self.message.is_empty() {
- my_size += ::protobuf::rt::string_size(1, &self.message);
- }
- my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
- self.cached_size.set(my_size);
- my_size
- }
-
- fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
- if !self.message.is_empty() {
- os.write_string(1, &self.message)?;
- }
- os.write_unknown_fields(self.get_unknown_fields())?;
- ::std::result::Result::Ok(())
- }
-
- fn get_cached_size(&self) -> u32 {
- self.cached_size.get()
- }
-
- fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
- &self.unknown_fields
- }
-
- fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
- &mut self.unknown_fields
- }
-
- fn as_any(&self) -> &dyn (::std::any::Any) {
- self as &dyn (::std::any::Any)
- }
- fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
- self as &mut dyn (::std::any::Any)
- }
- fn into_any(self: ::std::boxed::Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
- self
- }
-
- fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
- Self::descriptor_static()
- }
-
- fn new() -> HelloWorld {
- HelloWorld::new()
- }
-
- fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
- static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT;
- descriptor.get(|| {
- let mut fields = ::std::vec::Vec::new();
- fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
- "message",
- |m: &HelloWorld| { &m.message },
- |m: &mut HelloWorld| { &mut m.message },
- ));
- ::protobuf::reflect::MessageDescriptor::new_pb_name::<HelloWorld>(
- "HelloWorld",
- fields,
- file_descriptor_proto()
- )
- })
- }
-
- fn default_instance() -> &'static HelloWorld {
- static instance: ::protobuf::rt::LazyV2<HelloWorld> = ::protobuf::rt::LazyV2::INIT;
- instance.get(HelloWorld::new)
- }
-}
-
-impl ::protobuf::Clear for HelloWorld {
- fn clear(&mut self) {
- self.message.clear();
- self.unknown_fields.clear();
- }
-}
-
-impl ::std::fmt::Debug for HelloWorld {
- fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
- ::protobuf::text_format::fmt(self, f)
- }
-}
-
-impl ::protobuf::reflect::ProtobufValue for HelloWorld {
- fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
- ::protobuf::reflect::ReflectValueRef::Message(self)
- }
-}
-
-static file_descriptor_proto_data: &'static [u8] = b"\
- \n\x11hello_world.proto\x12\x0bhello_world\"*\n\nHelloWorld\x12\x1a\n\
- \x07message\x18\x01\x20\x01(\tR\x07messageB\0:\0B\0b\x06proto3\
-";
-
-static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;
-
-fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto {
- ::protobuf::Message::parse_from_bytes(file_descriptor_proto_data).unwrap()
-}
-
-pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto {
- file_descriptor_proto_lazy.get(|| {
- parse_descriptor_proto()
- })
-}
+++ /dev/null
-pub mod hello_world;
async-std = { version = "1.6.2", features = ["attributes"] }
env_logger = "0.7"
log = "0.4"
-protobuf = "2.18.1"
connect = { path = "../../" }
-
-[build-dependencies]
-protobuf-codegen-pure = "2.18.1"
+++ /dev/null
-fn main() -> Result<(), Box<dyn std::error::Error>> {
- protobuf_codegen_pure::Codegen::new()
- .out_dir("src/schema")
- .inputs(&["../tcp-client/schema/hello_world.proto"])
- .include("../tcp-client/schema")
- .run()
- .expect("Codegen failed.");
- Ok(())
-}
-mod schema;
-
-use crate::schema::hello_world::HelloWorld;
use connect::tls::rustls::ClientConfig;
-use connect::{Connection, SinkExt, StreamExt};
+use connect::{ConnectDatagram, Connection, SinkExt, StreamExt};
use log::*;
-use protobuf::well_known_types::Any;
use std::env;
#[async_std::main]
let mut conn = Connection::tls_client(ip_addr, &domain, client_config.into()).await?;
// send a message to the server
- let raw_msg = String::from("Hello world");
- info!("Sending message: {}", raw_msg);
-
- let mut msg = HelloWorld::new();
- msg.set_message(raw_msg);
+ let msg = String::from("Hello world");
+ info!("Sending message: {}", msg);
- conn.writer().send(msg).await?;
+ let envelope = ConnectDatagram::new(65535, msg.into_bytes())?;
+ conn.writer().send(envelope).await?;
// wait for the server to reply with an ack
- while let Some(reply) = conn.reader().next().await {
+ if let Some(mut reply) = conn.reader().next().await {
info!("Received message");
- let msg: HelloWorld = Any::unpack(&reply)?.unwrap();
+ let data = reply.take_data().unwrap();
+ let msg = String::from_utf8(data)?;
- info!("Unpacked reply: {}", msg.get_message());
+ info!("Unpacked reply: {}", msg);
}
Ok(())
+++ /dev/null
-// This file is generated by rust-protobuf 2.20.0. Do not edit
-// @generated
-
-// https://github.com/rust-lang/rust-clippy/issues/702
-#![allow(unknown_lints)]
-#![allow(clippy::all)]
-
-#![allow(unused_attributes)]
-#![rustfmt::skip]
-
-#![allow(box_pointers)]
-#![allow(dead_code)]
-#![allow(missing_docs)]
-#![allow(non_camel_case_types)]
-#![allow(non_snake_case)]
-#![allow(non_upper_case_globals)]
-#![allow(trivial_casts)]
-#![allow(unused_imports)]
-#![allow(unused_results)]
-//! Generated file from `hello_world.proto`
-
-/// Generated files are compatible only with the same version
-/// of protobuf runtime.
-// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_20_0;
-
-#[derive(PartialEq,Clone,Default)]
-pub struct HelloWorld {
- // message fields
- pub message: ::std::string::String,
- // special fields
- pub unknown_fields: ::protobuf::UnknownFields,
- pub cached_size: ::protobuf::CachedSize,
-}
-
-impl<'a> ::std::default::Default for &'a HelloWorld {
- fn default() -> &'a HelloWorld {
- <HelloWorld as ::protobuf::Message>::default_instance()
- }
-}
-
-impl HelloWorld {
- pub fn new() -> HelloWorld {
- ::std::default::Default::default()
- }
-
- // string message = 1;
-
-
- pub fn get_message(&self) -> &str {
- &self.message
- }
- pub fn clear_message(&mut self) {
- self.message.clear();
- }
-
- // Param is passed by value, moved
- pub fn set_message(&mut self, v: ::std::string::String) {
- self.message = v;
- }
-
- // Mutable pointer to the field.
- // If field is not initialized, it is initialized with default value first.
- pub fn mut_message(&mut self) -> &mut ::std::string::String {
- &mut self.message
- }
-
- // Take field
- pub fn take_message(&mut self) -> ::std::string::String {
- ::std::mem::replace(&mut self.message, ::std::string::String::new())
- }
-}
-
-impl ::protobuf::Message for HelloWorld {
- fn is_initialized(&self) -> bool {
- true
- }
-
- fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> {
- while !is.eof()? {
- let (field_number, wire_type) = is.read_tag_unpack()?;
- match field_number {
- 1 => {
- ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.message)?;
- },
- _ => {
- ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
- },
- };
- }
- ::std::result::Result::Ok(())
- }
-
- // Compute sizes of nested messages
- #[allow(unused_variables)]
- fn compute_size(&self) -> u32 {
- let mut my_size = 0;
- if !self.message.is_empty() {
- my_size += ::protobuf::rt::string_size(1, &self.message);
- }
- my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
- self.cached_size.set(my_size);
- my_size
- }
-
- fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
- if !self.message.is_empty() {
- os.write_string(1, &self.message)?;
- }
- os.write_unknown_fields(self.get_unknown_fields())?;
- ::std::result::Result::Ok(())
- }
-
- fn get_cached_size(&self) -> u32 {
- self.cached_size.get()
- }
-
- fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
- &self.unknown_fields
- }
-
- fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
- &mut self.unknown_fields
- }
-
- fn as_any(&self) -> &dyn (::std::any::Any) {
- self as &dyn (::std::any::Any)
- }
- fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
- self as &mut dyn (::std::any::Any)
- }
- fn into_any(self: ::std::boxed::Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
- self
- }
-
- fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
- Self::descriptor_static()
- }
-
- fn new() -> HelloWorld {
- HelloWorld::new()
- }
-
- fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
- static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT;
- descriptor.get(|| {
- let mut fields = ::std::vec::Vec::new();
- fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
- "message",
- |m: &HelloWorld| { &m.message },
- |m: &mut HelloWorld| { &mut m.message },
- ));
- ::protobuf::reflect::MessageDescriptor::new_pb_name::<HelloWorld>(
- "HelloWorld",
- fields,
- file_descriptor_proto()
- )
- })
- }
-
- fn default_instance() -> &'static HelloWorld {
- static instance: ::protobuf::rt::LazyV2<HelloWorld> = ::protobuf::rt::LazyV2::INIT;
- instance.get(HelloWorld::new)
- }
-}
-
-impl ::protobuf::Clear for HelloWorld {
- fn clear(&mut self) {
- self.message.clear();
- self.unknown_fields.clear();
- }
-}
-
-impl ::std::fmt::Debug for HelloWorld {
- fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
- ::protobuf::text_format::fmt(self, f)
- }
-}
-
-impl ::protobuf::reflect::ProtobufValue for HelloWorld {
- fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
- ::protobuf::reflect::ReflectValueRef::Message(self)
- }
-}
-
-static file_descriptor_proto_data: &'static [u8] = b"\
- \n\x11hello_world.proto\x12\x0bhello_world\"*\n\nHelloWorld\x12\x1a\n\
- \x07message\x18\x01\x20\x01(\tR\x07messageB\0:\0B\0b\x06proto3\
-";
-
-static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;
-
-fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto {
- ::protobuf::Message::parse_from_bytes(file_descriptor_proto_data).unwrap()
-}
-
-pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto {
- file_descriptor_proto_lazy.get(|| {
- parse_descriptor_proto()
- })
-}
+++ /dev/null
-pub mod hello_world;
async-std = { version = "1.6.2", features = ["attributes"] }
env_logger = "0.7"
log = "0.4"
-protobuf = "2.18.1"
connect = { path = "../../" }
-
-[build-dependencies]
-protobuf-codegen-pure = "2.18.1"
+++ /dev/null
-fn main() -> Result<(), Box<dyn std::error::Error>> {
- protobuf_codegen_pure::Codegen::new()
- .out_dir("src/schema")
- .inputs(&["../tcp-client/schema/hello_world.proto"])
- .include("../tcp-client/schema")
- .run()
- .expect("Codegen failed.");
- Ok(())
-}
-mod schema;
-
-use crate::schema::hello_world::HelloWorld;
use async_std::{io, task};
use connect::tls::rustls::internal::pemfile::{certs, rsa_private_keys};
use connect::tls::rustls::{NoClientAuth, ServerConfig};
use connect::tls::TlsListener;
-use connect::{SinkExt, StreamExt};
+use connect::{ConnectDatagram, SinkExt, StreamExt};
use log::*;
use std::env;
use std::fs::File;
info!("Handling connection from {}", conn.peer_addr());
task::spawn(async move {
- while let Some(msg) = conn.reader().next().await {
- if msg.is::<HelloWorld>() {
- if let Ok(Some(contents)) = msg.unpack::<HelloWorld>() {
- info!(
- "Received a message \"{}\" from {}",
- contents.get_message(),
- conn.peer_addr()
- );
-
- conn.writer()
- .send(contents)
- .await
- .expect("Could not send message back to source connection");
- info!("Sent message back to original sender");
- }
+ while let Some(mut envelope) = conn.reader().next().await {
+ // handle message based on intended recipient
+ if envelope.recipient() == 65535 {
+ // if recipient is 65535, we do custom processing
+ let data = envelope.take_data().unwrap();
+ let msg =
+ String::from_utf8(data).expect("could not build String from payload bytes");
+ info!("Received a message \"{}\" from {}", msg, conn.peer_addr());
+
+ let reply = ConnectDatagram::new(envelope.recipient(), msg.into_bytes())
+ .expect("could not construct new datagram from built String");
+
+ conn.writer()
+ .send(reply)
+ .await
+ .expect("Could not send message back to source connection");
+ info!("Sent message back to original sender");
} else {
- error!("Received a message of unknown type")
+ // if recipient is anything else, we just send it back
+ warn!(
+ "Received a message for unknown recipient {} from {}",
+ envelope.recipient(),
+ conn.peer_addr()
+ );
+
+ conn.writer()
+ .send(envelope)
+ .await
+ .expect("Could not send message back to source connection");
+ info!("Sent message back to original sender");
}
}
});
+++ /dev/null
-// This file is generated by rust-protobuf 2.20.0. Do not edit
-// @generated
-
-// https://github.com/rust-lang/rust-clippy/issues/702
-#![allow(unknown_lints)]
-#![allow(clippy::all)]
-
-#![allow(unused_attributes)]
-#![rustfmt::skip]
-
-#![allow(box_pointers)]
-#![allow(dead_code)]
-#![allow(missing_docs)]
-#![allow(non_camel_case_types)]
-#![allow(non_snake_case)]
-#![allow(non_upper_case_globals)]
-#![allow(trivial_casts)]
-#![allow(unused_imports)]
-#![allow(unused_results)]
-//! Generated file from `hello_world.proto`
-
-/// Generated files are compatible only with the same version
-/// of protobuf runtime.
-// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_20_0;
-
-#[derive(PartialEq,Clone,Default)]
-pub struct HelloWorld {
- // message fields
- pub message: ::std::string::String,
- // special fields
- pub unknown_fields: ::protobuf::UnknownFields,
- pub cached_size: ::protobuf::CachedSize,
-}
-
-impl<'a> ::std::default::Default for &'a HelloWorld {
- fn default() -> &'a HelloWorld {
- <HelloWorld as ::protobuf::Message>::default_instance()
- }
-}
-
-impl HelloWorld {
- pub fn new() -> HelloWorld {
- ::std::default::Default::default()
- }
-
- // string message = 1;
-
-
- pub fn get_message(&self) -> &str {
- &self.message
- }
- pub fn clear_message(&mut self) {
- self.message.clear();
- }
-
- // Param is passed by value, moved
- pub fn set_message(&mut self, v: ::std::string::String) {
- self.message = v;
- }
-
- // Mutable pointer to the field.
- // If field is not initialized, it is initialized with default value first.
- pub fn mut_message(&mut self) -> &mut ::std::string::String {
- &mut self.message
- }
-
- // Take field
- pub fn take_message(&mut self) -> ::std::string::String {
- ::std::mem::replace(&mut self.message, ::std::string::String::new())
- }
-}
-
-impl ::protobuf::Message for HelloWorld {
- fn is_initialized(&self) -> bool {
- true
- }
-
- fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> {
- while !is.eof()? {
- let (field_number, wire_type) = is.read_tag_unpack()?;
- match field_number {
- 1 => {
- ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.message)?;
- },
- _ => {
- ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
- },
- };
- }
- ::std::result::Result::Ok(())
- }
-
- // Compute sizes of nested messages
- #[allow(unused_variables)]
- fn compute_size(&self) -> u32 {
- let mut my_size = 0;
- if !self.message.is_empty() {
- my_size += ::protobuf::rt::string_size(1, &self.message);
- }
- my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
- self.cached_size.set(my_size);
- my_size
- }
-
- fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
- if !self.message.is_empty() {
- os.write_string(1, &self.message)?;
- }
- os.write_unknown_fields(self.get_unknown_fields())?;
- ::std::result::Result::Ok(())
- }
-
- fn get_cached_size(&self) -> u32 {
- self.cached_size.get()
- }
-
- fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
- &self.unknown_fields
- }
-
- fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
- &mut self.unknown_fields
- }
-
- fn as_any(&self) -> &dyn (::std::any::Any) {
- self as &dyn (::std::any::Any)
- }
- fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
- self as &mut dyn (::std::any::Any)
- }
- fn into_any(self: ::std::boxed::Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
- self
- }
-
- fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
- Self::descriptor_static()
- }
-
- fn new() -> HelloWorld {
- HelloWorld::new()
- }
-
- fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
- static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT;
- descriptor.get(|| {
- let mut fields = ::std::vec::Vec::new();
- fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
- "message",
- |m: &HelloWorld| { &m.message },
- |m: &mut HelloWorld| { &mut m.message },
- ));
- ::protobuf::reflect::MessageDescriptor::new_pb_name::<HelloWorld>(
- "HelloWorld",
- fields,
- file_descriptor_proto()
- )
- })
- }
-
- fn default_instance() -> &'static HelloWorld {
- static instance: ::protobuf::rt::LazyV2<HelloWorld> = ::protobuf::rt::LazyV2::INIT;
- instance.get(HelloWorld::new)
- }
-}
-
-impl ::protobuf::Clear for HelloWorld {
- fn clear(&mut self) {
- self.message.clear();
- self.unknown_fields.clear();
- }
-}
-
-impl ::std::fmt::Debug for HelloWorld {
- fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
- ::protobuf::text_format::fmt(self, f)
- }
-}
-
-impl ::protobuf::reflect::ProtobufValue for HelloWorld {
- fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
- ::protobuf::reflect::ReflectValueRef::Message(self)
- }
-}
-
-static file_descriptor_proto_data: &'static [u8] = b"\
- \n\x11hello_world.proto\x12\x0bhello_world\"*\n\nHelloWorld\x12\x1a\n\
- \x07message\x18\x01\x20\x01(\tR\x07messageB\0:\0B\0b\x06proto3\
-";
-
-static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;
-
-fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto {
- ::protobuf::Message::parse_from_bytes(file_descriptor_proto_data).unwrap()
-}
-
-pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto {
- file_descriptor_proto_lazy.get(|| {
- parse_descriptor_proto()
- })
-}
+++ /dev/null
-pub mod hello_world;
+++ /dev/null
-syntax = "proto3";
-package message;
-
-import "google/protobuf/any.proto";
-
-message ConnectionMessage {
- google.protobuf.Any payload = 1;
-}
//! Until the Rust ecosystem around reflection improves, the crate will use Protobuf to fill the
//! void.
+pub mod protocol;
mod reader;
-pub(crate) mod schema;
pub mod tcp;
pub mod tls;
mod writer;
+pub use crate::protocol::{ConnectDatagram, DatagramEmptyError};
pub use crate::reader::ConnectionReader;
pub use crate::writer::ConnectionWriter;
use async_std::{net::SocketAddr, pin::Pin};
--- /dev/null
+use std::error::Error;
+use std::io::Read;
+
+const VERSION: u8 = 1;
+
+#[derive(Debug, Clone)]
+pub struct DatagramEmptyError;
+
+impl Error for DatagramEmptyError {}
+
+impl std::fmt::Display for DatagramEmptyError {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ write!(
+ f,
+ "datagram cannot be constructed when provided payload is empty"
+ )
+ }
+}
+
+pub struct ConnectDatagram {
+ version: u8,
+ recipient: u16,
+ data: Option<Vec<u8>>,
+}
+
+impl ConnectDatagram {
+ pub fn new(recipient: u16, data: Vec<u8>) -> Result<Self, DatagramEmptyError> {
+ if data.len() > 0 {
+ Ok(Self {
+ version: VERSION,
+ recipient,
+ data: Some(data),
+ })
+ } else {
+ Err(DatagramEmptyError)
+ }
+ }
+
+ pub fn version(&self) -> u8 {
+ self.version
+ }
+
+ pub fn recipient(&self) -> u16 {
+ self.recipient
+ }
+
+ pub fn data(&self) -> Option<&Vec<u8>> {
+ self.data.as_ref()
+ }
+
+ pub fn take_data(&mut self) -> Option<Vec<u8>> {
+ self.data.take()
+ }
+
+ pub fn size(&self) -> usize {
+ let data_len = if let Some(data) = self.data() {
+ data.len()
+ } else {
+ 0
+ };
+
+ 3 + data_len
+ }
+
+ pub fn bytes(&self) -> Vec<u8> {
+ let mut bytes = Vec::with_capacity(self.size());
+
+ bytes.extend(&self.version.to_be_bytes());
+ bytes.extend(&self.recipient.to_be_bytes());
+
+ if let Some(data) = self.data() {
+ bytes.extend(data.as_slice());
+ }
+
+ return bytes;
+ }
+
+ pub fn encode(&self) -> Vec<u8> {
+ let size: u32 = (self.size()) as u32;
+
+ let mut bytes = Vec::from(size.to_be_bytes());
+ bytes.extend(self.bytes());
+
+ return bytes;
+ }
+
+ pub fn decode(source: &mut (dyn Read + Send + Sync)) -> anyhow::Result<Self> {
+ // payload size
+ let mut payload_size_bytes: [u8; 4] = [0; 4];
+ source.read_exact(&mut payload_size_bytes)?;
+ let payload_size = u32::from_be_bytes(payload_size_bytes);
+
+ // read whole payload
+ let mut payload_bytes = vec![0; payload_size as usize];
+ source.read_exact(payload_bytes.as_mut_slice())?;
+
+ // version
+ let version_bytes = payload_bytes.remove(0);
+ let version = u8::from_be(version_bytes);
+
+ // recipient
+ let mut recipient_bytes: [u8; 2] = [0; 2];
+ for i in 0..recipient_bytes.len() {
+ recipient_bytes[i] = payload_bytes.remove(0);
+ }
+ let recipient = u16::from_be_bytes(recipient_bytes);
+
+ // data
+ let data = payload_bytes;
+
+ if data.len() > 0 {
+ Ok(Self {
+ version,
+ recipient,
+ data: Some(data),
+ })
+ } else {
+ Err(anyhow::Error::from(DatagramEmptyError))
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::protocol::ConnectDatagram;
+ use std::io::Cursor;
+
+ #[test]
+ fn encoded_size() -> anyhow::Result<()> {
+ let mut data = Vec::new();
+ for _ in 0..5 {
+ data.push(1);
+ }
+ assert_eq!(5, data.len());
+
+ let sample = ConnectDatagram::new(1, data)?;
+ assert_eq!(7 + 5, sample.encode().len());
+
+ Ok(())
+ }
+
+ #[test]
+ fn take_data() -> anyhow::Result<()> {
+ let mut data = Vec::new();
+ for _ in 0..5 {
+ data.push(1);
+ }
+
+ let mut sample = ConnectDatagram::new(1, data)?;
+
+ let taken_data = sample.take_data().unwrap();
+ assert!(sample.data().is_none());
+ assert_eq!(5, taken_data.len());
+
+ Ok(())
+ }
+
+ #[async_std::test]
+ async fn encode_and_decode() -> anyhow::Result<()> {
+ let mut data = Vec::new();
+ for _ in 0..5 {
+ data.push(1);
+ }
+ assert_eq!(5, data.len());
+
+ let sample = ConnectDatagram::new(1, data)?;
+
+ let mut payload = sample.encode();
+ assert_eq!(7 + 5, payload.len());
+
+ let mut cursor: Cursor<&mut [u8]> = Cursor::new(payload.as_mut());
+ let sample_back_res = ConnectDatagram::decode(&mut cursor);
+ assert!(sample_back_res.is_ok());
+
+ let sample_back = sample_back_res.unwrap();
+ assert_eq!(sample_back.version(), 1);
+ assert_eq!(sample_back.recipient(), 1);
+ assert_eq!(sample_back.data().unwrap().len(), 5);
+
+ Ok(())
+ }
+}
-use crate::schema::ConnectionMessage;
use async_std::net::SocketAddr;
use async_std::pin::Pin;
use bytes::{Buf, BytesMut};
use futures::task::{Context, Poll};
use futures::{AsyncRead, Stream};
use log::*;
-use protobuf::Message;
-use std::convert::TryInto;
+use crate::protocol::ConnectDatagram;
pub use futures::SinkExt;
pub use futures::StreamExt;
-use protobuf::well_known_types::Any;
+use std::io::Cursor;
/// A default buffer size to read in bytes and then deserialize as messages
const BUFFER_SIZE: usize = 8192;
}
impl Stream for ConnectionReader {
- type Item = Any;
+ type Item = ConnectDatagram;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut buffer = BytesMut::new();
buffer = pending_buf;
}
- let mut bytes_read_u64: u64 = bytes_read.try_into().expect(
- format!("Conversion from usize ({}) to u64 failed", bytes_read).as_str(),
- );
- while bytes_read_u64 > 0 {
- trace!(
- "{} bytes from network stream still unprocessed",
- bytes_read_u64
- );
+ while bytes_read > 0 {
+ trace!("{} bytes from network stream still unprocessed", bytes_read);
buffer.resize(bytes_read, 0);
- match ConnectionMessage::parse_from_bytes(buffer.as_ref()) {
- Ok(mut data) => {
- let serialized_size = data.compute_size();
- trace!("Deserialized message of size {} bytes", serialized_size);
-
- buffer.advance(serialized_size as usize);
-
- let serialized_size_u64: u64 = serialized_size.try_into().expect(
- format!(
- "Conversion from usize ({}) to u64 failed",
- serialized_size
- )
- .as_str(),
- );
- bytes_read_u64 -= serialized_size_u64;
- trace!("{} bytes still unprocessed", bytes_read_u64);
-
- trace!("Sending deserialized message downstream");
- return Poll::Ready(Some(data.take_payload()));
+ let mut cursor = Cursor::new(buffer.as_mut());
+ match ConnectDatagram::decode(&mut cursor) {
+ Ok(data) => {
+ return match data.version() {
+ _ => {
+ let serialized_size = data.size();
+ trace!(
+ "Deserialized message of size {} bytes",
+ serialized_size
+ );
+
+ buffer.advance(serialized_size);
+ bytes_read -= serialized_size;
+ trace!("{} bytes still unprocessed", bytes_read);
+
+ trace!("Sending deserialized message downstream");
+ Poll::Ready(Some(data))
+ }
+ }
}
Err(err) => {
+++ /dev/null
-// This file is generated by rust-protobuf 2.20.0. Do not edit
-// @generated
-
-// https://github.com/rust-lang/rust-clippy/issues/702
-#![allow(unknown_lints)]
-#![allow(clippy::all)]
-
-#![allow(unused_attributes)]
-#![rustfmt::skip]
-
-#![allow(box_pointers)]
-#![allow(dead_code)]
-#![allow(missing_docs)]
-#![allow(non_camel_case_types)]
-#![allow(non_snake_case)]
-#![allow(non_upper_case_globals)]
-#![allow(trivial_casts)]
-#![allow(unused_imports)]
-#![allow(unused_results)]
-//! Generated file from `message.proto`
-
-/// Generated files are compatible only with the same version
-/// of protobuf runtime.
-// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_20_0;
-
-#[derive(PartialEq,Clone,Default)]
-pub struct ConnectionMessage {
- // message fields
- pub payload: ::protobuf::SingularPtrField<::protobuf::well_known_types::Any>,
- // special fields
- pub unknown_fields: ::protobuf::UnknownFields,
- pub cached_size: ::protobuf::CachedSize,
-}
-
-impl<'a> ::std::default::Default for &'a ConnectionMessage {
- fn default() -> &'a ConnectionMessage {
- <ConnectionMessage as ::protobuf::Message>::default_instance()
- }
-}
-
-impl ConnectionMessage {
- pub fn new() -> ConnectionMessage {
- ::std::default::Default::default()
- }
-
- // .google.protobuf.Any payload = 1;
-
-
- pub fn get_payload(&self) -> &::protobuf::well_known_types::Any {
- self.payload.as_ref().unwrap_or_else(|| <::protobuf::well_known_types::Any as ::protobuf::Message>::default_instance())
- }
- pub fn clear_payload(&mut self) {
- self.payload.clear();
- }
-
- pub fn has_payload(&self) -> bool {
- self.payload.is_some()
- }
-
- // Param is passed by value, moved
- pub fn set_payload(&mut self, v: ::protobuf::well_known_types::Any) {
- self.payload = ::protobuf::SingularPtrField::some(v);
- }
-
- // Mutable pointer to the field.
- // If field is not initialized, it is initialized with default value first.
- pub fn mut_payload(&mut self) -> &mut ::protobuf::well_known_types::Any {
- if self.payload.is_none() {
- self.payload.set_default();
- }
- self.payload.as_mut().unwrap()
- }
-
- // Take field
- pub fn take_payload(&mut self) -> ::protobuf::well_known_types::Any {
- self.payload.take().unwrap_or_else(|| ::protobuf::well_known_types::Any::new())
- }
-}
-
-impl ::protobuf::Message for ConnectionMessage {
- fn is_initialized(&self) -> bool {
- for v in &self.payload {
- if !v.is_initialized() {
- return false;
- }
- };
- true
- }
-
- fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> {
- while !is.eof()? {
- let (field_number, wire_type) = is.read_tag_unpack()?;
- match field_number {
- 1 => {
- ::protobuf::rt::read_singular_message_into(wire_type, is, &mut self.payload)?;
- },
- _ => {
- ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
- },
- };
- }
- ::std::result::Result::Ok(())
- }
-
- // Compute sizes of nested messages
- #[allow(unused_variables)]
- fn compute_size(&self) -> u32 {
- let mut my_size = 0;
- if let Some(ref v) = self.payload.as_ref() {
- let len = v.compute_size();
- my_size += 1 + ::protobuf::rt::compute_raw_varint32_size(len) + len;
- }
- my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
- self.cached_size.set(my_size);
- my_size
- }
-
- fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
- if let Some(ref v) = self.payload.as_ref() {
- os.write_tag(1, ::protobuf::wire_format::WireTypeLengthDelimited)?;
- os.write_raw_varint32(v.get_cached_size())?;
- v.write_to_with_cached_sizes(os)?;
- }
- os.write_unknown_fields(self.get_unknown_fields())?;
- ::std::result::Result::Ok(())
- }
-
- fn get_cached_size(&self) -> u32 {
- self.cached_size.get()
- }
-
- fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
- &self.unknown_fields
- }
-
- fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
- &mut self.unknown_fields
- }
-
- fn as_any(&self) -> &dyn (::std::any::Any) {
- self as &dyn (::std::any::Any)
- }
- fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
- self as &mut dyn (::std::any::Any)
- }
- fn into_any(self: ::std::boxed::Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
- self
- }
-
- fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
- Self::descriptor_static()
- }
-
- fn new() -> ConnectionMessage {
- ConnectionMessage::new()
- }
-
- fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
- static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT;
- descriptor.get(|| {
- let mut fields = ::std::vec::Vec::new();
- fields.push(::protobuf::reflect::accessor::make_singular_ptr_field_accessor::<_, ::protobuf::types::ProtobufTypeMessage<::protobuf::well_known_types::Any>>(
- "payload",
- |m: &ConnectionMessage| { &m.payload },
- |m: &mut ConnectionMessage| { &mut m.payload },
- ));
- ::protobuf::reflect::MessageDescriptor::new_pb_name::<ConnectionMessage>(
- "ConnectionMessage",
- fields,
- file_descriptor_proto()
- )
- })
- }
-
- fn default_instance() -> &'static ConnectionMessage {
- static instance: ::protobuf::rt::LazyV2<ConnectionMessage> = ::protobuf::rt::LazyV2::INIT;
- instance.get(ConnectionMessage::new)
- }
-}
-
-impl ::protobuf::Clear for ConnectionMessage {
- fn clear(&mut self) {
- self.payload.clear();
- self.unknown_fields.clear();
- }
-}
-
-impl ::std::fmt::Debug for ConnectionMessage {
- fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
- ::protobuf::text_format::fmt(self, f)
- }
-}
-
-impl ::protobuf::reflect::ProtobufValue for ConnectionMessage {
- fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
- ::protobuf::reflect::ReflectValueRef::Message(self)
- }
-}
-
-static file_descriptor_proto_data: &'static [u8] = b"\
- \n\rmessage.proto\x12\x07message\x1a\x19google/protobuf/any.proto\"G\n\
- \x11ConnectionMessage\x120\n\x07payload\x18\x01\x20\x01(\x0b2\x14.google\
- .protobuf.AnyR\x07payloadB\0:\0B\0b\x06proto3\
-";
-
-static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;
-
-fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto {
- ::protobuf::Message::parse_from_bytes(file_descriptor_proto_data).unwrap()
-}
-
-pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto {
- file_descriptor_proto_lazy.get(|| {
- parse_descriptor_proto()
- })
-}
+++ /dev/null
-mod message;
-
-pub use message::ConnectionMessage;
-use protobuf::well_known_types::Any;
-use protobuf::Message;
-
-impl ConnectionMessage {
- pub(crate) fn from_msg<M: Message>(msg: M) -> Self {
- let mut sm = Self::new();
- let payload = Any::pack(&msg).expect("Protobuf Message could not be packed into Any type");
-
- sm.set_payload(payload);
- return sm;
- }
-}
-use crate::schema::ConnectionMessage;
use async_channel::RecvError;
use async_std::net::SocketAddr;
use async_std::pin::Pin;
use futures::task::{Context, Poll};
use futures::{AsyncWrite, Sink};
use log::*;
-use protobuf::Message;
+use crate::protocol::ConnectDatagram;
pub use futures::SinkExt;
pub use futures::StreamExt;
pub fn is_closed(&self) -> bool {
self.closed
}
-}
-
-impl<M: Message> Sink<M> for ConnectionWriter {
- type Error = RecvError;
-
- fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- if self.is_closed() {
- trace!("Connection is closed, cannot send message");
- Poll::Ready(Err(RecvError))
- } else {
- trace!("Connection ready to send message");
- Poll::Ready(Ok(()))
- }
- }
- fn start_send(mut self: Pin<&mut Self>, item: M) -> Result<(), Self::Error> {
- trace!("Preparing message to be sent next");
- let msg: ConnectionMessage = ConnectionMessage::from_msg(item);
-
- if let Ok(buffer) = msg.write_to_bytes() {
- let msg_size = buffer.len();
- trace!("Serialized pending message into {} bytes", msg_size);
-
- self.pending_writes.push(buffer);
-
- Ok(())
- } else {
- error!("Encountered error when serializing message to bytes");
- Err(RecvError)
- }
- }
-
- fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ pub(crate) fn write_pending_bytes(
+ &mut self,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), RecvError>> {
if self.pending_writes.len() > 0 {
let stream = self.write_stream.as_mut();
Poll::Ready(Ok(()))
}
}
+}
- fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- self.closed = true;
-
- let flush = if self.pending_writes.len() > 0 {
- let stream = self.write_stream.as_mut();
+impl Sink<ConnectDatagram> for ConnectionWriter {
+ type Error = RecvError;
- match stream.poll_flush(cx) {
- Poll::Pending => Poll::Pending,
+ fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ if self.is_closed() {
+ trace!("Connection is closed, cannot send message");
+ Poll::Ready(Err(RecvError))
+ } else {
+ trace!("Connection ready to send message");
+ Poll::Ready(Ok(()))
+ }
+ }
- Poll::Ready(Ok(_)) => {
- trace!("Sending pending bytes");
+ fn start_send(mut self: Pin<&mut Self>, item: ConnectDatagram) -> Result<(), Self::Error> {
+ trace!("Preparing message to be sent next");
- let pending = self.pending_writes.split_off(0);
- let writeable_vec: Vec<IoSlice> =
- pending.iter().map(|p| IoSlice::new(p)).collect();
+ let buffer = item.encode();
+ let msg_size = buffer.len();
+ trace!("Serialized pending message into {} bytes", msg_size);
- let stream = self.write_stream.as_mut();
- match stream.poll_write_vectored(cx, writeable_vec.as_slice()) {
- Poll::Pending => Poll::Pending,
+ self.pending_writes.push(buffer);
- Poll::Ready(Ok(bytes_written)) => {
- trace!("Wrote {} bytes to network stream", bytes_written);
- Poll::Ready(Ok(()))
- }
+ Ok(())
+ }
- Poll::Ready(Err(_e)) => {
- error!("Encountered error when writing to network stream");
- Poll::Ready(Err(RecvError))
- }
- }
- }
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.write_pending_bytes(cx)
+ }
- Poll::Ready(Err(_e)) => {
- error!("Encountered error when flushing network stream");
- Poll::Ready(Err(RecvError))
- }
- }
- } else {
- Poll::Ready(Ok(()))
- };
+ fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.closed = true;
- match flush {
+ match self.write_pending_bytes(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(_)) => {