1 use crate::schema::StitchMessage;
2 use async_channel::RecvError;
3 use async_std::net::SocketAddr;
4 use async_std::pin::Pin;
5 use futures::task::{Context, Poll};
6 use futures::{AsyncWrite, AsyncWriteExt, Sink};
10 pub use futures::SinkExt;
11 pub use futures::StreamExt;
13 pub struct StitchConnectionWriter {
14 local_addr: SocketAddr,
15 peer_addr: SocketAddr,
16 write_stream: Box<dyn AsyncWrite + Send + Sync + Unpin>,
17 pending_write: Option<StitchMessage>,
20 impl StitchConnectionWriter {
22 local_addr: SocketAddr,
23 peer_addr: SocketAddr,
24 write_stream: Box<dyn AsyncWrite + Send + Sync + Unpin>,
34 pub fn local_addr(&self) -> SocketAddr {
35 self.local_addr.clone()
38 pub fn peer_addr(&self) -> SocketAddr {
39 self.peer_addr.clone()
43 impl<T: Message> Sink<T> for StitchConnectionWriter {
44 type Error = RecvError;
46 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
47 if self.pending_write.is_some() {
48 debug!("Connection not ready to send message yet, waiting for prior message");
51 debug!("Connection ready to send message");
56 fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
57 debug!("Preparing message to be sent next");
58 let stitch_msg: StitchMessage = StitchMessage::from_msg(item);
59 self.pending_write.replace(stitch_msg);
65 mut self: Pin<&mut Self>,
66 _cx: &mut Context<'_>,
67 ) -> Poll<Result<(), Self::Error>> {
68 if let Some(pending_msg) = self.pending_write.take() {
69 debug!("Send pending message");
70 if let Ok(buffer) = pending_msg.write_to_bytes() {
71 let msg_size = buffer.len();
72 debug!("{} bytes to be sent over network connection", msg_size);
74 debug!("{:?}", buffer.as_slice());
77 futures::executor::block_on(self.write_stream.write_all(buffer.as_slice()))
79 if let Ok(_) = futures::executor::block_on(self.write_stream.flush()) {
80 debug!("Sent message of {} bytes", msg_size);
83 debug!("Encountered error while flushing queued bytes to network stream");
84 Poll::Ready(Err(RecvError))
87 debug!("Encountered error when writing to network stream");
88 Poll::Ready(Err(RecvError))
91 debug!("Encountered error when serializing message to bytes");
92 return Poll::Ready(Err(RecvError));
95 debug!("No message to send over connection");
102 mut self: Pin<&mut Self>,
103 _cx: &mut Context<'_>,
104 ) -> Poll<Result<(), Self::Error>> {
105 if let Ok(_) = futures::executor::block_on(self.write_stream.close()) {
108 Poll::Ready(Err(RecvError))