1use crate::server::connection::{IpcConn, JsonRpcStream};
4use futures::StreamExt;
5use futures_util::future::Either;
6use interprocess::local_socket::{
7 tokio::prelude::{LocalSocketListener, LocalSocketStream},
8 traits::tokio::{Listener, Stream},
9 GenericFilePath, ListenerOptions, ToFsName,
10};
11use jsonrpsee::{
12 core::TEN_MB_SIZE_BYTES,
13 server::{
14 middleware::rpc::{RpcLoggerLayer, RpcServiceT},
15 stop_channel, ConnectionGuard, ConnectionPermit, IdProvider, RandomIntegerIdProvider,
16 ServerHandle, StopHandle,
17 },
18 BoundedSubscriptions, MethodSink, Methods,
19};
20use std::{
21 future::Future,
22 io,
23 pin::{pin, Pin},
24 sync::Arc,
25 task::{Context, Poll},
26};
27use tokio::{
28 io::{AsyncRead, AsyncWrite, AsyncWriteExt},
29 sync::oneshot,
30};
31use tower::{layer::util::Identity, Layer, Service};
32use tracing::{debug, instrument, trace, warn, Instrument};
33use crate::{
35 server::{connection::IpcConnDriver, rpc_service::RpcServiceCfg},
36 stream_codec::StreamCodec,
37};
38use tokio::sync::mpsc;
39use tokio_stream::wrappers::ReceiverStream;
40use tower::layer::{util::Stack, LayerFn};
41
42mod connection;
43mod ipc;
44mod rpc_service;
45
46pub use rpc_service::RpcService;
47
48pub struct IpcServer<HttpMiddleware = Identity, RpcMiddleware = Identity> {
52 endpoint: String,
54 id_provider: Arc<dyn IdProvider>,
55 cfg: Settings,
56 rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
57 http_middleware: tower::ServiceBuilder<HttpMiddleware>,
58}
59
60impl<HttpMiddleware, RpcMiddleware> IpcServer<HttpMiddleware, RpcMiddleware> {
61 pub fn endpoint(&self) -> String {
63 self.endpoint.clone()
64 }
65}
66
67impl<HttpMiddleware, RpcMiddleware> IpcServer<HttpMiddleware, RpcMiddleware>
68where
69 RpcMiddleware: for<'a> Layer<RpcService, Service: RpcServiceT<'a>> + Clone + Send + 'static,
70 HttpMiddleware: Layer<
71 TowerServiceNoHttp<RpcMiddleware>,
72 Service: Service<
73 String,
74 Response = Option<String>,
75 Error = Box<dyn core::error::Error + Send + Sync + 'static>,
76 Future: Send + Unpin,
77 > + Send,
78 > + Send
79 + 'static,
80{
81 pub async fn start(
103 mut self,
104 methods: impl Into<Methods>,
105 ) -> Result<ServerHandle, IpcServerStartError> {
106 let methods = methods.into();
107
108 let (stop_handle, server_handle) = stop_channel();
109
110 let (tx, rx) = oneshot::channel();
112
113 match self.cfg.tokio_runtime.take() {
114 Some(rt) => rt.spawn(self.start_inner(methods, stop_handle, tx)),
115 None => tokio::spawn(self.start_inner(methods, stop_handle, tx)),
116 };
117 rx.await.expect("channel is open")?;
118
119 Ok(server_handle)
120 }
121
122 async fn start_inner(
123 self,
124 methods: Methods,
125 stop_handle: StopHandle,
126 on_ready: oneshot::Sender<Result<(), IpcServerStartError>>,
127 ) {
128 trace!(endpoint = ?self.endpoint, "starting ipc server");
129
130 if cfg!(unix) {
131 if std::fs::remove_file(&self.endpoint).is_ok() {
133 debug!(endpoint = ?self.endpoint, "removed existing IPC endpoint file");
134 }
135 }
136
137 let listener = match self
138 .endpoint
139 .as_str()
140 .to_fs_name::<GenericFilePath>()
141 .and_then(|name| ListenerOptions::new().name(name).create_tokio())
142 {
143 Ok(listener) => listener,
144 Err(err) => {
145 on_ready
146 .send(Err(IpcServerStartError { endpoint: self.endpoint.clone(), source: err }))
147 .ok();
148 return;
149 }
150 };
151
152 on_ready.send(Ok(())).ok();
154
155 let mut id: u32 = 0;
156 let connection_guard = ConnectionGuard::new(self.cfg.max_connections as usize);
157
158 let stopped = stop_handle.clone().shutdown();
159 let mut stopped = pin!(stopped);
160
161 let (drop_on_completion, mut process_connection_awaiter) = mpsc::channel::<()>(1);
162
163 trace!("accepting ipc connections");
164 loop {
165 match try_accept_conn(&listener, stopped).await {
166 AcceptConnection::Established { local_socket_stream, stop } => {
167 let Some(conn_permit) = connection_guard.try_acquire() else {
168 let (_reader, mut writer) = local_socket_stream.split();
169 let _ = writer
170 .write_all(b"Too many connections. Please try again later.")
171 .await;
172 stopped = stop;
173 continue;
174 };
175
176 let max_conns = connection_guard.max_connections();
177 let curr_conns = max_conns - connection_guard.available_connections();
178 trace!("Accepting new connection {}/{}", curr_conns, max_conns);
179
180 let conn_permit = Arc::new(conn_permit);
181
182 process_connection(ProcessConnection {
183 http_middleware: &self.http_middleware,
184 rpc_middleware: self.rpc_middleware.clone(),
185 conn_permit,
186 conn_id: id,
187 server_cfg: self.cfg.clone(),
188 stop_handle: stop_handle.clone(),
189 drop_on_completion: drop_on_completion.clone(),
190 methods: methods.clone(),
191 id_provider: self.id_provider.clone(),
192 local_socket_stream,
193 });
194
195 id = id.wrapping_add(1);
196 stopped = stop;
197 }
198 AcceptConnection::Shutdown => {
199 break;
200 }
201 AcceptConnection::Err((err, stop)) => {
202 tracing::error!(%err, "Failed accepting a new IPC connection");
203 stopped = stop;
204 }
205 }
206 }
207
208 drop(drop_on_completion);
210
211 while process_connection_awaiter.recv().await.is_some() {
214 }
217 }
218}
219
220enum AcceptConnection<S> {
221 Shutdown,
222 Established { local_socket_stream: LocalSocketStream, stop: S },
223 Err((io::Error, S)),
224}
225
226async fn try_accept_conn<S>(listener: &LocalSocketListener, stopped: S) -> AcceptConnection<S>
227where
228 S: Future + Unpin,
229{
230 match futures_util::future::select(pin!(listener.accept()), stopped).await {
231 Either::Left((res, stop)) => match res {
232 Ok(local_socket_stream) => AcceptConnection::Established { local_socket_stream, stop },
233 Err(e) => AcceptConnection::Err((e, stop)),
234 },
235 Either::Right(_) => AcceptConnection::Shutdown,
236 }
237}
238
239impl std::fmt::Debug for IpcServer {
240 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
241 f.debug_struct("IpcServer")
242 .field("endpoint", &self.endpoint)
243 .field("cfg", &self.cfg)
244 .field("id_provider", &self.id_provider)
245 .finish()
246 }
247}
248
249#[derive(Debug, thiserror::Error)]
251#[error("failed to listen on ipc endpoint `{endpoint}`: {source}")]
252pub struct IpcServerStartError {
253 endpoint: String,
254 #[source]
255 source: io::Error,
256}
257
258#[derive(Debug, Clone)]
260#[allow(dead_code)]
261pub(crate) struct ServiceData {
262 pub(crate) methods: Methods,
264 pub(crate) id_provider: Arc<dyn IdProvider>,
266 pub(crate) stop_handle: StopHandle,
268 pub(crate) conn_id: u32,
270 pub(crate) conn_permit: Arc<ConnectionPermit>,
272 pub(crate) bounded_subscriptions: BoundedSubscriptions,
274 pub(crate) method_sink: MethodSink,
278 pub(crate) server_cfg: Settings,
280}
281
282#[derive(Debug, Clone)]
285pub struct RpcServiceBuilder<L>(tower::ServiceBuilder<L>);
286
287impl Default for RpcServiceBuilder<Identity> {
288 fn default() -> Self {
289 Self(tower::ServiceBuilder::new())
290 }
291}
292
293impl RpcServiceBuilder<Identity> {
294 pub fn new() -> Self {
296 Self(tower::ServiceBuilder::new())
297 }
298}
299
300impl<L> RpcServiceBuilder<L> {
301 pub fn option_layer<T>(
305 self,
306 layer: Option<T>,
307 ) -> RpcServiceBuilder<Stack<Either<T, Identity>, L>> {
308 let layer = if let Some(layer) = layer {
309 Either::Left(layer)
310 } else {
311 Either::Right(Identity::new())
312 };
313 self.layer(layer)
314 }
315
316 pub fn layer<T>(self, layer: T) -> RpcServiceBuilder<Stack<T, L>> {
320 RpcServiceBuilder(self.0.layer(layer))
321 }
322
323 pub fn layer_fn<F>(self, f: F) -> RpcServiceBuilder<Stack<LayerFn<F>, L>> {
328 RpcServiceBuilder(self.0.layer_fn(f))
329 }
330
331 pub fn rpc_logger(self, max_log_len: u32) -> RpcServiceBuilder<Stack<RpcLoggerLayer, L>> {
335 RpcServiceBuilder(self.0.layer(RpcLoggerLayer::new(max_log_len)))
336 }
337
338 pub(crate) fn service<S>(&self, service: S) -> L::Service
340 where
341 L: tower::Layer<S>,
342 {
343 self.0.service(service)
344 }
345}
346
347#[derive(Debug, Clone)]
352pub struct TowerServiceNoHttp<L> {
353 inner: ServiceData,
354 rpc_middleware: RpcServiceBuilder<L>,
355}
356
357impl<RpcMiddleware> Service<String> for TowerServiceNoHttp<RpcMiddleware>
358where
359 RpcMiddleware: for<'a> Layer<RpcService>,
360 for<'a> <RpcMiddleware as Layer<RpcService>>::Service: Send + Sync + 'static + RpcServiceT<'a>,
361{
362 type Response = Option<String>;
368
369 type Error = Box<dyn core::error::Error + Send + Sync + 'static>;
370
371 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
372
373 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
375 Poll::Ready(Ok(()))
376 }
377
378 fn call(&mut self, request: String) -> Self::Future {
379 trace!("{:?}", request);
380
381 let cfg = RpcServiceCfg::CallsAndSubscriptions {
382 bounded_subscriptions: BoundedSubscriptions::new(
383 self.inner.server_cfg.max_subscriptions_per_connection,
384 ),
385 id_provider: self.inner.id_provider.clone(),
386 sink: self.inner.method_sink.clone(),
387 };
388
389 let max_response_body_size = self.inner.server_cfg.max_response_body_size as usize;
390 let max_request_body_size = self.inner.server_cfg.max_request_body_size as usize;
391 let conn = self.inner.conn_permit.clone();
392 let rpc_service = self.rpc_middleware.service(RpcService::new(
393 self.inner.methods.clone(),
394 max_response_body_size,
395 self.inner.conn_id.into(),
396 cfg,
397 ));
398 let f = tokio::task::spawn(async move {
405 ipc::call_with_service(
406 request,
407 rpc_service,
408 max_response_body_size,
409 max_request_body_size,
410 conn,
411 )
412 .await
413 });
414
415 Box::pin(async move { f.await.map_err(|err| err.into()) })
416 }
417}
418
419struct ProcessConnection<'a, HttpMiddleware, RpcMiddleware> {
420 http_middleware: &'a tower::ServiceBuilder<HttpMiddleware>,
421 rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
422 conn_permit: Arc<ConnectionPermit>,
423 conn_id: u32,
424 server_cfg: Settings,
425 stop_handle: StopHandle,
426 drop_on_completion: mpsc::Sender<()>,
427 methods: Methods,
428 id_provider: Arc<dyn IdProvider>,
429 local_socket_stream: LocalSocketStream,
430}
431
432#[instrument(name = "connection", skip_all, fields(conn_id = %params.conn_id), level = "INFO")]
434fn process_connection<'b, RpcMiddleware, HttpMiddleware>(
435 params: ProcessConnection<'_, HttpMiddleware, RpcMiddleware>,
436) where
437 RpcMiddleware: Layer<RpcService> + Clone + Send + 'static,
438 for<'a> <RpcMiddleware as Layer<RpcService>>::Service: RpcServiceT<'a>,
439 HttpMiddleware: Layer<TowerServiceNoHttp<RpcMiddleware>> + Send + 'static,
440 <HttpMiddleware as Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service: Send
441 + Service<
442 String,
443 Response = Option<String>,
444 Error = Box<dyn core::error::Error + Send + Sync + 'static>,
445 >,
446 <<HttpMiddleware as Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service as Service<String>>::Future:
447 Send + Unpin,
448 {
449 let ProcessConnection {
450 http_middleware,
451 rpc_middleware,
452 conn_permit,
453 conn_id,
454 server_cfg,
455 stop_handle,
456 drop_on_completion,
457 id_provider,
458 methods,
459 local_socket_stream,
460 } = params;
461
462 let ipc = IpcConn(tokio_util::codec::Decoder::framed(
463 StreamCodec::stream_incoming(),
464 local_socket_stream,
465 ));
466
467 let (tx, rx) = mpsc::channel::<String>(server_cfg.message_buffer_capacity as usize);
468 let method_sink = MethodSink::new_with_limit(tx, server_cfg.max_response_body_size);
469 let tower_service = TowerServiceNoHttp {
470 inner: ServiceData {
471 methods,
472 id_provider,
473 stop_handle: stop_handle.clone(),
474 server_cfg: server_cfg.clone(),
475 conn_id,
476 conn_permit,
477 bounded_subscriptions: BoundedSubscriptions::new(
478 server_cfg.max_subscriptions_per_connection,
479 ),
480 method_sink,
481 },
482 rpc_middleware,
483 };
484
485 let service = http_middleware.service(tower_service);
486 tokio::spawn(async {
487 to_ipc_service(ipc, service, stop_handle, rx).in_current_span().await;
488 drop(drop_on_completion)
489 });
490}
491
492async fn to_ipc_service<S, T>(
493 ipc: IpcConn<JsonRpcStream<T>>,
494 service: S,
495 stop_handle: StopHandle,
496 rx: mpsc::Receiver<String>,
497) where
498 S: Service<String, Response = Option<String>> + Send + 'static,
499 S::Error: Into<Box<dyn core::error::Error + Send + Sync>>,
500 S::Future: Send + Unpin,
501 T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
502{
503 let rx_item = ReceiverStream::new(rx);
504 let conn = IpcConnDriver {
505 conn: ipc,
506 service,
507 pending_calls: Default::default(),
508 items: Default::default(),
509 };
510 let stopped = stop_handle.shutdown();
511
512 let mut conn = pin!(conn);
513 let mut rx_item = pin!(rx_item);
514 let mut stopped = pin!(stopped);
515
516 loop {
517 tokio::select! {
518 _ = &mut conn => {
519 break
520 }
521 item = rx_item.next() => {
522 if let Some(item) = item {
523 conn.push_back(item);
524 }
525 }
526 _ = &mut stopped => {
527 break
529 }
530 }
531 }
532}
533
534#[derive(Debug, Clone)]
536pub struct Settings {
537 max_request_body_size: u32,
539 max_response_body_size: u32,
541 max_log_length: u32,
545 max_connections: u32,
547 max_subscriptions_per_connection: u32,
549 message_buffer_capacity: u32,
551 tokio_runtime: Option<tokio::runtime::Handle>,
553}
554
555impl Default for Settings {
556 fn default() -> Self {
557 Self {
558 max_request_body_size: TEN_MB_SIZE_BYTES,
559 max_response_body_size: TEN_MB_SIZE_BYTES,
560 max_log_length: 4096,
561 max_connections: 100,
562 max_subscriptions_per_connection: 1024,
563 message_buffer_capacity: 1024,
564 tokio_runtime: None,
565 }
566 }
567}
568
569#[derive(Debug)]
571pub struct Builder<HttpMiddleware, RpcMiddleware> {
572 settings: Settings,
573 id_provider: Arc<dyn IdProvider>,
575 rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
576 http_middleware: tower::ServiceBuilder<HttpMiddleware>,
577}
578
579impl Default for Builder<Identity, Identity> {
580 fn default() -> Self {
581 Self {
582 settings: Settings::default(),
583 id_provider: Arc::new(RandomIntegerIdProvider),
584 rpc_middleware: RpcServiceBuilder::new(),
585 http_middleware: tower::ServiceBuilder::new(),
586 }
587 }
588}
589
590impl<HttpMiddleware, RpcMiddleware> Builder<HttpMiddleware, RpcMiddleware> {
591 pub const fn max_request_body_size(mut self, size: u32) -> Self {
593 self.settings.max_request_body_size = size;
594 self
595 }
596
597 pub const fn max_response_body_size(mut self, size: u32) -> Self {
599 self.settings.max_response_body_size = size;
600 self
601 }
602
603 pub const fn max_log_length(mut self, size: u32) -> Self {
605 self.settings.max_log_length = size;
606 self
607 }
608
609 pub const fn max_connections(mut self, max: u32) -> Self {
611 self.settings.max_connections = max;
612 self
613 }
614
615 pub const fn max_subscriptions_per_connection(mut self, max: u32) -> Self {
617 self.settings.max_subscriptions_per_connection = max;
618 self
619 }
620
621 pub const fn set_message_buffer_capacity(mut self, c: u32) -> Self {
639 self.settings.message_buffer_capacity = c;
640 self
641 }
642
643 pub fn custom_tokio_runtime(mut self, rt: tokio::runtime::Handle) -> Self {
647 self.settings.tokio_runtime = Some(rt);
648 self
649 }
650
651 pub fn set_id_provider<I: IdProvider + 'static>(mut self, id_provider: I) -> Self {
672 self.id_provider = Arc::new(id_provider);
673 self
674 }
675
676 pub fn set_http_middleware<T>(
693 self,
694 service_builder: tower::ServiceBuilder<T>,
695 ) -> Builder<T, RpcMiddleware> {
696 Builder {
697 settings: self.settings,
698 id_provider: self.id_provider,
699 http_middleware: service_builder,
700 rpc_middleware: self.rpc_middleware,
701 }
702 }
703
704 pub fn set_rpc_middleware<T>(
769 self,
770 rpc_middleware: RpcServiceBuilder<T>,
771 ) -> Builder<HttpMiddleware, T> {
772 Builder {
773 settings: self.settings,
774 id_provider: self.id_provider,
775 rpc_middleware,
776 http_middleware: self.http_middleware,
777 }
778 }
779
780 pub fn build(self, endpoint: String) -> IpcServer<HttpMiddleware, RpcMiddleware> {
782 IpcServer {
783 endpoint,
784 cfg: self.settings,
785 id_provider: self.id_provider,
786 http_middleware: self.http_middleware,
787 rpc_middleware: self.rpc_middleware,
788 }
789 }
790}
791
792#[cfg(test)]
793#[allow(missing_docs)]
794pub fn dummy_name() -> String {
795 let num: u64 = rand::Rng::gen(&mut rand::thread_rng());
796 if cfg!(windows) {
797 format!(r"\\.\pipe\my-pipe-{}", num)
798 } else {
799 format!(r"/tmp/my-uds-{}", num)
800 }
801}
802
803#[cfg(test)]
804mod tests {
805 use super::*;
806 use crate::client::IpcClientBuilder;
807 use futures::future::select;
808 use jsonrpsee::{
809 core::{
810 client,
811 client::{ClientT, Error, Subscription, SubscriptionClientT},
812 params::BatchRequestBuilder,
813 },
814 rpc_params,
815 types::Request,
816 PendingSubscriptionSink, RpcModule, SubscriptionMessage,
817 };
818 use reth_tracing::init_test_tracing;
819 use std::pin::pin;
820 use tokio::sync::broadcast;
821 use tokio_stream::wrappers::BroadcastStream;
822
823 async fn pipe_from_stream_with_bounded_buffer(
824 pending: PendingSubscriptionSink,
825 stream: BroadcastStream<usize>,
826 ) -> Result<(), Box<dyn core::error::Error + Send + Sync>> {
827 let sink = pending.accept().await.unwrap();
828 let closed = sink.closed();
829
830 let mut closed = pin!(closed);
831 let mut stream = pin!(stream);
832
833 loop {
834 match select(closed, stream.next()).await {
835 Either::Left((_, _)) | Either::Right((None, _)) => break Ok(()),
837
838 Either::Right((Some(Ok(item)), c)) => {
840 let notif = SubscriptionMessage::from_json(&item)?;
841
842 if sink.send(notif).await.is_err() {
846 break Ok(());
847 }
848
849 closed = c;
850 }
851
852 Either::Right((Some(Err(e)), _)) => break Err(e.into()),
854 }
855 }
856 }
857
858 fn produce_items(tx: broadcast::Sender<usize>) {
860 for c in 1..=100 {
861 std::thread::sleep(std::time::Duration::from_millis(1));
862 let _ = tx.send(c);
863 }
864 }
865
866 #[tokio::test]
867 async fn can_set_the_max_response_body_size() {
868 let endpoint = &dummy_name();
870 let server = Builder::default().max_response_body_size(100).build(endpoint.clone());
871 let mut module = RpcModule::new(());
872 module.register_method("anything", |_, _, _| "a".repeat(101)).unwrap();
873 let handle = server.start(module).await.unwrap();
874 tokio::spawn(handle.stopped());
875
876 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
877 let response: Result<String, Error> = client.request("anything", rpc_params![]).await;
878 assert!(response.unwrap_err().to_string().contains("Exceeded max limit of"));
879 }
880
881 #[tokio::test]
882 async fn can_set_the_max_request_body_size() {
883 init_test_tracing();
884 let endpoint = &dummy_name();
885 let server = Builder::default().max_request_body_size(100).build(endpoint.clone());
886 let mut module = RpcModule::new(());
887 module.register_method("anything", |_, _, _| "succeed").unwrap();
888 let handle = server.start(module).await.unwrap();
889 tokio::spawn(handle.stopped());
890
891 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
892 let response: Result<String, Error> =
893 client.request("anything", rpc_params!["a".repeat(101)]).await;
894 assert!(response.is_err());
895 let mut batch_request_builder = BatchRequestBuilder::new();
896 let _ = batch_request_builder.insert("anything", rpc_params![]);
897 let _ = batch_request_builder.insert("anything", rpc_params![]);
898 let _ = batch_request_builder.insert("anything", rpc_params![]);
899 let response: Result<client::BatchResponse<'_, String>, Error> =
904 client.batch_request(batch_request_builder).await;
905 assert!(response.is_err());
906 }
907
908 #[tokio::test]
909 async fn can_set_max_connections() {
910 init_test_tracing();
911
912 let endpoint = &dummy_name();
913 let server = Builder::default().max_connections(2).build(endpoint.clone());
914 let mut module = RpcModule::new(());
915 module.register_method("anything", |_, _, _| "succeed").unwrap();
916 let handle = server.start(module).await.unwrap();
917 tokio::spawn(handle.stopped());
918
919 let client1 = IpcClientBuilder::default().build(endpoint).await.unwrap();
920 let client2 = IpcClientBuilder::default().build(endpoint).await.unwrap();
921 let client3 = IpcClientBuilder::default().build(endpoint).await.unwrap();
922
923 let response1: Result<String, Error> = client1.request("anything", rpc_params![]).await;
924 let response2: Result<String, Error> = client2.request("anything", rpc_params![]).await;
925 let response3: Result<String, Error> = client3.request("anything", rpc_params![]).await;
926
927 assert!(response1.is_ok());
928 assert!(response2.is_ok());
929 assert!(response3.is_err());
931
932 drop(client2);
934 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
935
936 let client4 = IpcClientBuilder::default().build(endpoint).await.unwrap();
938 let response4: Result<String, Error> = client4.request("anything", rpc_params![]).await;
939 assert!(response4.is_ok());
940 }
941
942 #[tokio::test]
943 async fn test_rpc_request() {
944 init_test_tracing();
945 let endpoint = &dummy_name();
946 let server = Builder::default().build(endpoint.clone());
947 let mut module = RpcModule::new(());
948 let msg = r#"{"jsonrpc":"2.0","id":83,"result":"0x7a69"}"#;
949 module.register_method("eth_chainId", move |_, _, _| msg).unwrap();
950 let handle = server.start(module).await.unwrap();
951 tokio::spawn(handle.stopped());
952
953 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
954 let response: String = client.request("eth_chainId", rpc_params![]).await.unwrap();
955 assert_eq!(response, msg);
956 }
957
958 #[tokio::test]
959 async fn test_batch_request() {
960 let endpoint = &dummy_name();
961 let server = Builder::default().build(endpoint.clone());
962 let mut module = RpcModule::new(());
963 module.register_method("anything", |_, _, _| "ok").unwrap();
964 let handle = server.start(module).await.unwrap();
965 tokio::spawn(handle.stopped());
966
967 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
968 let mut batch_request_builder = BatchRequestBuilder::new();
969 let _ = batch_request_builder.insert("anything", rpc_params![]);
970 let _ = batch_request_builder.insert("anything", rpc_params![]);
971 let _ = batch_request_builder.insert("anything", rpc_params![]);
972 let result = client
973 .batch_request(batch_request_builder)
974 .await
975 .unwrap()
976 .into_ok()
977 .unwrap()
978 .collect::<Vec<String>>();
979 assert_eq!(result, vec!["ok", "ok", "ok"]);
980 }
981
982 #[tokio::test]
983 async fn test_ipc_modules() {
984 reth_tracing::init_test_tracing();
985 let endpoint = &dummy_name();
986 let server = Builder::default().build(endpoint.clone());
987 let mut module = RpcModule::new(());
988 let msg = r#"{"admin":"1.0","debug":"1.0","engine":"1.0","eth":"1.0","ethash":"1.0","miner":"1.0","net":"1.0","rpc":"1.0","txpool":"1.0","web3":"1.0"}"#;
989 module.register_method("rpc_modules", move |_, _, _| msg).unwrap();
990 let handle = server.start(module).await.unwrap();
991 tokio::spawn(handle.stopped());
992
993 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
994 let response: String = client.request("rpc_modules", rpc_params![]).await.unwrap();
995 assert_eq!(response, msg);
996 }
997
998 #[tokio::test(flavor = "multi_thread")]
999 async fn test_rpc_subscription() {
1000 let endpoint = &dummy_name();
1001 let server = Builder::default().build(endpoint.clone());
1002 let (tx, _rx) = broadcast::channel::<usize>(16);
1003
1004 let mut module = RpcModule::new(tx.clone());
1005 std::thread::spawn(move || produce_items(tx));
1006
1007 module
1008 .register_subscription(
1009 "subscribe_hello",
1010 "s_hello",
1011 "unsubscribe_hello",
1012 |_, pending, tx, _| async move {
1013 let rx = tx.subscribe();
1014 let stream = BroadcastStream::new(rx);
1015 pipe_from_stream_with_bounded_buffer(pending, stream).await?;
1016 Ok(())
1017 },
1018 )
1019 .unwrap();
1020
1021 let handle = server.start(module).await.unwrap();
1022 tokio::spawn(handle.stopped());
1023
1024 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
1025 let sub: Subscription<usize> =
1026 client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap();
1027
1028 let items = sub.take(16).collect::<Vec<_>>().await;
1029 assert_eq!(items.len(), 16);
1030 }
1031
1032 #[tokio::test]
1033 async fn test_rpc_middleware() {
1034 #[derive(Clone)]
1035 struct ModifyRequestIf<S>(S);
1036
1037 impl<'a, S> RpcServiceT<'a> for ModifyRequestIf<S>
1038 where
1039 S: Send + Sync + RpcServiceT<'a>,
1040 {
1041 type Future = S::Future;
1042
1043 fn call(&self, mut req: Request<'a>) -> Self::Future {
1044 if req.method == "say_hello" {
1046 req.method = "say_goodbye".into();
1047 } else if req.method == "say_goodbye" {
1048 req.method = "say_hello".into();
1049 }
1050
1051 self.0.call(req)
1052 }
1053 }
1054
1055 reth_tracing::init_test_tracing();
1056 let endpoint = &dummy_name();
1057
1058 let rpc_middleware = RpcServiceBuilder::new().layer_fn(ModifyRequestIf);
1059 let server = Builder::default().set_rpc_middleware(rpc_middleware).build(endpoint.clone());
1060
1061 let mut module = RpcModule::new(());
1062 let goodbye_msg = r#"{"jsonrpc":"2.0","id":1,"result":"goodbye"}"#;
1063 let hello_msg = r#"{"jsonrpc":"2.0","id":2,"result":"hello"}"#;
1064 module.register_method("say_hello", move |_, _, _| hello_msg).unwrap();
1065 module.register_method("say_goodbye", move |_, _, _| goodbye_msg).unwrap();
1066 let handle = server.start(module).await.unwrap();
1067 tokio::spawn(handle.stopped());
1068
1069 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
1070 let say_hello_response: String = client.request("say_hello", rpc_params![]).await.unwrap();
1071 let say_goodbye_response: String =
1072 client.request("say_goodbye", rpc_params![]).await.unwrap();
1073
1074 assert_eq!(say_hello_response, goodbye_msg);
1075 assert_eq!(say_goodbye_response, hello_msg);
1076 }
1077}