From b26c9c8b627388db15a182ff3a0b13340d3ccac0 Mon Sep 17 00:00:00 2001 From: Sachandhan Ganesh Date: Fri, 12 Feb 2021 15:52:39 -0800 Subject: [PATCH] remove dependency on protobuf and introduce basic custom wire format --- Cargo.toml | 13 +- build.rs | 14 -- examples/tcp-client/Cargo.toml | 4 - examples/tcp-client/build.rs | 9 - examples/tcp-client/schema/hello_world.proto | 6 - examples/tcp-client/src/main.rs | 23 +- examples/tcp-client/src/schema/hello_world.rs | 200 ---------------- examples/tcp-client/src/schema/mod.rs | 1 - examples/tcp-echo-server/Cargo.toml | 4 - examples/tcp-echo-server/build.rs | 9 - examples/tcp-echo-server/src/main.rs | 50 ++-- .../tcp-echo-server/src/schema/hello_world.rs | 200 ---------------- examples/tcp-echo-server/src/schema/mod.rs | 1 - examples/tls-client/Cargo.toml | 4 - examples/tls-client/build.rs | 9 - examples/tls-client/src/main.rs | 23 +- examples/tls-client/src/schema/hello_world.rs | 200 ---------------- examples/tls-client/src/schema/mod.rs | 1 - examples/tls-echo-server/Cargo.toml | 4 - examples/tls-echo-server/build.rs | 9 - examples/tls-echo-server/src/main.rs | 50 ++-- .../tls-echo-server/src/schema/hello_world.rs | 200 ---------------- examples/tls-echo-server/src/schema/mod.rs | 1 - schema/message.proto | 8 - src/lib.rs | 3 +- src/protocol.rs | 182 +++++++++++++++ src/reader.rs | 56 ++--- src/schema/message.rs | 216 ------------------ src/schema/mod.rs | 15 -- src/writer.rs | 99 +++----- 30 files changed, 320 insertions(+), 1294 deletions(-) delete mode 100644 build.rs delete mode 100644 examples/tcp-client/build.rs delete mode 100644 examples/tcp-client/schema/hello_world.proto delete mode 100644 examples/tcp-client/src/schema/hello_world.rs delete mode 100644 examples/tcp-client/src/schema/mod.rs delete mode 100644 examples/tcp-echo-server/build.rs delete mode 100644 examples/tcp-echo-server/src/schema/hello_world.rs delete mode 100644 examples/tcp-echo-server/src/schema/mod.rs delete mode 100644 examples/tls-client/build.rs delete mode 100644 examples/tls-client/src/schema/hello_world.rs delete mode 100644 examples/tls-client/src/schema/mod.rs delete mode 100644 examples/tls-echo-server/build.rs delete mode 100644 examples/tls-echo-server/src/schema/hello_world.rs delete mode 100644 examples/tls-echo-server/src/schema/mod.rs delete mode 100644 schema/message.proto create mode 100644 src/protocol.rs delete mode 100644 src/schema/message.rs delete mode 100644 src/schema/mod.rs diff --git a/Cargo.toml b/Cargo.toml index c150636..999615a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "connect" -version = "0.1.2" +version = "0.1.3" edition = "2018" authors = ["Sachandhan Ganesh "] description = "message queue abstraction over async network streams" @@ -13,23 +13,16 @@ license = "Apache-2.0" [workspace] members = [ - "examples/tcp-client", - "examples/tcp-echo-server", - "examples/tls-client", - "examples/tls-echo-server", + "examples/*", ] [dependencies] anyhow = "1.0.31" async-channel = "1.4.0" -async-std = { version = "1.6.2", features = ["unstable"] } +async-std = { version = "1.6.2", features = ["attributes", "unstable"] } async-tls = { version = "0.9.0", default-features = false, features = ["client", "server"]} bytes = "0.5.5" futures = "0.3.8" log = "0.4" -protobuf = "2.18.1" rustls = "0.18.0" - -[build-dependencies] -protobuf-codegen-pure = "2.18.1" diff --git a/build.rs b/build.rs deleted file mode 100644 index 31cc875..0000000 --- a/build.rs +++ /dev/null @@ -1,14 +0,0 @@ -use std::env; - -fn main() -> Result<(), Box> { - if Ok("dev".to_owned()) == env::var("PROFILE") { - protobuf_codegen_pure::Codegen::new() - .out_dir("src/schema") - .inputs(&["schema/message.proto"]) - .include("schema") - .run() - .expect("Codegen failed."); - } - - Ok(()) -} diff --git a/examples/tcp-client/Cargo.toml b/examples/tcp-client/Cargo.toml index 1fb8e2b..4097809 100644 --- a/examples/tcp-client/Cargo.toml +++ b/examples/tcp-client/Cargo.toml @@ -11,9 +11,5 @@ anyhow = "1.0.31" async-std = { version = "1.6.2", features = ["attributes"] } env_logger = "0.7" log = "0.4" -protobuf = "2.18.1" connect = { path = "../../" } - -[build-dependencies] -protobuf-codegen-pure = "2.18.1" diff --git a/examples/tcp-client/build.rs b/examples/tcp-client/build.rs deleted file mode 100644 index aa2ff90..0000000 --- a/examples/tcp-client/build.rs +++ /dev/null @@ -1,9 +0,0 @@ -fn main() -> Result<(), Box> { - protobuf_codegen_pure::Codegen::new() - .out_dir("src/schema") - .inputs(&["schema/hello_world.proto"]) - .include("schema") - .run() - .expect("Codegen failed."); - Ok(()) -} diff --git a/examples/tcp-client/schema/hello_world.proto b/examples/tcp-client/schema/hello_world.proto deleted file mode 100644 index 5ffe7b2..0000000 --- a/examples/tcp-client/schema/hello_world.proto +++ /dev/null @@ -1,6 +0,0 @@ -syntax = "proto3"; -package hello_world; - -message HelloWorld { - string message = 1; -} \ No newline at end of file diff --git a/examples/tcp-client/src/main.rs b/examples/tcp-client/src/main.rs index c65a455..5e15ada 100644 --- a/examples/tcp-client/src/main.rs +++ b/examples/tcp-client/src/main.rs @@ -1,9 +1,5 @@ -pub mod schema; - -use crate::schema::hello_world::HelloWorld; -use connect::{Connection, SinkExt, StreamExt}; +use connect::{ConnectDatagram, Connection, SinkExt, StreamExt}; use log::*; -use protobuf::well_known_types::Any; use std::env; #[async_std::main] @@ -24,21 +20,20 @@ async fn main() -> anyhow::Result<()> { let mut conn = Connection::tcp_client(ip_address).await?; // send a message to the server - let raw_msg = String::from("Hello world"); - - let mut msg = HelloWorld::new(); - msg.set_message(raw_msg.clone()); + let msg = String::from("Hello world"); + info!("Sending message: {}", msg); - conn.writer().send(msg).await?; - info!("Sent message: {}", raw_msg); + let envelope = ConnectDatagram::new(65535, msg.into_bytes())?; + conn.writer().send(envelope).await?; // wait for the server to reply with an ack - while let Some(reply) = conn.reader().next().await { + if let Some(mut reply) = conn.reader().next().await { info!("Received message"); - let msg: HelloWorld = Any::unpack(&reply)?.unwrap(); + let data = reply.take_data().unwrap(); + let msg = String::from_utf8(data)?; - info!("Unpacked reply: {}", msg.get_message()); + info!("Unpacked reply: {}", msg); } Ok(()) diff --git a/examples/tcp-client/src/schema/hello_world.rs b/examples/tcp-client/src/schema/hello_world.rs deleted file mode 100644 index 1af771c..0000000 --- a/examples/tcp-client/src/schema/hello_world.rs +++ /dev/null @@ -1,200 +0,0 @@ -// This file is generated by rust-protobuf 2.20.0. Do not edit -// @generated - -// https://github.com/rust-lang/rust-clippy/issues/702 -#![allow(unknown_lints)] -#![allow(clippy::all)] - -#![allow(unused_attributes)] -#![rustfmt::skip] - -#![allow(box_pointers)] -#![allow(dead_code)] -#![allow(missing_docs)] -#![allow(non_camel_case_types)] -#![allow(non_snake_case)] -#![allow(non_upper_case_globals)] -#![allow(trivial_casts)] -#![allow(unused_imports)] -#![allow(unused_results)] -//! Generated file from `hello_world.proto` - -/// Generated files are compatible only with the same version -/// of protobuf runtime. -// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_20_0; - -#[derive(PartialEq,Clone,Default)] -pub struct HelloWorld { - // message fields - pub message: ::std::string::String, - // special fields - pub unknown_fields: ::protobuf::UnknownFields, - pub cached_size: ::protobuf::CachedSize, -} - -impl<'a> ::std::default::Default for &'a HelloWorld { - fn default() -> &'a HelloWorld { - ::default_instance() - } -} - -impl HelloWorld { - pub fn new() -> HelloWorld { - ::std::default::Default::default() - } - - // string message = 1; - - - pub fn get_message(&self) -> &str { - &self.message - } - pub fn clear_message(&mut self) { - self.message.clear(); - } - - // Param is passed by value, moved - pub fn set_message(&mut self, v: ::std::string::String) { - self.message = v; - } - - // Mutable pointer to the field. - // If field is not initialized, it is initialized with default value first. - pub fn mut_message(&mut self) -> &mut ::std::string::String { - &mut self.message - } - - // Take field - pub fn take_message(&mut self) -> ::std::string::String { - ::std::mem::replace(&mut self.message, ::std::string::String::new()) - } -} - -impl ::protobuf::Message for HelloWorld { - fn is_initialized(&self) -> bool { - true - } - - fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> { - while !is.eof()? { - let (field_number, wire_type) = is.read_tag_unpack()?; - match field_number { - 1 => { - ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.message)?; - }, - _ => { - ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; - }, - }; - } - ::std::result::Result::Ok(()) - } - - // Compute sizes of nested messages - #[allow(unused_variables)] - fn compute_size(&self) -> u32 { - let mut my_size = 0; - if !self.message.is_empty() { - my_size += ::protobuf::rt::string_size(1, &self.message); - } - my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); - self.cached_size.set(my_size); - my_size - } - - fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> { - if !self.message.is_empty() { - os.write_string(1, &self.message)?; - } - os.write_unknown_fields(self.get_unknown_fields())?; - ::std::result::Result::Ok(()) - } - - fn get_cached_size(&self) -> u32 { - self.cached_size.get() - } - - fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { - &self.unknown_fields - } - - fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { - &mut self.unknown_fields - } - - fn as_any(&self) -> &dyn (::std::any::Any) { - self as &dyn (::std::any::Any) - } - fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) { - self as &mut dyn (::std::any::Any) - } - fn into_any(self: ::std::boxed::Box) -> ::std::boxed::Box { - self - } - - fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { - Self::descriptor_static() - } - - fn new() -> HelloWorld { - HelloWorld::new() - } - - fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor { - static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT; - descriptor.get(|| { - let mut fields = ::std::vec::Vec::new(); - fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( - "message", - |m: &HelloWorld| { &m.message }, - |m: &mut HelloWorld| { &mut m.message }, - )); - ::protobuf::reflect::MessageDescriptor::new_pb_name::( - "HelloWorld", - fields, - file_descriptor_proto() - ) - }) - } - - fn default_instance() -> &'static HelloWorld { - static instance: ::protobuf::rt::LazyV2 = ::protobuf::rt::LazyV2::INIT; - instance.get(HelloWorld::new) - } -} - -impl ::protobuf::Clear for HelloWorld { - fn clear(&mut self) { - self.message.clear(); - self.unknown_fields.clear(); - } -} - -impl ::std::fmt::Debug for HelloWorld { - fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { - ::protobuf::text_format::fmt(self, f) - } -} - -impl ::protobuf::reflect::ProtobufValue for HelloWorld { - fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { - ::protobuf::reflect::ReflectValueRef::Message(self) - } -} - -static file_descriptor_proto_data: &'static [u8] = b"\ - \n\x11hello_world.proto\x12\x0bhello_world\"*\n\nHelloWorld\x12\x1a\n\ - \x07message\x18\x01\x20\x01(\tR\x07messageB\0:\0B\0b\x06proto3\ -"; - -static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; - -fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto { - ::protobuf::Message::parse_from_bytes(file_descriptor_proto_data).unwrap() -} - -pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto { - file_descriptor_proto_lazy.get(|| { - parse_descriptor_proto() - }) -} diff --git a/examples/tcp-client/src/schema/mod.rs b/examples/tcp-client/src/schema/mod.rs deleted file mode 100644 index c6dbc18..0000000 --- a/examples/tcp-client/src/schema/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod hello_world; diff --git a/examples/tcp-echo-server/Cargo.toml b/examples/tcp-echo-server/Cargo.toml index 29af62d..e4417c7 100644 --- a/examples/tcp-echo-server/Cargo.toml +++ b/examples/tcp-echo-server/Cargo.toml @@ -11,9 +11,5 @@ anyhow = "1.0.31" async-std = { version = "1.6.2", features = ["attributes"] } env_logger = "0.7" log = "0.4" -protobuf = "2.18.1" connect = { path = "../../" } - -[build-dependencies] -protobuf-codegen-pure = "2.18.1" diff --git a/examples/tcp-echo-server/build.rs b/examples/tcp-echo-server/build.rs deleted file mode 100644 index ae9222f..0000000 --- a/examples/tcp-echo-server/build.rs +++ /dev/null @@ -1,9 +0,0 @@ -fn main() -> Result<(), Box> { - protobuf_codegen_pure::Codegen::new() - .out_dir("src/schema") - .inputs(&["../tcp-client/schema/hello_world.proto"]) - .include("../tcp-client/schema") - .run() - .expect("Codegen failed."); - Ok(()) -} diff --git a/examples/tcp-echo-server/src/main.rs b/examples/tcp-echo-server/src/main.rs index 4d7bfde..04e1e12 100644 --- a/examples/tcp-echo-server/src/main.rs +++ b/examples/tcp-echo-server/src/main.rs @@ -1,9 +1,6 @@ -mod schema; - -use crate::schema::hello_world::HelloWorld; use async_std::task; use connect::tcp::TcpListener; -use connect::{SinkExt, StreamExt}; +use connect::{ConnectDatagram, SinkExt, StreamExt}; use log::*; use std::env; @@ -31,23 +28,36 @@ async fn main() -> anyhow::Result<()> { info!("Handling connection from {}", conn.peer_addr()); task::spawn(async move { - while let Some(msg) = conn.reader().next().await { - if msg.is::() { - if let Ok(Some(contents)) = msg.unpack::() { - info!( - "Received a message \"{}\" from {}", - contents.get_message(), - conn.peer_addr() - ); - - conn.writer() - .send(contents) - .await - .expect("Could not send message back to source connection"); - info!("Sent message back to original sender"); - } + while let Some(mut envelope) = conn.reader().next().await { + // handle message based on intended recipient + if envelope.recipient() == 65535 { + // if recipient is 65535, we do custom processing + let data = envelope.take_data().unwrap(); + let msg = + String::from_utf8(data).expect("could not build String from payload bytes"); + info!("Received a message \"{}\" from {}", msg, conn.peer_addr()); + + let reply = ConnectDatagram::new(envelope.recipient(), msg.into_bytes()) + .expect("could not construct new datagram from built String"); + + conn.writer() + .send(reply) + .await + .expect("Could not send message back to source connection"); + info!("Sent message back to original sender"); } else { - error!("Received a message of unknown type") + // if recipient is anything else, we just send it back + warn!( + "Received a message for unknown recipient {} from {}", + envelope.recipient(), + conn.peer_addr() + ); + + conn.writer() + .send(envelope) + .await + .expect("Could not send message back to source connection"); + info!("Sent message back to original sender"); } } }); diff --git a/examples/tcp-echo-server/src/schema/hello_world.rs b/examples/tcp-echo-server/src/schema/hello_world.rs deleted file mode 100644 index 1af771c..0000000 --- a/examples/tcp-echo-server/src/schema/hello_world.rs +++ /dev/null @@ -1,200 +0,0 @@ -// This file is generated by rust-protobuf 2.20.0. Do not edit -// @generated - -// https://github.com/rust-lang/rust-clippy/issues/702 -#![allow(unknown_lints)] -#![allow(clippy::all)] - -#![allow(unused_attributes)] -#![rustfmt::skip] - -#![allow(box_pointers)] -#![allow(dead_code)] -#![allow(missing_docs)] -#![allow(non_camel_case_types)] -#![allow(non_snake_case)] -#![allow(non_upper_case_globals)] -#![allow(trivial_casts)] -#![allow(unused_imports)] -#![allow(unused_results)] -//! Generated file from `hello_world.proto` - -/// Generated files are compatible only with the same version -/// of protobuf runtime. -// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_20_0; - -#[derive(PartialEq,Clone,Default)] -pub struct HelloWorld { - // message fields - pub message: ::std::string::String, - // special fields - pub unknown_fields: ::protobuf::UnknownFields, - pub cached_size: ::protobuf::CachedSize, -} - -impl<'a> ::std::default::Default for &'a HelloWorld { - fn default() -> &'a HelloWorld { - ::default_instance() - } -} - -impl HelloWorld { - pub fn new() -> HelloWorld { - ::std::default::Default::default() - } - - // string message = 1; - - - pub fn get_message(&self) -> &str { - &self.message - } - pub fn clear_message(&mut self) { - self.message.clear(); - } - - // Param is passed by value, moved - pub fn set_message(&mut self, v: ::std::string::String) { - self.message = v; - } - - // Mutable pointer to the field. - // If field is not initialized, it is initialized with default value first. - pub fn mut_message(&mut self) -> &mut ::std::string::String { - &mut self.message - } - - // Take field - pub fn take_message(&mut self) -> ::std::string::String { - ::std::mem::replace(&mut self.message, ::std::string::String::new()) - } -} - -impl ::protobuf::Message for HelloWorld { - fn is_initialized(&self) -> bool { - true - } - - fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> { - while !is.eof()? { - let (field_number, wire_type) = is.read_tag_unpack()?; - match field_number { - 1 => { - ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.message)?; - }, - _ => { - ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; - }, - }; - } - ::std::result::Result::Ok(()) - } - - // Compute sizes of nested messages - #[allow(unused_variables)] - fn compute_size(&self) -> u32 { - let mut my_size = 0; - if !self.message.is_empty() { - my_size += ::protobuf::rt::string_size(1, &self.message); - } - my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); - self.cached_size.set(my_size); - my_size - } - - fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> { - if !self.message.is_empty() { - os.write_string(1, &self.message)?; - } - os.write_unknown_fields(self.get_unknown_fields())?; - ::std::result::Result::Ok(()) - } - - fn get_cached_size(&self) -> u32 { - self.cached_size.get() - } - - fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { - &self.unknown_fields - } - - fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { - &mut self.unknown_fields - } - - fn as_any(&self) -> &dyn (::std::any::Any) { - self as &dyn (::std::any::Any) - } - fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) { - self as &mut dyn (::std::any::Any) - } - fn into_any(self: ::std::boxed::Box) -> ::std::boxed::Box { - self - } - - fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { - Self::descriptor_static() - } - - fn new() -> HelloWorld { - HelloWorld::new() - } - - fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor { - static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT; - descriptor.get(|| { - let mut fields = ::std::vec::Vec::new(); - fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( - "message", - |m: &HelloWorld| { &m.message }, - |m: &mut HelloWorld| { &mut m.message }, - )); - ::protobuf::reflect::MessageDescriptor::new_pb_name::( - "HelloWorld", - fields, - file_descriptor_proto() - ) - }) - } - - fn default_instance() -> &'static HelloWorld { - static instance: ::protobuf::rt::LazyV2 = ::protobuf::rt::LazyV2::INIT; - instance.get(HelloWorld::new) - } -} - -impl ::protobuf::Clear for HelloWorld { - fn clear(&mut self) { - self.message.clear(); - self.unknown_fields.clear(); - } -} - -impl ::std::fmt::Debug for HelloWorld { - fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { - ::protobuf::text_format::fmt(self, f) - } -} - -impl ::protobuf::reflect::ProtobufValue for HelloWorld { - fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { - ::protobuf::reflect::ReflectValueRef::Message(self) - } -} - -static file_descriptor_proto_data: &'static [u8] = b"\ - \n\x11hello_world.proto\x12\x0bhello_world\"*\n\nHelloWorld\x12\x1a\n\ - \x07message\x18\x01\x20\x01(\tR\x07messageB\0:\0B\0b\x06proto3\ -"; - -static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; - -fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto { - ::protobuf::Message::parse_from_bytes(file_descriptor_proto_data).unwrap() -} - -pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto { - file_descriptor_proto_lazy.get(|| { - parse_descriptor_proto() - }) -} diff --git a/examples/tcp-echo-server/src/schema/mod.rs b/examples/tcp-echo-server/src/schema/mod.rs deleted file mode 100644 index c6dbc18..0000000 --- a/examples/tcp-echo-server/src/schema/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod hello_world; diff --git a/examples/tls-client/Cargo.toml b/examples/tls-client/Cargo.toml index 407ccb6..e6cf99b 100644 --- a/examples/tls-client/Cargo.toml +++ b/examples/tls-client/Cargo.toml @@ -11,9 +11,5 @@ anyhow = "1.0.31" async-std = { version = "1.6.2", features = ["attributes"] } env_logger = "0.7" log = "0.4" -protobuf = "2.18.1" connect = { path = "../../" } - -[build-dependencies] -protobuf-codegen-pure = "2.18.1" diff --git a/examples/tls-client/build.rs b/examples/tls-client/build.rs deleted file mode 100644 index ae9222f..0000000 --- a/examples/tls-client/build.rs +++ /dev/null @@ -1,9 +0,0 @@ -fn main() -> Result<(), Box> { - protobuf_codegen_pure::Codegen::new() - .out_dir("src/schema") - .inputs(&["../tcp-client/schema/hello_world.proto"]) - .include("../tcp-client/schema") - .run() - .expect("Codegen failed."); - Ok(()) -} diff --git a/examples/tls-client/src/main.rs b/examples/tls-client/src/main.rs index d9198e8..c0b9280 100644 --- a/examples/tls-client/src/main.rs +++ b/examples/tls-client/src/main.rs @@ -1,10 +1,6 @@ -mod schema; - -use crate::schema::hello_world::HelloWorld; use connect::tls::rustls::ClientConfig; -use connect::{Connection, SinkExt, StreamExt}; +use connect::{ConnectDatagram, Connection, SinkExt, StreamExt}; use log::*; -use protobuf::well_known_types::Any; use std::env; #[async_std::main] @@ -29,21 +25,20 @@ async fn main() -> anyhow::Result<()> { let mut conn = Connection::tls_client(ip_addr, &domain, client_config.into()).await?; // send a message to the server - let raw_msg = String::from("Hello world"); - info!("Sending message: {}", raw_msg); - - let mut msg = HelloWorld::new(); - msg.set_message(raw_msg); + let msg = String::from("Hello world"); + info!("Sending message: {}", msg); - conn.writer().send(msg).await?; + let envelope = ConnectDatagram::new(65535, msg.into_bytes())?; + conn.writer().send(envelope).await?; // wait for the server to reply with an ack - while let Some(reply) = conn.reader().next().await { + if let Some(mut reply) = conn.reader().next().await { info!("Received message"); - let msg: HelloWorld = Any::unpack(&reply)?.unwrap(); + let data = reply.take_data().unwrap(); + let msg = String::from_utf8(data)?; - info!("Unpacked reply: {}", msg.get_message()); + info!("Unpacked reply: {}", msg); } Ok(()) diff --git a/examples/tls-client/src/schema/hello_world.rs b/examples/tls-client/src/schema/hello_world.rs deleted file mode 100644 index 1af771c..0000000 --- a/examples/tls-client/src/schema/hello_world.rs +++ /dev/null @@ -1,200 +0,0 @@ -// This file is generated by rust-protobuf 2.20.0. Do not edit -// @generated - -// https://github.com/rust-lang/rust-clippy/issues/702 -#![allow(unknown_lints)] -#![allow(clippy::all)] - -#![allow(unused_attributes)] -#![rustfmt::skip] - -#![allow(box_pointers)] -#![allow(dead_code)] -#![allow(missing_docs)] -#![allow(non_camel_case_types)] -#![allow(non_snake_case)] -#![allow(non_upper_case_globals)] -#![allow(trivial_casts)] -#![allow(unused_imports)] -#![allow(unused_results)] -//! Generated file from `hello_world.proto` - -/// Generated files are compatible only with the same version -/// of protobuf runtime. -// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_20_0; - -#[derive(PartialEq,Clone,Default)] -pub struct HelloWorld { - // message fields - pub message: ::std::string::String, - // special fields - pub unknown_fields: ::protobuf::UnknownFields, - pub cached_size: ::protobuf::CachedSize, -} - -impl<'a> ::std::default::Default for &'a HelloWorld { - fn default() -> &'a HelloWorld { - ::default_instance() - } -} - -impl HelloWorld { - pub fn new() -> HelloWorld { - ::std::default::Default::default() - } - - // string message = 1; - - - pub fn get_message(&self) -> &str { - &self.message - } - pub fn clear_message(&mut self) { - self.message.clear(); - } - - // Param is passed by value, moved - pub fn set_message(&mut self, v: ::std::string::String) { - self.message = v; - } - - // Mutable pointer to the field. - // If field is not initialized, it is initialized with default value first. - pub fn mut_message(&mut self) -> &mut ::std::string::String { - &mut self.message - } - - // Take field - pub fn take_message(&mut self) -> ::std::string::String { - ::std::mem::replace(&mut self.message, ::std::string::String::new()) - } -} - -impl ::protobuf::Message for HelloWorld { - fn is_initialized(&self) -> bool { - true - } - - fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> { - while !is.eof()? { - let (field_number, wire_type) = is.read_tag_unpack()?; - match field_number { - 1 => { - ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.message)?; - }, - _ => { - ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; - }, - }; - } - ::std::result::Result::Ok(()) - } - - // Compute sizes of nested messages - #[allow(unused_variables)] - fn compute_size(&self) -> u32 { - let mut my_size = 0; - if !self.message.is_empty() { - my_size += ::protobuf::rt::string_size(1, &self.message); - } - my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); - self.cached_size.set(my_size); - my_size - } - - fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> { - if !self.message.is_empty() { - os.write_string(1, &self.message)?; - } - os.write_unknown_fields(self.get_unknown_fields())?; - ::std::result::Result::Ok(()) - } - - fn get_cached_size(&self) -> u32 { - self.cached_size.get() - } - - fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { - &self.unknown_fields - } - - fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { - &mut self.unknown_fields - } - - fn as_any(&self) -> &dyn (::std::any::Any) { - self as &dyn (::std::any::Any) - } - fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) { - self as &mut dyn (::std::any::Any) - } - fn into_any(self: ::std::boxed::Box) -> ::std::boxed::Box { - self - } - - fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { - Self::descriptor_static() - } - - fn new() -> HelloWorld { - HelloWorld::new() - } - - fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor { - static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT; - descriptor.get(|| { - let mut fields = ::std::vec::Vec::new(); - fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( - "message", - |m: &HelloWorld| { &m.message }, - |m: &mut HelloWorld| { &mut m.message }, - )); - ::protobuf::reflect::MessageDescriptor::new_pb_name::( - "HelloWorld", - fields, - file_descriptor_proto() - ) - }) - } - - fn default_instance() -> &'static HelloWorld { - static instance: ::protobuf::rt::LazyV2 = ::protobuf::rt::LazyV2::INIT; - instance.get(HelloWorld::new) - } -} - -impl ::protobuf::Clear for HelloWorld { - fn clear(&mut self) { - self.message.clear(); - self.unknown_fields.clear(); - } -} - -impl ::std::fmt::Debug for HelloWorld { - fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { - ::protobuf::text_format::fmt(self, f) - } -} - -impl ::protobuf::reflect::ProtobufValue for HelloWorld { - fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { - ::protobuf::reflect::ReflectValueRef::Message(self) - } -} - -static file_descriptor_proto_data: &'static [u8] = b"\ - \n\x11hello_world.proto\x12\x0bhello_world\"*\n\nHelloWorld\x12\x1a\n\ - \x07message\x18\x01\x20\x01(\tR\x07messageB\0:\0B\0b\x06proto3\ -"; - -static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; - -fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto { - ::protobuf::Message::parse_from_bytes(file_descriptor_proto_data).unwrap() -} - -pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto { - file_descriptor_proto_lazy.get(|| { - parse_descriptor_proto() - }) -} diff --git a/examples/tls-client/src/schema/mod.rs b/examples/tls-client/src/schema/mod.rs deleted file mode 100644 index c6dbc18..0000000 --- a/examples/tls-client/src/schema/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod hello_world; diff --git a/examples/tls-echo-server/Cargo.toml b/examples/tls-echo-server/Cargo.toml index 60a13bc..820c0c6 100644 --- a/examples/tls-echo-server/Cargo.toml +++ b/examples/tls-echo-server/Cargo.toml @@ -11,9 +11,5 @@ anyhow = "1.0.31" async-std = { version = "1.6.2", features = ["attributes"] } env_logger = "0.7" log = "0.4" -protobuf = "2.18.1" connect = { path = "../../" } - -[build-dependencies] -protobuf-codegen-pure = "2.18.1" diff --git a/examples/tls-echo-server/build.rs b/examples/tls-echo-server/build.rs deleted file mode 100644 index ae9222f..0000000 --- a/examples/tls-echo-server/build.rs +++ /dev/null @@ -1,9 +0,0 @@ -fn main() -> Result<(), Box> { - protobuf_codegen_pure::Codegen::new() - .out_dir("src/schema") - .inputs(&["../tcp-client/schema/hello_world.proto"]) - .include("../tcp-client/schema") - .run() - .expect("Codegen failed."); - Ok(()) -} diff --git a/examples/tls-echo-server/src/main.rs b/examples/tls-echo-server/src/main.rs index bf9d476..2a9abc8 100644 --- a/examples/tls-echo-server/src/main.rs +++ b/examples/tls-echo-server/src/main.rs @@ -1,11 +1,8 @@ -mod schema; - -use crate::schema::hello_world::HelloWorld; use async_std::{io, task}; use connect::tls::rustls::internal::pemfile::{certs, rsa_private_keys}; use connect::tls::rustls::{NoClientAuth, ServerConfig}; use connect::tls::TlsListener; -use connect::{SinkExt, StreamExt}; +use connect::{ConnectDatagram, SinkExt, StreamExt}; use log::*; use std::env; use std::fs::File; @@ -38,23 +35,36 @@ async fn main() -> anyhow::Result<()> { info!("Handling connection from {}", conn.peer_addr()); task::spawn(async move { - while let Some(msg) = conn.reader().next().await { - if msg.is::() { - if let Ok(Some(contents)) = msg.unpack::() { - info!( - "Received a message \"{}\" from {}", - contents.get_message(), - conn.peer_addr() - ); - - conn.writer() - .send(contents) - .await - .expect("Could not send message back to source connection"); - info!("Sent message back to original sender"); - } + while let Some(mut envelope) = conn.reader().next().await { + // handle message based on intended recipient + if envelope.recipient() == 65535 { + // if recipient is 65535, we do custom processing + let data = envelope.take_data().unwrap(); + let msg = + String::from_utf8(data).expect("could not build String from payload bytes"); + info!("Received a message \"{}\" from {}", msg, conn.peer_addr()); + + let reply = ConnectDatagram::new(envelope.recipient(), msg.into_bytes()) + .expect("could not construct new datagram from built String"); + + conn.writer() + .send(reply) + .await + .expect("Could not send message back to source connection"); + info!("Sent message back to original sender"); } else { - error!("Received a message of unknown type") + // if recipient is anything else, we just send it back + warn!( + "Received a message for unknown recipient {} from {}", + envelope.recipient(), + conn.peer_addr() + ); + + conn.writer() + .send(envelope) + .await + .expect("Could not send message back to source connection"); + info!("Sent message back to original sender"); } } }); diff --git a/examples/tls-echo-server/src/schema/hello_world.rs b/examples/tls-echo-server/src/schema/hello_world.rs deleted file mode 100644 index 1af771c..0000000 --- a/examples/tls-echo-server/src/schema/hello_world.rs +++ /dev/null @@ -1,200 +0,0 @@ -// This file is generated by rust-protobuf 2.20.0. Do not edit -// @generated - -// https://github.com/rust-lang/rust-clippy/issues/702 -#![allow(unknown_lints)] -#![allow(clippy::all)] - -#![allow(unused_attributes)] -#![rustfmt::skip] - -#![allow(box_pointers)] -#![allow(dead_code)] -#![allow(missing_docs)] -#![allow(non_camel_case_types)] -#![allow(non_snake_case)] -#![allow(non_upper_case_globals)] -#![allow(trivial_casts)] -#![allow(unused_imports)] -#![allow(unused_results)] -//! Generated file from `hello_world.proto` - -/// Generated files are compatible only with the same version -/// of protobuf runtime. -// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_20_0; - -#[derive(PartialEq,Clone,Default)] -pub struct HelloWorld { - // message fields - pub message: ::std::string::String, - // special fields - pub unknown_fields: ::protobuf::UnknownFields, - pub cached_size: ::protobuf::CachedSize, -} - -impl<'a> ::std::default::Default for &'a HelloWorld { - fn default() -> &'a HelloWorld { - ::default_instance() - } -} - -impl HelloWorld { - pub fn new() -> HelloWorld { - ::std::default::Default::default() - } - - // string message = 1; - - - pub fn get_message(&self) -> &str { - &self.message - } - pub fn clear_message(&mut self) { - self.message.clear(); - } - - // Param is passed by value, moved - pub fn set_message(&mut self, v: ::std::string::String) { - self.message = v; - } - - // Mutable pointer to the field. - // If field is not initialized, it is initialized with default value first. - pub fn mut_message(&mut self) -> &mut ::std::string::String { - &mut self.message - } - - // Take field - pub fn take_message(&mut self) -> ::std::string::String { - ::std::mem::replace(&mut self.message, ::std::string::String::new()) - } -} - -impl ::protobuf::Message for HelloWorld { - fn is_initialized(&self) -> bool { - true - } - - fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> { - while !is.eof()? { - let (field_number, wire_type) = is.read_tag_unpack()?; - match field_number { - 1 => { - ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.message)?; - }, - _ => { - ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; - }, - }; - } - ::std::result::Result::Ok(()) - } - - // Compute sizes of nested messages - #[allow(unused_variables)] - fn compute_size(&self) -> u32 { - let mut my_size = 0; - if !self.message.is_empty() { - my_size += ::protobuf::rt::string_size(1, &self.message); - } - my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); - self.cached_size.set(my_size); - my_size - } - - fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> { - if !self.message.is_empty() { - os.write_string(1, &self.message)?; - } - os.write_unknown_fields(self.get_unknown_fields())?; - ::std::result::Result::Ok(()) - } - - fn get_cached_size(&self) -> u32 { - self.cached_size.get() - } - - fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { - &self.unknown_fields - } - - fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { - &mut self.unknown_fields - } - - fn as_any(&self) -> &dyn (::std::any::Any) { - self as &dyn (::std::any::Any) - } - fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) { - self as &mut dyn (::std::any::Any) - } - fn into_any(self: ::std::boxed::Box) -> ::std::boxed::Box { - self - } - - fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { - Self::descriptor_static() - } - - fn new() -> HelloWorld { - HelloWorld::new() - } - - fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor { - static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT; - descriptor.get(|| { - let mut fields = ::std::vec::Vec::new(); - fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( - "message", - |m: &HelloWorld| { &m.message }, - |m: &mut HelloWorld| { &mut m.message }, - )); - ::protobuf::reflect::MessageDescriptor::new_pb_name::( - "HelloWorld", - fields, - file_descriptor_proto() - ) - }) - } - - fn default_instance() -> &'static HelloWorld { - static instance: ::protobuf::rt::LazyV2 = ::protobuf::rt::LazyV2::INIT; - instance.get(HelloWorld::new) - } -} - -impl ::protobuf::Clear for HelloWorld { - fn clear(&mut self) { - self.message.clear(); - self.unknown_fields.clear(); - } -} - -impl ::std::fmt::Debug for HelloWorld { - fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { - ::protobuf::text_format::fmt(self, f) - } -} - -impl ::protobuf::reflect::ProtobufValue for HelloWorld { - fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { - ::protobuf::reflect::ReflectValueRef::Message(self) - } -} - -static file_descriptor_proto_data: &'static [u8] = b"\ - \n\x11hello_world.proto\x12\x0bhello_world\"*\n\nHelloWorld\x12\x1a\n\ - \x07message\x18\x01\x20\x01(\tR\x07messageB\0:\0B\0b\x06proto3\ -"; - -static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; - -fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto { - ::protobuf::Message::parse_from_bytes(file_descriptor_proto_data).unwrap() -} - -pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto { - file_descriptor_proto_lazy.get(|| { - parse_descriptor_proto() - }) -} diff --git a/examples/tls-echo-server/src/schema/mod.rs b/examples/tls-echo-server/src/schema/mod.rs deleted file mode 100644 index c6dbc18..0000000 --- a/examples/tls-echo-server/src/schema/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod hello_world; diff --git a/schema/message.proto b/schema/message.proto deleted file mode 100644 index d1f6ee0..0000000 --- a/schema/message.proto +++ /dev/null @@ -1,8 +0,0 @@ -syntax = "proto3"; -package message; - -import "google/protobuf/any.proto"; - -message ConnectionMessage { - google.protobuf.Any payload = 1; -} diff --git a/src/lib.rs b/src/lib.rs index 4f1fe18..45f76da 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,12 +24,13 @@ //! Until the Rust ecosystem around reflection improves, the crate will use Protobuf to fill the //! void. +pub mod protocol; mod reader; -pub(crate) mod schema; pub mod tcp; pub mod tls; mod writer; +pub use crate::protocol::{ConnectDatagram, DatagramEmptyError}; pub use crate::reader::ConnectionReader; pub use crate::writer::ConnectionWriter; use async_std::{net::SocketAddr, pin::Pin}; diff --git a/src/protocol.rs b/src/protocol.rs new file mode 100644 index 0000000..0574d9f --- /dev/null +++ b/src/protocol.rs @@ -0,0 +1,182 @@ +use std::error::Error; +use std::io::Read; + +const VERSION: u8 = 1; + +#[derive(Debug, Clone)] +pub struct DatagramEmptyError; + +impl Error for DatagramEmptyError {} + +impl std::fmt::Display for DatagramEmptyError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + f, + "datagram cannot be constructed when provided payload is empty" + ) + } +} + +pub struct ConnectDatagram { + version: u8, + recipient: u16, + data: Option>, +} + +impl ConnectDatagram { + pub fn new(recipient: u16, data: Vec) -> Result { + if data.len() > 0 { + Ok(Self { + version: VERSION, + recipient, + data: Some(data), + }) + } else { + Err(DatagramEmptyError) + } + } + + pub fn version(&self) -> u8 { + self.version + } + + pub fn recipient(&self) -> u16 { + self.recipient + } + + pub fn data(&self) -> Option<&Vec> { + self.data.as_ref() + } + + pub fn take_data(&mut self) -> Option> { + self.data.take() + } + + pub fn size(&self) -> usize { + let data_len = if let Some(data) = self.data() { + data.len() + } else { + 0 + }; + + 3 + data_len + } + + pub fn bytes(&self) -> Vec { + let mut bytes = Vec::with_capacity(self.size()); + + bytes.extend(&self.version.to_be_bytes()); + bytes.extend(&self.recipient.to_be_bytes()); + + if let Some(data) = self.data() { + bytes.extend(data.as_slice()); + } + + return bytes; + } + + pub fn encode(&self) -> Vec { + let size: u32 = (self.size()) as u32; + + let mut bytes = Vec::from(size.to_be_bytes()); + bytes.extend(self.bytes()); + + return bytes; + } + + pub fn decode(source: &mut (dyn Read + Send + Sync)) -> anyhow::Result { + // payload size + let mut payload_size_bytes: [u8; 4] = [0; 4]; + source.read_exact(&mut payload_size_bytes)?; + let payload_size = u32::from_be_bytes(payload_size_bytes); + + // read whole payload + let mut payload_bytes = vec![0; payload_size as usize]; + source.read_exact(payload_bytes.as_mut_slice())?; + + // version + let version_bytes = payload_bytes.remove(0); + let version = u8::from_be(version_bytes); + + // recipient + let mut recipient_bytes: [u8; 2] = [0; 2]; + for i in 0..recipient_bytes.len() { + recipient_bytes[i] = payload_bytes.remove(0); + } + let recipient = u16::from_be_bytes(recipient_bytes); + + // data + let data = payload_bytes; + + if data.len() > 0 { + Ok(Self { + version, + recipient, + data: Some(data), + }) + } else { + Err(anyhow::Error::from(DatagramEmptyError)) + } + } +} + +#[cfg(test)] +mod tests { + use crate::protocol::ConnectDatagram; + use std::io::Cursor; + + #[test] + fn encoded_size() -> anyhow::Result<()> { + let mut data = Vec::new(); + for _ in 0..5 { + data.push(1); + } + assert_eq!(5, data.len()); + + let sample = ConnectDatagram::new(1, data)?; + assert_eq!(7 + 5, sample.encode().len()); + + Ok(()) + } + + #[test] + fn take_data() -> anyhow::Result<()> { + let mut data = Vec::new(); + for _ in 0..5 { + data.push(1); + } + + let mut sample = ConnectDatagram::new(1, data)?; + + let taken_data = sample.take_data().unwrap(); + assert!(sample.data().is_none()); + assert_eq!(5, taken_data.len()); + + Ok(()) + } + + #[async_std::test] + async fn encode_and_decode() -> anyhow::Result<()> { + let mut data = Vec::new(); + for _ in 0..5 { + data.push(1); + } + assert_eq!(5, data.len()); + + let sample = ConnectDatagram::new(1, data)?; + + let mut payload = sample.encode(); + assert_eq!(7 + 5, payload.len()); + + let mut cursor: Cursor<&mut [u8]> = Cursor::new(payload.as_mut()); + let sample_back_res = ConnectDatagram::decode(&mut cursor); + assert!(sample_back_res.is_ok()); + + let sample_back = sample_back_res.unwrap(); + assert_eq!(sample_back.version(), 1); + assert_eq!(sample_back.recipient(), 1); + assert_eq!(sample_back.data().unwrap().len(), 5); + + Ok(()) + } +} diff --git a/src/reader.rs b/src/reader.rs index 428f6d1..39b1048 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -1,16 +1,14 @@ -use crate::schema::ConnectionMessage; use async_std::net::SocketAddr; use async_std::pin::Pin; use bytes::{Buf, BytesMut}; use futures::task::{Context, Poll}; use futures::{AsyncRead, Stream}; use log::*; -use protobuf::Message; -use std::convert::TryInto; +use crate::protocol::ConnectDatagram; pub use futures::SinkExt; pub use futures::StreamExt; -use protobuf::well_known_types::Any; +use std::io::Cursor; /// A default buffer size to read in bytes and then deserialize as messages const BUFFER_SIZE: usize = 8192; @@ -81,7 +79,7 @@ impl ConnectionReader { } impl Stream for ConnectionReader { - type Item = Any; + type Item = ConnectDatagram; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut buffer = BytesMut::new(); @@ -111,36 +109,30 @@ impl Stream for ConnectionReader { buffer = pending_buf; } - let mut bytes_read_u64: u64 = bytes_read.try_into().expect( - format!("Conversion from usize ({}) to u64 failed", bytes_read).as_str(), - ); - while bytes_read_u64 > 0 { - trace!( - "{} bytes from network stream still unprocessed", - bytes_read_u64 - ); + while bytes_read > 0 { + trace!("{} bytes from network stream still unprocessed", bytes_read); buffer.resize(bytes_read, 0); - match ConnectionMessage::parse_from_bytes(buffer.as_ref()) { - Ok(mut data) => { - let serialized_size = data.compute_size(); - trace!("Deserialized message of size {} bytes", serialized_size); - - buffer.advance(serialized_size as usize); - - let serialized_size_u64: u64 = serialized_size.try_into().expect( - format!( - "Conversion from usize ({}) to u64 failed", - serialized_size - ) - .as_str(), - ); - bytes_read_u64 -= serialized_size_u64; - trace!("{} bytes still unprocessed", bytes_read_u64); - - trace!("Sending deserialized message downstream"); - return Poll::Ready(Some(data.take_payload())); + let mut cursor = Cursor::new(buffer.as_mut()); + match ConnectDatagram::decode(&mut cursor) { + Ok(data) => { + return match data.version() { + _ => { + let serialized_size = data.size(); + trace!( + "Deserialized message of size {} bytes", + serialized_size + ); + + buffer.advance(serialized_size); + bytes_read -= serialized_size; + trace!("{} bytes still unprocessed", bytes_read); + + trace!("Sending deserialized message downstream"); + Poll::Ready(Some(data)) + } + } } Err(err) => { diff --git a/src/schema/message.rs b/src/schema/message.rs deleted file mode 100644 index 265075f..0000000 --- a/src/schema/message.rs +++ /dev/null @@ -1,216 +0,0 @@ -// This file is generated by rust-protobuf 2.20.0. Do not edit -// @generated - -// https://github.com/rust-lang/rust-clippy/issues/702 -#![allow(unknown_lints)] -#![allow(clippy::all)] - -#![allow(unused_attributes)] -#![rustfmt::skip] - -#![allow(box_pointers)] -#![allow(dead_code)] -#![allow(missing_docs)] -#![allow(non_camel_case_types)] -#![allow(non_snake_case)] -#![allow(non_upper_case_globals)] -#![allow(trivial_casts)] -#![allow(unused_imports)] -#![allow(unused_results)] -//! Generated file from `message.proto` - -/// Generated files are compatible only with the same version -/// of protobuf runtime. -// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_20_0; - -#[derive(PartialEq,Clone,Default)] -pub struct ConnectionMessage { - // message fields - pub payload: ::protobuf::SingularPtrField<::protobuf::well_known_types::Any>, - // special fields - pub unknown_fields: ::protobuf::UnknownFields, - pub cached_size: ::protobuf::CachedSize, -} - -impl<'a> ::std::default::Default for &'a ConnectionMessage { - fn default() -> &'a ConnectionMessage { - ::default_instance() - } -} - -impl ConnectionMessage { - pub fn new() -> ConnectionMessage { - ::std::default::Default::default() - } - - // .google.protobuf.Any payload = 1; - - - pub fn get_payload(&self) -> &::protobuf::well_known_types::Any { - self.payload.as_ref().unwrap_or_else(|| <::protobuf::well_known_types::Any as ::protobuf::Message>::default_instance()) - } - pub fn clear_payload(&mut self) { - self.payload.clear(); - } - - pub fn has_payload(&self) -> bool { - self.payload.is_some() - } - - // Param is passed by value, moved - pub fn set_payload(&mut self, v: ::protobuf::well_known_types::Any) { - self.payload = ::protobuf::SingularPtrField::some(v); - } - - // Mutable pointer to the field. - // If field is not initialized, it is initialized with default value first. - pub fn mut_payload(&mut self) -> &mut ::protobuf::well_known_types::Any { - if self.payload.is_none() { - self.payload.set_default(); - } - self.payload.as_mut().unwrap() - } - - // Take field - pub fn take_payload(&mut self) -> ::protobuf::well_known_types::Any { - self.payload.take().unwrap_or_else(|| ::protobuf::well_known_types::Any::new()) - } -} - -impl ::protobuf::Message for ConnectionMessage { - fn is_initialized(&self) -> bool { - for v in &self.payload { - if !v.is_initialized() { - return false; - } - }; - true - } - - fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> { - while !is.eof()? { - let (field_number, wire_type) = is.read_tag_unpack()?; - match field_number { - 1 => { - ::protobuf::rt::read_singular_message_into(wire_type, is, &mut self.payload)?; - }, - _ => { - ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; - }, - }; - } - ::std::result::Result::Ok(()) - } - - // Compute sizes of nested messages - #[allow(unused_variables)] - fn compute_size(&self) -> u32 { - let mut my_size = 0; - if let Some(ref v) = self.payload.as_ref() { - let len = v.compute_size(); - my_size += 1 + ::protobuf::rt::compute_raw_varint32_size(len) + len; - } - my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); - self.cached_size.set(my_size); - my_size - } - - fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> { - if let Some(ref v) = self.payload.as_ref() { - os.write_tag(1, ::protobuf::wire_format::WireTypeLengthDelimited)?; - os.write_raw_varint32(v.get_cached_size())?; - v.write_to_with_cached_sizes(os)?; - } - os.write_unknown_fields(self.get_unknown_fields())?; - ::std::result::Result::Ok(()) - } - - fn get_cached_size(&self) -> u32 { - self.cached_size.get() - } - - fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { - &self.unknown_fields - } - - fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { - &mut self.unknown_fields - } - - fn as_any(&self) -> &dyn (::std::any::Any) { - self as &dyn (::std::any::Any) - } - fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) { - self as &mut dyn (::std::any::Any) - } - fn into_any(self: ::std::boxed::Box) -> ::std::boxed::Box { - self - } - - fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { - Self::descriptor_static() - } - - fn new() -> ConnectionMessage { - ConnectionMessage::new() - } - - fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor { - static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT; - descriptor.get(|| { - let mut fields = ::std::vec::Vec::new(); - fields.push(::protobuf::reflect::accessor::make_singular_ptr_field_accessor::<_, ::protobuf::types::ProtobufTypeMessage<::protobuf::well_known_types::Any>>( - "payload", - |m: &ConnectionMessage| { &m.payload }, - |m: &mut ConnectionMessage| { &mut m.payload }, - )); - ::protobuf::reflect::MessageDescriptor::new_pb_name::( - "ConnectionMessage", - fields, - file_descriptor_proto() - ) - }) - } - - fn default_instance() -> &'static ConnectionMessage { - static instance: ::protobuf::rt::LazyV2 = ::protobuf::rt::LazyV2::INIT; - instance.get(ConnectionMessage::new) - } -} - -impl ::protobuf::Clear for ConnectionMessage { - fn clear(&mut self) { - self.payload.clear(); - self.unknown_fields.clear(); - } -} - -impl ::std::fmt::Debug for ConnectionMessage { - fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { - ::protobuf::text_format::fmt(self, f) - } -} - -impl ::protobuf::reflect::ProtobufValue for ConnectionMessage { - fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { - ::protobuf::reflect::ReflectValueRef::Message(self) - } -} - -static file_descriptor_proto_data: &'static [u8] = b"\ - \n\rmessage.proto\x12\x07message\x1a\x19google/protobuf/any.proto\"G\n\ - \x11ConnectionMessage\x120\n\x07payload\x18\x01\x20\x01(\x0b2\x14.google\ - .protobuf.AnyR\x07payloadB\0:\0B\0b\x06proto3\ -"; - -static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; - -fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto { - ::protobuf::Message::parse_from_bytes(file_descriptor_proto_data).unwrap() -} - -pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto { - file_descriptor_proto_lazy.get(|| { - parse_descriptor_proto() - }) -} diff --git a/src/schema/mod.rs b/src/schema/mod.rs deleted file mode 100644 index 2774e64..0000000 --- a/src/schema/mod.rs +++ /dev/null @@ -1,15 +0,0 @@ -mod message; - -pub use message::ConnectionMessage; -use protobuf::well_known_types::Any; -use protobuf::Message; - -impl ConnectionMessage { - pub(crate) fn from_msg(msg: M) -> Self { - let mut sm = Self::new(); - let payload = Any::pack(&msg).expect("Protobuf Message could not be packed into Any type"); - - sm.set_payload(payload); - return sm; - } -} diff --git a/src/writer.rs b/src/writer.rs index 9a6f20a..4ee98f2 100644 --- a/src/writer.rs +++ b/src/writer.rs @@ -1,4 +1,3 @@ -use crate::schema::ConnectionMessage; use async_channel::RecvError; use async_std::net::SocketAddr; use async_std::pin::Pin; @@ -6,8 +5,8 @@ use futures::io::IoSlice; use futures::task::{Context, Poll}; use futures::{AsyncWrite, Sink}; use log::*; -use protobuf::Message; +use crate::protocol::ConnectDatagram; pub use futures::SinkExt; pub use futures::StreamExt; @@ -65,39 +64,11 @@ impl ConnectionWriter { pub fn is_closed(&self) -> bool { self.closed } -} - -impl Sink for ConnectionWriter { - type Error = RecvError; - - fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - if self.is_closed() { - trace!("Connection is closed, cannot send message"); - Poll::Ready(Err(RecvError)) - } else { - trace!("Connection ready to send message"); - Poll::Ready(Ok(())) - } - } - fn start_send(mut self: Pin<&mut Self>, item: M) -> Result<(), Self::Error> { - trace!("Preparing message to be sent next"); - let msg: ConnectionMessage = ConnectionMessage::from_msg(item); - - if let Ok(buffer) = msg.write_to_bytes() { - let msg_size = buffer.len(); - trace!("Serialized pending message into {} bytes", msg_size); - - self.pending_writes.push(buffer); - - Ok(()) - } else { - error!("Encountered error when serializing message to bytes"); - Err(RecvError) - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + pub(crate) fn write_pending_bytes( + &mut self, + cx: &mut Context<'_>, + ) -> Poll> { if self.pending_writes.len() > 0 { let stream = self.write_stream.as_mut(); @@ -136,49 +107,41 @@ impl Sink for ConnectionWriter { Poll::Ready(Ok(())) } } +} - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.closed = true; - - let flush = if self.pending_writes.len() > 0 { - let stream = self.write_stream.as_mut(); +impl Sink for ConnectionWriter { + type Error = RecvError; - match stream.poll_flush(cx) { - Poll::Pending => Poll::Pending, + fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + if self.is_closed() { + trace!("Connection is closed, cannot send message"); + Poll::Ready(Err(RecvError)) + } else { + trace!("Connection ready to send message"); + Poll::Ready(Ok(())) + } + } - Poll::Ready(Ok(_)) => { - trace!("Sending pending bytes"); + fn start_send(mut self: Pin<&mut Self>, item: ConnectDatagram) -> Result<(), Self::Error> { + trace!("Preparing message to be sent next"); - let pending = self.pending_writes.split_off(0); - let writeable_vec: Vec = - pending.iter().map(|p| IoSlice::new(p)).collect(); + let buffer = item.encode(); + let msg_size = buffer.len(); + trace!("Serialized pending message into {} bytes", msg_size); - let stream = self.write_stream.as_mut(); - match stream.poll_write_vectored(cx, writeable_vec.as_slice()) { - Poll::Pending => Poll::Pending, + self.pending_writes.push(buffer); - Poll::Ready(Ok(bytes_written)) => { - trace!("Wrote {} bytes to network stream", bytes_written); - Poll::Ready(Ok(())) - } + Ok(()) + } - Poll::Ready(Err(_e)) => { - error!("Encountered error when writing to network stream"); - Poll::Ready(Err(RecvError)) - } - } - } + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.write_pending_bytes(cx) + } - Poll::Ready(Err(_e)) => { - error!("Encountered error when flushing network stream"); - Poll::Ready(Err(RecvError)) - } - } - } else { - Poll::Ready(Ok(())) - }; + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.closed = true; - match flush { + match self.write_pending_bytes(cx) { Poll::Pending => Poll::Pending, Poll::Ready(Ok(_)) => { -- 2.44.0