]> git.lizzy.rs Git - connect-rs.git/commitdiff
remove dependency on protobuf and introduce basic custom wire format
authorSachandhan Ganesh <sachan.ganesh@gmail.com>
Fri, 12 Feb 2021 23:52:39 +0000 (15:52 -0800)
committerSachandhan Ganesh <sachan.ganesh@gmail.com>
Fri, 12 Feb 2021 23:54:11 +0000 (15:54 -0800)
30 files changed:
Cargo.toml
build.rs [deleted file]
examples/tcp-client/Cargo.toml
examples/tcp-client/build.rs [deleted file]
examples/tcp-client/schema/hello_world.proto [deleted file]
examples/tcp-client/src/main.rs
examples/tcp-client/src/schema/hello_world.rs [deleted file]
examples/tcp-client/src/schema/mod.rs [deleted file]
examples/tcp-echo-server/Cargo.toml
examples/tcp-echo-server/build.rs [deleted file]
examples/tcp-echo-server/src/main.rs
examples/tcp-echo-server/src/schema/hello_world.rs [deleted file]
examples/tcp-echo-server/src/schema/mod.rs [deleted file]
examples/tls-client/Cargo.toml
examples/tls-client/build.rs [deleted file]
examples/tls-client/src/main.rs
examples/tls-client/src/schema/hello_world.rs [deleted file]
examples/tls-client/src/schema/mod.rs [deleted file]
examples/tls-echo-server/Cargo.toml
examples/tls-echo-server/build.rs [deleted file]
examples/tls-echo-server/src/main.rs
examples/tls-echo-server/src/schema/hello_world.rs [deleted file]
examples/tls-echo-server/src/schema/mod.rs [deleted file]
schema/message.proto [deleted file]
src/lib.rs
src/protocol.rs [new file with mode: 0644]
src/reader.rs
src/schema/message.rs [deleted file]
src/schema/mod.rs [deleted file]
src/writer.rs

index c15063640b392b76c1bb91f102e14e700502080d..999615a5e5931b3773f44f4f353fac01903b809c 100644 (file)
@@ -1,6 +1,6 @@
 [package]
 name = "connect"
-version = "0.1.2"
+version = "0.1.3"
 edition = "2018"
 authors = ["Sachandhan Ganesh <sachan.ganesh@gmail.com>"]
 description = "message queue abstraction over async network streams"
@@ -13,23 +13,16 @@ license = "Apache-2.0"
 
 [workspace]
 members = [
-    "examples/tcp-client",
-    "examples/tcp-echo-server",
-    "examples/tls-client",
-    "examples/tls-echo-server",
+    "examples/*",
 ]
 
 
 [dependencies]
 anyhow = "1.0.31"
 async-channel = "1.4.0"
-async-std = { version = "1.6.2", features = ["unstable"] }
+async-std = { version = "1.6.2", features = ["attributes", "unstable"] }
 async-tls = { version = "0.9.0", default-features = false, features = ["client", "server"]}
 bytes = "0.5.5"
 futures = "0.3.8"
 log = "0.4"
-protobuf = "2.18.1"
 rustls = "0.18.0"
-
-[build-dependencies]
-protobuf-codegen-pure = "2.18.1"
diff --git a/build.rs b/build.rs
deleted file mode 100644 (file)
index 31cc875..0000000
--- a/build.rs
+++ /dev/null
@@ -1,14 +0,0 @@
-use std::env;
-
-fn main() -> Result<(), Box<dyn std::error::Error>> {
-    if Ok("dev".to_owned()) == env::var("PROFILE") {
-        protobuf_codegen_pure::Codegen::new()
-            .out_dir("src/schema")
-            .inputs(&["schema/message.proto"])
-            .include("schema")
-            .run()
-            .expect("Codegen failed.");
-    }
-
-    Ok(())
-}
index 1fb8e2b06773903c59722b598a6ef900808c2746..40978097941f0ce7109ea16ef6733380a42e0990 100644 (file)
@@ -11,9 +11,5 @@ anyhow = "1.0.31"
 async-std = { version = "1.6.2", features = ["attributes"] }
 env_logger = "0.7"
 log = "0.4"
-protobuf = "2.18.1"
 
 connect = { path = "../../" }
-
-[build-dependencies]
-protobuf-codegen-pure = "2.18.1"
diff --git a/examples/tcp-client/build.rs b/examples/tcp-client/build.rs
deleted file mode 100644 (file)
index aa2ff90..0000000
+++ /dev/null
@@ -1,9 +0,0 @@
-fn main() -> Result<(), Box<dyn std::error::Error>> {
-    protobuf_codegen_pure::Codegen::new()
-        .out_dir("src/schema")
-        .inputs(&["schema/hello_world.proto"])
-        .include("schema")
-        .run()
-        .expect("Codegen failed.");
-    Ok(())
-}
diff --git a/examples/tcp-client/schema/hello_world.proto b/examples/tcp-client/schema/hello_world.proto
deleted file mode 100644 (file)
index 5ffe7b2..0000000
+++ /dev/null
@@ -1,6 +0,0 @@
-syntax = "proto3";
-package hello_world;
-
-message HelloWorld {
-  string message = 1;
-}
\ No newline at end of file
index c65a45548cb32894155e443b1409105b67237902..5e15adad32afb1e1733955535a2c61661c4b9447 100644 (file)
@@ -1,9 +1,5 @@
-pub mod schema;
-
-use crate::schema::hello_world::HelloWorld;
-use connect::{Connection, SinkExt, StreamExt};
+use connect::{ConnectDatagram, Connection, SinkExt, StreamExt};
 use log::*;
-use protobuf::well_known_types::Any;
 use std::env;
 
 #[async_std::main]
