.map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
// create a server
- let server = TlsServer::new(ip_address, config.into()).await?;
+ let mut server = TlsServer::new(ip_address, config.into()).await?;
// handle server connections
// wait for a connection to come in and be accepted
loop {
- match server.accept().await {
- Ok(Some(mut conn)) => {
+ match server.next().await {
+ Some(mut conn) => {
info!("Handling connection from {}", conn.peer_addr());
task::spawn(async move {
});
}
- Ok(None) => (),
-
- Err(e) => {
- error!("Encountered error when accepting connection: {}", e);
+ None => {
break
}
}
use crate::Connection;
use async_std::net::{SocketAddr, TcpListener, ToSocketAddrs};
+use async_std::pin::Pin;
+use async_std::task::{Context, Poll};
+use futures::{Stream, StreamExt};
use log::*;
#[allow(dead_code)]
Ok(Connection::from(stream))
}
}
+
+impl Stream for TcpServer {
+ type Item = Connection;
+
+ fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ match futures::executor::block_on(self.listener.incoming().next()) {
+ Some(Ok(tcp_stream)) => {
+ let peer_addr = tcp_stream.peer_addr().expect("Could not retrieve peer IP address");
+ debug!("Received connection attempt from {}", peer_addr);
+
+ Poll::Ready(Some(Connection::from(tcp_stream)))
+ },
+
+ Some(Err(e)) => {
+ error!("Encountered error when trying to accept new connection {}", e);
+ Poll::Pending
+ }
+
+ None => Poll::Ready(None)
+ }
+ }
+}
use crate::tls::TlsConnectionMetadata;
use crate::Connection;
-use async_std::net::*;
+use async_std::net::{SocketAddr, TcpListener, ToSocketAddrs};
+use async_std::pin::Pin;
+use async_std::task::{Context, Poll};
use async_tls::TlsAcceptor;
+use futures::{Stream, StreamExt};
use log::*;
#[allow(dead_code)]
})
}
- pub async fn accept(&self) -> anyhow::Result<Option<Connection>> {
+ pub async fn accept(&self) -> anyhow::Result<Connection> {
let (tcp_stream, peer_addr) = self.listener.accept().await?;
debug!("Received connection attempt from {}", peer_addr);
- if let Ok(tls_stream) = self.acceptor.accept(tcp_stream).await {
- debug!("Completed TLS handshake with {}", peer_addr);
- Ok(Some(Connection::from(TlsConnectionMetadata::Server {
- local_addr: self.local_addrs.clone(),
- peer_addr,
- stream: tls_stream,
- })))
- } else {
- warn!("Could not encrypt connection with TLS from {}", peer_addr);
- Ok(None)
+ match self.acceptor.accept(tcp_stream).await {
+ Ok(tls_stream) => {
+ debug!("Completed TLS handshake with {}", peer_addr);
+ Ok(Connection::from(TlsConnectionMetadata::Server {
+ local_addr: self.local_addrs.clone(),
+ peer_addr,
+ stream: tls_stream,
+ }))
+ }
+
+ Err(e) => {
+ warn!("Could not encrypt connection with TLS from {}", peer_addr);
+ Err(anyhow::Error::new(e))
+ }
+ }
+ }
+}
+
+impl Stream for TlsServer {
+ type Item = Connection;
+
+ fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ match futures::executor::block_on(self.listener.incoming().next()) {
+ Some(Ok(tcp_stream)) => {
+ let peer_addr = tcp_stream.peer_addr().expect("Could not retrieve peer IP address");
+ debug!("Received connection attempt from {}", peer_addr);
+
+ match futures::executor::block_on(self.acceptor.accept(tcp_stream)) {
+ Ok(tls_stream) => {
+ debug!("Completed TLS handshake with {}", peer_addr);
+ Poll::Ready(Some(Connection::from(TlsConnectionMetadata::Server {
+ local_addr: self.local_addrs.clone(),
+ peer_addr,
+ stream: tls_stream,
+ })))
+ }
+
+ Err(_e) => {
+ warn!("Could not encrypt connection with TLS from {}", peer_addr);
+ Poll::Pending
+ }
+ }
+ },
+
+ Some(Err(e)) => {
+ error!("Encountered error when trying to accept new connection {}", e);
+ Poll::Pending
+ }
+
+ None => Poll::Ready(None)
}
}
}