async-tls = { version = "0.9.0", default-features = false, features = ["client", "server"]}
bytes = "0.5.5"
futures = "0.3.8"
+futures-lite = "1.11.3"
log = "0.4"
protobuf = "2.18.1"
rustls = "0.18.0"
use async_std::net::{TcpStream, ToSocketAddrs};
impl Connection {
- pub fn tcp_client<A: ToSocketAddrs + std::fmt::Display>(ip_addrs: A) -> anyhow::Result<Self> {
- let stream = futures::executor::block_on(TcpStream::connect(&ip_addrs))?;
+ pub async fn tcp_client<A: ToSocketAddrs + std::fmt::Display>(ip_addrs: A) -> anyhow::Result<Self> {
+ let stream = TcpStream::connect(&ip_addrs).await?;
info!("Established client TCP connection to {}", ip_addrs);
stream.set_nodelay(true)?;
use async_std::net::{SocketAddr, TcpListener, ToSocketAddrs};
use async_std::pin::Pin;
use futures::task::{Context, Poll};
-use futures::{Stream, StreamExt};
+use futures::Stream;
+use futures_lite::stream::StreamExt;
use log::*;
#[allow(dead_code)]
}
impl TcpServer {
- pub fn new<A: ToSocketAddrs + std::fmt::Display>(ip_addrs: A) -> anyhow::Result<Self> {
- let listener = futures::executor::block_on(TcpListener::bind(&ip_addrs))?;
+ pub async fn new<A: ToSocketAddrs + std::fmt::Display>(ip_addrs: A) -> anyhow::Result<Self> {
+ let listener = TcpListener::bind(&ip_addrs).await?;
info!("Started TCP server at {}", &ip_addrs);
Ok(Self {
impl Stream for TcpServer {
type Item = Connection;
- fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- if let Some(Ok(conn)) = futures::executor::block_on(self.listener.incoming().next()) {
- debug!(
- "Received connection attempt from {}",
- conn.peer_addr()
- .expect("Peer address could not be retrieved")
- );
- Poll::Ready(Some(Connection::from(conn)))
- } else {
- info!("Shutting TCP server down at {}", self.local_addrs);
- Poll::Ready(None)
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ match self.listener.incoming().poll_next(cx) {
+ Poll::Pending => Poll::Pending,
+
+ Poll::Ready(Some(Ok(conn))) => {
+ debug!(
+ "Received connection attempt from {}",
+ conn.peer_addr()
+ .expect("Peer address could not be retrieved")
+ );
+
+ Poll::Ready(Some(Connection::from(conn)))
+ },
+
+ Poll::Ready(Some(Err(e))) => {
+ error!(
+ "Encountered error when accepting connection attempt: {}", e
+ );
+
+ Poll::Pending
+ },
+
+ Poll::Ready(None) => {
+ info!("Shutting TCP server down at {}", self.local_addrs);
+ Poll::Ready(None)
+ },
}
}
}
}
impl Connection {
- pub fn tls_client<A: ToSocketAddrs + std::fmt::Display>(
+ pub async fn tls_client<A: ToSocketAddrs + std::fmt::Display>(
ip_addrs: A,
domain: &str,
connector: TlsConnector,
) -> anyhow::Result<Self> {
- let stream = futures::executor::block_on(TcpStream::connect(&ip_addrs))?;
+ let stream = TcpStream::connect(&ip_addrs).await?;
info!("Established client TCP connection to {}", ip_addrs);
stream.set_nodelay(true)?;
let local_addr = stream.peer_addr()?;
let peer_addr = stream.peer_addr()?;
- let encrypted_stream: client::TlsStream<TcpStream> =
- futures::executor::block_on(connector.connect(domain, stream))?;
+ let encrypted_stream: client::TlsStream<TcpStream> = connector.connect(domain, stream).await?;
info!("Completed TLS handshake with {}", peer_addr);
Ok(Self::from(TlsConnectionMetadata::Client {
use crate::Connection;
use async_std::net::*;
use async_std::pin::Pin;
-use async_std::prelude::*;
use async_tls::TlsAcceptor;
+use futures::Stream;
use futures::task::{Context, Poll};
+use futures_lite::StreamExt;
use log::*;
#[allow(dead_code)]
}
impl TlsServer {
- pub fn new<A: ToSocketAddrs + std::fmt::Display>(
+ pub async fn new<A: ToSocketAddrs + std::fmt::Display>(
ip_addrs: A,
acceptor: TlsAcceptor,
) -> anyhow::Result<Self> {
- let listener = futures::executor::block_on(TcpListener::bind(ip_addrs))?;
+ let listener = TcpListener::bind(ip_addrs).await?;
info!("Started TLS server at {}", listener.local_addr()?);
Ok(Self {
impl Stream for TlsServer {
type Item = Connection;
- fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- if let Some(Ok(tcp_stream)) = futures::executor::block_on(self.listener.incoming().next()) {
- let local_addr = tcp_stream
- .local_addr()
- .expect("Local address could not be retrieved");
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ match self.listener.incoming().poll_next(cx) {
+ Poll::Pending => Poll::Pending,
- let peer_addr = tcp_stream
- .peer_addr()
- .expect("Peer address could not be retrieved");
- debug!("Received connection attempt from {}", peer_addr);
+ Poll::Ready(Some(Ok(tcp_stream))) => {
+ let local_addr = tcp_stream
+ .local_addr()
+ .expect("Local address could not be retrieved");
+
+ let peer_addr = tcp_stream
+ .peer_addr()
+ .expect("Peer address could not be retrieved");
+
+ debug!(
+ "Received connection attempt from {}", peer_addr
+ );
+
+ if let Ok(tls_stream) = futures::executor::block_on(self.acceptor.accept(tcp_stream)) {
+ debug!("Completed TLS handshake with {}", peer_addr);
+ Poll::Ready(Some(Connection::from(TlsConnectionMetadata::Server {
+ local_addr,
+ peer_addr,
+ stream: tls_stream,
+ })))
+ } else {
+ warn!("Could not encrypt connection with TLS from {}", peer_addr);
+ Poll::Pending
+ }
+ },
+
+ Poll::Ready(Some(Err(e))) => {
+ error!(
+ "Encountered error when accepting connection attempt: {}", e
+ );
- if let Ok(tls_stream) = futures::executor::block_on(self.acceptor.accept(tcp_stream)) {
- debug!("Completed TLS handshake with {}", peer_addr);
- Poll::Ready(Some(Connection::from(TlsConnectionMetadata::Server {
- local_addr,
- peer_addr,
- stream: tls_stream,
- })))
- } else {
- warn!("Could not encrypt connection with TLS from {}", peer_addr);
- // @otodo close the tcp-stream connection
Poll::Pending
}
- } else {
- info!("Shutting TLS server down at {}", self.local_addrs);
- Poll::Ready(None)
+
+ Poll::Ready(None) => {
+ info!("Shutting TLS server down at {}", self.local_addrs);
+ Poll::Ready(None)
+ },
}
}
}