@@ -24,21 +20,20 @@ async fn main() -> anyhow::Result<()> {
     let mut conn = Connection::tcp_client(ip_address).await?;
 
     // send a message to the server
-    let raw_msg = String::from("Hello world");
-
-    let mut msg = HelloWorld::new();
-    msg.set_message(raw_msg.clone());
+    let msg = String::from("Hello world");
+    info!("Sending message: {}", msg);
 
-    conn.writer().send(msg).await?;
-    info!("Sent message: {}", raw_msg);
+    let envelope = ConnectDatagram::new(65535, msg.into_bytes())?;
+    conn.writer().send(envelope).await?;
 
     // wait for the server to reply with an ack
-    while let Some(reply) = conn.reader().next().await {
+    if let Some(mut reply) = conn.reader().next().await {
         info!("Received message");
 
-        let msg: HelloWorld = Any::unpack(&reply)?.unwrap();
+        let data = reply.take_data().unwrap();
+        let msg = String::from_utf8(data)?;
 
-        info!("Unpacked reply: {}", msg.get_message());
+        info!("Unpacked reply: {}", msg);
     }
 
     Ok(())
diff --git a/examples/tcp-client/src/schema/hello_world.rs b/examples/tcp-client/src/schema/hello_world.rs
deleted file mode 100644 (file)
index 1af771c..0000000
+++ /dev/null
@@ -1,200 +0,0 @@
-// This file is generated by rust-protobuf 2.20.0. Do not edit
-// @generated
-
-// https://github.com/rust-lang/rust-clippy/issues/702
-#![allow(unknown_lints)]
-#![allow(clippy::all)]
-
-#![allow(unused_attributes)]
-#![rustfmt::skip]
-
-#![allow(box_pointers)]
-#![allow(dead_code)]
-#![allow(missing_docs)]
-#![allow(non_camel_case_types)]
-#![allow(non_snake_case)]
-#![allow(non_upper_case_globals)]
-#![allow(trivial_casts)]
-#![allow(unused_imports)]
-#![allow(unused_results)]
-//! Generated file from `hello_world.proto`
-
-/// Generated files are compatible only with the same version
-/// of protobuf runtime.
-// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_20_0;
-
-#[derive(PartialEq,Clone,Default)]
-pub struct HelloWorld {
-    // message fields
-    pub message: ::std::string::String,
-    // special fields
-    pub unknown_fields: ::protobuf::UnknownFields,
-    pub cached_size: ::protobuf::CachedSize,
-}
-
-impl<'a> ::std::default::Default for &'a HelloWorld {
-    fn default() -> &'a HelloWorld {
-        <HelloWorld as ::protobuf::Message>::default_instance()
-    }
-}
-
-impl HelloWorld {
-    pub fn new() -> HelloWorld {
-        ::std::default::Default::default()
-    }
-
-    // string message = 1;
-
-
-    pub fn get_message(&self) -> &str {
-        &self.message
-    }
-    pub fn clear_message(&mut self) {
-        self.message.clear();
-    }
-
-    // Param is passed by value, moved
-    pub fn set_message(&mut self, v: ::std::string::String) {
-        self.message = v;
-    }
-
-    // Mutable pointer to the field.
-    // If field is not initialized, it is initialized with default value first.
-    pub fn mut_message(&mut self) -> &mut ::std::string::String {
-        &mut self.message
-    }
-
-    // Take field
-    pub fn take_message(&mut self) -> ::std::string::String {
-        ::std::mem::replace(&mut self.message, ::std::string::String::new())
-    }
-}
-
-impl ::protobuf::Message for HelloWorld {
-    fn is_initialized(&self) -> bool {
-        true
-    }
-
-    fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> {
-        while !is.eof()? {
-            let (field_number, wire_type) = is.read_tag_unpack()?;
-            match field_number {
-                1 => {
-                    ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.message)?;
-                },
-                _ => {
-                    ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
-                },
-            };
-        }
-        ::std::result::Result::Ok(())
-    }
-
-    // Compute sizes of nested messages
-    #[allow(unused_variables)]
-    fn compute_size(&self) -> u32 {
-        let mut my_size = 0;
-        if !self.message.is_empty() {
-            my_size += ::protobuf::rt::string_size(1, &self.message);
-        }
-        my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
-        self.cached_size.set(my_size);
-        my_size
-    }
-
-    fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
-        if !self.message.is_empty() {
-            os.write_string(1, &self.message)?;
-        }
-        os.write_unknown_fields(self.get_unknown_fields())?;
-        ::std::result::Result::Ok(())
-    }
-
-    fn get_cached_size(&self) -> u32 {
-        self.cached_size.get()
-    }
-
-    fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
-        &self.unknown_fields
-    }
-
-    fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
-        &mut self.unknown_fields
-    }
-
-    fn as_any(&self) -> &dyn (::std::any::Any) {
-        self as &dyn (::std::any::Any)
-    }
-    fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
-        self as &mut dyn (::std::any::Any)
-    }
-    fn into_any(self: ::std::boxed::Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
-        self
-    }
-
-    fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
-        Self::descriptor_static()
-    }
-
-    fn new() -> HelloWorld {
-        HelloWorld::new()
-    }
-
-    fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
-        static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT;
-        descriptor.get(|| {
-            let mut fields = ::std::vec::Vec::new();
-            fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
-                "message",
-                |m: &HelloWorld| { &m.message },
-                |m: &mut HelloWorld| { &mut m.message },
-            ));
-            ::protobuf::reflect::MessageDescriptor::new_pb_name::<HelloWorld>(
-                "HelloWorld",
-                fields,
-                file_descriptor_proto()
-            )
-        })
-    }
-
-    fn default_instance() -> &'static HelloWorld {
-        static instance: ::protobuf::rt::LazyV2<HelloWorld> = ::protobuf::rt::LazyV2::INIT;
-        instance.get(HelloWorld::new)
-    }
-}
-
-impl ::protobuf::Clear for HelloWorld {
-    fn clear(&mut self) {
-        self.message.clear();
-        self.unknown_fields.clear();
-    }
-}
-
-impl ::std::fmt::Debug for HelloWorld {
-    fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
-        ::protobuf::text_format::fmt(self, f)
-    }
-}
-
-impl ::protobuf::reflect::ProtobufValue for HelloWorld {
-    fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
-        ::protobuf::reflect::ReflectValueRef::Message(self)
-    }
-}
-
-static file_descriptor_proto_data: &'static [u8] = b"\
-    \n\x11hello_world.proto\x12\x0bhello_world\"*\n\nHelloWorld\x12\x1a\n\
-    \x07message\x18\x01\x20\x01(\tR\x07messageB\0:\0B\0b\x06proto3\
-";
-
-static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;
-
-fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto {
-    ::protobuf::Message::parse_from_bytes(file_descriptor_proto_data).unwrap()
-}
-
-pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto {
-    file_descriptor_proto_lazy.get(|| {
-        parse_descriptor_proto()
-    })
-}
diff --git a/examples/tcp-client/src/schema/mod.rs b/examples/tcp-client/src/schema/mod.rs
deleted file mode 100644 (file)
index c6dbc18..0000000
+++ /dev/null
@@ -1 +0,0 @@
-pub mod hello_world;
index 29af62d1b1c3fbc95946c70cee8af68fdb722b57..e4417c72436d20c33ac6e13227cf100cffc652f2 100644 (file)
@@ -11,9 +11,5 @@ anyhow = "1.0.31"
 async-std = { version = "1.6.2", features = ["attributes"] }
 env_logger = "0.7"
 log = "0.4"
-protobuf = "2.18.1"
 
 connect = { path = "../../" }
-
-[build-dependencies]
-protobuf-codegen-pure = "2.18.1"
diff --git a/examples/tcp-echo-server/build.rs b/examples/tcp-echo-server/build.rs
deleted file mode 100644 (file)
index ae9222f..0000000
+++ /dev/null
@@ -1,9 +0,0 @@
-fn main() -> Result<(), Box<dyn std::error::Error>> {
-    protobuf_codegen_pure::Codegen::new()
-        .out_dir("src/schema")
-        .inputs(&["../tcp-client/schema/hello_world.proto"])
-        .include("../tcp-client/schema")
-        .run()
-        .expect("Codegen failed.");
-    Ok(())
-}
index 4d7bfded05aef2e4583715afe7bc3f57b0c05812..04e1e122cdf968e06f6c2c80a5ad160a68a7fd8c 100644 (file)
@@ -1,9 +1,6 @@
-mod schema;
-
-use crate::schema::hello_world::HelloWorld;
 use async_std::task;
 use connect::tcp::TcpListener;
-use connect::{SinkExt, StreamExt};
+use connect::{ConnectDatagram, SinkExt, StreamExt};
 use log::*;
 use std::env;
 
