+use crate::protocol::ConnectDatagram;
use async_std::net::SocketAddr;
use async_std::pin::Pin;
use bytes::{Buf, BytesMut};
use futures::task::{Context, Poll};
use futures::{AsyncRead, Stream};
use log::*;
+use std::io::Cursor;
-use crate::protocol::ConnectDatagram;
pub use futures::SinkExt;
pub use futures::StreamExt;
-use std::io::Cursor;
/// A default buffer size to read in bytes and then deserialize as messages.
const BUFFER_SIZE: usize = 8192;
use crate::Connection;
-use async_std::net::{SocketAddr, TcpListener as AsyncListener, ToSocketAddrs};
+use async_std::net::{SocketAddr, TcpListener as AsyncListener, ToSocketAddrs, TcpStream};
use async_std::pin::Pin;
use async_std::task::{Context, Poll};
-use futures::{Stream, StreamExt};
+use async_stream::stream;
+use futures::Stream;
+use futures_lite::StreamExt;
use log::*;
/// Listens on a bound socket for incoming TCP connections to be handled as independent
#[allow(dead_code)]
pub struct TcpListener {
local_addrs: SocketAddr,
- listener: AsyncListener,
+ // listener: AsyncListener,
+ conn_stream: Pin<Box<dyn Stream<Item = Option<Result<TcpStream, std::io::Error>>> + Send + Sync>>,
}
impl TcpListener {
let listener = AsyncListener::bind(&ip_addrs).await?;
info!("Started TCP server at {}", &ip_addrs);
+ let local_addrs = listener.local_addr()?;
+
+ let stream = Box::pin(stream! {
+ loop {
+ yield listener.incoming().next().await;
+ }
+ });
+
Ok(Self {
- local_addrs: listener.local_addr()?,
- listener,
+ local_addrs,
+ // listener,
+ conn_stream: stream,
})
}
- /// Creates a [`Connection`] for the next `accept`ed TCP connection at the bound socket.
- ///
- /// # Example
- ///
- /// Basic usage:
- ///
- /// ```ignore
- /// let mut server = TcpListener::bind("127.0.0.1:3456").await?;
- /// while let Ok(mut conn) = server.accept().await? {
- /// // handle the connection
- /// }
- /// ```
- pub async fn accept(&self) -> anyhow::Result<Connection> {
- let (stream, ip_addr) = self.listener.accept().await?;
- debug!("Received connection attempt from {}", ip_addr);
-
- Ok(Connection::from(stream))
- }
+ // /// Creates a [`Connection`] for the next `accept`ed TCP connection at the bound socket.
+ // ///
+ // /// # Example
+ // ///
+ // /// Basic usage:
+ // ///
+ // /// ```ignore
+ // /// let mut server = TcpListener::bind("127.0.0.1:3456").await?;
+ // /// while let Ok(mut conn) = server.accept().await? {
+ // /// // handle the connection
+ // /// }
+ // /// ```
+ // pub async fn accept(&self) -> anyhow::Result<Connection> {
+ // let (stream, ip_addr) = self.listener.accept().await?;
+ // debug!("Received connection attempt from {}", ip_addr);
+ //
+ // Ok(Connection::from(stream))
+ // }
}
impl Stream for TcpListener {
type Item = Connection;
- fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- match futures::executor::block_on(self.listener.incoming().next()) {
- Some(Ok(tcp_stream)) => {
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ match self.conn_stream.poll_next(cx) {
+ Poll::Ready(Some(Some(Ok(tcp_stream)))) => {
let peer_addr = tcp_stream
.peer_addr()
.expect("Could not retrieve peer IP address");
Poll::Ready(Some(Connection::from(tcp_stream)))
}
- Some(Err(e)) => {
+ Poll::Ready(Some(Some(Err(err)))) => {
error!(
"Encountered error when trying to accept new connection {}",
- e
+ err
);
- Poll::Pending
+ Poll::Ready(None)
}
- None => Poll::Ready(None),
+ Poll::Ready(Some(None)) => Poll::Ready(None),
+
+ Poll::Ready(None) => Poll::Ready(None),
+
+ Poll::Pending => Poll::Pending,
}
}
}