reth_rpc_builder/
eth.rs

1use reth_evm::ConfigureEvm;
2use reth_primitives::NodePrimitives;
3use reth_provider::{BlockReader, CanonStateSubscriptions, EvmEnvProvider, StateProviderFactory};
4use reth_rpc::{EthFilter, EthPubSub};
5use reth_rpc_eth_api::EthApiTypes;
6use reth_rpc_eth_types::{
7    cache::cache_new_blocks_task, EthApiBuilderCtx, EthConfig, EthStateCache,
8};
9use reth_tasks::TaskSpawner;
10
11/// Alias for `eth` namespace API builder.
12pub type DynEthApiBuilder<Provider, Pool, EvmConfig, Network, Tasks, Events, EthApi> =
13    Box<dyn FnOnce(&EthApiBuilderCtx<Provider, Pool, EvmConfig, Network, Tasks, Events>) -> EthApi>;
14
15/// Handlers for core, filter and pubsub `eth` namespace APIs.
16#[derive(Debug, Clone)]
17pub struct EthHandlers<Provider: BlockReader, Events, EthApi: EthApiTypes> {
18    /// Main `eth_` request handler
19    pub api: EthApi,
20    /// The async caching layer used by the eth handlers
21    pub cache: EthStateCache<Provider::Block, Provider::Receipt>,
22    /// Polling based filter handler available on all transports
23    pub filter: EthFilter<EthApi>,
24    /// Handler for subscriptions only available for transports that support it (ws, ipc)
25    pub pubsub: EthPubSub<EthApi, Events>,
26}
27
28impl<Provider, Events, EthApi> EthHandlers<Provider, Events, EthApi>
29where
30    Provider: StateProviderFactory
31        + BlockReader<
32            Block = <Events::Primitives as NodePrimitives>::Block,
33            Receipt = <Events::Primitives as NodePrimitives>::Receipt,
34        > + EvmEnvProvider
35        + Clone
36        + Unpin
37        + 'static,
38    Events: CanonStateSubscriptions + Clone + 'static,
39    EthApi: EthApiTypes + 'static,
40{
41    /// Returns a new instance with handlers for `eth` namespace.
42    ///
43    /// This will spawn all necessary tasks for the handlers.
44    #[allow(clippy::too_many_arguments)]
45    pub fn bootstrap<EvmConfig, Tasks, Pool, Network>(
46        provider: Provider,
47        pool: Pool,
48        network: Network,
49        evm_config: EvmConfig,
50        config: EthConfig,
51        executor: Tasks,
52        events: Events,
53        eth_api_builder: DynEthApiBuilder<
54            Provider,
55            Pool,
56            EvmConfig,
57            Network,
58            Tasks,
59            Events,
60            EthApi,
61        >,
62    ) -> Self
63    where
64        EvmConfig: ConfigureEvm<Header = Provider::Header>,
65        Tasks: TaskSpawner + Clone + 'static,
66    {
67        let cache = EthStateCache::spawn_with(provider.clone(), config.cache, executor.clone());
68
69        let new_canonical_blocks = events.canonical_state_stream();
70        let c = cache.clone();
71        executor.spawn_critical(
72            "cache canonical blocks task",
73            Box::pin(async move {
74                cache_new_blocks_task(c, new_canonical_blocks).await;
75            }),
76        );
77
78        let ctx = EthApiBuilderCtx {
79            provider,
80            pool,
81            network,
82            evm_config,
83            config,
84            executor,
85            events,
86            cache,
87        };
88
89        let api = eth_api_builder(&ctx);
90
91        let filter =
92            EthFilter::new(api.clone(), ctx.config.filter_config(), Box::new(ctx.executor.clone()));
93
94        let pubsub = EthPubSub::with_spawner(
95            api.clone(),
96            ctx.events.clone(),
97            Box::new(ctx.executor.clone()),
98        );
99
100        Self { api, cache: ctx.cache, filter, pubsub }
101    }
102}