@@ -31,23 +28,36 @@ async fn main() -> anyhow::Result<()> {
         info!("Handling connection from {}", conn.peer_addr());
 
         task::spawn(async move {
-            while let Some(msg) = conn.reader().next().await {
-                if msg.is::<HelloWorld>() {
-                    if let Ok(Some(contents)) = msg.unpack::<HelloWorld>() {
-                        info!(
-                            "Received a message \"{}\" from {}",
-                            contents.get_message(),
-                            conn.peer_addr()
-                        );
-
-                        conn.writer()
-                            .send(contents)
-                            .await
-                            .expect("Could not send message back to source connection");
-                        info!("Sent message back to original sender");
-                    }
+            while let Some(mut envelope) = conn.reader().next().await {
+                // handle message based on intended recipient
+                if envelope.recipient() == 65535 {
+                    // if recipient is 65535, we do custom processing
+                    let data = envelope.take_data().unwrap();
+                    let msg =
+                        String::from_utf8(data).expect("could not build String from payload bytes");
+                    info!("Received a message \"{}\" from {}", msg, conn.peer_addr());
+
+                    let reply = ConnectDatagram::new(envelope.recipient(), msg.into_bytes())
+                        .expect("could not construct new datagram from built String");
+
+                    conn.writer()
+                        .send(reply)
+                        .await
+                        .expect("Could not send message back to source connection");
+                    info!("Sent message back to original sender");
                 } else {
-                    error!("Received a message of unknown type")
+                    // if recipient is anything else, we just send it back
+                    warn!(
+                        "Received a message for unknown recipient {} from {}",
+                        envelope.recipient(),
+                        conn.peer_addr()
+                    );
+
+                    conn.writer()
+                        .send(envelope)
+                        .await
+                        .expect("Could not send message back to source connection");
+                    info!("Sent message back to original sender");
                 }
             }
         });
diff --git a/examples/tcp-echo-server/src/schema/hello_world.rs b/examples/tcp-echo-server/src/schema/hello_world.rs
deleted file mode 100644 (file)
index 1af771c..0000000
+++ /dev/null
@@ -1,200 +0,0 @@
-// This file is generated by rust-protobuf 2.20.0. Do not edit
-// @generated
-
-// https://github.com/rust-lang/rust-clippy/issues/702
-#![allow(unknown_lints)]
-#![allow(clippy::all)]
-
-#![allow(unused_attributes)]
-#![rustfmt::skip]
-
-#![allow(box_pointers)]
-#![allow(dead_code)]
-#![allow(missing_docs)]
-#![allow(non_camel_case_types)]
-#![allow(non_snake_case)]
-#![allow(non_upper_case_globals)]
-#![allow(trivial_casts)]
-#![allow(unused_imports)]
-#![allow(unused_results)]
-//! Generated file from `hello_world.proto`
-
-/// Generated files are compatible only with the same version
-/// of protobuf runtime.
-// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_20_0;
-
-#[derive(PartialEq,Clone,Default)]
-pub struct HelloWorld {
-    // message fields
-    pub message: ::std::string::String,
-    // special fields
-    pub unknown_fields: ::protobuf::UnknownFields,
-    pub cached_size: ::protobuf::CachedSize,
-}
-
-impl<'a> ::std::default::Default for &'a HelloWorld {
-    fn default() -> &'a HelloWorld {
-        <HelloWorld as ::protobuf::Message>::default_instance()
-    }
-}
-
-impl HelloWorld {
-    pub fn new() -> HelloWorld {
-        ::std::default::Default::default()
-    }
-
-    // string message = 1;
-
-
-    pub fn get_message(&self) -> &str {
-        &self.message
-    }
-    pub fn clear_message(&mut self) {
-        self.message.clear();
-    }
-
-    // Param is passed by value, moved
-    pub fn set_message(&mut self, v: ::std::string::String) {
-        self.message = v;
-    }
-
-    // Mutable pointer to the field.
-    // If field is not initialized, it is initialized with default value first.
-    pub fn mut_message(&mut self) -> &mut ::std::string::String {
-        &mut self.message
-    }
-
-    // Take field
-    pub fn take_message(&mut self) -> ::std::string::String {
-        ::std::mem::replace(&mut self.message, ::std::string::String::new())
-    }
-}
-
-impl ::protobuf::Message for HelloWorld {
-    fn is_initialized(&self) -> bool {
-        true
-    }
-
-    fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> {
-        while !is.eof()? {
-            let (field_number, wire_type) = is.read_tag_unpack()?;
-            match field_number {
-                1 => {
-                    ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.message)?;
-                },
-                _ => {
-                    ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
-                },
-            };
-        }
-        ::std::result::Result::Ok(())
-    }
-
-    // Compute sizes of nested messages
-    #[allow(unused_variables)]
-    fn compute_size(&self) -> u32 {
-        let mut my_size = 0;
-        if !self.message.is_empty() {
-            my_size += ::protobuf::rt::string_size(1, &self.message);
-        }
-        my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
-        self.cached_size.set(my_size);
-        my_size
-    }
-
-    fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
-        if !self.message.is_empty() {
-            os.write_string(1, &self.message)?;
-        }
-        os.write_unknown_fields(self.get_unknown_fields())?;
-        ::std::result::Result::Ok(())
-    }
-
-    fn get_cached_size(&self) -> u32 {
-        self.cached_size.get()
-    }
-
-    fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
-        &self.unknown_fields
-    }
-
-    fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
-        &mut self.unknown_fields
-    }
-
-    fn as_any(&self) -> &dyn (::std::any::Any) {
-        self as &dyn (::std::any::Any)
-    }
-    fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
-        self as &mut dyn (::std::any::Any)
-    }
-    fn into_any(self: ::std::boxed::Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
-        self
-    }
-
-    fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
-        Self::descriptor_static()
-    }
-
-    fn new() -> HelloWorld {
-        HelloWorld::new()
-    }
-
-    fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
-        static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT;
-        descriptor.get(|| {
-            let mut fields = ::std::vec::Vec::new();
-            fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
-                "message",
-                |m: &HelloWorld| { &m.message },
-                |m: &mut HelloWorld| { &mut m.message },
-            ));
-            ::protobuf::reflect::MessageDescriptor::new_pb_name::<HelloWorld>(
-                "HelloWorld",
-                fields,
-                file_descriptor_proto()
-            )
-        })
-    }
-
-    fn default_instance() -> &'static HelloWorld {
-        static instance: ::protobuf::rt::LazyV2<HelloWorld> = ::protobuf::rt::LazyV2::INIT;
-        instance.get(HelloWorld::new)
-    }
-}
-
-impl ::protobuf::Clear for HelloWorld {
-    fn clear(&mut self) {
-        self.message.clear();
-        self.unknown_fields.clear();
-    }
-}
-
-impl ::std::fmt::Debug for HelloWorld {
-    fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
-        ::protobuf::text_format::fmt(self, f)
-    }
-}
-
-impl ::protobuf::reflect::ProtobufValue for HelloWorld {
-    fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
-        ::protobuf::reflect::ReflectValueRef::Message(self)
-    }
-}
-
-static file_descriptor_proto_data: &'static [u8] = b"\
-    \n\x11hello_world.proto\x12\x0bhello_world\"*\n\nHelloWorld\x12\x1a\n\
-    \x07message\x18\x01\x20\x01(\tR\x07messageB\0:\0B\0b\x06proto3\
-";
-
-static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;
-
-fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto {
-    ::protobuf::Message::parse_from_bytes(file_descriptor_proto_data).unwrap()
-}
-
-pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto {
-    file_descriptor_proto_lazy.get(|| {
-        parse_descriptor_proto()
-    })
-}
diff --git a/examples/tcp-echo-server/src/schema/mod.rs b/examples/tcp-echo-server/src/schema/mod.rs
deleted file mode 100644 (file)
index c6dbc18..0000000
+++ /dev/null
@@ -1 +0,0 @@
-pub mod hello_world;
index 407ccb60133ee8169bf9197e93ded213c224a759..e6cf99bfc9d0a3dcd7ea2d18208b0653b94162d8 100644 (file)
@@ -11,9 +11,5 @@ anyhow = "1.0.31"
 async-std = { version = "1.6.2", features = ["attributes"] }
 env_logger = "0.7"
 log = "0.4"
-protobuf = "2.18.1"
 
 connect = { path = "../../" }
-
-[build-dependencies]
-protobuf-codegen-pure = "2.18.1"
diff --git a/examples/tls-client/build.rs b/examples/tls-client/build.rs
deleted file mode 100644 (file)
index ae9222f..0000000
+++ /dev/null
@@ -1,9 +0,0 @@
-fn main() -> Result<(), Box<dyn std::error::Error>> {
-    protobuf_codegen_pure::Codegen::new()
-        .out_dir("src/schema")
-        .inputs(&["../tcp-client/schema/hello_world.proto"])
-        .include("../tcp-client/schema")
-        .run()
-        .expect("Codegen failed.");
-    Ok(())
-}
index d9198e8ca2d756c6a20e657919a3fb75e5b5d5ad..c0b9280a91b45947483a3c30d03ca171800582be 100644 (file)
@@ -1,10 +1,6 @@
-mod schema;
-
-use crate::schema::hello_world::HelloWorld;
 use connect::tls::rustls::ClientConfig;
