1use crate::stream_codec::StreamCodec;
4use futures::{StreamExt, TryFutureExt};
5use interprocess::local_socket::{
6 tokio::{prelude::*, RecvHalf, SendHalf},
7 GenericFilePath,
8};
9use jsonrpsee::{
10 async_client::{Client, ClientBuilder},
11 core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT},
12};
13use std::io;
14use tokio::io::AsyncWriteExt;
15use tokio_util::codec::FramedRead;
16
17#[derive(Debug)]
19pub(crate) struct Sender {
20 inner: SendHalf,
21}
22
23#[async_trait::async_trait]
24impl TransportSenderT for Sender {
25 type Error = IpcError;
26
27 async fn send(&mut self, msg: String) -> Result<(), Self::Error> {
30 Ok(self.inner.write_all(msg.as_bytes()).await?)
31 }
32
33 async fn send_ping(&mut self) -> Result<(), Self::Error> {
34 tracing::trace!("send ping - not implemented");
35 Err(IpcError::NotSupported)
36 }
37
38 async fn close(&mut self) -> Result<(), Self::Error> {
40 Ok(())
41 }
42}
43
44#[derive(Debug)]
46pub(crate) struct Receiver {
47 pub(crate) inner: FramedRead<RecvHalf, StreamCodec>,
48}
49
50#[async_trait::async_trait]
51impl TransportReceiverT for Receiver {
52 type Error = IpcError;
53
54 async fn receive(&mut self) -> Result<ReceivedMessage, Self::Error> {
56 self.inner.next().await.map_or(Err(IpcError::Closed), |val| Ok(ReceivedMessage::Text(val?)))
57 }
58}
59
60#[derive(Debug, Clone, Default)]
62#[non_exhaustive]
63pub(crate) struct IpcTransportClientBuilder;
64
65impl IpcTransportClientBuilder {
66 pub(crate) async fn build(self, path: &str) -> Result<(Sender, Receiver), IpcError> {
67 let conn = async { path.to_fs_name::<GenericFilePath>() }
68 .and_then(LocalSocketStream::connect)
69 .await
70 .map_err(|err| IpcError::FailedToConnect { path: path.to_string(), err })?;
71
72 let (recv, send) = conn.split();
73
74 Ok((
75 Sender { inner: send },
76 Receiver { inner: FramedRead::new(recv, StreamCodec::stream_incoming()) },
77 ))
78 }
79}
80
81#[derive(Clone, Default, Debug)]
83#[non_exhaustive]
84pub struct IpcClientBuilder;
85
86impl IpcClientBuilder {
87 pub async fn build(self, name: &str) -> Result<Client, IpcError> {
99 let (tx, rx) = IpcTransportClientBuilder::default().build(name).await?;
100 Ok(self.build_with_tokio(tx, rx))
101 }
102
103 pub fn build_with_tokio<S, R>(self, sender: S, receiver: R) -> Client
105 where
106 S: TransportSenderT + Send,
107 R: TransportReceiverT + Send,
108 {
109 ClientBuilder::default().build_with_tokio(sender, receiver)
110 }
111}
112
113#[derive(Debug, thiserror::Error)]
115pub enum IpcError {
116 #[error("operation not supported")]
118 NotSupported,
119 #[error("stream closed")]
121 Closed,
122 #[error("failed to connect to socket {path}: {err}")]
124 FailedToConnect {
125 #[doc(hidden)]
127 path: String,
128 #[doc(hidden)]
130 err: io::Error,
131 },
132 #[error(transparent)]
134 Io(#[from] io::Error),
135}
136
137#[cfg(test)]
138mod tests {
139 use super::*;
140 use crate::server::dummy_name;
141 use interprocess::local_socket::ListenerOptions;
142
143 #[tokio::test]
144 async fn test_connect() {
145 let name = &dummy_name();
146
147 let binding = ListenerOptions::new()
148 .name(name.as_str().to_fs_name::<GenericFilePath>().unwrap())
149 .create_tokio()
150 .unwrap();
151 tokio::spawn(async move {
152 let _x = binding.accept().await;
153 });
154
155 let (tx, rx) = IpcTransportClientBuilder::default().build(name).await.unwrap();
156 let _ = IpcClientBuilder::default().build_with_tokio(tx, rx);
157 }
158}