2 use async_std::net::{SocketAddr, TcpListener as AsyncListener, TcpStream, ToSocketAddrs};
3 use async_std::pin::Pin;
4 use async_std::task::{Context, Poll};
5 use async_stream::stream;
7 use futures_lite::StreamExt;
10 /// Listens on a bound socket for incoming TCP connections to be handled as independent
13 /// Implements the [`Stream`] trait to asynchronously accept incoming TCP connections.
17 /// Please see the [tcp-echo-server](https://github.com/sachanganesh/connect-rs/blob/main/examples/tcp-echo-server/src/main.rs)
18 /// example program for a more thorough showcase.
23 /// let mut server = TcpListener::bind(ip_address).await?;
25 /// // wait for a connection to come in and be accepted
26 /// while let Some(mut conn) = server.next().await {
27 /// // do something with connection
31 pub struct TcpListener {
32 local_addrs: SocketAddr,
33 // listener: AsyncListener,
35 Pin<Box<dyn Stream<Item = Option<Result<TcpStream, std::io::Error>>> + Send + Sync>>,
39 /// Creates a [`TcpListener`] by binding to an IP address and port and listens for incoming TCP
47 /// let mut server = TcpListener::bind("127.0.0.1:3456").await?;
49 pub async fn bind<A: ToSocketAddrs + std::fmt::Display>(ip_addrs: A) -> anyhow::Result<Self> {
50 let listener = AsyncListener::bind(&ip_addrs).await?;
51 info!("Started TCP server at {}", &ip_addrs);
53 let local_addrs = listener.local_addr()?;
55 let stream = Box::pin(stream! {
57 yield listener.incoming().next().await;
68 // /// Creates a [`Connection`] for the next `accept`ed TCP connection at the bound socket.
75 // /// let mut server = TcpListener::bind("127.0.0.1:3456").await?;
76 // /// while let Ok(mut conn) = server.accept().await? {
77 // /// // handle the connection
80 // pub async fn accept(&self) -> anyhow::Result<Connection> {
81 // let (stream, ip_addr) = self.listener.accept().await?;
82 // debug!("Received connection attempt from {}", ip_addr);
84 // Ok(Connection::from(stream))
88 impl Stream for TcpListener {
89 type Item = Connection;
91 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
92 match self.conn_stream.poll_next(cx) {
93 Poll::Ready(Some(Some(Ok(tcp_stream)))) => {
94 let peer_addr = tcp_stream
96 .expect("Could not retrieve peer IP address");
97 debug!("Received connection attempt from {}", peer_addr);
99 Poll::Ready(Some(Connection::from(tcp_stream)))
102 Poll::Ready(Some(Some(Err(err)))) => {
104 "Encountered error when trying to accept new connection {}",
110 Poll::Ready(Some(None)) => Poll::Ready(None),
112 Poll::Ready(None) => Poll::Ready(None),
114 Poll::Pending => Poll::Pending,