-use connect::{Connection, SinkExt, StreamExt};
+use connect::{ConnectDatagram, Connection, SinkExt, StreamExt};
 use log::*;
-use protobuf::well_known_types::Any;
 use std::env;
 
 #[async_std::main]
@@ -29,21 +25,20 @@ async fn main() -> anyhow::Result<()> {
     let mut conn = Connection::tls_client(ip_addr, &domain, client_config.into()).await?;
 
     // send a message to the server
-    let raw_msg = String::from("Hello world");
-    info!("Sending message: {}", raw_msg);
-
-    let mut msg = HelloWorld::new();
-    msg.set_message(raw_msg);
+    let msg = String::from("Hello world");
+    info!("Sending message: {}", msg);
 
-    conn.writer().send(msg).await?;
+    let envelope = ConnectDatagram::new(65535, msg.into_bytes())?;
+    conn.writer().send(envelope).await?;
 
     // wait for the server to reply with an ack
-    while let Some(reply) = conn.reader().next().await {
+    if let Some(mut reply) = conn.reader().next().await {
         info!("Received message");
 
-        let msg: HelloWorld = Any::unpack(&reply)?.unwrap();
+        let data = reply.take_data().unwrap();
+        let msg = String::from_utf8(data)?;
 
-        info!("Unpacked reply: {}", msg.get_message());
+        info!("Unpacked reply: {}", msg);
     }
 
     Ok(())
diff --git a/examples/tls-client/src/schema/hello_world.rs b/examples/tls-client/src/schema/hello_world.rs
deleted file mode 100644 (file)
index 1af771c..0000000
+++ /dev/null
@@ -1,200 +0,0 @@
-// This file is generated by rust-protobuf 2.20.0. Do not edit
-// @generated
-
-// https://github.com/rust-lang/rust-clippy/issues/702
-#![allow(unknown_lints)]
-#![allow(clippy::all)]
-
-#![allow(unused_attributes)]
-#![rustfmt::skip]
-
-#![allow(box_pointers)]
-#![allow(dead_code)]
-#![allow(missing_docs)]
-#![allow(non_camel_case_types)]
-#![allow(non_snake_case)]
-#![allow(non_upper_case_globals)]
-#![allow(trivial_casts)]
-#![allow(unused_imports)]
-#![allow(unused_results)]
-//! Generated file from `hello_world.proto`
-
-/// Generated files are compatible only with the same version
-/// of protobuf runtime.
-// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_20_0;
-
-#[derive(PartialEq,Clone,Default)]
-pub struct HelloWorld {
-    // message fields
-    pub message: ::std::string::String,
-    // special fields
-    pub unknown_fields: ::protobuf::UnknownFields,
-    pub cached_size: ::protobuf::CachedSize,
-}
-
-impl<'a> ::std::default::Default for &'a HelloWorld {
-    fn default() -> &'a HelloWorld {
-        <HelloWorld as ::protobuf::Message>::default_instance()
-    }
-}
-
-impl HelloWorld {
-    pub fn new() -> HelloWorld {
-        ::std::default::Default::default()
-    }
-
-    // string message = 1;
-
-
-    pub fn get_message(&self) -> &str {
-        &self.message
-    }
-    pub fn clear_message(&mut self) {
-        self.message.clear();
-    }
-
-    // Param is passed by value, moved
-    pub fn set_message(&mut self, v: ::std::string::String) {
-        self.message = v;
-    }
-
-    // Mutable pointer to the field.
-    // If field is not initialized, it is initialized with default value first.
-    pub fn mut_message(&mut self) -> &mut ::std::string::String {
-        &mut self.message
-    }
-
-    // Take field
-    pub fn take_message(&mut self) -> ::std::string::String {
-        ::std::mem::replace(&mut self.message, ::std::string::String::new())
-    }
-}
-
-impl ::protobuf::Message for HelloWorld {
-    fn is_initialized(&self) -> bool {
-        true
-    }
-
-    fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> {
-        while !is.eof()? {
-            let (field_number, wire_type) = is.read_tag_unpack()?;
-            match field_number {
-                1 => {
-                    ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.message)?;
-                },
-                _ => {
-                    ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
-                },
-            };
-        }
-        ::std::result::Result::Ok(())
-    }
-
-    // Compute sizes of nested messages
-    #[allow(unused_variables)]
-    fn compute_size(&self) -> u32 {
-        let mut my_size = 0;
-        if !self.message.is_empty() {
-            my_size += ::protobuf::rt::string_size(1, &self.message);
-        }
-        my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
-        self.cached_size.set(my_size);
-        my_size
-    }
-
-    fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
-        if !self.message.is_empty() {
-            os.write_string(1, &self.message)?;
-        }
-        os.write_unknown_fields(self.get_unknown_fields())?;
-        ::std::result::Result::Ok(())
-    }
-
-    fn get_cached_size(&self) -> u32 {
-        self.cached_size.get()
-    }
-
-    fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
-        &self.unknown_fields
-    }
-
-    fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
-        &mut self.unknown_fields
-    }
-
-    fn as_any(&self) -> &dyn (::std::any::Any) {
-        self as &dyn (::std::any::Any)
-    }
-    fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
-        self as &mut dyn (::std::any::Any)
-    }
-    fn into_any(self: ::std::boxed::Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
-        self
-    }
-
-    fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
-        Self::descriptor_static()
-    }
-
-    fn new() -> HelloWorld {
-        HelloWorld::new()
-    }
-
-    fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
-        static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT;
-        descriptor.get(|| {
-            let mut fields = ::std::vec::Vec::new();
-            fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
-                "message",
-                |m: &HelloWorld| { &m.message },
-                |m: &mut HelloWorld| { &mut m.message },
-            ));
-            ::protobuf::reflect::MessageDescriptor::new_pb_name::<HelloWorld>(
-                "HelloWorld",
-                fields,
-                file_descriptor_proto()
-            )
-        })
-    }
-
-    fn default_instance() -> &'static HelloWorld {
-        static instance: ::protobuf::rt::LazyV2<HelloWorld> = ::protobuf::rt::LazyV2::INIT;
-        instance.get(HelloWorld::new)
-    }
-}
-
-impl ::protobuf::Clear for HelloWorld {
-    fn clear(&mut self) {
-        self.message.clear();
-        self.unknown_fields.clear();
-    }
-}
-
-impl ::std::fmt::Debug for HelloWorld {
-    fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
-        ::protobuf::text_format::fmt(self, f)
-    }
-}
-
-impl ::protobuf::reflect::ProtobufValue for HelloWorld {
-    fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
-        ::protobuf::reflect::ReflectValueRef::Message(self)
-    }
-}
-
-static file_descriptor_proto_data: &'static [u8] = b"\
-    \n\x11hello_world.proto\x12\x0bhello_world\"*\n\nHelloWorld\x12\x1a\n\
-    \x07message\x18\x01\x20\x01(\tR\x07messageB\0:\0B\0b\x06proto3\
-";
-
-static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;
-
-fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto {
-    ::protobuf::Message::parse_from_bytes(file_descriptor_proto_data).unwrap()
-}
-
-pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto {
-    file_descriptor_proto_lazy.get(|| {
-        parse_descriptor_proto()
-    })
-}
diff --git a/examples/tls-client/src/schema/mod.rs b/examples/tls-client/src/schema/mod.rs
deleted file mode 100644 (file)
index c6dbc18..0000000
+++ /dev/null
@@ -1 +0,0 @@
-pub mod hello_world;
index 60a13bcd1d5421e67ab7d22c99e4c8b3f4f8e742..820c0c6295ba33dbd84ad0788ce6f4a0de7fbad0 100644 (file)
@@ -11,9 +11,5 @@ anyhow = "1.0.31"
 async-std = { version = "1.6.2", features = ["attributes"] }
 env_logger = "0.7"
 log = "0.4"
