reth_ipc/client/
mod.rs

1//! [`jsonrpsee`] transport adapter implementation for IPC.
2
3use crate::stream_codec::StreamCodec;
4use futures::{StreamExt, TryFutureExt};
5use interprocess::local_socket::{
6    tokio::{prelude::*, RecvHalf, SendHalf},
7    GenericFilePath,
8};
9use jsonrpsee::{
10    async_client::{Client, ClientBuilder},
11    core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT},
12};
13use std::io;
14use tokio::io::AsyncWriteExt;
15use tokio_util::codec::FramedRead;
16
17/// Sending end of IPC transport.
18#[derive(Debug)]
19pub(crate) struct Sender {
20    inner: SendHalf,
21}
22
23#[async_trait::async_trait]
24impl TransportSenderT for Sender {
25    type Error = IpcError;
26
27    /// Sends out a request. Returns a Future that finishes when the request has been successfully
28    /// sent.
29    async fn send(&mut self, msg: String) -> Result<(), Self::Error> {
30        Ok(self.inner.write_all(msg.as_bytes()).await?)
31    }
32
33    async fn send_ping(&mut self) -> Result<(), Self::Error> {
34        tracing::trace!("send ping - not implemented");
35        Err(IpcError::NotSupported)
36    }
37
38    /// Close the connection.
39    async fn close(&mut self) -> Result<(), Self::Error> {
40        Ok(())
41    }
42}
43
44/// Receiving end of IPC transport.
45#[derive(Debug)]
46pub(crate) struct Receiver {
47    pub(crate) inner: FramedRead<RecvHalf, StreamCodec>,
48}
49
50#[async_trait::async_trait]
51impl TransportReceiverT for Receiver {
52    type Error = IpcError;
53
54    /// Returns a Future resolving when the server sent us something back.
55    async fn receive(&mut self) -> Result<ReceivedMessage, Self::Error> {
56        self.inner.next().await.map_or(Err(IpcError::Closed), |val| Ok(ReceivedMessage::Text(val?)))
57    }
58}
59
60/// Builder for IPC transport [`Sender`] and [`Receiver`] pair.
61#[derive(Debug, Clone, Default)]
62#[non_exhaustive]
63pub(crate) struct IpcTransportClientBuilder;
64
65impl IpcTransportClientBuilder {
66    pub(crate) async fn build(self, path: &str) -> Result<(Sender, Receiver), IpcError> {
67        let conn = async { path.to_fs_name::<GenericFilePath>() }
68            .and_then(LocalSocketStream::connect)
69            .await
70            .map_err(|err| IpcError::FailedToConnect { path: path.to_string(), err })?;
71
72        let (recv, send) = conn.split();
73
74        Ok((
75            Sender { inner: send },
76            Receiver { inner: FramedRead::new(recv, StreamCodec::stream_incoming()) },
77        ))
78    }
79}
80
81/// Builder type for [`Client`]
82#[derive(Clone, Default, Debug)]
83#[non_exhaustive]
84pub struct IpcClientBuilder;
85
86impl IpcClientBuilder {
87    /// Connects to a IPC socket
88    ///
89    /// ```
90    /// use jsonrpsee::{core::client::ClientT, rpc_params};
91    /// use reth_ipc::client::IpcClientBuilder;
92    ///
93    /// # async fn run_client() -> Result<(), Box<dyn core::error::Error +  Send + Sync>> {
94    /// let client = IpcClientBuilder::default().build("/tmp/my-uds").await?;
95    /// let response: String = client.request("say_hello", rpc_params![]).await?;
96    /// # Ok(()) }
97    /// ```
98    pub async fn build(self, name: &str) -> Result<Client, IpcError> {
99        let (tx, rx) = IpcTransportClientBuilder::default().build(name).await?;
100        Ok(self.build_with_tokio(tx, rx))
101    }
102
103    /// Uses the sender and receiver channels to connect to the socket.
104    pub fn build_with_tokio<S, R>(self, sender: S, receiver: R) -> Client
105    where
106        S: TransportSenderT + Send,
107        R: TransportReceiverT + Send,
108    {
109        ClientBuilder::default().build_with_tokio(sender, receiver)
110    }
111}
112
113/// Error variants that can happen in IPC transport.
114#[derive(Debug, thiserror::Error)]
115pub enum IpcError {
116    /// Operation not supported
117    #[error("operation not supported")]
118    NotSupported,
119    /// Stream was closed
120    #[error("stream closed")]
121    Closed,
122    /// Thrown when failed to establish a socket connection.
123    #[error("failed to connect to socket {path}: {err}")]
124    FailedToConnect {
125        /// The path of the socket.
126        #[doc(hidden)]
127        path: String,
128        /// The error occurred while connecting.
129        #[doc(hidden)]
130        err: io::Error,
131    },
132    /// Wrapped IO Error
133    #[error(transparent)]
134    Io(#[from] io::Error),
135}
136
137#[cfg(test)]
138mod tests {
139    use super::*;
140    use crate::server::dummy_name;
141    use interprocess::local_socket::ListenerOptions;
142
143    #[tokio::test]
144    async fn test_connect() {
145        let name = &dummy_name();
146
147        let binding = ListenerOptions::new()
148            .name(name.as_str().to_fs_name::<GenericFilePath>().unwrap())
149            .create_tokio()
150            .unwrap();
151        tokio::spawn(async move {
152            let _x = binding.accept().await;
153        });
154
155        let (tx, rx) = IpcTransportClientBuilder::default().build(name).await.unwrap();
156        let _ = IpcClientBuilder::default().build_with_tokio(tx, rx);
157    }
158}