]> git.lizzy.rs Git - connect-rs.git/blob - src/tcp/listener.rs
remove `block_on` in tls-listener
[connect-rs.git] / src / tcp / listener.rs
1 use crate::Connection;
2 use async_std::net::{SocketAddr, TcpListener as AsyncListener, TcpStream, ToSocketAddrs};
3 use async_std::pin::Pin;
4 use async_std::task::{Context, Poll};
5 use async_stream::stream;
6 use futures::Stream;
7 use futures_lite::StreamExt;
8 use log::*;
9
10 /// Listens on a bound socket for incoming TCP connections to be handled as independent
11 /// [`Connection`]s.
12 ///
13 /// Implements the [`Stream`] trait to asynchronously accept incoming TCP connections.
14 ///
15 /// # Example
16 ///
17 /// Please see the [tcp-echo-server](https://github.com/sachanganesh/connect-rs/blob/main/examples/tcp-echo-server/src/main.rs)
18 /// example program for a more thorough showcase.
19 ///
20 /// Basic usage:
21 ///
22 /// ```ignore
23 /// let mut server = TcpListener::bind(ip_address).await?;
24 ///
25 /// // wait for a connection to come in and be accepted
26 /// while let Some(mut conn) = server.next().await {
27 ///     // do something with connection
28 /// }
29 /// ```
30 #[allow(dead_code)]
31 pub struct TcpListener {
32     local_addrs: SocketAddr,
33     // listener: AsyncListener,
34     conn_stream:
35         Pin<Box<dyn Stream<Item = Option<Result<TcpStream, std::io::Error>>> + Send + Sync>>,
36 }
37
38 impl TcpListener {
39     /// Creates a [`TcpListener`] by binding to an IP address and port and listens for incoming TCP
40     /// connections.
41     ///
42     /// # Example
43     ///
44     /// Basic usage:
45     ///
46     /// ```ignore
47     /// let mut server = TcpListener::bind("127.0.0.1:3456").await?;
48     /// ```
49     pub async fn bind<A: ToSocketAddrs + std::fmt::Display>(ip_addrs: A) -> anyhow::Result<Self> {
50         let listener = AsyncListener::bind(&ip_addrs).await?;
51         info!("Started TCP server at {}", &ip_addrs);
52
53         let local_addrs = listener.local_addr()?;
54
55         let stream = Box::pin(stream! {
56             loop {
57                 yield listener.incoming().next().await;
58             }
59         });
60
61         Ok(Self {
62             local_addrs,
63             // listener,
64             conn_stream: stream,
65         })
66     }
67
68     // /// Creates a [`Connection`] for the next `accept`ed TCP connection at the bound socket.
69     // ///
70     // /// # Example
71     // ///
72     // /// Basic usage:
73     // ///
74     // /// ```ignore
75     // /// let mut server = TcpListener::bind("127.0.0.1:3456").await?;
76     // /// while let Ok(mut conn) = server.accept().await? {
77     // ///     // handle the connection
78     // /// }
79     // /// ```
80     // pub async fn accept(&self) -> anyhow::Result<Connection> {
81     //     let (stream, ip_addr) = self.listener.accept().await?;
82     //     debug!("Received connection attempt from {}", ip_addr);
83     //
84     //     Ok(Connection::from(stream))
85     // }
86 }
87
88 impl Stream for TcpListener {
89     type Item = Connection;
90
91     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
92         match self.conn_stream.poll_next(cx) {
93             Poll::Ready(Some(Some(Ok(tcp_stream)))) => {
94                 let peer_addr = tcp_stream
95                     .peer_addr()
96                     .expect("Could not retrieve peer IP address");
97                 debug!("Received connection attempt from {}", peer_addr);
98
99                 Poll::Ready(Some(Connection::from(tcp_stream)))
100             }
101
102             Poll::Ready(Some(Some(Err(err)))) => {
103                 error!(
104                     "Encountered error when trying to accept new connection {}",
105                     err
106                 );
107                 Poll::Pending
108             }
109
110             Poll::Ready(Some(None)) => Poll::Ready(None),
111
112             Poll::Ready(None) => Poll::Ready(None),
113
114             Poll::Pending => Poll::Pending,
115         }
116     }
117 }