-protobuf = "2.18.1"
 
 connect = { path = "../../" }
-
-[build-dependencies]
-protobuf-codegen-pure = "2.18.1"
diff --git a/examples/tls-echo-server/build.rs b/examples/tls-echo-server/build.rs
deleted file mode 100644 (file)
index ae9222f..0000000
+++ /dev/null
@@ -1,9 +0,0 @@
-fn main() -> Result<(), Box<dyn std::error::Error>> {
-    protobuf_codegen_pure::Codegen::new()
-        .out_dir("src/schema")
-        .inputs(&["../tcp-client/schema/hello_world.proto"])
-        .include("../tcp-client/schema")
-        .run()
-        .expect("Codegen failed.");
-    Ok(())
-}
index bf9d476f2579363c4b35d231e22c0a89c0f9a94e..2a9abc8eb79afe229c583232973ede8d0f64e1a5 100644 (file)
@@ -1,11 +1,8 @@
-mod schema;
-
-use crate::schema::hello_world::HelloWorld;
 use async_std::{io, task};
 use connect::tls::rustls::internal::pemfile::{certs, rsa_private_keys};
 use connect::tls::rustls::{NoClientAuth, ServerConfig};
 use connect::tls::TlsListener;
-use connect::{SinkExt, StreamExt};
+use connect::{ConnectDatagram, SinkExt, StreamExt};
 use log::*;
 use std::env;
 use std::fs::File;
@@ -38,23 +35,36 @@ async fn main() -> anyhow::Result<()> {
         info!("Handling connection from {}", conn.peer_addr());
 
         task::spawn(async move {
-            while let Some(msg) = conn.reader().next().await {
-                if msg.is::<HelloWorld>() {
-                    if let Ok(Some(contents)) = msg.unpack::<HelloWorld>() {
-                        info!(
-                            "Received a message \"{}\" from {}",
-                            contents.get_message(),
-                            conn.peer_addr()
-                        );
-
-                        conn.writer()
-                            .send(contents)
-                            .await
-                            .expect("Could not send message back to source connection");
-                        info!("Sent message back to original sender");
-                    }
+            while let Some(mut envelope) = conn.reader().next().await {
+                // handle message based on intended recipient
+                if envelope.recipient() == 65535 {
+                    // if recipient is 65535, we do custom processing
+                    let data = envelope.take_data().unwrap();
+                    let msg =
+                        String::from_utf8(data).expect("could not build String from payload bytes");
+                    info!("Received a message \"{}\" from {}", msg, conn.peer_addr());
+
+                    let reply = ConnectDatagram::new(envelope.recipient(), msg.into_bytes())
+                        .expect("could not construct new datagram from built String");
+
+                    conn.writer()
+                        .send(reply)
+                        .await
+                        .expect("Could not send message back to source connection");
+                    info!("Sent message back to original sender");
                 } else {
-                    error!("Received a message of unknown type")
+                    // if recipient is anything else, we just send it back
+                    warn!(
+                        "Received a message for unknown recipient {} from {}",
+                        envelope.recipient(),
+                        conn.peer_addr()
+                    );
+
+                    conn.writer()
+                        .send(envelope)
+                        .await
+                        .expect("Could not send message back to source connection");
+                    info!("Sent message back to original sender");
                 }
             }
         });
diff --git a/examples/tls-echo-server/src/schema/hello_world.rs b/examples/tls-echo-server/src/schema/hello_world.rs
deleted file mode 100644 (file)
index 1af771c..0000000
+++ /dev/null
@@ -1,200 +0,0 @@
-// This file is generated by rust-protobuf 2.20.0. Do not edit
-// @generated
-
-// https://github.com/rust-lang/rust-clippy/issues/702
-#![allow(unknown_lints)]
-#![allow(clippy::all)]
-
-#![allow(unused_attributes)]
-#![rustfmt::skip]
-
-#![allow(box_pointers)]
-#![allow(dead_code)]
-#![allow(missing_docs)]
-#![allow(non_camel_case_types)]
-#![allow(non_snake_case)]
-#![allow(non_upper_case_globals)]
-#![allow(trivial_casts)]
-#![allow(unused_imports)]
-#![allow(unused_results)]
-//! Generated file from `hello_world.proto`
-
-/// Generated files are compatible only with the same version
-/// of protobuf runtime.
-// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_20_0;
-
-#[derive(PartialEq,Clone,Default)]
-pub struct HelloWorld {
-    // message fields
-    pub message: ::std::string::String,
-    // special fields
-    pub unknown_fields: ::protobuf::UnknownFields,
-    pub cached_size: ::protobuf::CachedSize,
-}
-
-impl<'a> ::std::default::Default for &'a HelloWorld {
-    fn default() -> &'a HelloWorld {
-        <HelloWorld as ::protobuf::Message>::default_instance()
-    }
-}
-
-impl HelloWorld {
-    pub fn new() -> HelloWorld {
-        ::std::default::Default::default()
-    }
-
-    // string message = 1;
-
-
-    pub fn get_message(&self) -> &str {
-        &self.message
-    }
-    pub fn clear_message(&mut self) {
-        self.message.clear();
-    }
-
-    // Param is passed by value, moved
-    pub fn set_message(&mut self, v: ::std::string::String) {
-        self.message = v;
-    }
-
-    // Mutable pointer to the field.
-    // If field is not initialized, it is initialized with default value first.
-    pub fn mut_message(&mut self) -> &mut ::std::string::String {
-        &mut self.message
-    }
-
-    // Take field
-    pub fn take_message(&mut self) -> ::std::string::String {
-        ::std::mem::replace(&mut self.message, ::std::string::String::new())
-    }
-}
-
-impl ::protobuf::Message for HelloWorld {
-    fn is_initialized(&self) -> bool {
-        true
-    }
-
-    fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> {
-        while !is.eof()? {
-            let (field_number, wire_type) = is.read_tag_unpack()?;
-            match field_number {
-                1 => {
-                    ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.message)?;
-                },
-                _ => {
-                    ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
-                },
-            };
-        }
-        ::std::result::Result::Ok(())
-    }
-
-    // Compute sizes of nested messages
-    #[allow(unused_variables)]
-    fn compute_size(&self) -> u32 {
-        let mut my_size = 0;
-        if !self.message.is_empty() {
-            my_size += ::protobuf::rt::string_size(1, &self.message);
-        }
-        my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
-        self.cached_size.set(my_size);
-        my_size
-    }
-
-    fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
-        if !self.message.is_empty() {
-            os.write_string(1, &self.message)?;
-        }
-        os.write_unknown_fields(self.get_unknown_fields())?;
-        ::std::result::Result::Ok(())
-    }
-
-    fn get_cached_size(&self) -> u32 {
-        self.cached_size.get()
-    }
-
-    fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
-        &self.unknown_fields
-    }
-
-    fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
-        &mut self.unknown_fields
-    }
-
-    fn as_any(&self) -> &dyn (::std::any::Any) {
-        self as &dyn (::std::any::Any)
-    }
-    fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
-        self as &mut dyn (::std::any::Any)
-    }
-    fn into_any(self: ::std::boxed::Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
-        self
-    }
-
-    fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
-        Self::descriptor_static()
-    }
-
-    fn new() -> HelloWorld {
-        HelloWorld::new()
-    }
-
-    fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
-        static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT;
-        descriptor.get(|| {
-            let mut fields = ::std::vec::Vec::new();
-            fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
-                "message",
-                |m: &HelloWorld| { &m.message },
-                |m: &mut HelloWorld| { &mut m.message },
-            ));
-            ::protobuf::reflect::MessageDescriptor::new_pb_name::<HelloWorld>(
-                "HelloWorld",
-                fields,
-                file_descriptor_proto()
-            )
-        })
-    }
-
-    fn default_instance() -> &'static HelloWorld {
-        static instance: ::protobuf::rt::LazyV2<HelloWorld> = ::protobuf::rt::LazyV2::INIT;
-        instance.get(HelloWorld::new)
-    }
-}
-
-impl ::protobuf::Clear for HelloWorld {
-    fn clear(&mut self) {
-        self.message.clear();
-        self.unknown_fields.clear();
-    }
-}
-
-impl ::std::fmt::Debug for HelloWorld {
-    fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
-        ::protobuf::text_format::fmt(self, f)
-    }
-}
-
-impl ::protobuf::reflect::ProtobufValue for HelloWorld {
-    fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
-        ::protobuf::reflect::ReflectValueRef::Message(self)
-    }
-}
-
-static file_descriptor_proto_data: &'static [u8] = b"\
-    \n\x11hello_world.proto\x12\x0bhello_world\"*\n\nHelloWorld\x12\x1a\n\
-    \x07message\x18\x01\x20\x01(\tR\x07messageB\0:\0B\0b\x06proto3\
-";
-
-static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;
-
-fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto {
-    ::protobuf::Message::parse_from_bytes(file_descriptor_proto_data).unwrap()
-}
-
-pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto {
-    file_descriptor_proto_lazy.get(|| {
-        parse_descriptor_proto()
-    })
-}
diff --git a/examples/tls-echo-server/src/schema/mod.rs b/examples/tls-echo-server/src/schema/mod.rs
deleted file mode 100644 (file)
index c6dbc18..0000000
+++ /dev/null
@@ -1 +0,0 @@
-pub mod hello_world;
diff --git a/schema/message.proto b/schema/message.proto
deleted file mode 100644 (file)
index d1f6ee0..0000000
+++ /dev/null
@@ -1,8 +0,0 @@
-syntax = "proto3";
-package message;
-
-import "google/protobuf/any.proto";
-
-message ConnectionMessage {
-    google.protobuf.Any payload = 1;
-}
index 4f1fe18ac1a8b8506465600a1c1c77b65065aa95..45f76da53ee4caff795accb05202e55d90c82df5 100644 (file)
 //! Until the Rust ecosystem around reflection improves, the crate will use Protobuf to fill the
 //! void.
 
