reth_ipc/client/
mod.rs

1//! [`jsonrpsee`] transport adapter implementation for IPC.
2
3use crate::stream_codec::StreamCodec;
4use futures::{StreamExt, TryFutureExt};
5use interprocess::local_socket::{
6    tokio::{prelude::*, RecvHalf, SendHalf},
7    GenericFilePath,
8};
9use jsonrpsee::{
10    async_client::{Client, ClientBuilder},
11    core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT},
12};
13use std::{io, time::Duration};
14use tokio::io::AsyncWriteExt;
15use tokio_util::codec::FramedRead;
16
17/// Sending end of IPC transport.
18#[derive(Debug)]
19pub(crate) struct Sender {
20    inner: SendHalf,
21}
22
23impl TransportSenderT for Sender {
24    type Error = IpcError;
25
26    /// Sends out a request. Returns a Future that finishes when the request has been successfully
27    /// sent.
28    async fn send(&mut self, msg: String) -> Result<(), Self::Error> {
29        Ok(self.inner.write_all(msg.as_bytes()).await?)
30    }
31
32    async fn send_ping(&mut self) -> Result<(), Self::Error> {
33        tracing::trace!("send ping - not implemented");
34        Err(IpcError::NotSupported)
35    }
36
37    /// Close the connection.
38    async fn close(&mut self) -> Result<(), Self::Error> {
39        Ok(())
40    }
41}
42
43/// Receiving end of IPC transport.
44#[derive(Debug)]
45pub(crate) struct Receiver {
46    pub(crate) inner: FramedRead<RecvHalf, StreamCodec>,
47}
48
49impl TransportReceiverT for Receiver {
50    type Error = IpcError;
51
52    /// Returns a Future resolving when the server sent us something back.
53    async fn receive(&mut self) -> Result<ReceivedMessage, Self::Error> {
54        self.inner.next().await.map_or(Err(IpcError::Closed), |val| Ok(ReceivedMessage::Text(val?)))
55    }
56}
57
58/// Builder for IPC transport [`Sender`] and [`Receiver`] pair.
59#[derive(Debug, Clone, Default)]
60#[non_exhaustive]
61pub(crate) struct IpcTransportClientBuilder;
62
63impl IpcTransportClientBuilder {
64    pub(crate) async fn build(self, path: &str) -> Result<(Sender, Receiver), IpcError> {
65        let conn = async { path.to_fs_name::<GenericFilePath>() }
66            .and_then(LocalSocketStream::connect)
67            .await
68            .map_err(|err| IpcError::FailedToConnect { path: path.to_string(), err })?;
69
70        let (recv, send) = conn.split();
71
72        Ok((
73            Sender { inner: send },
74            Receiver { inner: FramedRead::new(recv, StreamCodec::stream_incoming()) },
75        ))
76    }
77}
78
79/// Builder type for [`Client`]
80#[derive(Clone, Debug)]
81#[non_exhaustive]
82pub struct IpcClientBuilder {
83    request_timeout: Duration,
84}
85
86impl Default for IpcClientBuilder {
87    fn default() -> Self {
88        Self { request_timeout: Duration::from_secs(60) }
89    }
90}
91
92impl IpcClientBuilder {
93    /// Connects to an IPC socket
94    ///
95    /// ```
96    /// use jsonrpsee::{core::client::ClientT, rpc_params};
97    /// use reth_ipc::client::IpcClientBuilder;
98    ///
99    /// # async fn run_client() -> Result<(), Box<dyn core::error::Error +  Send + Sync>> {
100    /// let client = IpcClientBuilder::default().build("/tmp/my-uds").await?;
101    /// let response: String = client.request("say_hello", rpc_params![]).await?;
102    /// # Ok(()) }
103    /// ```
104    pub async fn build(self, name: &str) -> Result<Client, IpcError> {
105        let (tx, rx) = IpcTransportClientBuilder::default().build(name).await?;
106        Ok(self.build_with_tokio(tx, rx))
107    }
108
109    /// Uses the sender and receiver channels to connect to the socket.
110    pub fn build_with_tokio<S, R>(self, sender: S, receiver: R) -> Client
111    where
112        S: TransportSenderT + Send,
113        R: TransportReceiverT + Send,
114    {
115        ClientBuilder::default()
116            .request_timeout(self.request_timeout)
117            .build_with_tokio(sender, receiver)
118    }
119
120    /// Set request timeout (default is 60 seconds).
121    pub const fn request_timeout(mut self, timeout: Duration) -> Self {
122        self.request_timeout = timeout;
123        self
124    }
125}
126
127/// Error variants that can happen in IPC transport.
128#[derive(Debug, thiserror::Error)]
129pub enum IpcError {
130    /// Operation not supported
131    #[error("operation not supported")]
132    NotSupported,
133    /// Stream was closed
134    #[error("stream closed")]
135    Closed,
136    /// Thrown when failed to establish a socket connection.
137    #[error("failed to connect to socket {path}: {err}")]
138    FailedToConnect {
139        /// The path of the socket.
140        #[doc(hidden)]
141        path: String,
142        /// The error occurred while connecting.
143        #[doc(hidden)]
144        err: io::Error,
145    },
146    /// Wrapped IO Error
147    #[error(transparent)]
148    Io(#[from] io::Error),
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154    use crate::server::dummy_name;
155    use interprocess::local_socket::ListenerOptions;
156
157    #[tokio::test]
158    async fn test_connect() {
159        let name = &dummy_name();
160
161        let binding = ListenerOptions::new()
162            .name(name.as_str().to_fs_name::<GenericFilePath>().unwrap())
163            .create_tokio()
164            .unwrap();
165        tokio::spawn(async move {
166            let _x = binding.accept().await;
167        });
168
169        let (tx, rx) = IpcTransportClientBuilder::default().build(name).await.unwrap();
170        let _ = IpcClientBuilder::default().build_with_tokio(tx, rx);
171    }
172}