1#![doc(
15 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
16 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
17 issue_tracker_base_url = "https://github.com/SeismicSystems/seismic-reth/issues/"
18)]
19#![cfg_attr(not(test), warn(unused_crate_dependencies))]
20#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
21
22use crate::{auth::AuthRpcModule, error::WsHttpSamePortError, metrics::RpcRequestMetrics};
23use alloy_provider::{fillers::RecommendedFillers, Provider, ProviderBuilder};
24use core::marker::PhantomData;
25use error::{ConflictingModules, RpcError, ServerKind};
26use http::{header::AUTHORIZATION, HeaderMap};
27use jsonrpsee::{
28 core::RegisterMethodError,
29 server::{
30 middleware::rpc::{RpcService, RpcServiceBuilder, RpcServiceT},
31 AlreadyStoppedError, IdProvider, ServerHandle,
32 },
33 MethodResponse, Methods, RpcModule,
34};
35use reth_chainspec::{ChainSpecProvider, EthereumHardforks};
36use reth_consensus::{ConsensusError, FullConsensus};
37use reth_evm::ConfigureEvm;
38use reth_network_api::{noop::NoopNetwork, NetworkInfo, Peers};
39use reth_primitives_traits::NodePrimitives;
40use reth_rpc::{
41 AdminApi, DebugApi, EngineEthApi, EthApi, EthApiBuilder, EthBundle, MinerApi, NetApi,
42 OtterscanApi, RPCApi, RethApi, TraceApi, TxPoolApi, ValidationApiConfig, Web3Api,
43};
44use reth_rpc_api::servers::*;
45use reth_rpc_eth_api::{
46 helpers::{Call, EthApiSpec, EthTransactions, LoadPendingBlock, TraceExt},
47 EthApiServer, EthApiTypes, FullEthApiServer, RpcBlock, RpcHeader, RpcReceipt, RpcTransaction,
48};
49use reth_rpc_eth_types::{EthConfig, EthSubscriptionIdProvider};
50use reth_rpc_layer::{AuthLayer, Claims, CompressionLayer, JwtAuthValidator, JwtSecret};
51use reth_storage_api::{
52 AccountReader, BlockReader, BlockReaderIdExt, ChangeSetReader, FullRpcProvider, ProviderBlock,
53 StateProviderFactory,
54};
55use reth_tasks::{pool::BlockingTaskGuard, TaskSpawner, TokioTaskExecutor};
56use reth_transaction_pool::{noop::NoopTransactionPool, PoolTransaction, TransactionPool};
57use serde::{Deserialize, Serialize};
58use std::{
59 collections::HashMap,
60 fmt::Debug,
61 net::{Ipv4Addr, SocketAddr, SocketAddrV4},
62 sync::Arc,
63 time::{Duration, SystemTime, UNIX_EPOCH},
64};
65use tower::Layer;
66use tower_http::cors::CorsLayer;
67
68pub use cors::CorsDomainError;
69
70pub use jsonrpsee::server::ServerBuilder;
72use jsonrpsee::server::ServerConfigBuilder;
73pub use reth_ipc::server::{
74 Builder as IpcServerBuilder, RpcServiceBuilder as IpcRpcServiceBuilder,
75};
76pub use reth_rpc_server_types::{constants, RethRpcModule, RpcModuleSelection};
77pub use tower::layer::util::{Identity, Stack};
78
79pub mod auth;
81
82pub mod config;
84
85mod cors;
87
88pub mod error;
90
91pub mod eth;
93pub use eth::EthHandlers;
94
95mod metrics;
97pub use metrics::{MeteredRequestFuture, RpcRequestMetricsService};
98use reth_chain_state::CanonStateSubscriptions;
99use reth_rpc::eth::sim_bundle::EthSimBundle;
100
101pub mod rate_limiter;
103
104#[expect(clippy::too_many_arguments)]
106pub async fn launch<N, Provider, Pool, Network, EvmConfig, EthApi>(
107 provider: Provider,
108 pool: Pool,
109 network: Network,
110 module_config: impl Into<TransportRpcModuleConfig>,
111 server_config: impl Into<RpcServerConfig>,
112 executor: Box<dyn TaskSpawner + 'static>,
113 evm_config: EvmConfig,
114 eth: EthApi,
115 consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
116) -> Result<RpcServerHandle, RpcError>
117where
118 N: NodePrimitives,
119 Provider: FullRpcProvider<Block = N::Block, Receipt = N::Receipt, Header = N::BlockHeader>
120 + CanonStateSubscriptions<Primitives = N>
121 + AccountReader
122 + ChangeSetReader,
123 Pool: TransactionPool + 'static,
124 Network: NetworkInfo + Peers + Clone + 'static,
125 EvmConfig: ConfigureEvm<Primitives = N> + 'static,
126 EthApi: FullEthApiServer<Provider = Provider, Pool = Pool>,
127{
128 let module_config = module_config.into();
129 server_config
130 .into()
131 .start(
132 &RpcModuleBuilder::new(provider, pool, network, executor, evm_config, consensus)
133 .build(module_config, eth),
134 )
135 .await
136}
137
138#[derive(Debug, Clone)]
142pub struct RpcModuleBuilder<N, Provider, Pool, Network, EvmConfig, Consensus> {
143 provider: Provider,
145 pool: Pool,
147 network: Network,
149 executor: Box<dyn TaskSpawner + 'static>,
151 evm_config: EvmConfig,
153 consensus: Consensus,
155 _primitives: PhantomData<N>,
157}
158
159impl<N, Provider, Pool, Network, EvmConfig, Consensus>
162 RpcModuleBuilder<N, Provider, Pool, Network, EvmConfig, Consensus>
163where
164 N: NodePrimitives,
165 EvmConfig: Clone,
166{
167 pub const fn new(
169 provider: Provider,
170 pool: Pool,
171 network: Network,
172 executor: Box<dyn TaskSpawner + 'static>,
173 evm_config: EvmConfig,
174 consensus: Consensus,
175 ) -> Self {
176 Self { provider, pool, network, executor, evm_config, consensus, _primitives: PhantomData }
177 }
178
179 pub fn with_provider<P>(
181 self,
182 provider: P,
183 ) -> RpcModuleBuilder<N, P, Pool, Network, EvmConfig, Consensus>
184 where
185 P: BlockReader<Block = N::Block, Header = N::BlockHeader, Receipt = N::Receipt>
186 + StateProviderFactory
187 + 'static,
188 {
189 let Self { pool, network, executor, evm_config, consensus, _primitives, .. } = self;
190 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
191 }
192
193 pub fn with_pool<P>(
195 self,
196 pool: P,
197 ) -> RpcModuleBuilder<N, Provider, P, Network, EvmConfig, Consensus>
198 where
199 P: TransactionPool<Transaction: PoolTransaction<Consensus = N::SignedTx>> + 'static,
200 {
201 let Self { provider, network, executor, evm_config, consensus, _primitives, .. } = self;
202 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
203 }
204
205 pub fn with_noop_pool(
211 self,
212 ) -> RpcModuleBuilder<N, Provider, NoopTransactionPool, Network, EvmConfig, Consensus> {
213 let Self { provider, executor, network, evm_config, consensus, _primitives, .. } = self;
214 RpcModuleBuilder {
215 provider,
216 executor,
217 network,
218 evm_config,
219 pool: NoopTransactionPool::default(),
220 consensus,
221 _primitives,
222 }
223 }
224
225 pub fn with_network<Net>(
227 self,
228 network: Net,
229 ) -> RpcModuleBuilder<N, Provider, Pool, Net, EvmConfig, Consensus>
230 where
231 Net: NetworkInfo + Peers + 'static,
232 {
233 let Self { provider, pool, executor, evm_config, consensus, _primitives, .. } = self;
234 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
235 }
236
237 pub fn with_noop_network(
243 self,
244 ) -> RpcModuleBuilder<N, Provider, Pool, NoopNetwork, EvmConfig, Consensus> {
245 let Self { provider, pool, executor, evm_config, consensus, _primitives, .. } = self;
246 RpcModuleBuilder {
247 provider,
248 pool,
249 executor,
250 network: NoopNetwork::default(),
251 evm_config,
252 consensus,
253 _primitives,
254 }
255 }
256
257 pub fn with_executor(self, executor: Box<dyn TaskSpawner + 'static>) -> Self {
259 let Self { pool, network, provider, evm_config, consensus, _primitives, .. } = self;
260 Self { provider, network, pool, executor, evm_config, consensus, _primitives }
261 }
262
263 pub fn with_tokio_executor(self) -> Self {
268 let Self { pool, network, provider, evm_config, consensus, _primitives, .. } = self;
269 Self {
270 provider,
271 network,
272 pool,
273 executor: Box::new(TokioTaskExecutor::default()),
274 evm_config,
275 consensus,
276 _primitives,
277 }
278 }
279
280 pub fn with_evm_config<E>(
282 self,
283 evm_config: E,
284 ) -> RpcModuleBuilder<N, Provider, Pool, Network, E, Consensus>
285 where
286 EvmConfig: 'static,
287 E: ConfigureEvm + Clone,
288 {
289 let Self { provider, pool, executor, network, consensus, _primitives, .. } = self;
290 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
291 }
292
293 pub fn with_consensus<C>(
295 self,
296 consensus: C,
297 ) -> RpcModuleBuilder<N, Provider, Pool, Network, EvmConfig, C> {
298 let Self { provider, network, pool, executor, evm_config, _primitives, .. } = self;
299 RpcModuleBuilder { provider, network, pool, executor, evm_config, consensus, _primitives }
300 }
301
302 pub fn eth_api_builder(&self) -> EthApiBuilder<Provider, Pool, Network, EvmConfig>
304 where
305 Provider: BlockReaderIdExt + Clone,
306 Pool: Clone,
307 Network: Clone,
308 EvmConfig: Clone,
309 {
310 EthApiBuilder::new(
311 self.provider.clone(),
312 self.pool.clone(),
313 self.network.clone(),
314 self.evm_config.clone(),
315 )
316 }
317
318 pub fn bootstrap_eth_api(&self) -> EthApi<Provider, Pool, Network, EvmConfig>
324 where
325 Provider: BlockReaderIdExt<Block = N::Block, Header = N::BlockHeader, Receipt = N::Receipt>
326 + StateProviderFactory
327 + CanonStateSubscriptions<Primitives = N>
328 + ChainSpecProvider
329 + Clone
330 + Unpin
331 + 'static,
332 Pool: Clone,
333 EvmConfig: Clone,
334 Network: Clone,
335 {
336 self.eth_api_builder().build()
337 }
338}
339
340impl<N, Provider, Pool, Network, EvmConfig, Consensus>
341 RpcModuleBuilder<N, Provider, Pool, Network, EvmConfig, Consensus>
342where
343 N: NodePrimitives,
344 Provider: FullRpcProvider<Block = N::Block, Receipt = N::Receipt, Header = N::BlockHeader>
345 + CanonStateSubscriptions<Primitives = N>
346 + AccountReader
347 + ChangeSetReader,
348 Pool: TransactionPool + 'static,
349 Network: NetworkInfo + Peers + Clone + 'static,
350 EvmConfig: ConfigureEvm<Primitives = N> + 'static,
351 Consensus: FullConsensus<N, Error = ConsensusError> + Clone + 'static,
352{
353 pub fn build_with_auth_server<EthApi>(
360 self,
361 module_config: TransportRpcModuleConfig,
362 engine: impl IntoEngineApiRpcModule,
363 eth: EthApi,
364 ) -> (
365 TransportRpcModules,
366 AuthRpcModule,
367 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>,
368 )
369 where
370 EthApi: FullEthApiServer<Provider = Provider, Pool = Pool>,
371 {
372 let Self { provider, pool, network, executor, consensus, evm_config, .. } = self;
373
374 let config = module_config.config.clone().unwrap_or_default();
375
376 let mut registry = RpcRegistryInner::new(
377 provider, pool, network, executor, consensus, config, evm_config, eth,
378 );
379
380 let modules = registry.create_transport_rpc_modules(module_config);
381
382 let auth_module = registry.create_auth_module(engine);
383
384 (modules, auth_module, registry)
385 }
386
387 pub fn into_registry<EthApi>(
392 self,
393 config: RpcModuleConfig,
394 eth: EthApi,
395 ) -> RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
396 where
397 EthApi: EthApiTypes + 'static,
398 {
399 let Self { provider, pool, network, executor, consensus, evm_config, .. } = self;
400 RpcRegistryInner::new(provider, pool, network, executor, consensus, config, evm_config, eth)
401 }
402
403 pub fn build<EthApi>(
406 self,
407 module_config: TransportRpcModuleConfig,
408 eth: EthApi,
409 ) -> TransportRpcModules<()>
410 where
411 EthApi: FullEthApiServer<Provider = Provider, Pool = Pool>,
412 {
413 let mut modules = TransportRpcModules::default();
414
415 let Self { provider, pool, network, executor, consensus, evm_config, .. } = self;
416
417 if !module_config.is_empty() {
418 let TransportRpcModuleConfig { http, ws, ipc, config } = module_config.clone();
419
420 let mut registry = RpcRegistryInner::new(
421 provider,
422 pool,
423 network,
424 executor,
425 consensus,
426 config.unwrap_or_default(),
427 evm_config,
428 eth,
429 );
430
431 modules.config = module_config;
432 modules.http = registry.maybe_module(http.as_ref());
433 modules.ws = registry.maybe_module(ws.as_ref());
434 modules.ipc = registry.maybe_module(ipc.as_ref());
435 }
436
437 modules
438 }
439}
440
441impl<N: NodePrimitives> Default for RpcModuleBuilder<N, (), (), (), (), ()> {
442 fn default() -> Self {
443 Self::new((), (), (), Box::new(TokioTaskExecutor::default()), (), ())
444 }
445}
446
447#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)]
449pub struct RpcModuleConfig {
450 eth: EthConfig,
452 flashbots: ValidationApiConfig,
454}
455
456impl RpcModuleConfig {
459 pub fn builder() -> RpcModuleConfigBuilder {
461 RpcModuleConfigBuilder::default()
462 }
463
464 pub const fn new(eth: EthConfig, flashbots: ValidationApiConfig) -> Self {
466 Self { eth, flashbots }
467 }
468
469 pub const fn eth(&self) -> &EthConfig {
471 &self.eth
472 }
473
474 pub const fn eth_mut(&mut self) -> &mut EthConfig {
476 &mut self.eth
477 }
478}
479
480#[derive(Clone, Debug, Default)]
482pub struct RpcModuleConfigBuilder {
483 eth: Option<EthConfig>,
484 flashbots: Option<ValidationApiConfig>,
485}
486
487impl RpcModuleConfigBuilder {
490 pub const fn eth(mut self, eth: EthConfig) -> Self {
492 self.eth = Some(eth);
493 self
494 }
495
496 pub fn flashbots(mut self, flashbots: ValidationApiConfig) -> Self {
498 self.flashbots = Some(flashbots);
499 self
500 }
501
502 pub fn build(self) -> RpcModuleConfig {
504 let Self { eth, flashbots } = self;
505 RpcModuleConfig { eth: eth.unwrap_or_default(), flashbots: flashbots.unwrap_or_default() }
506 }
507
508 pub const fn get_eth(&self) -> Option<&EthConfig> {
510 self.eth.as_ref()
511 }
512
513 pub const fn eth_mut(&mut self) -> &mut Option<EthConfig> {
515 &mut self.eth
516 }
517
518 pub fn eth_mut_or_default(&mut self) -> &mut EthConfig {
520 self.eth.get_or_insert_with(EthConfig::default)
521 }
522}
523
524#[derive(Debug, Clone)]
526#[expect(dead_code)] pub struct RpcRegistryInner<
528 Provider: BlockReader,
529 Pool,
530 Network,
531 EthApi: EthApiTypes,
532 EvmConfig,
533 Consensus,
534> {
535 provider: Provider,
536 pool: Pool,
537 network: Network,
538 executor: Box<dyn TaskSpawner + 'static>,
539 evm_config: EvmConfig,
540 consensus: Consensus,
541 eth: EthHandlers<EthApi>,
543 blocking_pool_guard: BlockingTaskGuard,
545 modules: HashMap<RethRpcModule, Methods>,
547 eth_config: EthConfig,
549}
550
551impl<N, Provider, Pool, Network, EthApi, EvmConfig, Consensus>
554 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
555where
556 N: NodePrimitives,
557 Provider: StateProviderFactory
558 + CanonStateSubscriptions<Primitives = N>
559 + BlockReader<Block = N::Block, Receipt = N::Receipt>
560 + Clone
561 + Unpin
562 + 'static,
563 Pool: Send + Sync + Clone + 'static,
564 Network: Clone + 'static,
565 EthApi: EthApiTypes + 'static,
566 EvmConfig: ConfigureEvm<Primitives = N>,
567{
568 #[expect(clippy::too_many_arguments)]
570 pub fn new(
571 provider: Provider,
572 pool: Pool,
573 network: Network,
574 executor: Box<dyn TaskSpawner + 'static>,
575 consensus: Consensus,
576 config: RpcModuleConfig,
577 evm_config: EvmConfig,
578 eth_api: EthApi,
579 ) -> Self
580 where
581 EvmConfig: ConfigureEvm<Primitives = N>,
582 {
583 let blocking_pool_guard = BlockingTaskGuard::new(config.eth.max_tracing_requests);
584
585 let eth = EthHandlers::bootstrap(config.eth, executor.clone(), eth_api);
586
587 Self {
588 provider,
589 pool,
590 network,
591 eth,
592 executor,
593 consensus,
594 modules: Default::default(),
595 blocking_pool_guard,
596 eth_config: config.eth,
597 evm_config,
598 }
599 }
600}
601
602impl<Provider, Pool, Network, EthApi, BlockExecutor, Consensus>
603 RpcRegistryInner<Provider, Pool, Network, EthApi, BlockExecutor, Consensus>
604where
605 Provider: BlockReader,
606 EthApi: EthApiTypes,
607{
608 pub const fn eth_api(&self) -> &EthApi {
610 &self.eth.api
611 }
612
613 pub const fn eth_handlers(&self) -> &EthHandlers<EthApi> {
615 &self.eth
616 }
617
618 pub const fn pool(&self) -> &Pool {
620 &self.pool
621 }
622
623 pub const fn tasks(&self) -> &(dyn TaskSpawner + 'static) {
625 &*self.executor
626 }
627
628 pub const fn provider(&self) -> &Provider {
630 &self.provider
631 }
632
633 pub fn methods(&self) -> Vec<Methods> {
635 self.modules.values().cloned().collect()
636 }
637
638 pub fn module(&self) -> RpcModule<()> {
640 let mut module = RpcModule::new(());
641 for methods in self.modules.values().cloned() {
642 module.merge(methods).expect("No conflicts");
643 }
644 module
645 }
646}
647
648impl<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
649 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
650where
651 Network: NetworkInfo + Clone + 'static,
652 EthApi: EthApiTypes,
653 Provider: BlockReader + ChainSpecProvider<ChainSpec: EthereumHardforks>,
654 EvmConfig: ConfigureEvm,
655{
656 pub fn admin_api(&self) -> AdminApi<Network, Provider::ChainSpec>
658 where
659 Network: Peers,
660 {
661 AdminApi::new(self.network.clone(), self.provider.chain_spec())
662 }
663
664 pub fn web3_api(&self) -> Web3Api<Network> {
666 Web3Api::new(self.network.clone())
667 }
668
669 pub fn register_admin(&mut self) -> &mut Self
671 where
672 Network: Peers,
673 {
674 let adminapi = self.admin_api();
675 self.modules.insert(RethRpcModule::Admin, adminapi.into_rpc().into());
676 self
677 }
678
679 pub fn register_web3(&mut self) -> &mut Self {
681 let web3api = self.web3_api();
682 self.modules.insert(RethRpcModule::Web3, web3api.into_rpc().into());
683 self
684 }
685}
686
687impl<N, Provider, Pool, Network, EthApi, EvmConfig, Consensus>
688 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
689where
690 N: NodePrimitives,
691 Provider: FullRpcProvider<
692 Header = N::BlockHeader,
693 Block = N::Block,
694 Receipt = N::Receipt,
695 Transaction = N::SignedTx,
696 > + AccountReader
697 + ChangeSetReader,
698 Network: NetworkInfo + Peers + Clone + 'static,
699 EthApi: EthApiServer<
700 RpcTransaction<EthApi::NetworkTypes>,
701 RpcBlock<EthApi::NetworkTypes>,
702 RpcReceipt<EthApi::NetworkTypes>,
703 RpcHeader<EthApi::NetworkTypes>,
704 > + EthApiTypes,
705 EvmConfig: ConfigureEvm<Primitives = N> + 'static,
706{
707 pub fn register_eth(&mut self) -> &mut Self {
713 let eth_api = self.eth_api().clone();
714 self.modules.insert(RethRpcModule::Eth, eth_api.into_rpc().into());
715 self
716 }
717
718 pub fn register_ots(&mut self) -> &mut Self
724 where
725 EthApi: TraceExt + EthTransactions,
726 {
727 let otterscan_api = self.otterscan_api();
728 self.modules.insert(RethRpcModule::Ots, otterscan_api.into_rpc().into());
729 self
730 }
731
732 pub fn register_debug(&mut self) -> &mut Self
738 where
739 EthApi: EthApiSpec + EthTransactions + TraceExt,
740 EvmConfig::Primitives: NodePrimitives<Block = ProviderBlock<EthApi::Provider>>,
741 {
742 let debug_api = self.debug_api();
743 self.modules.insert(RethRpcModule::Debug, debug_api.into_rpc().into());
744 self
745 }
746
747 pub fn register_trace(&mut self) -> &mut Self
753 where
754 EthApi: TraceExt,
755 {
756 let trace_api = self.trace_api();
757 self.modules.insert(RethRpcModule::Trace, trace_api.into_rpc().into());
758 self
759 }
760
761 pub fn register_net(&mut self) -> &mut Self
769 where
770 EthApi: EthApiSpec + 'static,
771 {
772 let netapi = self.net_api();
773 self.modules.insert(RethRpcModule::Net, netapi.into_rpc().into());
774 self
775 }
776
777 pub fn register_reth(&mut self) -> &mut Self {
785 let rethapi = self.reth_api();
786 self.modules.insert(RethRpcModule::Reth, rethapi.into_rpc().into());
787 self
788 }
789
790 pub fn otterscan_api(&self) -> OtterscanApi<EthApi> {
796 let eth_api = self.eth_api().clone();
797 OtterscanApi::new(eth_api)
798 }
799}
800
801impl<N, Provider, Pool, Network, EthApi, EvmConfig, Consensus>
802 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
803where
804 N: NodePrimitives,
805 Provider: FullRpcProvider<
806 Block = N::Block,
807 Header = N::BlockHeader,
808 Transaction = N::SignedTx,
809 Receipt = N::Receipt,
810 > + AccountReader
811 + ChangeSetReader,
812 Network: NetworkInfo + Peers + Clone + 'static,
813 EthApi: EthApiTypes,
814 EvmConfig: ConfigureEvm<Primitives = N>,
815{
816 pub fn trace_api(&self) -> TraceApi<EthApi>
822 where
823 EthApi: TraceExt,
824 {
825 TraceApi::new(self.eth_api().clone(), self.blocking_pool_guard.clone(), self.eth_config)
826 }
827
828 pub fn bundle_api(&self) -> EthBundle<EthApi>
834 where
835 EthApi: EthTransactions + LoadPendingBlock + Call,
836 {
837 let eth_api = self.eth_api().clone();
838 EthBundle::new(eth_api, self.blocking_pool_guard.clone())
839 }
840
841 pub fn debug_api(&self) -> DebugApi<EthApi, EvmConfig>
847 where
848 EthApi: EthApiSpec + EthTransactions + TraceExt,
849 EvmConfig::Primitives: NodePrimitives<Block = ProviderBlock<EthApi::Provider>>,
850 {
851 DebugApi::new(
852 self.eth_api().clone(),
853 self.blocking_pool_guard.clone(),
854 self.evm_config.clone(),
855 )
856 }
857
858 pub fn net_api(&self) -> NetApi<Network, EthApi>
864 where
865 EthApi: EthApiSpec + 'static,
866 {
867 let eth_api = self.eth_api().clone();
868 NetApi::new(self.network.clone(), eth_api)
869 }
870
871 pub fn reth_api(&self) -> RethApi<Provider> {
873 RethApi::new(self.provider.clone(), self.executor.clone())
874 }
875}
876
877impl<N, Provider, Pool, Network, EthApi, EvmConfig, Consensus>
878 RpcRegistryInner<Provider, Pool, Network, EthApi, EvmConfig, Consensus>
879where
880 N: NodePrimitives,
881 Provider: FullRpcProvider<Block = N::Block>
882 + CanonStateSubscriptions<Primitives = N>
883 + AccountReader
884 + ChangeSetReader,
885 Pool: TransactionPool + 'static,
886 Network: NetworkInfo + Peers + Clone + 'static,
887 EthApi: FullEthApiServer<Provider = Provider, Pool = Pool>,
888 EvmConfig: ConfigureEvm<Primitives = N> + 'static,
889 Consensus: FullConsensus<N, Error = ConsensusError> + Clone + 'static,
890{
891 pub fn create_auth_module(&self, engine_api: impl IntoEngineApiRpcModule) -> AuthRpcModule {
897 let mut module = engine_api.into_rpc_module();
898
899 let eth_handlers = self.eth_handlers();
901 let engine_eth = EngineEthApi::new(eth_handlers.api.clone(), eth_handlers.filter.clone());
902
903 module.merge(engine_eth.into_rpc()).expect("No conflicting methods");
904
905 AuthRpcModule { inner: module }
906 }
907
908 fn maybe_module(&mut self, config: Option<&RpcModuleSelection>) -> Option<RpcModule<()>> {
910 config.map(|config| self.module_for(config))
911 }
912
913 pub fn create_transport_rpc_modules(
917 &mut self,
918 config: TransportRpcModuleConfig,
919 ) -> TransportRpcModules<()> {
920 let mut modules = TransportRpcModules::default();
921 let http = self.maybe_module(config.http.as_ref());
922 let ws = self.maybe_module(config.ws.as_ref());
923 let ipc = self.maybe_module(config.ipc.as_ref());
924
925 modules.config = config;
926 modules.http = http;
927 modules.ws = ws;
928 modules.ipc = ipc;
929 modules
930 }
931
932 pub fn module_for(&mut self, config: &RpcModuleSelection) -> RpcModule<()> {
935 let mut module = RpcModule::new(());
936 let all_methods = self.reth_methods(config.iter_selection());
937 for methods in all_methods {
938 module.merge(methods).expect("No conflicts");
939 }
940 module
941 }
942
943 pub fn reth_methods(
952 &mut self,
953 namespaces: impl Iterator<Item = RethRpcModule>,
954 ) -> Vec<Methods> {
955 let EthHandlers { api: eth_api, filter: eth_filter, pubsub: eth_pubsub, .. } =
956 self.eth_handlers().clone();
957
958 let namespaces: Vec<_> = namespaces.collect();
960 namespaces
961 .iter()
962 .copied()
963 .map(|namespace| {
964 self.modules
965 .entry(namespace)
966 .or_insert_with(|| match namespace {
967 RethRpcModule::Admin => {
968 AdminApi::new(self.network.clone(), self.provider.chain_spec())
969 .into_rpc()
970 .into()
971 }
972 RethRpcModule::Debug => DebugApi::new(
973 eth_api.clone(),
974 self.blocking_pool_guard.clone(),
975 self.evm_config.clone(),
976 )
977 .into_rpc()
978 .into(),
979 RethRpcModule::Eth => {
980 let mut module = eth_api.clone().into_rpc();
982 module.merge(eth_filter.clone().into_rpc()).expect("No conflicts");
983 module.merge(eth_pubsub.clone().into_rpc()).expect("No conflicts");
984 module
985 .merge(
986 EthBundle::new(
987 eth_api.clone(),
988 self.blocking_pool_guard.clone(),
989 )
990 .into_rpc(),
991 )
992 .expect("No conflicts");
993
994 module.into()
995 }
996 RethRpcModule::Net => {
997 NetApi::new(self.network.clone(), eth_api.clone()).into_rpc().into()
998 }
999 RethRpcModule::Trace => TraceApi::new(
1000 eth_api.clone(),
1001 self.blocking_pool_guard.clone(),
1002 self.eth_config,
1003 )
1004 .into_rpc()
1005 .into(),
1006 RethRpcModule::Web3 => Web3Api::new(self.network.clone()).into_rpc().into(),
1007 RethRpcModule::Txpool => TxPoolApi::new(
1008 self.eth.api.pool().clone(),
1009 self.eth.api.tx_resp_builder().clone(),
1010 )
1011 .into_rpc()
1012 .into(),
1013 RethRpcModule::Rpc => RPCApi::new(
1014 namespaces
1015 .iter()
1016 .map(|module| (module.to_string(), "1.0".to_string()))
1017 .collect(),
1018 )
1019 .into_rpc()
1020 .into(),
1021 RethRpcModule::Ots => OtterscanApi::new(eth_api.clone()).into_rpc().into(),
1022 RethRpcModule::Reth => {
1023 RethApi::new(self.provider.clone(), self.executor.clone())
1024 .into_rpc()
1025 .into()
1026 }
1027 RethRpcModule::Flashbots => Default::default(),
1031 RethRpcModule::Miner => MinerApi::default().into_rpc().into(),
1032 RethRpcModule::Mev => {
1033 EthSimBundle::new(eth_api.clone(), self.blocking_pool_guard.clone())
1034 .into_rpc()
1035 .into()
1036 }
1037 })
1038 .clone()
1039 })
1040 .collect::<Vec<_>>()
1041 }
1042}
1043
1044#[derive(Debug)]
1056pub struct RpcServerConfig<RpcMiddleware = Identity> {
1057 http_server_config: Option<ServerConfigBuilder>,
1059 http_cors_domains: Option<String>,
1061 http_addr: Option<SocketAddr>,
1063 http_disable_compression: bool,
1065 ws_server_config: Option<ServerConfigBuilder>,
1067 ws_cors_domains: Option<String>,
1069 ws_addr: Option<SocketAddr>,
1071 ipc_server_config: Option<IpcServerBuilder<Identity, Identity>>,
1073 ipc_endpoint: Option<String>,
1075 jwt_secret: Option<JwtSecret>,
1077 rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
1079}
1080
1081impl Default for RpcServerConfig<Identity> {
1084 fn default() -> Self {
1086 Self {
1087 http_server_config: None,
1088 http_cors_domains: None,
1089 http_addr: None,
1090 http_disable_compression: false,
1091 ws_server_config: None,
1092 ws_cors_domains: None,
1093 ws_addr: None,
1094 ipc_server_config: None,
1095 ipc_endpoint: None,
1096 jwt_secret: None,
1097 rpc_middleware: RpcServiceBuilder::new(),
1098 }
1099 }
1100}
1101
1102impl RpcServerConfig {
1103 pub fn http(config: ServerConfigBuilder) -> Self {
1105 Self::default().with_http(config)
1106 }
1107
1108 pub fn ws(config: ServerConfigBuilder) -> Self {
1110 Self::default().with_ws(config)
1111 }
1112
1113 pub fn ipc(config: IpcServerBuilder<Identity, Identity>) -> Self {
1115 Self::default().with_ipc(config)
1116 }
1117
1118 pub fn with_http(mut self, config: ServerConfigBuilder) -> Self {
1123 self.http_server_config =
1124 Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
1125 self
1126 }
1127
1128 pub fn with_ws(mut self, config: ServerConfigBuilder) -> Self {
1133 self.ws_server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
1134 self
1135 }
1136
1137 pub fn with_ipc(mut self, config: IpcServerBuilder<Identity, Identity>) -> Self {
1142 self.ipc_server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
1143 self
1144 }
1145}
1146
1147impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
1148 pub fn set_rpc_middleware<T>(self, rpc_middleware: RpcServiceBuilder<T>) -> RpcServerConfig<T> {
1150 RpcServerConfig {
1151 http_server_config: self.http_server_config,
1152 http_cors_domains: self.http_cors_domains,
1153 http_addr: self.http_addr,
1154 http_disable_compression: self.http_disable_compression,
1155 ws_server_config: self.ws_server_config,
1156 ws_cors_domains: self.ws_cors_domains,
1157 ws_addr: self.ws_addr,
1158 ipc_server_config: self.ipc_server_config,
1159 ipc_endpoint: self.ipc_endpoint,
1160 jwt_secret: self.jwt_secret,
1161 rpc_middleware,
1162 }
1163 }
1164
1165 pub fn with_cors(self, cors_domain: Option<String>) -> Self {
1167 self.with_http_cors(cors_domain.clone()).with_ws_cors(cors_domain)
1168 }
1169
1170 pub fn with_ws_cors(mut self, cors_domain: Option<String>) -> Self {
1172 self.ws_cors_domains = cors_domain;
1173 self
1174 }
1175
1176 pub const fn with_http_disable_compression(mut self, http_disable_compression: bool) -> Self {
1178 self.http_disable_compression = http_disable_compression;
1179 self
1180 }
1181
1182 pub fn with_http_cors(mut self, cors_domain: Option<String>) -> Self {
1184 self.http_cors_domains = cors_domain;
1185 self
1186 }
1187
1188 pub const fn with_http_address(mut self, addr: SocketAddr) -> Self {
1193 self.http_addr = Some(addr);
1194 self
1195 }
1196
1197 pub const fn with_ws_address(mut self, addr: SocketAddr) -> Self {
1202 self.ws_addr = Some(addr);
1203 self
1204 }
1205
1206 pub fn with_id_provider<I>(mut self, id_provider: I) -> Self
1210 where
1211 I: IdProvider + Clone + 'static,
1212 {
1213 if let Some(config) = self.http_server_config {
1214 self.http_server_config = Some(config.set_id_provider(id_provider.clone()));
1215 }
1216 if let Some(config) = self.ws_server_config {
1217 self.ws_server_config = Some(config.set_id_provider(id_provider.clone()));
1218 }
1219 if let Some(ipc) = self.ipc_server_config {
1220 self.ipc_server_config = Some(ipc.set_id_provider(id_provider));
1221 }
1222
1223 self
1224 }
1225
1226 pub fn with_ipc_endpoint(mut self, path: impl Into<String>) -> Self {
1230 self.ipc_endpoint = Some(path.into());
1231 self
1232 }
1233
1234 pub const fn with_jwt_secret(mut self, secret: Option<JwtSecret>) -> Self {
1236 self.jwt_secret = secret;
1237 self
1238 }
1239
1240 pub const fn has_server(&self) -> bool {
1244 self.http_server_config.is_some() ||
1245 self.ws_server_config.is_some() ||
1246 self.ipc_server_config.is_some()
1247 }
1248
1249 pub const fn http_address(&self) -> Option<SocketAddr> {
1251 self.http_addr
1252 }
1253
1254 pub const fn ws_address(&self) -> Option<SocketAddr> {
1256 self.ws_addr
1257 }
1258
1259 pub fn ipc_endpoint(&self) -> Option<String> {
1261 self.ipc_endpoint.clone()
1262 }
1263
1264 fn maybe_cors_layer(cors: Option<String>) -> Result<Option<CorsLayer>, CorsDomainError> {
1266 cors.as_deref().map(cors::create_cors_layer).transpose()
1267 }
1268
1269 fn maybe_jwt_layer(jwt_secret: Option<JwtSecret>) -> Option<AuthLayer<JwtAuthValidator>> {
1271 jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret)))
1272 }
1273
1274 fn maybe_compression_layer(disable_compression: bool) -> Option<CompressionLayer> {
1277 if disable_compression {
1278 None
1279 } else {
1280 Some(CompressionLayer::new())
1281 }
1282 }
1283
1284 pub async fn start(self, modules: &TransportRpcModules) -> Result<RpcServerHandle, RpcError>
1290 where
1291 RpcMiddleware: Layer<RpcRequestMetricsService<RpcService>> + Clone + Send + 'static,
1292 for<'a> <RpcMiddleware as Layer<RpcRequestMetricsService<RpcService>>>::Service:
1293 Send
1294 + Sync
1295 + 'static
1296 + RpcServiceT<
1297 MethodResponse = MethodResponse,
1298 BatchResponse = MethodResponse,
1299 NotificationResponse = MethodResponse,
1300 >,
1301 {
1302 let mut http_handle = None;
1303 let mut ws_handle = None;
1304 let mut ipc_handle = None;
1305
1306 let http_socket_addr = self.http_addr.unwrap_or(SocketAddr::V4(SocketAddrV4::new(
1307 Ipv4Addr::LOCALHOST,
1308 constants::DEFAULT_HTTP_RPC_PORT,
1309 )));
1310
1311 let ws_socket_addr = self.ws_addr.unwrap_or(SocketAddr::V4(SocketAddrV4::new(
1312 Ipv4Addr::LOCALHOST,
1313 constants::DEFAULT_WS_RPC_PORT,
1314 )));
1315
1316 let metrics = modules.ipc.as_ref().map(RpcRequestMetrics::ipc).unwrap_or_default();
1317 let ipc_path =
1318 self.ipc_endpoint.clone().unwrap_or_else(|| constants::DEFAULT_IPC_ENDPOINT.into());
1319
1320 if let Some(builder) = self.ipc_server_config {
1321 let ipc = builder
1322 .set_rpc_middleware(IpcRpcServiceBuilder::new().layer(metrics))
1323 .build(ipc_path);
1324 ipc_handle = Some(ipc.start(modules.ipc.clone().expect("ipc server error")).await?);
1325 }
1326
1327 if self.http_addr == self.ws_addr &&
1329 self.http_server_config.is_some() &&
1330 self.ws_server_config.is_some()
1331 {
1332 let cors = match (self.ws_cors_domains.as_ref(), self.http_cors_domains.as_ref()) {
1333 (Some(ws_cors), Some(http_cors)) => {
1334 if ws_cors.trim() != http_cors.trim() {
1335 return Err(WsHttpSamePortError::ConflictingCorsDomains {
1336 http_cors_domains: Some(http_cors.clone()),
1337 ws_cors_domains: Some(ws_cors.clone()),
1338 }
1339 .into());
1340 }
1341 Some(ws_cors)
1342 }
1343 (a, b) => a.or(b),
1344 }
1345 .cloned();
1346
1347 modules.config.ensure_ws_http_identical()?;
1349
1350 if let Some(config) = self.http_server_config {
1351 let server = ServerBuilder::new()
1352 .set_http_middleware(
1353 tower::ServiceBuilder::new()
1354 .option_layer(Self::maybe_cors_layer(cors)?)
1355 .option_layer(Self::maybe_jwt_layer(self.jwt_secret))
1356 .option_layer(Self::maybe_compression_layer(
1357 self.http_disable_compression,
1358 )),
1359 )
1360 .set_rpc_middleware(
1361 self.rpc_middleware.clone().layer(
1362 modules
1363 .http
1364 .as_ref()
1365 .or(modules.ws.as_ref())
1366 .map(RpcRequestMetrics::same_port)
1367 .unwrap_or_default(),
1368 ),
1369 )
1370 .set_config(config.build())
1371 .build(http_socket_addr)
1372 .await
1373 .map_err(|err| {
1374 RpcError::server_error(err, ServerKind::WsHttp(http_socket_addr))
1375 })?;
1376 let addr = server.local_addr().map_err(|err| {
1377 RpcError::server_error(err, ServerKind::WsHttp(http_socket_addr))
1378 })?;
1379 if let Some(module) = modules.http.as_ref().or(modules.ws.as_ref()) {
1380 let handle = server.start(module.clone());
1381 http_handle = Some(handle.clone());
1382 ws_handle = Some(handle);
1383 }
1384 return Ok(RpcServerHandle {
1385 http_local_addr: Some(addr),
1386 ws_local_addr: Some(addr),
1387 http: http_handle,
1388 ws: ws_handle,
1389 ipc_endpoint: self.ipc_endpoint.clone(),
1390 ipc: ipc_handle,
1391 jwt_secret: self.jwt_secret,
1392 });
1393 }
1394 }
1395
1396 let mut ws_local_addr = None;
1397 let mut ws_server = None;
1398 let mut http_local_addr = None;
1399 let mut http_server = None;
1400
1401 if let Some(config) = self.ws_server_config {
1402 let server = ServerBuilder::new()
1403 .set_config(config.ws_only().build())
1404 .set_http_middleware(
1405 tower::ServiceBuilder::new()
1406 .option_layer(Self::maybe_cors_layer(self.ws_cors_domains.clone())?)
1407 .option_layer(Self::maybe_jwt_layer(self.jwt_secret)),
1408 )
1409 .set_rpc_middleware(
1410 self.rpc_middleware
1411 .clone()
1412 .layer(modules.ws.as_ref().map(RpcRequestMetrics::ws).unwrap_or_default()),
1413 )
1414 .build(ws_socket_addr)
1415 .await
1416 .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?;
1417
1418 let addr = server
1419 .local_addr()
1420 .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?;
1421
1422 ws_local_addr = Some(addr);
1423 ws_server = Some(server);
1424 }
1425
1426 if let Some(config) = self.http_server_config {
1427 let server = ServerBuilder::new()
1428 .set_config(config.http_only().build())
1429 .set_http_middleware(
1430 tower::ServiceBuilder::new()
1431 .option_layer(Self::maybe_cors_layer(self.http_cors_domains.clone())?)
1432 .option_layer(Self::maybe_jwt_layer(self.jwt_secret))
1433 .option_layer(Self::maybe_compression_layer(self.http_disable_compression)),
1434 )
1435 .set_rpc_middleware(
1436 self.rpc_middleware.clone().layer(
1437 modules.http.as_ref().map(RpcRequestMetrics::http).unwrap_or_default(),
1438 ),
1439 )
1440 .build(http_socket_addr)
1441 .await
1442 .map_err(|err| RpcError::server_error(err, ServerKind::Http(http_socket_addr)))?;
1443 let local_addr = server
1444 .local_addr()
1445 .map_err(|err| RpcError::server_error(err, ServerKind::Http(http_socket_addr)))?;
1446 http_local_addr = Some(local_addr);
1447 http_server = Some(server);
1448 }
1449
1450 http_handle = http_server
1451 .map(|http_server| http_server.start(modules.http.clone().expect("http server error")));
1452 ws_handle = ws_server
1453 .map(|ws_server| ws_server.start(modules.ws.clone().expect("ws server error")));
1454 Ok(RpcServerHandle {
1455 http_local_addr,
1456 ws_local_addr,
1457 http: http_handle,
1458 ws: ws_handle,
1459 ipc_endpoint: self.ipc_endpoint.clone(),
1460 ipc: ipc_handle,
1461 jwt_secret: self.jwt_secret,
1462 })
1463 }
1464}
1465
1466#[derive(Debug, Clone, Default, Eq, PartialEq)]
1478pub struct TransportRpcModuleConfig {
1479 http: Option<RpcModuleSelection>,
1481 ws: Option<RpcModuleSelection>,
1483 ipc: Option<RpcModuleSelection>,
1485 config: Option<RpcModuleConfig>,
1487}
1488
1489impl TransportRpcModuleConfig {
1492 pub fn set_http(http: impl Into<RpcModuleSelection>) -> Self {
1494 Self::default().with_http(http)
1495 }
1496
1497 pub fn set_ws(ws: impl Into<RpcModuleSelection>) -> Self {
1499 Self::default().with_ws(ws)
1500 }
1501
1502 pub fn set_ipc(ipc: impl Into<RpcModuleSelection>) -> Self {
1504 Self::default().with_ipc(ipc)
1505 }
1506
1507 pub fn with_http(mut self, http: impl Into<RpcModuleSelection>) -> Self {
1509 self.http = Some(http.into());
1510 self
1511 }
1512
1513 pub fn with_ws(mut self, ws: impl Into<RpcModuleSelection>) -> Self {
1515 self.ws = Some(ws.into());
1516 self
1517 }
1518
1519 pub fn with_ipc(mut self, ipc: impl Into<RpcModuleSelection>) -> Self {
1521 self.ipc = Some(ipc.into());
1522 self
1523 }
1524
1525 pub fn with_config(mut self, config: RpcModuleConfig) -> Self {
1527 self.config = Some(config);
1528 self
1529 }
1530
1531 pub const fn http_mut(&mut self) -> &mut Option<RpcModuleSelection> {
1533 &mut self.http
1534 }
1535
1536 pub const fn ws_mut(&mut self) -> &mut Option<RpcModuleSelection> {
1538 &mut self.ws
1539 }
1540
1541 pub const fn ipc_mut(&mut self) -> &mut Option<RpcModuleSelection> {
1543 &mut self.ipc
1544 }
1545
1546 pub const fn config_mut(&mut self) -> &mut Option<RpcModuleConfig> {
1548 &mut self.config
1549 }
1550
1551 pub const fn is_empty(&self) -> bool {
1553 self.http.is_none() && self.ws.is_none() && self.ipc.is_none()
1554 }
1555
1556 pub const fn http(&self) -> Option<&RpcModuleSelection> {
1558 self.http.as_ref()
1559 }
1560
1561 pub const fn ws(&self) -> Option<&RpcModuleSelection> {
1563 self.ws.as_ref()
1564 }
1565
1566 pub const fn ipc(&self) -> Option<&RpcModuleSelection> {
1568 self.ipc.as_ref()
1569 }
1570
1571 pub const fn config(&self) -> Option<&RpcModuleConfig> {
1573 self.config.as_ref()
1574 }
1575
1576 pub fn contains_any(&self, module: &RethRpcModule) -> bool {
1578 self.contains_http(module) || self.contains_ws(module) || self.contains_ipc(module)
1579 }
1580
1581 pub fn contains_http(&self, module: &RethRpcModule) -> bool {
1583 self.http.as_ref().is_some_and(|http| http.contains(module))
1584 }
1585
1586 pub fn contains_ws(&self, module: &RethRpcModule) -> bool {
1588 self.ws.as_ref().is_some_and(|ws| ws.contains(module))
1589 }
1590
1591 pub fn contains_ipc(&self, module: &RethRpcModule) -> bool {
1593 self.ipc.as_ref().is_some_and(|ipc| ipc.contains(module))
1594 }
1595
1596 fn ensure_ws_http_identical(&self) -> Result<(), WsHttpSamePortError> {
1599 if RpcModuleSelection::are_identical(self.http.as_ref(), self.ws.as_ref()) {
1600 Ok(())
1601 } else {
1602 let http_modules =
1603 self.http.as_ref().map(RpcModuleSelection::to_selection).unwrap_or_default();
1604 let ws_modules =
1605 self.ws.as_ref().map(RpcModuleSelection::to_selection).unwrap_or_default();
1606
1607 let http_not_ws = http_modules.difference(&ws_modules).copied().collect();
1608 let ws_not_http = ws_modules.difference(&http_modules).copied().collect();
1609 let overlap = http_modules.intersection(&ws_modules).copied().collect();
1610
1611 Err(WsHttpSamePortError::ConflictingModules(Box::new(ConflictingModules {
1612 overlap,
1613 http_not_ws,
1614 ws_not_http,
1615 })))
1616 }
1617 }
1618}
1619
1620#[derive(Debug, Clone, Default)]
1622pub struct TransportRpcModules<Context = ()> {
1623 config: TransportRpcModuleConfig,
1625 http: Option<RpcModule<Context>>,
1627 ws: Option<RpcModule<Context>>,
1629 ipc: Option<RpcModule<Context>>,
1631}
1632
1633impl TransportRpcModules {
1636 pub fn with_config(mut self, config: TransportRpcModuleConfig) -> Self {
1639 self.config = config;
1640 self
1641 }
1642
1643 pub fn with_http(mut self, http: RpcModule<()>) -> Self {
1646 self.http = Some(http);
1647 self
1648 }
1649
1650 pub fn with_ws(mut self, ws: RpcModule<()>) -> Self {
1653 self.ws = Some(ws);
1654 self
1655 }
1656
1657 pub fn with_ipc(mut self, ipc: RpcModule<()>) -> Self {
1660 self.ipc = Some(ipc);
1661 self
1662 }
1663
1664 pub const fn module_config(&self) -> &TransportRpcModuleConfig {
1666 &self.config
1667 }
1668
1669 pub fn merge_if_module_configured(
1674 &mut self,
1675 module: RethRpcModule,
1676 other: impl Into<Methods>,
1677 ) -> Result<(), RegisterMethodError> {
1678 let other = other.into();
1679 if self.module_config().contains_http(&module) {
1680 self.merge_http(other.clone())?;
1681 }
1682 if self.module_config().contains_ws(&module) {
1683 self.merge_ws(other.clone())?;
1684 }
1685 if self.module_config().contains_ipc(&module) {
1686 self.merge_ipc(other)?;
1687 }
1688
1689 Ok(())
1690 }
1691
1692 pub fn merge_http(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1698 if let Some(ref mut http) = self.http {
1699 return http.merge(other.into()).map(|_| true)
1700 }
1701 Ok(false)
1702 }
1703
1704 pub fn merge_ws(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1710 if let Some(ref mut ws) = self.ws {
1711 return ws.merge(other.into()).map(|_| true)
1712 }
1713 Ok(false)
1714 }
1715
1716 pub fn merge_ipc(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1722 if let Some(ref mut ipc) = self.ipc {
1723 return ipc.merge(other.into()).map(|_| true)
1724 }
1725 Ok(false)
1726 }
1727
1728 pub fn merge_configured(
1732 &mut self,
1733 other: impl Into<Methods>,
1734 ) -> Result<(), RegisterMethodError> {
1735 let other = other.into();
1736 self.merge_http(other.clone())?;
1737 self.merge_ws(other.clone())?;
1738 self.merge_ipc(other)?;
1739 Ok(())
1740 }
1741
1742 pub fn methods_by_module<F>(&self, module: RethRpcModule) -> Methods {
1746 self.methods_by(|name| name.starts_with(module.as_str()))
1747 }
1748
1749 pub fn methods_by<F>(&self, mut filter: F) -> Methods
1753 where
1754 F: FnMut(&str) -> bool,
1755 {
1756 let mut methods = Methods::new();
1757
1758 let mut f =
1760 |name: &str, mm: &Methods| filter(name) && !mm.method_names().any(|m| m == name);
1761
1762 if let Some(m) = self.http_methods(|name| f(name, &methods)) {
1763 let _ = methods.merge(m);
1764 }
1765 if let Some(m) = self.ws_methods(|name| f(name, &methods)) {
1766 let _ = methods.merge(m);
1767 }
1768 if let Some(m) = self.ipc_methods(|name| f(name, &methods)) {
1769 let _ = methods.merge(m);
1770 }
1771 methods
1772 }
1773
1774 pub fn http_methods<F>(&self, filter: F) -> Option<Methods>
1778 where
1779 F: FnMut(&str) -> bool,
1780 {
1781 self.http.as_ref().map(|module| methods_by(module, filter))
1782 }
1783
1784 pub fn ws_methods<F>(&self, filter: F) -> Option<Methods>
1788 where
1789 F: FnMut(&str) -> bool,
1790 {
1791 self.ws.as_ref().map(|module| methods_by(module, filter))
1792 }
1793
1794 pub fn ipc_methods<F>(&self, filter: F) -> Option<Methods>
1798 where
1799 F: FnMut(&str) -> bool,
1800 {
1801 self.ipc.as_ref().map(|module| methods_by(module, filter))
1802 }
1803
1804 pub fn remove_http_method(&mut self, method_name: &'static str) -> bool {
1812 if let Some(http_module) = &mut self.http {
1813 http_module.remove_method(method_name).is_some()
1814 } else {
1815 false
1816 }
1817 }
1818
1819 pub fn remove_http_methods(&mut self, methods: impl IntoIterator<Item = &'static str>) {
1821 for name in methods {
1822 self.remove_http_method(name);
1823 }
1824 }
1825
1826 pub fn remove_ws_method(&mut self, method_name: &'static str) -> bool {
1834 if let Some(ws_module) = &mut self.ws {
1835 ws_module.remove_method(method_name).is_some()
1836 } else {
1837 false
1838 }
1839 }
1840
1841 pub fn remove_ws_methods(&mut self, methods: impl IntoIterator<Item = &'static str>) {
1843 for name in methods {
1844 self.remove_ws_method(name);
1845 }
1846 }
1847
1848 pub fn remove_ipc_method(&mut self, method_name: &'static str) -> bool {
1856 if let Some(ipc_module) = &mut self.ipc {
1857 ipc_module.remove_method(method_name).is_some()
1858 } else {
1859 false
1860 }
1861 }
1862
1863 pub fn remove_ipc_methods(&mut self, methods: impl IntoIterator<Item = &'static str>) {
1865 for name in methods {
1866 self.remove_ipc_method(name);
1867 }
1868 }
1869
1870 pub fn remove_method_from_configured(&mut self, method_name: &'static str) -> bool {
1874 let http_removed = self.remove_http_method(method_name);
1875 let ws_removed = self.remove_ws_method(method_name);
1876 let ipc_removed = self.remove_ipc_method(method_name);
1877
1878 http_removed || ws_removed || ipc_removed
1879 }
1880
1881 pub fn rename(
1885 &mut self,
1886 old_name: &'static str,
1887 new_method: impl Into<Methods>,
1888 ) -> Result<(), RegisterMethodError> {
1889 self.remove_method_from_configured(old_name);
1891
1892 self.merge_configured(new_method)
1894 }
1895
1896 pub fn replace_http(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1903 let other = other.into();
1904 self.remove_http_methods(other.method_names());
1905 self.merge_http(other)
1906 }
1907
1908 pub fn replace_ipc(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1915 let other = other.into();
1916 self.remove_ipc_methods(other.method_names());
1917 self.merge_ipc(other)
1918 }
1919
1920 pub fn replace_ws(&mut self, other: impl Into<Methods>) -> Result<bool, RegisterMethodError> {
1927 let other = other.into();
1928 self.remove_ws_methods(other.method_names());
1929 self.merge_ws(other)
1930 }
1931
1932 pub fn replace_configured(
1936 &mut self,
1937 other: impl Into<Methods>,
1938 ) -> Result<bool, RegisterMethodError> {
1939 let other = other.into();
1940 self.replace_http(other.clone())?;
1941 self.replace_ws(other.clone())?;
1942 self.replace_ipc(other)?;
1943 Ok(true)
1944 }
1945}
1946
1947fn methods_by<T, F>(module: &RpcModule<T>, mut filter: F) -> Methods
1949where
1950 F: FnMut(&str) -> bool,
1951{
1952 let mut methods = Methods::new();
1953 let method_names = module.method_names().filter(|name| filter(name));
1954
1955 for name in method_names {
1956 if let Some(matched_method) = module.method(name).cloned() {
1957 let _ = methods.verify_and_insert(name, matched_method);
1958 }
1959 }
1960
1961 methods
1962}
1963
1964#[derive(Clone, Debug)]
1969#[must_use = "Server stops if dropped"]
1970pub struct RpcServerHandle {
1971 http_local_addr: Option<SocketAddr>,
1973 ws_local_addr: Option<SocketAddr>,
1974 http: Option<ServerHandle>,
1975 ws: Option<ServerHandle>,
1976 ipc_endpoint: Option<String>,
1977 ipc: Option<jsonrpsee::server::ServerHandle>,
1978 jwt_secret: Option<JwtSecret>,
1979}
1980
1981impl RpcServerHandle {
1984 fn bearer_token(&self) -> Option<String> {
1986 self.jwt_secret.as_ref().map(|secret| {
1987 format!(
1988 "Bearer {}",
1989 secret
1990 .encode(&Claims {
1991 iat: (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() +
1992 Duration::from_secs(60))
1993 .as_secs(),
1994 exp: None,
1995 })
1996 .unwrap()
1997 )
1998 })
1999 }
2000 pub const fn http_local_addr(&self) -> Option<SocketAddr> {
2002 self.http_local_addr
2003 }
2004
2005 pub const fn ws_local_addr(&self) -> Option<SocketAddr> {
2007 self.ws_local_addr
2008 }
2009
2010 pub fn stop(self) -> Result<(), AlreadyStoppedError> {
2012 if let Some(handle) = self.http {
2013 handle.stop()?
2014 }
2015
2016 if let Some(handle) = self.ws {
2017 handle.stop()?
2018 }
2019
2020 if let Some(handle) = self.ipc {
2021 handle.stop()?
2022 }
2023
2024 Ok(())
2025 }
2026
2027 pub fn ipc_endpoint(&self) -> Option<String> {
2029 self.ipc_endpoint.clone()
2030 }
2031
2032 pub fn http_url(&self) -> Option<String> {
2034 self.http_local_addr.map(|addr| format!("http://{addr}"))
2035 }
2036
2037 pub fn ws_url(&self) -> Option<String> {
2039 self.ws_local_addr.map(|addr| format!("ws://{addr}"))
2040 }
2041
2042 pub fn http_client(&self) -> Option<jsonrpsee::http_client::HttpClient> {
2044 let url = self.http_url()?;
2045
2046 let client = if let Some(token) = self.bearer_token() {
2047 jsonrpsee::http_client::HttpClientBuilder::default()
2048 .set_headers(HeaderMap::from_iter([(AUTHORIZATION, token.parse().unwrap())]))
2049 .build(url)
2050 } else {
2051 jsonrpsee::http_client::HttpClientBuilder::default().build(url)
2052 };
2053
2054 client.expect("failed to create http client").into()
2055 }
2056
2057 pub async fn ws_client(&self) -> Option<jsonrpsee::ws_client::WsClient> {
2059 let url = self.ws_url()?;
2060 let mut builder = jsonrpsee::ws_client::WsClientBuilder::default();
2061
2062 if let Some(token) = self.bearer_token() {
2063 let headers = HeaderMap::from_iter([(AUTHORIZATION, token.parse().unwrap())]);
2064 builder = builder.set_headers(headers);
2065 }
2066
2067 let client = builder.build(url).await.expect("failed to create ws client");
2068 Some(client)
2069 }
2070
2071 pub fn eth_http_provider(
2073 &self,
2074 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static> {
2075 self.new_http_provider_for()
2076 }
2077
2078 pub fn new_http_provider_for<N>(&self) -> Option<impl Provider<N> + Clone + Unpin + 'static>
2083 where
2084 N: RecommendedFillers<RecommendedFillers: Unpin>,
2085 {
2086 let rpc_url = self.http_url()?;
2087 let provider = ProviderBuilder::default()
2088 .with_recommended_fillers()
2089 .connect_http(rpc_url.parse().expect("valid url"));
2090 Some(provider)
2091 }
2092
2093 pub async fn eth_ws_provider(
2095 &self,
2096 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static> {
2097 self.new_ws_provider_for().await
2098 }
2099
2100 pub async fn new_ws_provider_for<N>(&self) -> Option<impl Provider<N> + Clone + Unpin + 'static>
2105 where
2106 N: RecommendedFillers<RecommendedFillers: Unpin>,
2107 {
2108 let rpc_url = self.ws_url()?;
2109 let provider = ProviderBuilder::default()
2110 .with_recommended_fillers()
2111 .connect(&rpc_url)
2112 .await
2113 .expect("failed to create ws client");
2114 Some(provider)
2115 }
2116
2117 pub async fn eth_ipc_provider(
2119 &self,
2120 ) -> Option<impl Provider<alloy_network::Ethereum> + Clone + Unpin + 'static> {
2121 self.new_ipc_provider_for().await
2122 }
2123
2124 pub async fn new_ipc_provider_for<N>(
2129 &self,
2130 ) -> Option<impl Provider<N> + Clone + Unpin + 'static>
2131 where
2132 N: RecommendedFillers<RecommendedFillers: Unpin>,
2133 {
2134 let rpc_url = self.ipc_endpoint()?;
2135 let provider = ProviderBuilder::default()
2136 .with_recommended_fillers()
2137 .connect(&rpc_url)
2138 .await
2139 .expect("failed to create ipc client");
2140 Some(provider)
2141 }
2142}
2143
2144#[cfg(test)]
2145mod tests {
2146 use super::*;
2147
2148 #[test]
2149 fn parse_eth_call_bundle_selection() {
2150 let selection = "eth,admin,debug".parse::<RpcModuleSelection>().unwrap();
2151 assert_eq!(
2152 selection,
2153 RpcModuleSelection::Selection(
2154 [RethRpcModule::Eth, RethRpcModule::Admin, RethRpcModule::Debug,].into()
2155 )
2156 );
2157 }
2158
2159 #[test]
2160 fn parse_rpc_module_selection() {
2161 let selection = "all".parse::<RpcModuleSelection>().unwrap();
2162 assert_eq!(selection, RpcModuleSelection::All);
2163 }
2164
2165 #[test]
2166 fn parse_rpc_module_selection_none() {
2167 let selection = "none".parse::<RpcModuleSelection>().unwrap();
2168 assert_eq!(selection, RpcModuleSelection::Selection(Default::default()));
2169 }
2170
2171 #[test]
2172 fn parse_rpc_unique_module_selection() {
2173 let selection = "eth,admin,eth,net".parse::<RpcModuleSelection>().unwrap();
2174 assert_eq!(
2175 selection,
2176 RpcModuleSelection::Selection(
2177 [RethRpcModule::Eth, RethRpcModule::Admin, RethRpcModule::Net,].into()
2178 )
2179 );
2180 }
2181
2182 #[test]
2183 fn identical_selection() {
2184 assert!(RpcModuleSelection::are_identical(
2185 Some(&RpcModuleSelection::All),
2186 Some(&RpcModuleSelection::All),
2187 ));
2188 assert!(!RpcModuleSelection::are_identical(
2189 Some(&RpcModuleSelection::All),
2190 Some(&RpcModuleSelection::Standard),
2191 ));
2192 assert!(RpcModuleSelection::are_identical(
2193 Some(&RpcModuleSelection::Selection(RpcModuleSelection::Standard.to_selection())),
2194 Some(&RpcModuleSelection::Standard),
2195 ));
2196 assert!(RpcModuleSelection::are_identical(
2197 Some(&RpcModuleSelection::Selection([RethRpcModule::Eth].into())),
2198 Some(&RpcModuleSelection::Selection([RethRpcModule::Eth].into())),
2199 ));
2200 assert!(RpcModuleSelection::are_identical(
2201 None,
2202 Some(&RpcModuleSelection::Selection(Default::default())),
2203 ));
2204 assert!(RpcModuleSelection::are_identical(
2205 Some(&RpcModuleSelection::Selection(Default::default())),
2206 None,
2207 ));
2208 assert!(RpcModuleSelection::are_identical(None, None));
2209 }
2210
2211 #[test]
2212 fn test_rpc_module_str() {
2213 macro_rules! assert_rpc_module {
2214 ($($s:expr => $v:expr,)*) => {
2215 $(
2216 let val: RethRpcModule = $s.parse().unwrap();
2217 assert_eq!(val, $v);
2218 assert_eq!(val.to_string().as_str(), $s);
2219 )*
2220 };
2221 }
2222 assert_rpc_module!
2223 (
2224 "admin" => RethRpcModule::Admin,
2225 "debug" => RethRpcModule::Debug,
2226 "eth" => RethRpcModule::Eth,
2227 "net" => RethRpcModule::Net,
2228 "trace" => RethRpcModule::Trace,
2229 "web3" => RethRpcModule::Web3,
2230 "rpc" => RethRpcModule::Rpc,
2231 "ots" => RethRpcModule::Ots,
2232 "reth" => RethRpcModule::Reth,
2233 );
2234 }
2235
2236 #[test]
2237 fn test_default_selection() {
2238 let selection = RpcModuleSelection::Standard.to_selection();
2239 assert_eq!(selection, [RethRpcModule::Eth, RethRpcModule::Net, RethRpcModule::Web3].into())
2240 }
2241
2242 #[test]
2243 fn test_create_rpc_module_config() {
2244 let selection = vec!["eth", "admin"];
2245 let config = RpcModuleSelection::try_from_selection(selection).unwrap();
2246 assert_eq!(
2247 config,
2248 RpcModuleSelection::Selection([RethRpcModule::Eth, RethRpcModule::Admin].into())
2249 );
2250 }
2251
2252 #[test]
2253 fn test_configure_transport_config() {
2254 let config = TransportRpcModuleConfig::default()
2255 .with_http([RethRpcModule::Eth, RethRpcModule::Admin]);
2256 assert_eq!(
2257 config,
2258 TransportRpcModuleConfig {
2259 http: Some(RpcModuleSelection::Selection(
2260 [RethRpcModule::Eth, RethRpcModule::Admin].into()
2261 )),
2262 ws: None,
2263 ipc: None,
2264 config: None,
2265 }
2266 )
2267 }
2268
2269 #[test]
2270 fn test_configure_transport_config_none() {
2271 let config = TransportRpcModuleConfig::default().with_http(Vec::<RethRpcModule>::new());
2272 assert_eq!(
2273 config,
2274 TransportRpcModuleConfig {
2275 http: Some(RpcModuleSelection::Selection(Default::default())),
2276 ws: None,
2277 ipc: None,
2278 config: None,
2279 }
2280 )
2281 }
2282
2283 fn create_test_module() -> RpcModule<()> {
2284 let mut module = RpcModule::new(());
2285 module.register_method("anything", |_, _, _| "succeed").unwrap();
2286 module
2287 }
2288
2289 #[test]
2290 fn test_remove_http_method() {
2291 let mut modules =
2292 TransportRpcModules { http: Some(create_test_module()), ..Default::default() };
2293 assert!(modules.remove_http_method("anything"));
2295
2296 assert!(!modules.remove_http_method("non_existent_method"));
2298
2299 assert!(modules.http.as_ref().unwrap().method("anything").is_none());
2301 }
2302
2303 #[test]
2304 fn test_remove_ws_method() {
2305 let mut modules =
2306 TransportRpcModules { ws: Some(create_test_module()), ..Default::default() };
2307
2308 assert!(modules.remove_ws_method("anything"));
2310
2311 assert!(!modules.remove_ws_method("non_existent_method"));
2313
2314 assert!(modules.ws.as_ref().unwrap().method("anything").is_none());
2316 }
2317
2318 #[test]
2319 fn test_remove_ipc_method() {
2320 let mut modules =
2321 TransportRpcModules { ipc: Some(create_test_module()), ..Default::default() };
2322
2323 assert!(modules.remove_ipc_method("anything"));
2325
2326 assert!(!modules.remove_ipc_method("non_existent_method"));
2328
2329 assert!(modules.ipc.as_ref().unwrap().method("anything").is_none());
2331 }
2332
2333 #[test]
2334 fn test_remove_method_from_configured() {
2335 let mut modules = TransportRpcModules {
2336 http: Some(create_test_module()),
2337 ws: Some(create_test_module()),
2338 ipc: Some(create_test_module()),
2339 ..Default::default()
2340 };
2341
2342 assert!(modules.remove_method_from_configured("anything"));
2344
2345 assert!(!modules.remove_method_from_configured("anything"));
2347
2348 assert!(!modules.remove_method_from_configured("non_existent_method"));
2350
2351 assert!(modules.http.as_ref().unwrap().method("anything").is_none());
2353 assert!(modules.ws.as_ref().unwrap().method("anything").is_none());
2354 assert!(modules.ipc.as_ref().unwrap().method("anything").is_none());
2355 }
2356
2357 #[test]
2358 fn test_transport_rpc_module_rename() {
2359 let mut modules = TransportRpcModules {
2360 http: Some(create_test_module()),
2361 ws: Some(create_test_module()),
2362 ipc: Some(create_test_module()),
2363 ..Default::default()
2364 };
2365
2366 assert!(modules.http.as_ref().unwrap().method("anything").is_some());
2368 assert!(modules.ws.as_ref().unwrap().method("anything").is_some());
2369 assert!(modules.ipc.as_ref().unwrap().method("anything").is_some());
2370
2371 assert!(modules.http.as_ref().unwrap().method("something").is_none());
2373 assert!(modules.ws.as_ref().unwrap().method("something").is_none());
2374 assert!(modules.ipc.as_ref().unwrap().method("something").is_none());
2375
2376 let mut other_module = RpcModule::new(());
2378 other_module.register_method("something", |_, _, _| "fails").unwrap();
2379
2380 modules.rename("anything", other_module).expect("rename failed");
2382
2383 assert!(modules.http.as_ref().unwrap().method("anything").is_none());
2385 assert!(modules.ws.as_ref().unwrap().method("anything").is_none());
2386 assert!(modules.ipc.as_ref().unwrap().method("anything").is_none());
2387
2388 assert!(modules.http.as_ref().unwrap().method("something").is_some());
2390 assert!(modules.ws.as_ref().unwrap().method("something").is_some());
2391 assert!(modules.ipc.as_ref().unwrap().method("something").is_some());
2392 }
2393
2394 #[test]
2395 fn test_replace_http_method() {
2396 let mut modules =
2397 TransportRpcModules { http: Some(create_test_module()), ..Default::default() };
2398
2399 let mut other_module = RpcModule::new(());
2400 other_module.register_method("something", |_, _, _| "fails").unwrap();
2401
2402 assert!(modules.replace_http(other_module.clone()).unwrap());
2403
2404 assert!(modules.http.as_ref().unwrap().method("something").is_some());
2405
2406 other_module.register_method("anything", |_, _, _| "fails").unwrap();
2407 assert!(modules.replace_http(other_module.clone()).unwrap());
2408
2409 assert!(modules.http.as_ref().unwrap().method("anything").is_some());
2410 }
2411 #[test]
2412 fn test_replace_ipc_method() {
2413 let mut modules =
2414 TransportRpcModules { ipc: Some(create_test_module()), ..Default::default() };
2415
2416 let mut other_module = RpcModule::new(());
2417 other_module.register_method("something", |_, _, _| "fails").unwrap();
2418
2419 assert!(modules.replace_ipc(other_module.clone()).unwrap());
2420
2421 assert!(modules.ipc.as_ref().unwrap().method("something").is_some());
2422
2423 other_module.register_method("anything", |_, _, _| "fails").unwrap();
2424 assert!(modules.replace_ipc(other_module.clone()).unwrap());
2425
2426 assert!(modules.ipc.as_ref().unwrap().method("anything").is_some());
2427 }
2428 #[test]
2429 fn test_replace_ws_method() {
2430 let mut modules =
2431 TransportRpcModules { ws: Some(create_test_module()), ..Default::default() };
2432
2433 let mut other_module = RpcModule::new(());
2434 other_module.register_method("something", |_, _, _| "fails").unwrap();
2435
2436 assert!(modules.replace_ws(other_module.clone()).unwrap());
2437
2438 assert!(modules.ws.as_ref().unwrap().method("something").is_some());
2439
2440 other_module.register_method("anything", |_, _, _| "fails").unwrap();
2441 assert!(modules.replace_ws(other_module.clone()).unwrap());
2442
2443 assert!(modules.ws.as_ref().unwrap().method("anything").is_some());
2444 }
2445
2446 #[test]
2447 fn test_replace_configured() {
2448 let mut modules = TransportRpcModules {
2449 http: Some(create_test_module()),
2450 ws: Some(create_test_module()),
2451 ipc: Some(create_test_module()),
2452 ..Default::default()
2453 };
2454 let mut other_module = RpcModule::new(());
2455 other_module.register_method("something", |_, _, _| "fails").unwrap();
2456
2457 assert!(modules.replace_configured(other_module).unwrap());
2458
2459 assert!(modules.http.as_ref().unwrap().method("something").is_some());
2461 assert!(modules.ipc.as_ref().unwrap().method("something").is_some());
2462 assert!(modules.ws.as_ref().unwrap().method("something").is_some());
2463
2464 assert!(modules.http.as_ref().unwrap().method("anything").is_some());
2465 assert!(modules.ipc.as_ref().unwrap().method("anything").is_some());
2466 assert!(modules.ws.as_ref().unwrap().method("anything").is_some());
2467 }
2468}