+pub mod protocol;
 mod reader;
-pub(crate) mod schema;
 pub mod tcp;
 pub mod tls;
 mod writer;
 
+pub use crate::protocol::{ConnectDatagram, DatagramEmptyError};
 pub use crate::reader::ConnectionReader;
 pub use crate::writer::ConnectionWriter;
 use async_std::{net::SocketAddr, pin::Pin};
diff --git a/src/protocol.rs b/src/protocol.rs
new file mode 100644 (file)
index 0000000..0574d9f
--- /dev/null
@@ -0,0 +1,182 @@
+use std::error::Error;
+use std::io::Read;
+
+const VERSION: u8 = 1;
+
+#[derive(Debug, Clone)]
+pub struct DatagramEmptyError;
+
+impl Error for DatagramEmptyError {}
+
+impl std::fmt::Display for DatagramEmptyError {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        write!(
+            f,
+            "datagram cannot be constructed when provided payload is empty"
+        )
+    }
+}
+
+pub struct ConnectDatagram {
+    version: u8,
+    recipient: u16,
+    data: Option<Vec<u8>>,
+}
+
+impl ConnectDatagram {
+    pub fn new(recipient: u16, data: Vec<u8>) -> Result<Self, DatagramEmptyError> {
+        if data.len() > 0 {
+            Ok(Self {
+                version: VERSION,
+                recipient,
+                data: Some(data),
+            })
+        } else {
+            Err(DatagramEmptyError)
+        }
+    }
+
+    pub fn version(&self) -> u8 {
+        self.version
+    }
+
+    pub fn recipient(&self) -> u16 {
+        self.recipient
+    }
+
+    pub fn data(&self) -> Option<&Vec<u8>> {
+        self.data.as_ref()
+    }
+
+    pub fn take_data(&mut self) -> Option<Vec<u8>> {
+        self.data.take()
+    }
+
+    pub fn size(&self) -> usize {
+        let data_len = if let Some(data) = self.data() {
+            data.len()
+        } else {
+            0
+        };
+
+        3 + data_len
+    }
+
+    pub fn bytes(&self) -> Vec<u8> {
+        let mut bytes = Vec::with_capacity(self.size());
+
+        bytes.extend(&self.version.to_be_bytes());
+        bytes.extend(&self.recipient.to_be_bytes());
+
+        if let Some(data) = self.data() {
+            bytes.extend(data.as_slice());
+        }
+
+        return bytes;
+    }
+
+    pub fn encode(&self) -> Vec<u8> {
+        let size: u32 = (self.size()) as u32;
+
+        let mut bytes = Vec::from(size.to_be_bytes());
+        bytes.extend(self.bytes());
+
+        return bytes;
+    }
+
+    pub fn decode(source: &mut (dyn Read + Send + Sync)) -> anyhow::Result<Self> {
+        // payload size
+        let mut payload_size_bytes: [u8; 4] = [0; 4];
+        source.read_exact(&mut payload_size_bytes)?;
+        let payload_size = u32::from_be_bytes(payload_size_bytes);
+
+        // read whole payload
+        let mut payload_bytes = vec![0; payload_size as usize];
+        source.read_exact(payload_bytes.as_mut_slice())?;
+
+        // version
+        let version_bytes = payload_bytes.remove(0);
+        let version = u8::from_be(version_bytes);
+
+        // recipient
+        let mut recipient_bytes: [u8; 2] = [0; 2];
+        for i in 0..recipient_bytes.len() {
+            recipient_bytes[i] = payload_bytes.remove(0);
+        }
+        let recipient = u16::from_be_bytes(recipient_bytes);
+
+        // data
+        let data = payload_bytes;
+
+        if data.len() > 0 {
+            Ok(Self {
+                version,
+                recipient,
+                data: Some(data),
+            })
+        } else {
+            Err(anyhow::Error::from(DatagramEmptyError))
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::protocol::ConnectDatagram;
+    use std::io::Cursor;
+
+    #[test]
+    fn encoded_size() -> anyhow::Result<()> {
+        let mut data = Vec::new();
+        for _ in 0..5 {
+            data.push(1);
+        }
+        assert_eq!(5, data.len());
+
+        let sample = ConnectDatagram::new(1, data)?;
+        assert_eq!(7 + 5, sample.encode().len());
+
+        Ok(())
+    }
+
+    #[test]
+    fn take_data() -> anyhow::Result<()> {
+        let mut data = Vec::new();
+        for _ in 0..5 {
+            data.push(1);
+        }
+
+        let mut sample = ConnectDatagram::new(1, data)?;
+
+        let taken_data = sample.take_data().unwrap();
+        assert!(sample.data().is_none());
+        assert_eq!(5, taken_data.len());
+
+        Ok(())
+    }
+
+    #[async_std::test]
+    async fn encode_and_decode() -> anyhow::Result<()> {
+        let mut data = Vec::new();
+        for _ in 0..5 {
+            data.push(1);
+        }
+        assert_eq!(5, data.len());
+
+        let sample = ConnectDatagram::new(1, data)?;
+
+        let mut payload = sample.encode();
+        assert_eq!(7 + 5, payload.len());
+
+        let mut cursor: Cursor<&mut [u8]> = Cursor::new(payload.as_mut());
+        let sample_back_res = ConnectDatagram::decode(&mut cursor);
+        assert!(sample_back_res.is_ok());
+
+        let sample_back = sample_back_res.unwrap();
+        assert_eq!(sample_back.version(), 1);
+        assert_eq!(sample_back.recipient(), 1);
+        assert_eq!(sample_back.data().unwrap().len(), 5);
+
+        Ok(())
+    }
+}
index 428f6d19902f4c7afc614e8b1f66704d39a117e0..39b1048f896fca459f491d4b566cf74c4acd11ad 100644 (file)
@@ -1,16 +1,14 @@
-use crate::schema::ConnectionMessage;
 use async_std::net::SocketAddr;
 use async_std::pin::Pin;
 use bytes::{Buf, BytesMut};
 use futures::task::{Context, Poll};
 use futures::{AsyncRead, Stream};
 use log::*;
-use protobuf::Message;
-use std::convert::TryInto;
 
+use crate::protocol::ConnectDatagram;
 pub use futures::SinkExt;
 pub use futures::StreamExt;
-use protobuf::well_known_types::Any;
+use std::io::Cursor;
 
 /// A default buffer size to read in bytes and then deserialize as messages
 const BUFFER_SIZE: usize = 8192;
@@ -81,7 +79,7 @@ impl ConnectionReader {
 }
 
 impl Stream for ConnectionReader {
-    type Item = Any;
+    type Item = ConnectDatagram;
 
     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
         let mut buffer = BytesMut::new();
@@ -111,36 +109,30 @@ impl Stream for ConnectionReader {
                         buffer = pending_buf;
                     }
 
-                    let mut bytes_read_u64: u64 = bytes_read.try_into().expect(
-                        format!("Conversion from usize ({}) to u64 failed", bytes_read).as_str(),
-                    );
-                    while bytes_read_u64 > 0 {
-                        trace!(
-                            "{} bytes from network stream still unprocessed",
-                            bytes_read_u64
-                        );
+                    while bytes_read > 0 {
+                        trace!("{} bytes from network stream still unprocessed", bytes_read);
 
                         buffer.resize(bytes_read, 0);
 
-                        match ConnectionMessage::parse_from_bytes(buffer.as_ref()) {
-                            Ok(mut data) => {
-                                let serialized_size = data.compute_size();
-                                trace!("Deserialized message of size {} bytes", serialized_size);
-
-                                buffer.advance(serialized_size as usize);
-
-                                let serialized_size_u64: u64 = serialized_size.try_into().expect(
-                                    format!(
-                                        "Conversion from usize ({}) to u64 failed",
-                                        serialized_size
-                                    )
-                                    .as_str(),
-                                );
-                                bytes_read_u64 -= serialized_size_u64;
-                                trace!("{} bytes still unprocessed", bytes_read_u64);
-
-                                trace!("Sending deserialized message downstream");
-                                return Poll::Ready(Some(data.take_payload()));
+                        let mut cursor = Cursor::new(buffer.as_mut());
+                        match ConnectDatagram::decode(&mut cursor) {
+                            Ok(data) => {
+                                return match data.version() {
+                                    _ => {
+                                        let serialized_size = data.size();
+                                        trace!(
+                                            "Deserialized message of size {} bytes",
+                                            serialized_size
+                                        );
+
+                                        buffer.advance(serialized_size);
+                                        bytes_read -= serialized_size;
+                                        trace!("{} bytes still unprocessed", bytes_read);
+
+                                        trace!("Sending deserialized message downstream");
+                                        Poll::Ready(Some(data))
+                                    }
+                                }
                             }
 
                             Err(err) => {
diff --git a/src/schema/message.rs b/src/schema/message.rs
deleted file mode 100644 (file)
index 265075f..0000000
+++ /dev/null
@@ -1,216 +0,0 @@
-// This file is generated by rust-protobuf 2.20.0. Do not edit
-// @generated
-
-// https://github.com/rust-lang/rust-clippy/issues/702
-#![allow(unknown_lints)]
-#![allow(clippy::all)]
-
-#![allow(unused_attributes)]
-#![rustfmt::skip]
-
-#![allow(box_pointers)]
-#![allow(dead_code)]
-#![allow(missing_docs)]
-#![allow(non_camel_case_types)]
-#![allow(non_snake_case)]
-#![allow(non_upper_case_globals)]
-#![allow(trivial_casts)]
-#![allow(unused_imports)]
-#![allow(unused_results)]
-//! Generated file from `message.proto`
-
-/// Generated files are compatible only with the same version
-/// of protobuf runtime.
-// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_20_0;
-
-#[derive(PartialEq,Clone,Default)]
-pub struct ConnectionMessage {
-    // message fields
-    pub payload: ::protobuf::SingularPtrField<::protobuf::well_known_types::Any>,
-    // special fields
-    pub unknown_fields: ::protobuf::UnknownFields,
-    pub cached_size: ::protobuf::CachedSize,
-}
-
-impl<'a> ::std::default::Default for &'a ConnectionMessage {
-    fn default() -> &'a ConnectionMessage {
-        <ConnectionMessage as ::protobuf::Message>::default_instance()
-    }
-}
-
-impl ConnectionMessage {
-    pub fn new() -> ConnectionMessage {
-        ::std::default::Default::default()
-    }
-
-    // .google.protobuf.Any payload = 1;
-
-
-    pub fn get_payload(&self) -> &::protobuf::well_known_types::Any {
-        self.payload.as_ref().unwrap_or_else(|| <::protobuf::well_known_types::Any as ::protobuf::Message>::default_instance())
-    }
-    pub fn clear_payload(&mut self) {
-        self.payload.clear();
-    }
-
-    pub fn has_payload(&self) -> bool {
-        self.payload.is_some()
-    }
-
-    // Param is passed by value, moved
-    pub fn set_payload(&mut self, v: ::protobuf::well_known_types::Any) {
-        self.payload = ::protobuf::SingularPtrField::some(v);
-    }
-
-    // Mutable pointer to the field.
-    // If field is not initialized, it is initialized with default value first.
-    pub fn mut_payload(&mut self) -> &mut ::protobuf::well_known_types::Any {
-        if self.payload.is_none() {
-            self.payload.set_default();
-        }
-        self.payload.as_mut().unwrap()
-    }
-
-    // Take field
-    pub fn take_payload(&mut self) -> ::protobuf::well_known_types::Any {
-        self.payload.take().unwrap_or_else(|| ::protobuf::well_known_types::Any::new())
-    }
-}
-
-impl ::protobuf::Message for ConnectionMessage {
-    fn is_initialized(&self) -> bool {
-        for v in &self.payload {
-            if !v.is_initialized() {
-                return false;
-            }
-        };
-        true
-    }
-
-    fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> {
-        while !is.eof()? {
-            let (field_number, wire_type) = is.read_tag_unpack()?;
-            match field_number {
-                1 => {
-                    ::protobuf::rt::read_singular_message_into(wire_type, is, &mut self.payload)?;
-                },
-                _ => {
-                    ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
-                },
-            };
-        }
-        ::std::result::Result::Ok(())
-    }
-
-    // Compute sizes of nested messages
-    #[allow(unused_variables)]
-    fn compute_size(&self) -> u32 {
-        let mut my_size = 0;
-        if let Some(ref v) = self.payload.as_ref() {
-            let len = v.compute_size();
-            my_size += 1 + ::protobuf::rt::compute_raw_varint32_size(len) + len;
-        }
-        my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
-        self.cached_size.set(my_size);
-        my_size
-    }
-
-    fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
-        if let Some(ref v) = self.payload.as_ref() {
-            os.write_tag(1, ::protobuf::wire_format::WireTypeLengthDelimited)?;
-            os.write_raw_varint32(v.get_cached_size())?;
-            v.write_to_with_cached_sizes(os)?;
-        }
-        os.write_unknown_fields(self.get_unknown_fields())?;
-        ::std::result::Result::Ok(())
-    }
-
-    fn get_cached_size(&self) -> u32 {
-        self.cached_size.get()
-    }
-
-    fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
-        &self.unknown_fields
-    }
-
-    fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
-        &mut self.unknown_fields
-    }
-
-    fn as_any(&self) -> &dyn (::std::any::Any) {
-        self as &dyn (::std::any::Any)
-    }
-    fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
-        self as &mut dyn (::std::any::Any)
-    }
-    fn into_any(self: ::std::boxed::Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
-        self
-    }
-
-    fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
-        Self::descriptor_static()
-    }
-
-    fn new() -> ConnectionMessage {
-        ConnectionMessage::new()
-    }
-
-    fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
-        static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT;
-        descriptor.get(|| {
-            let mut fields = ::std::vec::Vec::new();
-            fields.push(::protobuf::reflect::accessor::make_singular_ptr_field_accessor::<_, ::protobuf::types::ProtobufTypeMessage<::protobuf::well_known_types::Any>>(
-                "payload",
-                |m: &ConnectionMessage| { &m.payload },
-                |m: &mut ConnectionMessage| { &mut m.payload },
-            ));
-            ::protobuf::reflect::MessageDescriptor::new_pb_name::<ConnectionMessage>(
-                "ConnectionMessage",
-                fields,
-                file_descriptor_proto()
-            )
-        })
-    }
-
-    fn default_instance() -> &'static ConnectionMessage {
-        static instance: ::protobuf::rt::LazyV2<ConnectionMessage> = ::protobuf::rt::LazyV2::INIT;
-        instance.get(ConnectionMessage::new)
-    }
-}
-
-impl ::protobuf::Clear for ConnectionMessage {
-    fn clear(&mut self) {
-        self.payload.clear();
-        self.unknown_fields.clear();
-    }
-}
-
-impl ::std::fmt::Debug for ConnectionMessage {
-    fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
-        ::protobuf::text_format::fmt(self, f)
-    }
-}
-
-impl ::protobuf::reflect::ProtobufValue for ConnectionMessage {
-    fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
-        ::protobuf::reflect::ReflectValueRef::Message(self)
-    }
-}
-
-static file_descriptor_proto_data: &'static [u8] = b"\
-    \n\rmessage.proto\x12\x07message\x1a\x19google/protobuf/any.proto\"G\n\
-    \x11ConnectionMessage\x120\n\x07payload\x18\x01\x20\x01(\x0b2\x14.google\
-    .protobuf.AnyR\x07payloadB\0:\0B\0b\x06proto3\
-";
-
-static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;
-
-fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto {
-    ::protobuf::Message::parse_from_bytes(file_descriptor_proto_data).unwrap()
-}
-
-pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto {
-    file_descriptor_proto_lazy.get(|| {
-        parse_descriptor_proto()
-    })
-}
diff --git a/src/schema/mod.rs b/src/schema/mod.rs
deleted file mode 100644 (file)
index 2774e64..0000000
+++ /dev/null
@@ -1,15 +0,0 @@
-mod message;
-
-pub use message::ConnectionMessage;
-use protobuf::well_known_types::Any;
-use protobuf::Message;
-
-impl ConnectionMessage {
-    pub(crate) fn from_msg<M: Message>(msg: M) -> Self {
-        let mut sm = Self::new();
-        let payload = Any::pack(&msg).expect("Protobuf Message could not be packed into Any type");
-
-        sm.set_payload(payload);
-        return sm;
-    }
-}
index 9a6f20af85ab3e3bc43243b94e29a325141bc4a1..4ee98f29456002ee9df3d1ebce11b605b0fece6f 100644 (file)
@@ -1,4 +1,3 @@
-use crate::schema::ConnectionMessage;
 use async_channel::RecvError;
 use async_std::net::SocketAddr;
 use async_std::pin::Pin;
