reth_ipc/server/
mod.rs

1//! JSON-RPC IPC server implementation
2
3use crate::server::connection::{IpcConn, JsonRpcStream};
4use futures::StreamExt;
5use futures_util::future::Either;
6use interprocess::local_socket::{
7    tokio::prelude::{LocalSocketListener, LocalSocketStream},
8    traits::tokio::{Listener, Stream},
9    GenericFilePath, ListenerOptions, ToFsName,
10};
11use jsonrpsee::{
12    core::{middleware::layer::RpcLoggerLayer, JsonRawValue, TEN_MB_SIZE_BYTES},
13    server::{
14        middleware::rpc::RpcServiceT, stop_channel, ConnectionGuard, ConnectionPermit, IdProvider,
15        RandomIntegerIdProvider, ServerHandle, StopHandle,
16    },
17    BoundedSubscriptions, MethodResponse, MethodSink, Methods,
18};
19use std::{
20    future::Future,
21    io,
22    pin::{pin, Pin},
23    sync::Arc,
24    task::{Context, Poll},
25};
26use tokio::{
27    io::{AsyncRead, AsyncWrite, AsyncWriteExt},
28    sync::oneshot,
29};
30use tower::{layer::util::Identity, Layer, Service};
31use tracing::{debug, instrument, trace, warn, Instrument};
32// re-export so can be used during builder setup
33use crate::{
34    server::{connection::IpcConnDriver, rpc_service::RpcServiceCfg},
35    stream_codec::StreamCodec,
36};
37use tokio::sync::mpsc;
38use tokio_stream::wrappers::ReceiverStream;
39use tower::layer::{util::Stack, LayerFn};
40
41mod connection;
42mod ipc;
43mod rpc_service;
44
45pub use rpc_service::RpcService;
46
47/// Ipc Server implementation
48///
49/// This is an adapted `jsonrpsee` Server, but for `Ipc` connections.
50pub struct IpcServer<HttpMiddleware = Identity, RpcMiddleware = Identity> {
51    /// The endpoint we listen for incoming transactions
52    endpoint: String,
53    id_provider: Arc<dyn IdProvider>,
54    cfg: Settings,
55    rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
56    http_middleware: tower::ServiceBuilder<HttpMiddleware>,
57}
58
59impl<HttpMiddleware, RpcMiddleware> IpcServer<HttpMiddleware, RpcMiddleware> {
60    /// Returns the configured endpoint
61    pub fn endpoint(&self) -> String {
62        self.endpoint.clone()
63    }
64}
65
66impl<HttpMiddleware, RpcMiddleware> IpcServer<HttpMiddleware, RpcMiddleware>
67where
68    RpcMiddleware: for<'a> Layer<RpcService, Service: RpcServiceT> + Clone + Send + 'static,
69    HttpMiddleware: Layer<
70            TowerServiceNoHttp<RpcMiddleware>,
71            Service: Service<
72                String,
73                Response = Option<String>,
74                Error = Box<dyn core::error::Error + Send + Sync + 'static>,
75                Future: Send + Unpin,
76            > + Send,
77        > + Send
78        + 'static,
79{
80    /// Start responding to connections requests.
81    ///
82    /// This will run on the tokio runtime until the server is stopped or the `ServerHandle` is
83    /// dropped.
84    ///
85    /// ```
86    /// use jsonrpsee::RpcModule;
87    /// use reth_ipc::server::Builder;
88    /// async fn run_server() -> Result<(), Box<dyn core::error::Error + Send + Sync>> {
89    ///     let server = Builder::default().build("/tmp/my-uds".into());
90    ///     let mut module = RpcModule::new(());
91    ///     module.register_method("say_hello", |_, _, _| "lo")?;
92    ///     let handle = server.start(module).await?;
93    ///
94    ///     // In this example we don't care about doing shutdown so let's it run forever.
95    ///     // You may use the `ServerHandle` to shut it down or manage it yourself.
96    ///     let server = tokio::spawn(handle.stopped());
97    ///     server.await.unwrap();
98    ///     Ok(())
99    /// }
100    /// ```
101    pub async fn start(
102        mut self,
103        methods: impl Into<Methods>,
104    ) -> Result<ServerHandle, IpcServerStartError> {
105        let methods = methods.into();
106
107        let (stop_handle, server_handle) = stop_channel();
108
109        // use a signal channel to wait until we're ready to accept connections
110        let (tx, rx) = oneshot::channel();
111
112        match self.cfg.tokio_runtime.take() {
113            Some(rt) => rt.spawn(self.start_inner(methods, stop_handle, tx)),
114            None => tokio::spawn(self.start_inner(methods, stop_handle, tx)),
115        };
116        rx.await.expect("channel is open")?;
117
118        Ok(server_handle)
119    }
120
121    async fn start_inner(
122        self,
123        methods: Methods,
124        stop_handle: StopHandle,
125        on_ready: oneshot::Sender<Result<(), IpcServerStartError>>,
126    ) {
127        trace!(endpoint = ?self.endpoint, "starting ipc server");
128
129        if cfg!(unix) {
130            // ensure the file does not exist
131            if std::fs::remove_file(&self.endpoint).is_ok() {
132                debug!(endpoint = ?self.endpoint, "removed existing IPC endpoint file");
133            }
134        }
135
136        let listener = match self
137            .endpoint
138            .as_str()
139            .to_fs_name::<GenericFilePath>()
140            .and_then(|name| ListenerOptions::new().name(name).create_tokio())
141        {
142            Ok(listener) => listener,
143            Err(err) => {
144                on_ready
145                    .send(Err(IpcServerStartError { endpoint: self.endpoint.clone(), source: err }))
146                    .ok();
147                return;
148            }
149        };
150
151        // signal that we're ready to accept connections
152        on_ready.send(Ok(())).ok();
153
154        let mut id: u32 = 0;
155        let connection_guard = ConnectionGuard::new(self.cfg.max_connections as usize);
156
157        let stopped = stop_handle.clone().shutdown();
158        let mut stopped = pin!(stopped);
159
160        let (drop_on_completion, mut process_connection_awaiter) = mpsc::channel::<()>(1);
161
162        trace!("accepting ipc connections");
163        loop {
164            match try_accept_conn(&listener, stopped).await {
165                AcceptConnection::Established { local_socket_stream, stop } => {
166                    let Some(conn_permit) = connection_guard.try_acquire() else {
167                        let (_reader, mut writer) = local_socket_stream.split();
168                        let _ = writer
169                            .write_all(b"Too many connections. Please try again later.")
170                            .await;
171                        stopped = stop;
172                        continue;
173                    };
174
175                    let max_conns = connection_guard.max_connections();
176                    let curr_conns = max_conns - connection_guard.available_connections();
177                    trace!("Accepting new connection {}/{}", curr_conns, max_conns);
178
179                    let conn_permit = Arc::new(conn_permit);
180
181                    process_connection(ProcessConnection {
182                        http_middleware: &self.http_middleware,
183                        rpc_middleware: self.rpc_middleware.clone(),
184                        conn_permit,
185                        conn_id: id,
186                        server_cfg: self.cfg.clone(),
187                        stop_handle: stop_handle.clone(),
188                        drop_on_completion: drop_on_completion.clone(),
189                        methods: methods.clone(),
190                        id_provider: self.id_provider.clone(),
191                        local_socket_stream,
192                    });
193
194                    id = id.wrapping_add(1);
195                    stopped = stop;
196                }
197                AcceptConnection::Shutdown => {
198                    break;
199                }
200                AcceptConnection::Err((err, stop)) => {
201                    tracing::error!(%err, "Failed accepting a new IPC connection");
202                    stopped = stop;
203                }
204            }
205        }
206
207        // Drop the last Sender
208        drop(drop_on_completion);
209
210        // Once this channel is closed it is safe to assume that all connections have been
211        // gracefully shutdown
212        while process_connection_awaiter.recv().await.is_some() {
213            // Generally, messages should not be sent across this channel,
214            // but we'll loop here to wait for `None` just to be on the safe side
215        }
216    }
217}
218
219enum AcceptConnection<S> {
220    Shutdown,
221    Established { local_socket_stream: LocalSocketStream, stop: S },
222    Err((io::Error, S)),
223}
224
225async fn try_accept_conn<S>(listener: &LocalSocketListener, stopped: S) -> AcceptConnection<S>
226where
227    S: Future + Unpin,
228{
229    match futures_util::future::select(pin!(listener.accept()), stopped).await {
230        Either::Left((res, stop)) => match res {
231            Ok(local_socket_stream) => AcceptConnection::Established { local_socket_stream, stop },
232            Err(e) => AcceptConnection::Err((e, stop)),
233        },
234        Either::Right(_) => AcceptConnection::Shutdown,
235    }
236}
237
238impl std::fmt::Debug for IpcServer {
239    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
240        f.debug_struct("IpcServer")
241            .field("endpoint", &self.endpoint)
242            .field("cfg", &self.cfg)
243            .field("id_provider", &self.id_provider)
244            .finish()
245    }
246}
247
248/// Error thrown when server couldn't be started.
249#[derive(Debug, thiserror::Error)]
250#[error("failed to listen on ipc endpoint `{endpoint}`: {source}")]
251pub struct IpcServerStartError {
252    endpoint: String,
253    #[source]
254    source: io::Error,
255}
256
257/// Data required by the server to handle requests received via an IPC connection
258#[derive(Debug, Clone)]
259#[allow(dead_code)]
260pub(crate) struct ServiceData {
261    /// Registered server methods.
262    pub(crate) methods: Methods,
263    /// Subscription ID provider.
264    pub(crate) id_provider: Arc<dyn IdProvider>,
265    /// Stop handle.
266    pub(crate) stop_handle: StopHandle,
267    /// Connection ID
268    pub(crate) conn_id: u32,
269    /// Connection Permit.
270    pub(crate) conn_permit: Arc<ConnectionPermit>,
271    /// Limits the number of subscriptions for this connection
272    pub(crate) bounded_subscriptions: BoundedSubscriptions,
273    /// Sink that is used to send back responses to the connection.
274    ///
275    /// This is used for subscriptions.
276    pub(crate) method_sink: MethodSink,
277    /// `ServerConfig`
278    pub(crate) server_cfg: Settings,
279}
280
281/// Similar to [`tower::ServiceBuilder`] but doesn't
282/// support any tower middleware implementations.
283#[derive(Debug, Clone)]
284pub struct RpcServiceBuilder<L>(tower::ServiceBuilder<L>);
285
286impl Default for RpcServiceBuilder<Identity> {
287    fn default() -> Self {
288        Self(tower::ServiceBuilder::new())
289    }
290}
291
292impl RpcServiceBuilder<Identity> {
293    /// Create a new [`RpcServiceBuilder`].
294    pub const fn new() -> Self {
295        Self(tower::ServiceBuilder::new())
296    }
297}
298
299impl<L> RpcServiceBuilder<L> {
300    /// Optionally add a new layer `T` to the [`RpcServiceBuilder`].
301    ///
302    /// See the documentation for [`tower::ServiceBuilder::option_layer`] for more details.
303    pub fn option_layer<T>(
304        self,
305        layer: Option<T>,
306    ) -> RpcServiceBuilder<Stack<Either<T, Identity>, L>> {
307        let layer = if let Some(layer) = layer {
308            Either::Left(layer)
309        } else {
310            Either::Right(Identity::new())
311        };
312        self.layer(layer)
313    }
314
315    /// Add a new layer `T` to the [`RpcServiceBuilder`].
316    ///
317    /// See the documentation for [`tower::ServiceBuilder::layer`] for more details.
318    pub fn layer<T>(self, layer: T) -> RpcServiceBuilder<Stack<T, L>> {
319        RpcServiceBuilder(self.0.layer(layer))
320    }
321
322    /// Add a [`tower::Layer`] built from a function that accepts a service and returns another
323    /// service.
324    ///
325    /// See the documentation for [`tower::ServiceBuilder::layer_fn`] for more details.
326    pub fn layer_fn<F>(self, f: F) -> RpcServiceBuilder<Stack<LayerFn<F>, L>> {
327        RpcServiceBuilder(self.0.layer_fn(f))
328    }
329
330    /// Add a logging layer to [`RpcServiceBuilder`]
331    ///
332    /// This logs each request and response for every call.
333    pub fn rpc_logger(self, max_log_len: u32) -> RpcServiceBuilder<Stack<RpcLoggerLayer, L>> {
334        RpcServiceBuilder(self.0.layer(RpcLoggerLayer::new(max_log_len)))
335    }
336
337    /// Wrap the service `S` with the middleware.
338    pub(crate) fn service<S>(&self, service: S) -> L::Service
339    where
340        L: tower::Layer<S>,
341    {
342        self.0.service(service)
343    }
344}
345
346/// `JsonRPSee` service compatible with `tower`.
347///
348/// # Note
349/// This is similar to [`hyper::service::service_fn`](https://docs.rs/hyper/latest/hyper/service/fn.service_fn.html).
350#[derive(Debug, Clone)]
351pub struct TowerServiceNoHttp<L> {
352    inner: ServiceData,
353    rpc_middleware: RpcServiceBuilder<L>,
354}
355
356impl<RpcMiddleware> Service<String> for TowerServiceNoHttp<RpcMiddleware>
357where
358    RpcMiddleware: for<'a> Layer<RpcService>,
359    for<'a> <RpcMiddleware as Layer<RpcService>>::Service:
360        Send + Sync + 'static + RpcServiceT<MethodResponse = MethodResponse>,
361{
362    /// The response of a handled RPC call
363    ///
364    /// This is an `Option` because subscriptions and call responses are handled differently.
365    /// This will be `Some` for calls, and `None` for subscriptions, because the subscription
366    /// response will be emitted via the `method_sink`.
367    type Response = Option<String>;
368
369    type Error = Box<dyn core::error::Error + Send + Sync + 'static>;
370
371    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
372
373    /// Opens door for back pressure implementation.
374    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
375        Poll::Ready(Ok(()))
376    }
377
378    fn call(&mut self, request: String) -> Self::Future {
379        trace!("{:?}", request);
380
381        let cfg = RpcServiceCfg::CallsAndSubscriptions {
382            bounded_subscriptions: BoundedSubscriptions::new(
383                self.inner.server_cfg.max_subscriptions_per_connection,
384            ),
385            id_provider: self.inner.id_provider.clone(),
386            sink: self.inner.method_sink.clone(),
387        };
388
389        let max_response_body_size = self.inner.server_cfg.max_response_body_size as usize;
390        let max_request_body_size = self.inner.server_cfg.max_request_body_size as usize;
391        let conn = self.inner.conn_permit.clone();
392        let rpc_service = self.rpc_middleware.service(RpcService::new(
393            self.inner.methods.clone(),
394            max_response_body_size,
395            self.inner.conn_id.into(),
396            cfg,
397        ));
398        // an ipc connection needs to handle read+write concurrently
399        // even if the underlying rpc handler spawns the actual work or is does a lot of async any
400        // additional overhead performed by `handle_request` can result in I/O latencies, for
401        // example tracing calls are relatively CPU expensive on serde::serialize alone, moving this
402        // work to a separate task takes the pressure off the connection so all concurrent responses
403        // are also serialized concurrently and the connection can focus on read+write
404        let f = tokio::task::spawn(async move {
405            ipc::call_with_service(
406                request,
407                rpc_service,
408                max_response_body_size,
409                max_request_body_size,
410                conn,
411            )
412            .await
413        });
414
415        Box::pin(async move { f.await.map_err(|err| err.into()) })
416    }
417}
418
419struct ProcessConnection<'a, HttpMiddleware, RpcMiddleware> {
420    http_middleware: &'a tower::ServiceBuilder<HttpMiddleware>,
421    rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
422    conn_permit: Arc<ConnectionPermit>,
423    conn_id: u32,
424    server_cfg: Settings,
425    stop_handle: StopHandle,
426    drop_on_completion: mpsc::Sender<()>,
427    methods: Methods,
428    id_provider: Arc<dyn IdProvider>,
429    local_socket_stream: LocalSocketStream,
430}
431
432/// Spawns the IPC connection onto a new task
433#[instrument(name = "connection", skip_all, fields(conn_id = %params.conn_id), level = "INFO")]
434fn process_connection<'b, RpcMiddleware, HttpMiddleware>(
435    params: ProcessConnection<'_, HttpMiddleware, RpcMiddleware>,
436) where
437    RpcMiddleware: Layer<RpcService> + Clone + Send + 'static,
438    for<'a> <RpcMiddleware as Layer<RpcService>>::Service: RpcServiceT,
439    HttpMiddleware: Layer<TowerServiceNoHttp<RpcMiddleware>> + Send + 'static,
440    <HttpMiddleware as Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service: Send
441    + Service<
442        String,
443        Response = Option<String>,
444        Error = Box<dyn core::error::Error + Send + Sync + 'static>,
445    >,
446    <<HttpMiddleware as Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service as Service<String>>::Future:
447    Send + Unpin,
448 {
449    let ProcessConnection {
450        http_middleware,
451        rpc_middleware,
452        conn_permit,
453        conn_id,
454        server_cfg,
455        stop_handle,
456        drop_on_completion,
457        id_provider,
458        methods,
459        local_socket_stream,
460    } = params;
461
462    let ipc = IpcConn(tokio_util::codec::Decoder::framed(
463        StreamCodec::stream_incoming(),
464        local_socket_stream,
465    ));
466
467    let (tx, rx) = mpsc::channel::<Box<JsonRawValue>>(server_cfg.message_buffer_capacity as usize);
468    let method_sink = MethodSink::new_with_limit(tx, server_cfg.max_response_body_size);
469    let tower_service = TowerServiceNoHttp {
470        inner: ServiceData {
471            methods,
472            id_provider,
473            stop_handle: stop_handle.clone(),
474            server_cfg: server_cfg.clone(),
475            conn_id,
476            conn_permit,
477            bounded_subscriptions: BoundedSubscriptions::new(
478                server_cfg.max_subscriptions_per_connection,
479            ),
480            method_sink,
481        },
482        rpc_middleware,
483    };
484
485    let service = http_middleware.service(tower_service);
486    tokio::spawn(async {
487        to_ipc_service(ipc, service, stop_handle, rx).in_current_span().await;
488        drop(drop_on_completion)
489    });
490}
491
492async fn to_ipc_service<S, T>(
493    ipc: IpcConn<JsonRpcStream<T>>,
494    service: S,
495    stop_handle: StopHandle,
496    rx: mpsc::Receiver<Box<JsonRawValue>>,
497) where
498    S: Service<String, Response = Option<String>> + Send + 'static,
499    S::Error: Into<Box<dyn core::error::Error + Send + Sync>>,
500    S::Future: Send + Unpin,
501    T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
502{
503    let rx_item = ReceiverStream::new(rx);
504    let conn = IpcConnDriver {
505        conn: ipc,
506        service,
507        pending_calls: Default::default(),
508        items: Default::default(),
509    };
510    let stopped = stop_handle.shutdown();
511
512    let mut conn = pin!(conn);
513    let mut rx_item = pin!(rx_item);
514    let mut stopped = pin!(stopped);
515
516    loop {
517        tokio::select! {
518            _ = &mut conn => {
519               break
520            }
521            item = rx_item.next() => {
522                if let Some(item) = item {
523                    conn.push_back(item.to_string());
524                }
525            }
526            _ = &mut stopped => {
527                // shutdown
528                break
529            }
530        }
531    }
532}
533
534/// JSON-RPC IPC server settings.
535#[derive(Debug, Clone)]
536pub struct Settings {
537    /// Maximum size in bytes of a request.
538    max_request_body_size: u32,
539    /// Maximum size in bytes of a response.
540    max_response_body_size: u32,
541    /// Max length for logging for requests and responses
542    ///
543    /// Logs bigger than this limit will be truncated.
544    max_log_length: u32,
545    /// Maximum number of incoming connections allowed.
546    max_connections: u32,
547    /// Maximum number of subscriptions per connection.
548    max_subscriptions_per_connection: u32,
549    /// Number of messages that server is allowed `buffer` until backpressure kicks in.
550    message_buffer_capacity: u32,
551    /// Custom tokio runtime to run the server on.
552    tokio_runtime: Option<tokio::runtime::Handle>,
553}
554
555impl Default for Settings {
556    fn default() -> Self {
557        Self {
558            max_request_body_size: TEN_MB_SIZE_BYTES,
559            max_response_body_size: TEN_MB_SIZE_BYTES,
560            max_log_length: 4096,
561            max_connections: 100,
562            max_subscriptions_per_connection: 1024,
563            message_buffer_capacity: 1024,
564            tokio_runtime: None,
565        }
566    }
567}
568
569/// Builder to configure and create a JSON-RPC server
570#[derive(Debug)]
571pub struct Builder<HttpMiddleware, RpcMiddleware> {
572    settings: Settings,
573    /// Subscription ID provider.
574    id_provider: Arc<dyn IdProvider>,
575    rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
576    http_middleware: tower::ServiceBuilder<HttpMiddleware>,
577}
578
579impl Default for Builder<Identity, Identity> {
580    fn default() -> Self {
581        Self {
582            settings: Settings::default(),
583            id_provider: Arc::new(RandomIntegerIdProvider),
584            rpc_middleware: RpcServiceBuilder::new(),
585            http_middleware: tower::ServiceBuilder::new(),
586        }
587    }
588}
589
590impl<HttpMiddleware, RpcMiddleware> Builder<HttpMiddleware, RpcMiddleware> {
591    /// Set the maximum size of a request body in bytes. Default is 10 MiB.
592    pub const fn max_request_body_size(mut self, size: u32) -> Self {
593        self.settings.max_request_body_size = size;
594        self
595    }
596
597    /// Set the maximum size of a response body in bytes. Default is 10 MiB.
598    pub const fn max_response_body_size(mut self, size: u32) -> Self {
599        self.settings.max_response_body_size = size;
600        self
601    }
602
603    /// Set the maximum size of a log
604    pub const fn max_log_length(mut self, size: u32) -> Self {
605        self.settings.max_log_length = size;
606        self
607    }
608
609    /// Set the maximum number of connections allowed. Default is 100.
610    pub const fn max_connections(mut self, max: u32) -> Self {
611        self.settings.max_connections = max;
612        self
613    }
614
615    /// Set the maximum number of subscriptions per connection. Default is 1024.
616    pub const fn max_subscriptions_per_connection(mut self, max: u32) -> Self {
617        self.settings.max_subscriptions_per_connection = max;
618        self
619    }
620
621    /// The server enforces backpressure which means that
622    /// `n` messages can be buffered and if the client
623    /// can't keep up with the server.
624    ///
625    /// This `capacity` is applied per connection and
626    /// applies globally on the connection which implies
627    /// all JSON-RPC messages.
628    ///
629    /// For example if a subscription produces plenty of new items
630    /// and the client can't keep up then no new messages are handled.
631    ///
632    /// If this limit is exceeded then the server will "back-off"
633    /// and only accept new messages once the client reads pending messages.
634    ///
635    /// # Panics
636    ///
637    /// Panics if the buffer capacity is 0.
638    pub const fn set_message_buffer_capacity(mut self, c: u32) -> Self {
639        self.settings.message_buffer_capacity = c;
640        self
641    }
642
643    /// Configure a custom [`tokio::runtime::Handle`] to run the server on.
644    ///
645    /// Default: [`tokio::spawn`]
646    pub fn custom_tokio_runtime(mut self, rt: tokio::runtime::Handle) -> Self {
647        self.settings.tokio_runtime = Some(rt);
648        self
649    }
650
651    /// Configure custom `subscription ID` provider for the server to use
652    /// to when getting new subscription calls.
653    ///
654    /// You may choose static dispatch or dynamic dispatch because
655    /// `IdProvider` is implemented for `Box<T>`.
656    ///
657    /// Default: [`RandomIntegerIdProvider`].
658    ///
659    /// # Examples
660    ///
661    /// ```rust
662    /// use jsonrpsee::server::RandomStringIdProvider;
663    /// use reth_ipc::server::Builder;
664    ///
665    /// // static dispatch
666    /// let builder1 = Builder::default().set_id_provider(RandomStringIdProvider::new(16));
667    ///
668    /// // or dynamic dispatch
669    /// let builder2 = Builder::default().set_id_provider(Box::new(RandomStringIdProvider::new(16)));
670    /// ```
671    pub fn set_id_provider<I: IdProvider + 'static>(mut self, id_provider: I) -> Self {
672        self.id_provider = Arc::new(id_provider);
673        self
674    }
675
676    /// Configure a custom [`tower::ServiceBuilder`] middleware for composing layers to be applied
677    /// to the RPC service.
678    ///
679    /// Default: No tower layers are applied to the RPC service.
680    ///
681    /// # Examples
682    ///
683    /// ```rust
684    /// #[tokio::main]
685    /// async fn main() {
686    ///     let builder = tower::ServiceBuilder::new();
687    ///     let server = reth_ipc::server::Builder::default()
688    ///         .set_http_middleware(builder)
689    ///         .build("/tmp/my-uds".into());
690    /// }
691    /// ```
692    pub fn set_http_middleware<T>(
693        self,
694        service_builder: tower::ServiceBuilder<T>,
695    ) -> Builder<T, RpcMiddleware> {
696        Builder {
697            settings: self.settings,
698            id_provider: self.id_provider,
699            http_middleware: service_builder,
700            rpc_middleware: self.rpc_middleware,
701        }
702    }
703
704    /// Enable middleware that is invoked on every JSON-RPC call.
705    ///
706    /// The middleware itself is very similar to the `tower middleware` but
707    /// it has a different service trait which takes &self instead &mut self
708    /// which means that you can't use built-in middleware from tower.
709    ///
710    /// Another consequence of `&self` is that you must wrap any of the middleware state in
711    /// a type which is Send and provides interior mutability such `Arc<Mutex>`.
712    ///
713    /// The builder itself exposes a similar API as the [`tower::ServiceBuilder`]
714    /// where it is possible to compose layers to the middleware.
715    pub fn set_rpc_middleware<T>(
716        self,
717        rpc_middleware: RpcServiceBuilder<T>,
718    ) -> Builder<HttpMiddleware, T> {
719        Builder {
720            settings: self.settings,
721            id_provider: self.id_provider,
722            rpc_middleware,
723            http_middleware: self.http_middleware,
724        }
725    }
726
727    /// Finalize the configuration of the server. Consumes the [`Builder`].
728    pub fn build(self, endpoint: String) -> IpcServer<HttpMiddleware, RpcMiddleware> {
729        IpcServer {
730            endpoint,
731            cfg: self.settings,
732            id_provider: self.id_provider,
733            http_middleware: self.http_middleware,
734            rpc_middleware: self.rpc_middleware,
735        }
736    }
737}
738
739#[cfg(test)]
740#[expect(missing_docs)]
741pub fn dummy_name() -> String {
742    use rand::Rng;
743    let num: u64 = rand::rng().random();
744    if cfg!(windows) {
745        format!(r"\\.\pipe\my-pipe-{num}")
746    } else {
747        format!(r"/tmp/my-uds-{num}")
748    }
749}
750
751#[cfg(test)]
752mod tests {
753    use super::*;
754    use crate::client::IpcClientBuilder;
755    use futures::future::select;
756    use jsonrpsee::{
757        core::{
758            client::{self, ClientT, Error, Subscription, SubscriptionClientT},
759            middleware::{Batch, BatchEntry, Notification},
760            params::BatchRequestBuilder,
761        },
762        rpc_params,
763        types::Request,
764        PendingSubscriptionSink, RpcModule, SubscriptionMessage,
765    };
766    use reth_tracing::init_test_tracing;
767    use std::pin::pin;
768    use tokio::sync::broadcast;
769    use tokio_stream::wrappers::BroadcastStream;
770
771    async fn pipe_from_stream_with_bounded_buffer(
772        pending: PendingSubscriptionSink,
773        stream: BroadcastStream<usize>,
774    ) -> Result<(), Box<dyn core::error::Error + Send + Sync>> {
775        let sink = pending.accept().await.unwrap();
776        let closed = sink.closed();
777
778        let mut closed = pin!(closed);
779        let mut stream = pin!(stream);
780
781        loop {
782            match select(closed, stream.next()).await {
783                // subscription closed or stream is closed.
784                Either::Left((_, _)) | Either::Right((None, _)) => break Ok(()),
785
786                // received new item from the stream.
787                Either::Right((Some(Ok(item)), c)) => {
788                    let raw_value = serde_json::value::to_raw_value(&item)?;
789                    let notif = SubscriptionMessage::from(raw_value);
790
791                    // NOTE: this will block until there a spot in the queue
792                    // and you might want to do something smarter if it's
793                    // critical that "the most recent item" must be sent when it is produced.
794                    if sink.send(notif).await.is_err() {
795                        break Ok(());
796                    }
797
798                    closed = c;
799                }
800
801                // Send back the error.
802                Either::Right((Some(Err(e)), _)) => break Err(e.into()),
803            }
804        }
805    }
806
807    // Naive example that broadcasts the produced values to all active subscribers.
808    fn produce_items(tx: broadcast::Sender<usize>) {
809        for c in 1..=100 {
810            std::thread::sleep(std::time::Duration::from_millis(1));
811            let _ = tx.send(c);
812        }
813    }
814
815    #[tokio::test]
816    async fn can_set_the_max_response_body_size() {
817        // init_test_tracing();
818        let endpoint = &dummy_name();
819        let server = Builder::default().max_response_body_size(100).build(endpoint.clone());
820        let mut module = RpcModule::new(());
821        module.register_method("anything", |_, _, _| "a".repeat(101)).unwrap();
822        let handle = server.start(module).await.unwrap();
823        tokio::spawn(handle.stopped());
824
825        let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
826        let response: Result<String, Error> = client.request("anything", rpc_params![]).await;
827        assert!(response.unwrap_err().to_string().contains("Exceeded max limit of"));
828    }
829
830    #[tokio::test]
831    async fn can_set_the_max_request_body_size() {
832        init_test_tracing();
833        let endpoint = &dummy_name();
834        let server = Builder::default().max_request_body_size(100).build(endpoint.clone());
835        let mut module = RpcModule::new(());
836        module.register_method("anything", |_, _, _| "succeed").unwrap();
837        let handle = server.start(module).await.unwrap();
838        tokio::spawn(handle.stopped());
839
840        let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
841        let response: Result<String, Error> =
842            client.request("anything", rpc_params!["a".repeat(101)]).await;
843        assert!(response.is_err());
844        let mut batch_request_builder = BatchRequestBuilder::new();
845        let _ = batch_request_builder.insert("anything", rpc_params![]);
846        let _ = batch_request_builder.insert("anything", rpc_params![]);
847        let _ = batch_request_builder.insert("anything", rpc_params![]);
848        // the raw request string is:
849        //  [{"jsonrpc":"2.0","id":0,"method":"anything"},{"jsonrpc":"2.0","id":1, \
850        //    "method":"anything"},{"jsonrpc":"2.0","id":2,"method":"anything"}]"
851        // which is 136 bytes, more than 100 bytes.
852        let response: Result<client::BatchResponse<'_, String>, Error> =
853            client.batch_request(batch_request_builder).await;
854        assert!(response.is_err());
855    }
856
857    #[tokio::test]
858    async fn can_set_max_connections() {
859        init_test_tracing();
860
861        let endpoint = &dummy_name();
862        let server = Builder::default().max_connections(2).build(endpoint.clone());
863        let mut module = RpcModule::new(());
864        module.register_method("anything", |_, _, _| "succeed").unwrap();
865        let handle = server.start(module).await.unwrap();
866        tokio::spawn(handle.stopped());
867
868        let client1 = IpcClientBuilder::default().build(endpoint).await.unwrap();
869        let client2 = IpcClientBuilder::default().build(endpoint).await.unwrap();
870        let client3 = IpcClientBuilder::default().build(endpoint).await.unwrap();
871
872        let response1: Result<String, Error> = client1.request("anything", rpc_params![]).await;
873        let response2: Result<String, Error> = client2.request("anything", rpc_params![]).await;
874        let response3: Result<String, Error> = client3.request("anything", rpc_params![]).await;
875
876        assert!(response1.is_ok());
877        assert!(response2.is_ok());
878        // Third connection is rejected
879        assert!(response3.is_err());
880
881        // Decrement connection count
882        drop(client2);
883        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
884
885        // Can connect again
886        let client4 = IpcClientBuilder::default().build(endpoint).await.unwrap();
887        let response4: Result<String, Error> = client4.request("anything", rpc_params![]).await;
888        assert!(response4.is_ok());
889    }
890
891    #[tokio::test]
892    async fn test_rpc_request() {
893        init_test_tracing();
894        let endpoint = &dummy_name();
895        let server = Builder::default().build(endpoint.clone());
896        let mut module = RpcModule::new(());
897        let msg = r#"{"jsonrpc":"2.0","id":83,"result":"0x7a69"}"#;
898        module.register_method("eth_chainId", move |_, _, _| msg).unwrap();
899        let handle = server.start(module).await.unwrap();
900        tokio::spawn(handle.stopped());
901
902        let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
903        let response: String = client.request("eth_chainId", rpc_params![]).await.unwrap();
904        assert_eq!(response, msg);
905    }
906
907    #[tokio::test]
908    async fn test_batch_request() {
909        let endpoint = &dummy_name();
910        let server = Builder::default().build(endpoint.clone());
911        let mut module = RpcModule::new(());
912        module.register_method("anything", |_, _, _| "ok").unwrap();
913        let handle = server.start(module).await.unwrap();
914        tokio::spawn(handle.stopped());
915
916        let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
917        let mut batch_request_builder = BatchRequestBuilder::new();
918        let _ = batch_request_builder.insert("anything", rpc_params![]);
919        let _ = batch_request_builder.insert("anything", rpc_params![]);
920        let _ = batch_request_builder.insert("anything", rpc_params![]);
921        let result = client
922            .batch_request(batch_request_builder)
923            .await
924            .unwrap()
925            .into_ok()
926            .unwrap()
927            .collect::<Vec<String>>();
928        assert_eq!(result, vec!["ok", "ok", "ok"]);
929    }
930
931    #[tokio::test]
932    async fn test_ipc_modules() {
933        reth_tracing::init_test_tracing();
934        let endpoint = &dummy_name();
935        let server = Builder::default().build(endpoint.clone());
936        let mut module = RpcModule::new(());
937        let msg = r#"{"admin":"1.0","debug":"1.0","engine":"1.0","eth":"1.0","ethash":"1.0","miner":"1.0","net":"1.0","rpc":"1.0","txpool":"1.0","web3":"1.0"}"#;
938        module.register_method("rpc_modules", move |_, _, _| msg).unwrap();
939        let handle = server.start(module).await.unwrap();
940        tokio::spawn(handle.stopped());
941
942        let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
943        let response: String = client.request("rpc_modules", rpc_params![]).await.unwrap();
944        assert_eq!(response, msg);
945    }
946
947    #[tokio::test(flavor = "multi_thread")]
948    async fn test_rpc_subscription() {
949        let endpoint = &dummy_name();
950        let server = Builder::default().build(endpoint.clone());
951        let (tx, _rx) = broadcast::channel::<usize>(16);
952
953        let mut module = RpcModule::new(tx.clone());
954        std::thread::spawn(move || produce_items(tx));
955
956        module
957            .register_subscription(
958                "subscribe_hello",
959                "s_hello",
960                "unsubscribe_hello",
961                |_, pending, tx, _| async move {
962                    let rx = tx.subscribe();
963                    let stream = BroadcastStream::new(rx);
964                    pipe_from_stream_with_bounded_buffer(pending, stream).await?;
965                    Ok(())
966                },
967            )
968            .unwrap();
969
970        let handle = server.start(module).await.unwrap();
971        tokio::spawn(handle.stopped());
972
973        let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
974        let sub: Subscription<usize> =
975            client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap();
976
977        let items = sub.take(16).collect::<Vec<_>>().await;
978        assert_eq!(items.len(), 16);
979    }
980
981    #[tokio::test]
982    async fn test_rpc_middleware() {
983        #[derive(Clone)]
984        struct ModifyRequestIf<S>(S);
985
986        impl<S> RpcServiceT for ModifyRequestIf<S>
987        where
988            S: Send + Sync + RpcServiceT,
989        {
990            type MethodResponse = S::MethodResponse;
991            type NotificationResponse = S::NotificationResponse;
992            type BatchResponse = S::BatchResponse;
993
994            fn call<'a>(
995                &self,
996                mut req: Request<'a>,
997            ) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
998                // Re-direct all calls that isn't `say_hello` to `say_goodbye`
999                if req.method == "say_hello" {
1000                    req.method = "say_goodbye".into();
1001                } else if req.method == "say_goodbye" {
1002                    req.method = "say_hello".into();
1003                }
1004
1005                self.0.call(req)
1006            }
1007
1008            fn batch<'a>(
1009                &self,
1010                mut batch: Batch<'a>,
1011            ) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
1012                for call in batch.iter_mut() {
1013                    match call {
1014                        Ok(BatchEntry::Call(req)) => {
1015                            if req.method == "say_hello" {
1016                                req.method = "say_goodbye".into();
1017                            } else if req.method == "say_goodbye" {
1018                                req.method = "say_hello".into();
1019                            }
1020                        }
1021                        Ok(BatchEntry::Notification(n)) => {
1022                            if n.method == "say_hello" {
1023                                n.method = "say_goodbye".into();
1024                            } else if n.method == "say_goodbye" {
1025                                n.method = "say_hello".into();
1026                            }
1027                        }
1028                        // Invalid request, we don't care about it.
1029                        Err(_err) => {}
1030                    }
1031                }
1032
1033                self.0.batch(batch)
1034            }
1035
1036            fn notification<'a>(
1037                &self,
1038                mut n: Notification<'a>,
1039            ) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
1040                if n.method == "say_hello" {
1041                    n.method = "say_goodbye".into();
1042                } else if n.method == "say_goodbye" {
1043                    n.method = "say_hello".into();
1044                }
1045                self.0.notification(n)
1046            }
1047        }
1048
1049        reth_tracing::init_test_tracing();
1050        let endpoint = &dummy_name();
1051
1052        let rpc_middleware = RpcServiceBuilder::new().layer_fn(ModifyRequestIf);
1053        let server = Builder::default().set_rpc_middleware(rpc_middleware).build(endpoint.clone());
1054
1055        let mut module = RpcModule::new(());
1056        let goodbye_msg = r#"{"jsonrpc":"2.0","id":1,"result":"goodbye"}"#;
1057        let hello_msg = r#"{"jsonrpc":"2.0","id":2,"result":"hello"}"#;
1058        module.register_method("say_hello", move |_, _, _| hello_msg).unwrap();
1059        module.register_method("say_goodbye", move |_, _, _| goodbye_msg).unwrap();
1060        let handle = server.start(module).await.unwrap();
1061        tokio::spawn(handle.stopped());
1062
1063        let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
1064        let say_hello_response: String = client.request("say_hello", rpc_params![]).await.unwrap();
1065        let say_goodbye_response: String =
1066            client.request("say_goodbye", rpc_params![]).await.unwrap();
1067
1068        assert_eq!(say_hello_response, goodbye_msg);
1069        assert_eq!(say_goodbye_response, hello_msg);
1070    }
1071}