]> git.lizzy.rs Git - connect-rs.git/blob - src/tcp/listener.rs
remove `block_on` in tcp-listener
[connect-rs.git] / src / tcp / listener.rs
1 use crate::Connection;
2 use async_std::net::{SocketAddr, TcpListener as AsyncListener, ToSocketAddrs, TcpStream};
3 use async_std::pin::Pin;
4 use async_std::task::{Context, Poll};
5 use async_stream::stream;
6 use futures::Stream;
7 use futures_lite::StreamExt;
8 use log::*;
9
10 /// Listens on a bound socket for incoming TCP connections to be handled as independent
11 /// [`Connection`]s.
12 ///
13 /// Implements the [`Stream`] trait to asynchronously accept incoming TCP connections.
14 ///
15 /// # Example
16 ///
17 /// Basic usage:
18 ///
19 /// ```ignore
20 /// let mut server = TcpListener::bind(ip_address).await?;
21 ///
22 /// // wait for a connection to come in and be accepted
23 /// while let Some(mut conn) = server.next().await {
24 ///     // do something with connection
25 /// }
26 /// ```
27 #[allow(dead_code)]
28 pub struct TcpListener {
29     local_addrs: SocketAddr,
30     // listener: AsyncListener,
31     conn_stream: Pin<Box<dyn Stream<Item = Option<Result<TcpStream, std::io::Error>>> + Send + Sync>>,
32 }
33
34 impl TcpListener {
35     /// Creates a [`TcpListener`] by binding to an IP address and port and listens for incoming TCP
36     /// connections.
37     ///
38     /// # Example
39     ///
40     /// Basic usage:
41     ///
42     /// ```ignore
43     /// let mut server = TcpListener::bind("127.0.0.1:3456").await?;
44     /// ```
45     pub async fn bind<A: ToSocketAddrs + std::fmt::Display>(ip_addrs: A) -> anyhow::Result<Self> {
46         let listener = AsyncListener::bind(&ip_addrs).await?;
47         info!("Started TCP server at {}", &ip_addrs);
48
49         let local_addrs = listener.local_addr()?;
50
51         let stream = Box::pin(stream! {
52             loop {
53                 yield listener.incoming().next().await;
54             }
55         });
56
57         Ok(Self {
58             local_addrs,
59             // listener,
60             conn_stream: stream,
61         })
62     }
63
64     // /// Creates a [`Connection`] for the next `accept`ed TCP connection at the bound socket.
65     // ///
66     // /// # Example
67     // ///
68     // /// Basic usage:
69     // ///
70     // /// ```ignore
71     // /// let mut server = TcpListener::bind("127.0.0.1:3456").await?;
72     // /// while let Ok(mut conn) = server.accept().await? {
73     // ///     // handle the connection
74     // /// }
75     // /// ```
76     // pub async fn accept(&self) -> anyhow::Result<Connection> {
77     //     let (stream, ip_addr) = self.listener.accept().await?;
78     //     debug!("Received connection attempt from {}", ip_addr);
79     //
80     //     Ok(Connection::from(stream))
81     // }
82 }
83
84 impl Stream for TcpListener {
85     type Item = Connection;
86
87     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
88         match self.conn_stream.poll_next(cx) {
89             Poll::Ready(Some(Some(Ok(tcp_stream)))) => {
90                 let peer_addr = tcp_stream
91                     .peer_addr()
92                     .expect("Could not retrieve peer IP address");
93                 debug!("Received connection attempt from {}", peer_addr);
94
95                 Poll::Ready(Some(Connection::from(tcp_stream)))
96             }
97
98             Poll::Ready(Some(Some(Err(err)))) => {
99                 error!(
100                     "Encountered error when trying to accept new connection {}",
101                     err
102                 );
103                 Poll::Ready(None)
104             }
105
106             Poll::Ready(Some(None)) => Poll::Ready(None),
107
108             Poll::Ready(None) => Poll::Ready(None),
109
110             Poll::Pending => Poll::Pending,
111         }
112     }
113 }