From 2aae1c9971c0c2192ac9c3bf3d5fd2237f779d62 Mon Sep 17 00:00:00 2001 From: Sachandhan Ganesh Date: Sat, 13 Feb 2021 22:47:38 -0800 Subject: [PATCH] introduce new example program to test ordering of messages --- examples/tcp-client-blaster/Cargo.toml | 15 ++++++ examples/tcp-client-blaster/src/main.rs | 66 +++++++++++++++++++++++++ 2 files changed, 81 insertions(+) create mode 100644 examples/tcp-client-blaster/Cargo.toml create mode 100644 examples/tcp-client-blaster/src/main.rs diff --git a/examples/tcp-client-blaster/Cargo.toml b/examples/tcp-client-blaster/Cargo.toml new file mode 100644 index 0000000..16d45e2 --- /dev/null +++ b/examples/tcp-client-blaster/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "tcp-client-blaster" +version = "0.1.0" +authors = ["Sachandhan Ganesh "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1.0" +async-std = { version = "1.9.0", features = ["attributes"] } +env_logger = "0.7" +log = "0.4" + +connect = { path = "../../" } \ No newline at end of file diff --git a/examples/tcp-client-blaster/src/main.rs b/examples/tcp-client-blaster/src/main.rs new file mode 100644 index 0000000..d93b85e --- /dev/null +++ b/examples/tcp-client-blaster/src/main.rs @@ -0,0 +1,66 @@ +use connect::{ConnectDatagram, Connection, SinkExt, StreamExt}; +use log::*; +use std::env; + +type Number = u16; +const NUM_MESSAGES: Number = 10000; + +#[async_std::main] +async fn main() -> anyhow::Result<()> { + env_logger::init(); + + // Get ip address from cmd line args + let args: Vec = env::args().collect(); + let ip_address = match args.get(1) { + Some(addr) => addr, + None => { + error!("Need to pass IP address to connect to as command line argument"); + panic!(); + } + }; + + // create a client connection to the server + let conn = Connection::tcp_client(ip_address).await?; + let (mut reader, mut writer) = conn.split(); + + // receive messages + let read_task = async_std::task::spawn(async move { + let mut prev: Option = None; + + while let Some(mut reply) = reader.next().await { + let mut payload = reply.take_data().unwrap(); + + let mut data_bytes: [u8; 2] = [0; 2]; + for i in 0..payload.len() { + data_bytes[i] = payload.remove(0); + } + + let data = Number::from_be_bytes(data_bytes); + + if let Some(prev_num) = prev { + assert_eq!(prev_num + 1, data); + } else { + assert_eq!(0, data); + } + + prev = Some(data); + info!("Received message: {}", data); + + if prev.unwrap() + 1 == NUM_MESSAGES { + break; + } + } + }); + + // send messages to the server + for i in 0..NUM_MESSAGES { + // info!("Sending message: {}", i); + let data = i.to_be_bytes().to_vec(); + let envelope = ConnectDatagram::new(i, data)?; + writer.send(envelope).await?; + } + + read_task.await; + + Ok(()) +} -- 2.44.0