@@ -6,8 +5,8 @@ use futures::io::IoSlice;
 use futures::task::{Context, Poll};
 use futures::{AsyncWrite, Sink};
 use log::*;
-use protobuf::Message;
 
+use crate::protocol::ConnectDatagram;
 pub use futures::SinkExt;
 pub use futures::StreamExt;
 
@@ -65,39 +64,11 @@ impl ConnectionWriter {
     pub fn is_closed(&self) -> bool {
         self.closed
     }
-}
-
-impl<M: Message> Sink<M> for ConnectionWriter {
-    type Error = RecvError;
-
-    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
-        if self.is_closed() {
-            trace!("Connection is closed, cannot send message");
-            Poll::Ready(Err(RecvError))
-        } else {
-            trace!("Connection ready to send message");
-            Poll::Ready(Ok(()))
-        }
-    }
 
-    fn start_send(mut self: Pin<&mut Self>, item: M) -> Result<(), Self::Error> {
-        trace!("Preparing message to be sent next");
-        let msg: ConnectionMessage = ConnectionMessage::from_msg(item);
-
-        if let Ok(buffer) = msg.write_to_bytes() {
-            let msg_size = buffer.len();
-            trace!("Serialized pending message into {} bytes", msg_size);
-
-            self.pending_writes.push(buffer);
-
-            Ok(())
-        } else {
-            error!("Encountered error when serializing message to bytes");
-            Err(RecvError)
-        }
-    }
-
-    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+    pub(crate) fn write_pending_bytes(
+        &mut self,
+        cx: &mut Context<'_>,
+    ) -> Poll<Result<(), RecvError>> {
         if self.pending_writes.len() > 0 {
             let stream = self.write_stream.as_mut();
 
@@ -136,49 +107,41 @@ impl<M: Message> Sink<M> for ConnectionWriter {
             Poll::Ready(Ok(()))
         }
     }
+}
 
