1use crate::stream_codec::StreamCodec;
4use futures::{StreamExt, TryFutureExt};
5use interprocess::local_socket::{
6 tokio::{prelude::*, RecvHalf, SendHalf},
7 GenericFilePath,
8};
9use jsonrpsee::{
10 async_client::{Client, ClientBuilder},
11 core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT},
12};
13use std::{io, time::Duration};
14use tokio::io::AsyncWriteExt;
15use tokio_util::codec::FramedRead;
16
17#[derive(Debug)]
19pub(crate) struct Sender {
20 inner: SendHalf,
21}
22
23impl TransportSenderT for Sender {
24 type Error = IpcError;
25
26 async fn send(&mut self, msg: String) -> Result<(), Self::Error> {
29 Ok(self.inner.write_all(msg.as_bytes()).await?)
30 }
31
32 async fn send_ping(&mut self) -> Result<(), Self::Error> {
33 tracing::trace!("send ping - not implemented");
34 Err(IpcError::NotSupported)
35 }
36
37 async fn close(&mut self) -> Result<(), Self::Error> {
39 Ok(())
40 }
41}
42
43#[derive(Debug)]
45pub(crate) struct Receiver {
46 pub(crate) inner: FramedRead<RecvHalf, StreamCodec>,
47}
48
49impl TransportReceiverT for Receiver {
50 type Error = IpcError;
51
52 async fn receive(&mut self) -> Result<ReceivedMessage, Self::Error> {
54 self.inner.next().await.map_or(Err(IpcError::Closed), |val| Ok(ReceivedMessage::Text(val?)))
55 }
56}
57
58#[derive(Debug, Clone, Default)]
60#[non_exhaustive]
61pub(crate) struct IpcTransportClientBuilder;
62
63impl IpcTransportClientBuilder {
64 pub(crate) async fn build(self, path: &str) -> Result<(Sender, Receiver), IpcError> {
65 let conn = async { path.to_fs_name::<GenericFilePath>() }
66 .and_then(LocalSocketStream::connect)
67 .await
68 .map_err(|err| IpcError::FailedToConnect { path: path.to_string(), err })?;
69
70 let (recv, send) = conn.split();
71
72 Ok((
73 Sender { inner: send },
74 Receiver { inner: FramedRead::new(recv, StreamCodec::stream_incoming()) },
75 ))
76 }
77}
78
79#[derive(Clone, Debug)]
81#[non_exhaustive]
82pub struct IpcClientBuilder {
83 request_timeout: Duration,
84}
85
86impl Default for IpcClientBuilder {
87 fn default() -> Self {
88 Self { request_timeout: Duration::from_secs(60) }
89 }
90}
91
92impl IpcClientBuilder {
93 pub async fn build(self, name: &str) -> Result<Client, IpcError> {
105 let (tx, rx) = IpcTransportClientBuilder::default().build(name).await?;
106 Ok(self.build_with_tokio(tx, rx))
107 }
108
109 pub fn build_with_tokio<S, R>(self, sender: S, receiver: R) -> Client
111 where
112 S: TransportSenderT + Send,
113 R: TransportReceiverT + Send,
114 {
115 ClientBuilder::default()
116 .request_timeout(self.request_timeout)
117 .build_with_tokio(sender, receiver)
118 }
119
120 pub const fn request_timeout(mut self, timeout: Duration) -> Self {
122 self.request_timeout = timeout;
123 self
124 }
125}
126
127#[derive(Debug, thiserror::Error)]
129pub enum IpcError {
130 #[error("operation not supported")]
132 NotSupported,
133 #[error("stream closed")]
135 Closed,
136 #[error("failed to connect to socket {path}: {err}")]
138 FailedToConnect {
139 #[doc(hidden)]
141 path: String,
142 #[doc(hidden)]
144 err: io::Error,
145 },
146 #[error(transparent)]
148 Io(#[from] io::Error),
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154 use crate::server::dummy_name;
155 use interprocess::local_socket::ListenerOptions;
156
157 #[tokio::test]
158 async fn test_connect() {
159 let name = &dummy_name();
160
161 let binding = ListenerOptions::new()
162 .name(name.as_str().to_fs_name::<GenericFilePath>().unwrap())
163 .create_tokio()
164 .unwrap();
165 tokio::spawn(async move {
166 let _x = binding.accept().await;
167 });
168
169 let (tx, rx) = IpcTransportClientBuilder::default().build(name).await.unwrap();
170 let _ = IpcClientBuilder::default().build_with_tokio(tx, rx);
171 }
172}