1use crate::server::connection::{IpcConn, JsonRpcStream};
4use futures::StreamExt;
5use futures_util::future::Either;
6use interprocess::local_socket::{
7 tokio::prelude::{LocalSocketListener, LocalSocketStream},
8 traits::tokio::{Listener, Stream},
9 GenericFilePath, ListenerOptions, ToFsName,
10};
11use jsonrpsee::{
12 core::{middleware::layer::RpcLoggerLayer, JsonRawValue, TEN_MB_SIZE_BYTES},
13 server::{
14 middleware::rpc::RpcServiceT, stop_channel, ConnectionGuard, ConnectionPermit, IdProvider,
15 RandomIntegerIdProvider, ServerHandle, StopHandle,
16 },
17 BoundedSubscriptions, MethodResponse, MethodSink, Methods,
18};
19use std::{
20 future::Future,
21 io,
22 pin::{pin, Pin},
23 sync::Arc,
24 task::{Context, Poll},
25};
26use tokio::{
27 io::{AsyncRead, AsyncWrite, AsyncWriteExt},
28 sync::oneshot,
29};
30use tower::{layer::util::Identity, Layer, Service};
31use tracing::{debug, instrument, trace, warn, Instrument};
32use crate::{
34 server::{connection::IpcConnDriver, rpc_service::RpcServiceCfg},
35 stream_codec::StreamCodec,
36};
37use tokio::sync::mpsc;
38use tokio_stream::wrappers::ReceiverStream;
39use tower::layer::{util::Stack, LayerFn};
40
41mod connection;
42mod ipc;
43mod rpc_service;
44
45pub use rpc_service::RpcService;
46
47pub struct IpcServer<HttpMiddleware = Identity, RpcMiddleware = Identity> {
51 endpoint: String,
53 id_provider: Arc<dyn IdProvider>,
54 cfg: Settings,
55 rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
56 http_middleware: tower::ServiceBuilder<HttpMiddleware>,
57}
58
59impl<HttpMiddleware, RpcMiddleware> IpcServer<HttpMiddleware, RpcMiddleware> {
60 pub fn endpoint(&self) -> String {
62 self.endpoint.clone()
63 }
64}
65
66impl<HttpMiddleware, RpcMiddleware> IpcServer<HttpMiddleware, RpcMiddleware>
67where
68 RpcMiddleware: for<'a> Layer<RpcService, Service: RpcServiceT> + Clone + Send + 'static,
69 HttpMiddleware: Layer<
70 TowerServiceNoHttp<RpcMiddleware>,
71 Service: Service<
72 String,
73 Response = Option<String>,
74 Error = Box<dyn core::error::Error + Send + Sync + 'static>,
75 Future: Send + Unpin,
76 > + Send,
77 > + Send
78 + 'static,
79{
80 pub async fn start(
102 mut self,
103 methods: impl Into<Methods>,
104 ) -> Result<ServerHandle, IpcServerStartError> {
105 let methods = methods.into();
106
107 let (stop_handle, server_handle) = stop_channel();
108
109 let (tx, rx) = oneshot::channel();
111
112 match self.cfg.tokio_runtime.take() {
113 Some(rt) => rt.spawn(self.start_inner(methods, stop_handle, tx)),
114 None => tokio::spawn(self.start_inner(methods, stop_handle, tx)),
115 };
116 rx.await.expect("channel is open")?;
117
118 Ok(server_handle)
119 }
120
121 async fn start_inner(
122 self,
123 methods: Methods,
124 stop_handle: StopHandle,
125 on_ready: oneshot::Sender<Result<(), IpcServerStartError>>,
126 ) {
127 trace!(endpoint = ?self.endpoint, "starting ipc server");
128
129 if cfg!(unix) {
130 if std::fs::remove_file(&self.endpoint).is_ok() {
132 debug!(endpoint = ?self.endpoint, "removed existing IPC endpoint file");
133 }
134 }
135
136 let listener = match self
137 .endpoint
138 .as_str()
139 .to_fs_name::<GenericFilePath>()
140 .and_then(|name| ListenerOptions::new().name(name).create_tokio())
141 {
142 Ok(listener) => listener,
143 Err(err) => {
144 on_ready
145 .send(Err(IpcServerStartError { endpoint: self.endpoint.clone(), source: err }))
146 .ok();
147 return;
148 }
149 };
150
151 on_ready.send(Ok(())).ok();
153
154 let mut id: u32 = 0;
155 let connection_guard = ConnectionGuard::new(self.cfg.max_connections as usize);
156
157 let stopped = stop_handle.clone().shutdown();
158 let mut stopped = pin!(stopped);
159
160 let (drop_on_completion, mut process_connection_awaiter) = mpsc::channel::<()>(1);
161
162 trace!("accepting ipc connections");
163 loop {
164 match try_accept_conn(&listener, stopped).await {
165 AcceptConnection::Established { local_socket_stream, stop } => {
166 let Some(conn_permit) = connection_guard.try_acquire() else {
167 let (_reader, mut writer) = local_socket_stream.split();
168 let _ = writer
169 .write_all(b"Too many connections. Please try again later.")
170 .await;
171 stopped = stop;
172 continue;
173 };
174
175 let max_conns = connection_guard.max_connections();
176 let curr_conns = max_conns - connection_guard.available_connections();
177 trace!("Accepting new connection {}/{}", curr_conns, max_conns);
178
179 let conn_permit = Arc::new(conn_permit);
180
181 process_connection(ProcessConnection {
182 http_middleware: &self.http_middleware,
183 rpc_middleware: self.rpc_middleware.clone(),
184 conn_permit,
185 conn_id: id,
186 server_cfg: self.cfg.clone(),
187 stop_handle: stop_handle.clone(),
188 drop_on_completion: drop_on_completion.clone(),
189 methods: methods.clone(),
190 id_provider: self.id_provider.clone(),
191 local_socket_stream,
192 });
193
194 id = id.wrapping_add(1);
195 stopped = stop;
196 }
197 AcceptConnection::Shutdown => {
198 break;
199 }
200 AcceptConnection::Err((err, stop)) => {
201 tracing::error!(%err, "Failed accepting a new IPC connection");
202 stopped = stop;
203 }
204 }
205 }
206
207 drop(drop_on_completion);
209
210 while process_connection_awaiter.recv().await.is_some() {
213 }
216 }
217}
218
219enum AcceptConnection<S> {
220 Shutdown,
221 Established { local_socket_stream: LocalSocketStream, stop: S },
222 Err((io::Error, S)),
223}
224
225async fn try_accept_conn<S>(listener: &LocalSocketListener, stopped: S) -> AcceptConnection<S>
226where
227 S: Future + Unpin,
228{
229 match futures_util::future::select(pin!(listener.accept()), stopped).await {
230 Either::Left((res, stop)) => match res {
231 Ok(local_socket_stream) => AcceptConnection::Established { local_socket_stream, stop },
232 Err(e) => AcceptConnection::Err((e, stop)),
233 },
234 Either::Right(_) => AcceptConnection::Shutdown,
235 }
236}
237
238impl std::fmt::Debug for IpcServer {
239 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
240 f.debug_struct("IpcServer")
241 .field("endpoint", &self.endpoint)
242 .field("cfg", &self.cfg)
243 .field("id_provider", &self.id_provider)
244 .finish()
245 }
246}
247
248#[derive(Debug, thiserror::Error)]
250#[error("failed to listen on ipc endpoint `{endpoint}`: {source}")]
251pub struct IpcServerStartError {
252 endpoint: String,
253 #[source]
254 source: io::Error,
255}
256
257#[derive(Debug, Clone)]
259#[allow(dead_code)]
260pub(crate) struct ServiceData {
261 pub(crate) methods: Methods,
263 pub(crate) id_provider: Arc<dyn IdProvider>,
265 pub(crate) stop_handle: StopHandle,
267 pub(crate) conn_id: u32,
269 pub(crate) conn_permit: Arc<ConnectionPermit>,
271 pub(crate) bounded_subscriptions: BoundedSubscriptions,
273 pub(crate) method_sink: MethodSink,
277 pub(crate) server_cfg: Settings,
279}
280
281#[derive(Debug, Clone)]
284pub struct RpcServiceBuilder<L>(tower::ServiceBuilder<L>);
285
286impl Default for RpcServiceBuilder<Identity> {
287 fn default() -> Self {
288 Self(tower::ServiceBuilder::new())
289 }
290}
291
292impl RpcServiceBuilder<Identity> {
293 pub const fn new() -> Self {
295 Self(tower::ServiceBuilder::new())
296 }
297}
298
299impl<L> RpcServiceBuilder<L> {
300 pub fn option_layer<T>(
304 self,
305 layer: Option<T>,
306 ) -> RpcServiceBuilder<Stack<Either<T, Identity>, L>> {
307 let layer = if let Some(layer) = layer {
308 Either::Left(layer)
309 } else {
310 Either::Right(Identity::new())
311 };
312 self.layer(layer)
313 }
314
315 pub fn layer<T>(self, layer: T) -> RpcServiceBuilder<Stack<T, L>> {
319 RpcServiceBuilder(self.0.layer(layer))
320 }
321
322 pub fn layer_fn<F>(self, f: F) -> RpcServiceBuilder<Stack<LayerFn<F>, L>> {
327 RpcServiceBuilder(self.0.layer_fn(f))
328 }
329
330 pub fn rpc_logger(self, max_log_len: u32) -> RpcServiceBuilder<Stack<RpcLoggerLayer, L>> {
334 RpcServiceBuilder(self.0.layer(RpcLoggerLayer::new(max_log_len)))
335 }
336
337 pub(crate) fn service<S>(&self, service: S) -> L::Service
339 where
340 L: tower::Layer<S>,
341 {
342 self.0.service(service)
343 }
344}
345
346#[derive(Debug, Clone)]
351pub struct TowerServiceNoHttp<L> {
352 inner: ServiceData,
353 rpc_middleware: RpcServiceBuilder<L>,
354}
355
356impl<RpcMiddleware> Service<String> for TowerServiceNoHttp<RpcMiddleware>
357where
358 RpcMiddleware: for<'a> Layer<RpcService>,
359 for<'a> <RpcMiddleware as Layer<RpcService>>::Service:
360 Send + Sync + 'static + RpcServiceT<MethodResponse = MethodResponse>,
361{
362 type Response = Option<String>;
368
369 type Error = Box<dyn core::error::Error + Send + Sync + 'static>;
370
371 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
372
373 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
375 Poll::Ready(Ok(()))
376 }
377
378 fn call(&mut self, request: String) -> Self::Future {
379 trace!("{:?}", request);
380
381 let cfg = RpcServiceCfg::CallsAndSubscriptions {
382 bounded_subscriptions: BoundedSubscriptions::new(
383 self.inner.server_cfg.max_subscriptions_per_connection,
384 ),
385 id_provider: self.inner.id_provider.clone(),
386 sink: self.inner.method_sink.clone(),
387 };
388
389 let max_response_body_size = self.inner.server_cfg.max_response_body_size as usize;
390 let max_request_body_size = self.inner.server_cfg.max_request_body_size as usize;
391 let conn = self.inner.conn_permit.clone();
392 let rpc_service = self.rpc_middleware.service(RpcService::new(
393 self.inner.methods.clone(),
394 max_response_body_size,
395 self.inner.conn_id.into(),
396 cfg,
397 ));
398 let f = tokio::task::spawn(async move {
405 ipc::call_with_service(
406 request,
407 rpc_service,
408 max_response_body_size,
409 max_request_body_size,
410 conn,
411 )
412 .await
413 });
414
415 Box::pin(async move { f.await.map_err(|err| err.into()) })
416 }
417}
418
419struct ProcessConnection<'a, HttpMiddleware, RpcMiddleware> {
420 http_middleware: &'a tower::ServiceBuilder<HttpMiddleware>,
421 rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
422 conn_permit: Arc<ConnectionPermit>,
423 conn_id: u32,
424 server_cfg: Settings,
425 stop_handle: StopHandle,
426 drop_on_completion: mpsc::Sender<()>,
427 methods: Methods,
428 id_provider: Arc<dyn IdProvider>,
429 local_socket_stream: LocalSocketStream,
430}
431
432#[instrument(name = "connection", skip_all, fields(conn_id = %params.conn_id), level = "INFO")]
434fn process_connection<'b, RpcMiddleware, HttpMiddleware>(
435 params: ProcessConnection<'_, HttpMiddleware, RpcMiddleware>,
436) where
437 RpcMiddleware: Layer<RpcService> + Clone + Send + 'static,
438 for<'a> <RpcMiddleware as Layer<RpcService>>::Service: RpcServiceT,
439 HttpMiddleware: Layer<TowerServiceNoHttp<RpcMiddleware>> + Send + 'static,
440 <HttpMiddleware as Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service: Send
441 + Service<
442 String,
443 Response = Option<String>,
444 Error = Box<dyn core::error::Error + Send + Sync + 'static>,
445 >,
446 <<HttpMiddleware as Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service as Service<String>>::Future:
447 Send + Unpin,
448 {
449 let ProcessConnection {
450 http_middleware,
451 rpc_middleware,
452 conn_permit,
453 conn_id,
454 server_cfg,
455 stop_handle,
456 drop_on_completion,
457 id_provider,
458 methods,
459 local_socket_stream,
460 } = params;
461
462 let ipc = IpcConn(tokio_util::codec::Decoder::framed(
463 StreamCodec::stream_incoming(),
464 local_socket_stream,
465 ));
466
467 let (tx, rx) = mpsc::channel::<Box<JsonRawValue>>(server_cfg.message_buffer_capacity as usize);
468 let method_sink = MethodSink::new_with_limit(tx, server_cfg.max_response_body_size);
469 let tower_service = TowerServiceNoHttp {
470 inner: ServiceData {
471 methods,
472 id_provider,
473 stop_handle: stop_handle.clone(),
474 server_cfg: server_cfg.clone(),
475 conn_id,
476 conn_permit,
477 bounded_subscriptions: BoundedSubscriptions::new(
478 server_cfg.max_subscriptions_per_connection,
479 ),
480 method_sink,
481 },
482 rpc_middleware,
483 };
484
485 let service = http_middleware.service(tower_service);
486 tokio::spawn(async {
487 to_ipc_service(ipc, service, stop_handle, rx).in_current_span().await;
488 drop(drop_on_completion)
489 });
490}
491
492async fn to_ipc_service<S, T>(
493 ipc: IpcConn<JsonRpcStream<T>>,
494 service: S,
495 stop_handle: StopHandle,
496 rx: mpsc::Receiver<Box<JsonRawValue>>,
497) where
498 S: Service<String, Response = Option<String>> + Send + 'static,
499 S::Error: Into<Box<dyn core::error::Error + Send + Sync>>,
500 S::Future: Send + Unpin,
501 T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
502{
503 let rx_item = ReceiverStream::new(rx);
504 let conn = IpcConnDriver {
505 conn: ipc,
506 service,
507 pending_calls: Default::default(),
508 items: Default::default(),
509 };
510 let stopped = stop_handle.shutdown();
511
512 let mut conn = pin!(conn);
513 let mut rx_item = pin!(rx_item);
514 let mut stopped = pin!(stopped);
515
516 loop {
517 tokio::select! {
518 _ = &mut conn => {
519 break
520 }
521 item = rx_item.next() => {
522 if let Some(item) = item {
523 conn.push_back(item.to_string());
524 }
525 }
526 _ = &mut stopped => {
527 break
529 }
530 }
531 }
532}
533
534#[derive(Debug, Clone)]
536pub struct Settings {
537 max_request_body_size: u32,
539 max_response_body_size: u32,
541 max_log_length: u32,
545 max_connections: u32,
547 max_subscriptions_per_connection: u32,
549 message_buffer_capacity: u32,
551 tokio_runtime: Option<tokio::runtime::Handle>,
553}
554
555impl Default for Settings {
556 fn default() -> Self {
557 Self {
558 max_request_body_size: TEN_MB_SIZE_BYTES,
559 max_response_body_size: TEN_MB_SIZE_BYTES,
560 max_log_length: 4096,
561 max_connections: 100,
562 max_subscriptions_per_connection: 1024,
563 message_buffer_capacity: 1024,
564 tokio_runtime: None,
565 }
566 }
567}
568
569#[derive(Debug)]
571pub struct Builder<HttpMiddleware, RpcMiddleware> {
572 settings: Settings,
573 id_provider: Arc<dyn IdProvider>,
575 rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
576 http_middleware: tower::ServiceBuilder<HttpMiddleware>,
577}
578
579impl Default for Builder<Identity, Identity> {
580 fn default() -> Self {
581 Self {
582 settings: Settings::default(),
583 id_provider: Arc::new(RandomIntegerIdProvider),
584 rpc_middleware: RpcServiceBuilder::new(),
585 http_middleware: tower::ServiceBuilder::new(),
586 }
587 }
588}
589
590impl<HttpMiddleware, RpcMiddleware> Builder<HttpMiddleware, RpcMiddleware> {
591 pub const fn max_request_body_size(mut self, size: u32) -> Self {
593 self.settings.max_request_body_size = size;
594 self
595 }
596
597 pub const fn max_response_body_size(mut self, size: u32) -> Self {
599 self.settings.max_response_body_size = size;
600 self
601 }
602
603 pub const fn max_log_length(mut self, size: u32) -> Self {
605 self.settings.max_log_length = size;
606 self
607 }
608
609 pub const fn max_connections(mut self, max: u32) -> Self {
611 self.settings.max_connections = max;
612 self
613 }
614
615 pub const fn max_subscriptions_per_connection(mut self, max: u32) -> Self {
617 self.settings.max_subscriptions_per_connection = max;
618 self
619 }
620
621 pub const fn set_message_buffer_capacity(mut self, c: u32) -> Self {
639 self.settings.message_buffer_capacity = c;
640 self
641 }
642
643 pub fn custom_tokio_runtime(mut self, rt: tokio::runtime::Handle) -> Self {
647 self.settings.tokio_runtime = Some(rt);
648 self
649 }
650
651 pub fn set_id_provider<I: IdProvider + 'static>(mut self, id_provider: I) -> Self {
672 self.id_provider = Arc::new(id_provider);
673 self
674 }
675
676 pub fn set_http_middleware<T>(
693 self,
694 service_builder: tower::ServiceBuilder<T>,
695 ) -> Builder<T, RpcMiddleware> {
696 Builder {
697 settings: self.settings,
698 id_provider: self.id_provider,
699 http_middleware: service_builder,
700 rpc_middleware: self.rpc_middleware,
701 }
702 }
703
704 pub fn set_rpc_middleware<T>(
716 self,
717 rpc_middleware: RpcServiceBuilder<T>,
718 ) -> Builder<HttpMiddleware, T> {
719 Builder {
720 settings: self.settings,
721 id_provider: self.id_provider,
722 rpc_middleware,
723 http_middleware: self.http_middleware,
724 }
725 }
726
727 pub fn build(self, endpoint: String) -> IpcServer<HttpMiddleware, RpcMiddleware> {
729 IpcServer {
730 endpoint,
731 cfg: self.settings,
732 id_provider: self.id_provider,
733 http_middleware: self.http_middleware,
734 rpc_middleware: self.rpc_middleware,
735 }
736 }
737}
738
739#[cfg(test)]
740#[expect(missing_docs)]
741pub fn dummy_name() -> String {
742 use rand::Rng;
743 let num: u64 = rand::rng().random();
744 if cfg!(windows) {
745 format!(r"\\.\pipe\my-pipe-{num}")
746 } else {
747 format!(r"/tmp/my-uds-{num}")
748 }
749}
750
751#[cfg(test)]
752mod tests {
753 use super::*;
754 use crate::client::IpcClientBuilder;
755 use futures::future::select;
756 use jsonrpsee::{
757 core::{
758 client::{self, ClientT, Error, Subscription, SubscriptionClientT},
759 middleware::{Batch, BatchEntry, Notification},
760 params::BatchRequestBuilder,
761 },
762 rpc_params,
763 types::Request,
764 PendingSubscriptionSink, RpcModule, SubscriptionMessage,
765 };
766 use reth_tracing::init_test_tracing;
767 use std::pin::pin;
768 use tokio::sync::broadcast;
769 use tokio_stream::wrappers::BroadcastStream;
770
771 async fn pipe_from_stream_with_bounded_buffer(
772 pending: PendingSubscriptionSink,
773 stream: BroadcastStream<usize>,
774 ) -> Result<(), Box<dyn core::error::Error + Send + Sync>> {
775 let sink = pending.accept().await.unwrap();
776 let closed = sink.closed();
777
778 let mut closed = pin!(closed);
779 let mut stream = pin!(stream);
780
781 loop {
782 match select(closed, stream.next()).await {
783 Either::Left((_, _)) | Either::Right((None, _)) => break Ok(()),
785
786 Either::Right((Some(Ok(item)), c)) => {
788 let raw_value = serde_json::value::to_raw_value(&item)?;
789 let notif = SubscriptionMessage::from(raw_value);
790
791 if sink.send(notif).await.is_err() {
795 break Ok(());
796 }
797
798 closed = c;
799 }
800
801 Either::Right((Some(Err(e)), _)) => break Err(e.into()),
803 }
804 }
805 }
806
807 fn produce_items(tx: broadcast::Sender<usize>) {
809 for c in 1..=100 {
810 std::thread::sleep(std::time::Duration::from_millis(1));
811 let _ = tx.send(c);
812 }
813 }
814
815 #[tokio::test]
816 async fn can_set_the_max_response_body_size() {
817 let endpoint = &dummy_name();
819 let server = Builder::default().max_response_body_size(100).build(endpoint.clone());
820 let mut module = RpcModule::new(());
821 module.register_method("anything", |_, _, _| "a".repeat(101)).unwrap();
822 let handle = server.start(module).await.unwrap();
823 tokio::spawn(handle.stopped());
824
825 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
826 let response: Result<String, Error> = client.request("anything", rpc_params![]).await;
827 assert!(response.unwrap_err().to_string().contains("Exceeded max limit of"));
828 }
829
830 #[tokio::test]
831 async fn can_set_the_max_request_body_size() {
832 init_test_tracing();
833 let endpoint = &dummy_name();
834 let server = Builder::default().max_request_body_size(100).build(endpoint.clone());
835 let mut module = RpcModule::new(());
836 module.register_method("anything", |_, _, _| "succeed").unwrap();
837 let handle = server.start(module).await.unwrap();
838 tokio::spawn(handle.stopped());
839
840 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
841 let response: Result<String, Error> =
842 client.request("anything", rpc_params!["a".repeat(101)]).await;
843 assert!(response.is_err());
844 let mut batch_request_builder = BatchRequestBuilder::new();
845 let _ = batch_request_builder.insert("anything", rpc_params![]);
846 let _ = batch_request_builder.insert("anything", rpc_params![]);
847 let _ = batch_request_builder.insert("anything", rpc_params![]);
848 let response: Result<client::BatchResponse<'_, String>, Error> =
853 client.batch_request(batch_request_builder).await;
854 assert!(response.is_err());
855 }
856
857 #[tokio::test]
858 async fn can_set_max_connections() {
859 init_test_tracing();
860
861 let endpoint = &dummy_name();
862 let server = Builder::default().max_connections(2).build(endpoint.clone());
863 let mut module = RpcModule::new(());
864 module.register_method("anything", |_, _, _| "succeed").unwrap();
865 let handle = server.start(module).await.unwrap();
866 tokio::spawn(handle.stopped());
867
868 let client1 = IpcClientBuilder::default().build(endpoint).await.unwrap();
869 let client2 = IpcClientBuilder::default().build(endpoint).await.unwrap();
870 let client3 = IpcClientBuilder::default().build(endpoint).await.unwrap();
871
872 let response1: Result<String, Error> = client1.request("anything", rpc_params![]).await;
873 let response2: Result<String, Error> = client2.request("anything", rpc_params![]).await;
874 let response3: Result<String, Error> = client3.request("anything", rpc_params![]).await;
875
876 assert!(response1.is_ok());
877 assert!(response2.is_ok());
878 assert!(response3.is_err());
880
881 drop(client2);
883 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
884
885 let client4 = IpcClientBuilder::default().build(endpoint).await.unwrap();
887 let response4: Result<String, Error> = client4.request("anything", rpc_params![]).await;
888 assert!(response4.is_ok());
889 }
890
891 #[tokio::test]
892 async fn test_rpc_request() {
893 init_test_tracing();
894 let endpoint = &dummy_name();
895 let server = Builder::default().build(endpoint.clone());
896 let mut module = RpcModule::new(());
897 let msg = r#"{"jsonrpc":"2.0","id":83,"result":"0x7a69"}"#;
898 module.register_method("eth_chainId", move |_, _, _| msg).unwrap();
899 let handle = server.start(module).await.unwrap();
900 tokio::spawn(handle.stopped());
901
902 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
903 let response: String = client.request("eth_chainId", rpc_params![]).await.unwrap();
904 assert_eq!(response, msg);
905 }
906
907 #[tokio::test]
908 async fn test_batch_request() {
909 let endpoint = &dummy_name();
910 let server = Builder::default().build(endpoint.clone());
911 let mut module = RpcModule::new(());
912 module.register_method("anything", |_, _, _| "ok").unwrap();
913 let handle = server.start(module).await.unwrap();
914 tokio::spawn(handle.stopped());
915
916 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
917 let mut batch_request_builder = BatchRequestBuilder::new();
918 let _ = batch_request_builder.insert("anything", rpc_params![]);
919 let _ = batch_request_builder.insert("anything", rpc_params![]);
920 let _ = batch_request_builder.insert("anything", rpc_params![]);
921 let result = client
922 .batch_request(batch_request_builder)
923 .await
924 .unwrap()
925 .into_ok()
926 .unwrap()
927 .collect::<Vec<String>>();
928 assert_eq!(result, vec!["ok", "ok", "ok"]);
929 }
930
931 #[tokio::test]
932 async fn test_ipc_modules() {
933 reth_tracing::init_test_tracing();
934 let endpoint = &dummy_name();
935 let server = Builder::default().build(endpoint.clone());
936 let mut module = RpcModule::new(());
937 let msg = r#"{"admin":"1.0","debug":"1.0","engine":"1.0","eth":"1.0","ethash":"1.0","miner":"1.0","net":"1.0","rpc":"1.0","txpool":"1.0","web3":"1.0"}"#;
938 module.register_method("rpc_modules", move |_, _, _| msg).unwrap();
939 let handle = server.start(module).await.unwrap();
940 tokio::spawn(handle.stopped());
941
942 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
943 let response: String = client.request("rpc_modules", rpc_params![]).await.unwrap();
944 assert_eq!(response, msg);
945 }
946
947 #[tokio::test(flavor = "multi_thread")]
948 async fn test_rpc_subscription() {
949 let endpoint = &dummy_name();
950 let server = Builder::default().build(endpoint.clone());
951 let (tx, _rx) = broadcast::channel::<usize>(16);
952
953 let mut module = RpcModule::new(tx.clone());
954 std::thread::spawn(move || produce_items(tx));
955
956 module
957 .register_subscription(
958 "subscribe_hello",
959 "s_hello",
960 "unsubscribe_hello",
961 |_, pending, tx, _| async move {
962 let rx = tx.subscribe();
963 let stream = BroadcastStream::new(rx);
964 pipe_from_stream_with_bounded_buffer(pending, stream).await?;
965 Ok(())
966 },
967 )
968 .unwrap();
969
970 let handle = server.start(module).await.unwrap();
971 tokio::spawn(handle.stopped());
972
973 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
974 let sub: Subscription<usize> =
975 client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap();
976
977 let items = sub.take(16).collect::<Vec<_>>().await;
978 assert_eq!(items.len(), 16);
979 }
980
981 #[tokio::test]
982 async fn test_rpc_middleware() {
983 #[derive(Clone)]
984 struct ModifyRequestIf<S>(S);
985
986 impl<S> RpcServiceT for ModifyRequestIf<S>
987 where
988 S: Send + Sync + RpcServiceT,
989 {
990 type MethodResponse = S::MethodResponse;
991 type NotificationResponse = S::NotificationResponse;
992 type BatchResponse = S::BatchResponse;
993
994 fn call<'a>(
995 &self,
996 mut req: Request<'a>,
997 ) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
998 if req.method == "say_hello" {
1000 req.method = "say_goodbye".into();
1001 } else if req.method == "say_goodbye" {
1002 req.method = "say_hello".into();
1003 }
1004
1005 self.0.call(req)
1006 }
1007
1008 fn batch<'a>(
1009 &self,
1010 mut batch: Batch<'a>,
1011 ) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
1012 for call in batch.iter_mut() {
1013 match call {
1014 Ok(BatchEntry::Call(req)) => {
1015 if req.method == "say_hello" {
1016 req.method = "say_goodbye".into();
1017 } else if req.method == "say_goodbye" {
1018 req.method = "say_hello".into();
1019 }
1020 }
1021 Ok(BatchEntry::Notification(n)) => {
1022 if n.method == "say_hello" {
1023 n.method = "say_goodbye".into();
1024 } else if n.method == "say_goodbye" {
1025 n.method = "say_hello".into();
1026 }
1027 }
1028 Err(_err) => {}
1030 }
1031 }
1032
1033 self.0.batch(batch)
1034 }
1035
1036 fn notification<'a>(
1037 &self,
1038 mut n: Notification<'a>,
1039 ) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
1040 if n.method == "say_hello" {
1041 n.method = "say_goodbye".into();
1042 } else if n.method == "say_goodbye" {
1043 n.method = "say_hello".into();
1044 }
1045 self.0.notification(n)
1046 }
1047 }
1048
1049 reth_tracing::init_test_tracing();
1050 let endpoint = &dummy_name();
1051
1052 let rpc_middleware = RpcServiceBuilder::new().layer_fn(ModifyRequestIf);
1053 let server = Builder::default().set_rpc_middleware(rpc_middleware).build(endpoint.clone());
1054
1055 let mut module = RpcModule::new(());
1056 let goodbye_msg = r#"{"jsonrpc":"2.0","id":1,"result":"goodbye"}"#;
1057 let hello_msg = r#"{"jsonrpc":"2.0","id":2,"result":"hello"}"#;
1058 module.register_method("say_hello", move |_, _, _| hello_msg).unwrap();
1059 module.register_method("say_goodbye", move |_, _, _| goodbye_msg).unwrap();
1060 let handle = server.start(module).await.unwrap();
1061 tokio::spawn(handle.stopped());
1062
1063 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
1064 let say_hello_response: String = client.request("say_hello", rpc_params![]).await.unwrap();
1065 let say_goodbye_response: String =
1066 client.request("say_goodbye", rpc_params![]).await.unwrap();
1067
1068 assert_eq!(say_hello_response, goodbye_msg);
1069 assert_eq!(say_goodbye_response, hello_msg);
1070 }
1071}