-    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
-        self.closed = true;
-
-        let flush = if self.pending_writes.len() > 0 {
-            let stream = self.write_stream.as_mut();
+impl Sink<ConnectDatagram> for ConnectionWriter {
+    type Error = RecvError;
 
-            match stream.poll_flush(cx) {
-                Poll::Pending => Poll::Pending,
+    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+        if self.is_closed() {
+            trace!("Connection is closed, cannot send message");
+            Poll::Ready(Err(RecvError))
+        } else {
+            trace!("Connection ready to send message");
+            Poll::Ready(Ok(()))
+        }
+    }
 
-                Poll::Ready(Ok(_)) => {
-                    trace!("Sending pending bytes");
+    fn start_send(mut self: Pin<&mut Self>, item: ConnectDatagram) -> Result<(), Self::Error> {
+        trace!("Preparing message to be sent next");
 
-                    let pending = self.pending_writes.split_off(0);
-                    let writeable_vec: Vec<IoSlice> =
-                        pending.iter().map(|p| IoSlice::new(p)).collect();
+        let buffer = item.encode();
+        let msg_size = buffer.len();
+        trace!("Serialized pending message into {} bytes", msg_size);
 
-                    let stream = self.write_stream.as_mut();
-                    match stream.poll_write_vectored(cx, writeable_vec.as_slice()) {
-                        Poll::Pending => Poll::Pending,
+        self.pending_writes.push(buffer);
 
-                        Poll::Ready(Ok(bytes_written)) => {
-                            trace!("Wrote {} bytes to network stream", bytes_written);
-                            Poll::Ready(Ok(()))
-                        }
+        Ok(())
+    }
 
-                        Poll::Ready(Err(_e)) => {
-                            error!("Encountered error when writing to network stream");
-                            Poll::Ready(Err(RecvError))
-                        }
-                    }
-                }
+    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+        self.write_pending_bytes(cx)
+    }
 
-                Poll::Ready(Err(_e)) => {
-                    error!("Encountered error when flushing network stream");
-                    Poll::Ready(Err(RecvError))
-                }
-            }
-        } else {
-            Poll::Ready(Ok(()))
-        };
+    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+        self.closed = true;
 
-        match flush {
+        match self.write_pending_bytes(cx) {
             Poll::Pending => Poll::Pending,
 
             Poll::Ready(Ok(_)) => {