reth_node_builder/launch/
common.rs

1//! Helper types that can be used by launchers.
2
3use std::{sync::Arc, thread::available_parallelism};
4
5use crate::{
6    components::{NodeComponents, NodeComponentsBuilder},
7    hooks::OnComponentInitializedHook,
8    BuilderContext, NodeAdapter,
9};
10use alloy_primitives::{BlockNumber, B256};
11use eyre::{Context, OptionExt};
12use rayon::ThreadPoolBuilder;
13use reth_chainspec::{Chain, EthChainSpec, EthereumHardforks};
14use reth_config::{config::EtlConfig, PruneConfig};
15use reth_consensus::noop::NoopConsensus;
16use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
17use reth_db_common::init::{init_genesis, InitStorageError};
18use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
19use reth_engine_local::MiningMode;
20use reth_engine_tree::tree::{InvalidBlockHook, InvalidBlockHooks, NoopInvalidBlockHook};
21use reth_evm::noop::NoopBlockExecutorProvider;
22use reth_fs_util as fs;
23use reth_invalid_block_hooks::InvalidBlockWitnessHook;
24use reth_network_p2p::headers::client::HeadersClient;
25use reth_node_api::{
26    FullNodePrimitives, FullNodeTypes, NodePrimitives, NodeTypes, NodeTypesWithDB,
27};
28use reth_node_core::{
29    args::InvalidBlockHookType,
30    dirs::{ChainPath, DataDirPath},
31    node_config::NodeConfig,
32    primitives::BlockHeader,
33    version::{
34        BUILD_PROFILE_NAME, CARGO_PKG_VERSION, VERGEN_BUILD_TIMESTAMP, VERGEN_CARGO_FEATURES,
35        VERGEN_CARGO_TARGET_TRIPLE, VERGEN_GIT_SHA,
36    },
37};
38use reth_node_metrics::{
39    chain::ChainSpecInfo,
40    hooks::Hooks,
41    recorder::install_prometheus_recorder,
42    server::{MetricServer, MetricServerConfig},
43    version::VersionInfo,
44};
45use reth_primitives::{Head, TransactionSigned};
46use reth_provider::{
47    providers::{ProviderNodeTypes, StaticFileProvider},
48    BlockHashReader, BlockNumReader, ChainSpecProvider, ProviderError, ProviderFactory,
49    ProviderResult, StageCheckpointReader, StateProviderFactory, StaticFileProviderFactory,
50};
51use reth_prune::{PruneModes, PrunerBuilder};
52use reth_rpc_api::clients::EthApiClient;
53use reth_rpc_builder::config::RethRpcServerConfig;
54use reth_rpc_layer::JwtSecret;
55use reth_stages::{sets::DefaultStages, MetricEvent, PipelineBuilder, PipelineTarget, StageId};
56use reth_static_file::StaticFileProducer;
57use reth_tasks::TaskExecutor;
58use reth_tracing::tracing::{debug, error, info, warn};
59use reth_transaction_pool::TransactionPool;
60use tokio::sync::{
61    mpsc::{unbounded_channel, UnboundedSender},
62    oneshot, watch,
63};
64
65/// Reusable setup for launching a node.
66///
67/// This provides commonly used boilerplate for launching a node.
68#[derive(Debug, Clone)]
69pub struct LaunchContext {
70    /// The task executor for the node.
71    pub task_executor: TaskExecutor,
72    /// The data directory for the node.
73    pub data_dir: ChainPath<DataDirPath>,
74}
75
76impl LaunchContext {
77    /// Create a new instance of the default node launcher.
78    pub const fn new(task_executor: TaskExecutor, data_dir: ChainPath<DataDirPath>) -> Self {
79        Self { task_executor, data_dir }
80    }
81
82    /// Attaches a database to the launch context.
83    pub const fn with<DB>(self, database: DB) -> LaunchContextWith<DB> {
84        LaunchContextWith { inner: self, attachment: database }
85    }
86
87    /// Loads the reth config with the configured `data_dir` and overrides settings according to the
88    /// `config`.
89    ///
90    /// Attaches both the `NodeConfig` and the loaded `reth.toml` config to the launch context.
91    pub fn with_loaded_toml_config<ChainSpec: EthChainSpec>(
92        self,
93        config: NodeConfig<ChainSpec>,
94    ) -> eyre::Result<LaunchContextWith<WithConfigs<ChainSpec>>> {
95        let toml_config = self.load_toml_config(&config)?;
96        Ok(self.with(WithConfigs { config, toml_config }))
97    }
98
99    /// Loads the reth config with the configured `data_dir` and overrides settings according to the
100    /// `config`.
101    ///
102    /// This is async because the trusted peers may have to be resolved.
103    pub fn load_toml_config<ChainSpec: EthChainSpec>(
104        &self,
105        config: &NodeConfig<ChainSpec>,
106    ) -> eyre::Result<reth_config::Config> {
107        let config_path = config.config.clone().unwrap_or_else(|| self.data_dir.config());
108
109        let mut toml_config = reth_config::Config::from_path(&config_path)
110            .wrap_err_with(|| format!("Could not load config file {config_path:?}"))?;
111
112        Self::save_pruning_config_if_full_node(&mut toml_config, config, &config_path)?;
113
114        info!(target: "reth::cli", path = ?config_path, "Configuration loaded");
115
116        // Update the config with the command line arguments
117        toml_config.peers.trusted_nodes_only = config.network.trusted_only;
118
119        Ok(toml_config)
120    }
121
122    /// Save prune config to the toml file if node is a full node.
123    fn save_pruning_config_if_full_node<ChainSpec: EthChainSpec>(
124        reth_config: &mut reth_config::Config,
125        config: &NodeConfig<ChainSpec>,
126        config_path: impl AsRef<std::path::Path>,
127    ) -> eyre::Result<()> {
128        if reth_config.prune.is_none() {
129            if let Some(prune_config) = config.prune_config() {
130                reth_config.update_prune_config(prune_config);
131                info!(target: "reth::cli", "Saving prune config to toml file");
132                reth_config.save(config_path.as_ref())?;
133            }
134        } else if config.prune_config().is_none() {
135            warn!(target: "reth::cli", "Prune configs present in config file but --full not provided. Running as a Full node");
136        }
137        Ok(())
138    }
139
140    /// Convenience function to [`Self::configure_globals`]
141    pub fn with_configured_globals(self) -> Self {
142        self.configure_globals();
143        self
144    }
145
146    /// Configure global settings this includes:
147    ///
148    /// - Raising the file descriptor limit
149    /// - Configuring the global rayon thread pool
150    pub fn configure_globals(&self) {
151        // Raise the fd limit of the process.
152        // Does not do anything on windows.
153        match fdlimit::raise_fd_limit() {
154            Ok(fdlimit::Outcome::LimitRaised { from, to }) => {
155                debug!(from, to, "Raised file descriptor limit");
156            }
157            Ok(fdlimit::Outcome::Unsupported) => {}
158            Err(err) => warn!(%err, "Failed to raise file descriptor limit"),
159        }
160
161        // Limit the global rayon thread pool, reserving 1 core for the rest of the system.
162        // If the system only has 1 core the pool will use it.
163        let num_threads =
164            available_parallelism().map_or(0, |num| num.get().saturating_sub(1).max(1));
165        if let Err(err) = ThreadPoolBuilder::new()
166            .num_threads(num_threads)
167            .thread_name(|i| format!("reth-rayon-{i}"))
168            .build_global()
169        {
170            error!(%err, "Failed to build global thread pool")
171        }
172    }
173}
174
175/// A [`LaunchContext`] along with an additional value.
176///
177/// This can be used to sequentially attach additional values to the type during the launch process.
178///
179/// The type provides common boilerplate for launching a node depending on the additional value.
180#[derive(Debug, Clone)]
181pub struct LaunchContextWith<T> {
182    /// The wrapped launch context.
183    pub inner: LaunchContext,
184    /// The additional attached value.
185    pub attachment: T,
186}
187
188impl<T> LaunchContextWith<T> {
189    /// Configure global settings this includes:
190    ///
191    /// - Raising the file descriptor limit
192    /// - Configuring the global rayon thread pool
193    pub fn configure_globals(&self) {
194        self.inner.configure_globals();
195    }
196
197    /// Returns the data directory.
198    pub const fn data_dir(&self) -> &ChainPath<DataDirPath> {
199        &self.inner.data_dir
200    }
201
202    /// Returns the task executor.
203    pub const fn task_executor(&self) -> &TaskExecutor {
204        &self.inner.task_executor
205    }
206
207    /// Attaches another value to the launch context.
208    pub fn attach<A>(self, attachment: A) -> LaunchContextWith<Attached<T, A>> {
209        LaunchContextWith {
210            inner: self.inner,
211            attachment: Attached::new(self.attachment, attachment),
212        }
213    }
214
215    /// Consumes the type and calls a function with a reference to the context.
216    // Returns the context again
217    pub fn inspect<F>(self, f: F) -> Self
218    where
219        F: FnOnce(&Self),
220    {
221        f(&self);
222        self
223    }
224}
225
226impl<ChainSpec> LaunchContextWith<WithConfigs<ChainSpec>> {
227    /// Resolves the trusted peers and adds them to the toml config.
228    pub async fn with_resolved_peers(mut self) -> eyre::Result<Self> {
229        if !self.attachment.config.network.trusted_peers.is_empty() {
230            info!(target: "reth::cli", "Adding trusted nodes");
231
232            self.attachment
233                .toml_config
234                .peers
235                .trusted_nodes
236                .extend(self.attachment.config.network.trusted_peers.clone());
237        }
238        Ok(self)
239    }
240}
241
242impl<L, R> LaunchContextWith<Attached<L, R>> {
243    /// Get a reference to the left value.
244    pub const fn left(&self) -> &L {
245        &self.attachment.left
246    }
247
248    /// Get a reference to the right value.
249    pub const fn right(&self) -> &R {
250        &self.attachment.right
251    }
252
253    /// Get a mutable reference to the right value.
254    pub fn left_mut(&mut self) -> &mut L {
255        &mut self.attachment.left
256    }
257
258    /// Get a mutable reference to the right value.
259    pub fn right_mut(&mut self) -> &mut R {
260        &mut self.attachment.right
261    }
262}
263impl<R, ChainSpec: EthChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, R>> {
264    /// Adjust certain settings in the config to make sure they are set correctly
265    ///
266    /// This includes:
267    /// - Making sure the ETL dir is set to the datadir
268    /// - RPC settings are adjusted to the correct port
269    pub fn with_adjusted_configs(self) -> Self {
270        self.ensure_etl_datadir().with_adjusted_instance_ports()
271    }
272
273    /// Make sure ETL doesn't default to /tmp/, but to whatever datadir is set to
274    pub fn ensure_etl_datadir(mut self) -> Self {
275        if self.toml_config_mut().stages.etl.dir.is_none() {
276            self.toml_config_mut().stages.etl.dir =
277                Some(EtlConfig::from_datadir(self.data_dir().data_dir()))
278        }
279
280        self
281    }
282
283    /// Change rpc port numbers based on the instance number.
284    pub fn with_adjusted_instance_ports(mut self) -> Self {
285        self.node_config_mut().adjust_instance_ports();
286        self
287    }
288
289    /// Returns the container for all config types
290    pub const fn configs(&self) -> &WithConfigs<ChainSpec> {
291        self.attachment.left()
292    }
293
294    /// Returns the attached [`NodeConfig`].
295    pub const fn node_config(&self) -> &NodeConfig<ChainSpec> {
296        &self.left().config
297    }
298
299    /// Returns the attached [`NodeConfig`].
300    pub fn node_config_mut(&mut self) -> &mut NodeConfig<ChainSpec> {
301        &mut self.left_mut().config
302    }
303
304    /// Returns the attached toml config [`reth_config::Config`].
305    pub const fn toml_config(&self) -> &reth_config::Config {
306        &self.left().toml_config
307    }
308
309    /// Returns the attached toml config [`reth_config::Config`].
310    pub fn toml_config_mut(&mut self) -> &mut reth_config::Config {
311        &mut self.left_mut().toml_config
312    }
313
314    /// Returns the configured chain spec.
315    pub fn chain_spec(&self) -> Arc<ChainSpec> {
316        self.node_config().chain.clone()
317    }
318
319    /// Get the hash of the genesis block.
320    pub fn genesis_hash(&self) -> B256 {
321        self.node_config().chain.genesis_hash()
322    }
323
324    /// Returns the chain identifier of the node.
325    pub fn chain_id(&self) -> Chain {
326        self.node_config().chain.chain()
327    }
328
329    /// Returns true if the node is configured as --dev
330    pub const fn is_dev(&self) -> bool {
331        self.node_config().dev.dev
332    }
333
334    /// Returns the configured [`PruneConfig`]
335    /// Any configuration set in CLI will take precedence over those set in toml
336    pub fn prune_config(&self) -> Option<PruneConfig> {
337        let Some(mut node_prune_config) = self.node_config().prune_config() else {
338            // No CLI config is set, use the toml config.
339            return self.toml_config().prune.clone();
340        };
341
342        // Otherwise, use the CLI configuration and merge with toml config.
343        node_prune_config.merge(self.toml_config().prune.clone());
344        Some(node_prune_config)
345    }
346
347    /// Returns the configured [`PruneModes`], returning the default if no config was available.
348    pub fn prune_modes(&self) -> PruneModes {
349        self.prune_config().map(|config| config.segments).unwrap_or_default()
350    }
351
352    /// Returns an initialized [`PrunerBuilder`] based on the configured [`PruneConfig`]
353    pub fn pruner_builder(&self) -> PrunerBuilder {
354        PrunerBuilder::new(self.prune_config().unwrap_or_default())
355            .delete_limit(self.chain_spec().prune_delete_limit())
356            .timeout(PrunerBuilder::DEFAULT_TIMEOUT)
357    }
358
359    /// Loads the JWT secret for the engine API
360    pub fn auth_jwt_secret(&self) -> eyre::Result<JwtSecret> {
361        let default_jwt_path = self.data_dir().jwt();
362        let secret = self.node_config().rpc.auth_jwt_secret(default_jwt_path)?;
363        Ok(secret)
364    }
365
366    /// Returns the [`MiningMode`] intended for --dev mode.
367    pub fn dev_mining_mode(&self, pool: impl TransactionPool) -> MiningMode {
368        if let Some(interval) = self.node_config().dev.block_time {
369            MiningMode::interval(interval)
370        } else {
371            MiningMode::instant(pool)
372        }
373    }
374}
375
376impl<DB, ChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, DB>>
377where
378    DB: Database + Clone + 'static,
379    ChainSpec: EthChainSpec + EthereumHardforks + 'static,
380{
381    /// Returns the [`ProviderFactory`] for the attached storage after executing a consistent check
382    /// between the database and static files. **It may execute a pipeline unwind if it fails this
383    /// check.**
384    pub async fn create_provider_factory<N>(&self) -> eyre::Result<ProviderFactory<N>>
385    where
386        N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
387        N::Primitives: FullNodePrimitives<BlockHeader = reth_primitives::Header>,
388    {
389        let factory = ProviderFactory::new(
390            self.right().clone(),
391            self.chain_spec(),
392            StaticFileProvider::read_write(self.data_dir().static_files())?,
393        )
394        .with_prune_modes(self.prune_modes())
395        .with_static_files_metrics();
396
397        let has_receipt_pruning =
398            self.toml_config().prune.as_ref().is_some_and(|a| a.has_receipts_pruning());
399
400        // Check for consistency between database and static files. If it fails, it unwinds to
401        // the first block that's consistent between database and static files.
402        if let Some(unwind_target) = factory
403            .static_file_provider()
404            .check_consistency(&factory.provider()?, has_receipt_pruning)?
405        {
406            // Highly unlikely to happen, and given its destructive nature, it's better to panic
407            // instead.
408            assert_ne!(unwind_target, PipelineTarget::Unwind(0), "A static file <> database inconsistency was found that would trigger an unwind to block 0");
409
410            info!(target: "reth::cli", unwind_target = %unwind_target, "Executing an unwind after a failed storage consistency check.");
411
412            let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);
413
414            // Builds an unwind-only pipeline
415            let pipeline = PipelineBuilder::default()
416                .add_stages(DefaultStages::new(
417                    factory.clone(),
418                    tip_rx,
419                    Arc::new(NoopConsensus::default()),
420                    NoopHeaderDownloader::default(),
421                    NoopBodiesDownloader::default(),
422                    NoopBlockExecutorProvider::<N::Primitives>::default(),
423                    self.toml_config().stages.clone(),
424                    self.prune_modes(),
425                ))
426                .build(
427                    factory.clone(),
428                    StaticFileProducer::new(factory.clone(), self.prune_modes()),
429                );
430
431            // Unwinds to block
432            let (tx, rx) = oneshot::channel();
433
434            // Pipeline should be run as blocking and panic if it fails.
435            self.task_executor().spawn_critical_blocking(
436                "pipeline task",
437                Box::pin(async move {
438                    let (_, result) = pipeline.run_as_fut(Some(unwind_target)).await;
439                    let _ = tx.send(result);
440                }),
441            );
442            rx.await??;
443        }
444
445        Ok(factory)
446    }
447
448    /// Creates a new [`ProviderFactory`] and attaches it to the launch context.
449    pub async fn with_provider_factory<N>(
450        self,
451    ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>>>
452    where
453        N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
454        N::Primitives: FullNodePrimitives<BlockHeader = reth_primitives::Header>,
455    {
456        let factory = self.create_provider_factory().await?;
457        let ctx = LaunchContextWith {
458            inner: self.inner,
459            attachment: self.attachment.map_right(|_| factory),
460        };
461
462        Ok(ctx)
463    }
464}
465
466impl<T> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, ProviderFactory<T>>>
467where
468    T: ProviderNodeTypes,
469{
470    /// Returns access to the underlying database.
471    pub const fn database(&self) -> &T::DB {
472        self.right().db_ref()
473    }
474
475    /// Returns the configured `ProviderFactory`.
476    pub const fn provider_factory(&self) -> &ProviderFactory<T> {
477        self.right()
478    }
479
480    /// Returns the static file provider to interact with the static files.
481    pub fn static_file_provider(&self) -> StaticFileProvider<T::Primitives> {
482        self.right().static_file_provider()
483    }
484
485    /// This launches the prometheus endpoint.
486    ///
487    /// Convenience function to [`Self::start_prometheus_endpoint`]
488    pub async fn with_prometheus_server(self) -> eyre::Result<Self> {
489        self.start_prometheus_endpoint().await?;
490        Ok(self)
491    }
492
493    /// Starts the prometheus endpoint.
494    pub async fn start_prometheus_endpoint(&self) -> eyre::Result<()> {
495        // ensure recorder runs upkeep periodically
496        install_prometheus_recorder().spawn_upkeep();
497
498        let listen_addr = self.node_config().metrics;
499        if let Some(addr) = listen_addr {
500            info!(target: "reth::cli", "Starting metrics endpoint at {}", addr);
501            let config = MetricServerConfig::new(
502                addr,
503                VersionInfo {
504                    version: CARGO_PKG_VERSION,
505                    build_timestamp: VERGEN_BUILD_TIMESTAMP,
506                    cargo_features: VERGEN_CARGO_FEATURES,
507                    git_sha: VERGEN_GIT_SHA,
508                    target_triple: VERGEN_CARGO_TARGET_TRIPLE,
509                    build_profile: BUILD_PROFILE_NAME,
510                },
511                ChainSpecInfo { name: self.left().config.chain.chain().to_string() },
512                self.task_executor().clone(),
513                Hooks::builder()
514                    .with_hook({
515                        let db = self.database().clone();
516                        move || db.report_metrics()
517                    })
518                    .with_hook({
519                        let sfp = self.static_file_provider();
520                        move || {
521                            if let Err(error) = sfp.report_metrics() {
522                                error!(%error, "Failed to report metrics for the static file provider");
523                            }
524                        }
525                    })
526                    .build(),
527            );
528
529            MetricServer::new(config).serve().await?;
530        }
531
532        Ok(())
533    }
534
535    /// Convenience function to [`Self::init_genesis`]
536    pub fn with_genesis(self) -> Result<Self, InitStorageError> {
537        init_genesis(self.provider_factory())?;
538        Ok(self)
539    }
540
541    /// Write the genesis block and state if it has not already been written
542    pub fn init_genesis(&self) -> Result<B256, InitStorageError> {
543        init_genesis(self.provider_factory())
544    }
545
546    /// Creates a new `WithMeteredProvider` container and attaches it to the
547    /// launch context.
548    ///
549    /// This spawns a metrics task that listens for metrics related events and updates metrics for
550    /// prometheus.
551    pub fn with_metrics_task(
552        self,
553    ) -> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, WithMeteredProvider<T>>> {
554        let (metrics_sender, metrics_receiver) = unbounded_channel();
555
556        let with_metrics =
557            WithMeteredProvider { provider_factory: self.right().clone(), metrics_sender };
558
559        debug!(target: "reth::cli", "Spawning stages metrics listener task");
560        let sync_metrics_listener = reth_stages::MetricsListener::new(metrics_receiver);
561        self.task_executor().spawn_critical("stages metrics listener task", sync_metrics_listener);
562
563        LaunchContextWith {
564            inner: self.inner,
565            attachment: self.attachment.map_right(|_| with_metrics),
566        }
567    }
568}
569
570impl<N> LaunchContextWith<Attached<WithConfigs<N::ChainSpec>, WithMeteredProvider<N>>>
571where
572    N: NodeTypesWithDB,
573{
574    /// Returns the configured `ProviderFactory`.
575    const fn provider_factory(&self) -> &ProviderFactory<N> {
576        &self.right().provider_factory
577    }
578
579    /// Returns the metrics sender.
580    fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
581        self.right().metrics_sender.clone()
582    }
583
584    /// Creates a `BlockchainProvider` and attaches it to the launch context.
585    #[allow(clippy::complexity)]
586    pub fn with_blockchain_db<T, F>(
587        self,
588        create_blockchain_provider: F,
589    ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<N::ChainSpec>, WithMeteredProviders<T>>>>
590    where
591        T: FullNodeTypes<Types = N>,
592        F: FnOnce(ProviderFactory<N>) -> eyre::Result<T::Provider>,
593    {
594        let blockchain_db = create_blockchain_provider(self.provider_factory().clone())?;
595
596        let metered_providers = WithMeteredProviders {
597            db_provider_container: WithMeteredProvider {
598                provider_factory: self.provider_factory().clone(),
599                metrics_sender: self.sync_metrics_tx(),
600            },
601            blockchain_db,
602        };
603
604        let ctx = LaunchContextWith {
605            inner: self.inner,
606            attachment: self.attachment.map_right(|_| metered_providers),
607        };
608
609        Ok(ctx)
610    }
611}
612
613impl<T>
614    LaunchContextWith<
615        Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithMeteredProviders<T>>,
616    >
617where
618    T: FullNodeTypes<Types: ProviderNodeTypes>,
619{
620    /// Returns access to the underlying database.
621    pub const fn database(&self) -> &<T::Types as NodeTypesWithDB>::DB {
622        self.provider_factory().db_ref()
623    }
624
625    /// Returns the configured `ProviderFactory`.
626    pub const fn provider_factory(&self) -> &ProviderFactory<T::Types> {
627        &self.right().db_provider_container.provider_factory
628    }
629
630    /// Fetches the head block from the database.
631    ///
632    /// If the database is empty, returns the genesis block.
633    pub fn lookup_head(&self) -> eyre::Result<Head> {
634        self.node_config()
635            .lookup_head(self.provider_factory())
636            .wrap_err("the head block is missing")
637    }
638
639    /// Returns the metrics sender.
640    pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
641        self.right().db_provider_container.metrics_sender.clone()
642    }
643
644    /// Returns a reference to the blockchain provider.
645    pub const fn blockchain_db(&self) -> &T::Provider {
646        &self.right().blockchain_db
647    }
648
649    /// Creates a `NodeAdapter` and attaches it to the launch context.
650    pub async fn with_components<CB>(
651        self,
652        components_builder: CB,
653        on_component_initialized: Box<
654            dyn OnComponentInitializedHook<NodeAdapter<T, CB::Components>>,
655        >,
656    ) -> eyre::Result<
657        LaunchContextWith<
658            Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
659        >,
660    >
661    where
662        CB: NodeComponentsBuilder<T>,
663    {
664        // fetch the head block from the database
665        let head = self.lookup_head()?;
666
667        let builder_ctx = BuilderContext::new(
668            head,
669            self.blockchain_db().clone(),
670            self.task_executor().clone(),
671            self.configs().clone(),
672        );
673
674        debug!(target: "reth::cli", "creating components");
675        let components = components_builder.build_components(&builder_ctx).await?;
676
677        let blockchain_db = self.blockchain_db().clone();
678
679        let node_adapter = NodeAdapter {
680            components,
681            task_executor: self.task_executor().clone(),
682            provider: blockchain_db,
683        };
684
685        debug!(target: "reth::cli", "calling on_component_initialized hook");
686        on_component_initialized.on_event(node_adapter.clone())?;
687
688        let components_container = WithComponents {
689            db_provider_container: WithMeteredProvider {
690                provider_factory: self.provider_factory().clone(),
691                metrics_sender: self.sync_metrics_tx(),
692            },
693            node_adapter,
694            head,
695        };
696
697        let ctx = LaunchContextWith {
698            inner: self.inner,
699            attachment: self.attachment.map_right(|_| components_container),
700        };
701
702        Ok(ctx)
703    }
704}
705
706impl<T, CB>
707    LaunchContextWith<
708        Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
709    >
710where
711    T: FullNodeTypes<Types: ProviderNodeTypes>,
712    CB: NodeComponentsBuilder<T>,
713{
714    /// Returns the configured `ProviderFactory`.
715    pub const fn provider_factory(&self) -> &ProviderFactory<T::Types> {
716        &self.right().db_provider_container.provider_factory
717    }
718
719    /// Returns the max block that the node should run to, looking it up from the network if
720    /// necessary
721    pub async fn max_block<C>(&self, client: C) -> eyre::Result<Option<BlockNumber>>
722    where
723        C: HeadersClient<Header: BlockHeader>,
724    {
725        self.node_config().max_block(client, self.provider_factory().clone()).await
726    }
727
728    /// Returns the static file provider to interact with the static files.
729    pub fn static_file_provider(&self) -> StaticFileProvider<<T::Types as NodeTypes>::Primitives> {
730        self.provider_factory().static_file_provider()
731    }
732
733    /// Creates a new [`StaticFileProducer`] with the attached database.
734    pub fn static_file_producer(&self) -> StaticFileProducer<ProviderFactory<T::Types>> {
735        StaticFileProducer::new(self.provider_factory().clone(), self.prune_modes())
736    }
737
738    /// Returns the current head block.
739    pub const fn head(&self) -> Head {
740        self.right().head
741    }
742
743    /// Returns the configured `NodeAdapter`.
744    pub const fn node_adapter(&self) -> &NodeAdapter<T, CB::Components> {
745        &self.right().node_adapter
746    }
747
748    /// Returns mutable reference to the configured `NodeAdapter`.
749    pub fn node_adapter_mut(&mut self) -> &mut NodeAdapter<T, CB::Components> {
750        &mut self.right_mut().node_adapter
751    }
752
753    /// Returns a reference to the blockchain provider.
754    pub const fn blockchain_db(&self) -> &T::Provider {
755        &self.node_adapter().provider
756    }
757
758    /// Returns the initial backfill to sync to at launch.
759    ///
760    /// This returns the configured `debug.tip` if set, otherwise it will check if backfill was
761    /// previously interrupted and returns the block hash of the last checkpoint, see also
762    /// [`Self::check_pipeline_consistency`]
763    pub fn initial_backfill_target(&self) -> ProviderResult<Option<B256>> {
764        let mut initial_target = self.node_config().debug.tip;
765
766        if initial_target.is_none() {
767            initial_target = self.check_pipeline_consistency()?;
768        }
769
770        Ok(initial_target)
771    }
772
773    /// Returns true if the node should terminate after the initial backfill run.
774    ///
775    /// This is the case if any of these configs are set:
776    ///  `--debug.max-block`
777    ///  `--debug.terminate`
778    pub const fn terminate_after_initial_backfill(&self) -> bool {
779        self.node_config().debug.terminate || self.node_config().debug.max_block.is_some()
780    }
781
782    /// Ensures that the database matches chain-specific requirements.
783    ///
784    /// This checks for OP-Mainnet and ensures we have all the necessary data to progress (past
785    /// bedrock height)
786    fn ensure_chain_specific_db_checks(&self) -> ProviderResult<()> {
787        if self.chain_spec().is_optimism() &&
788            !self.is_dev() &&
789            self.chain_id() == Chain::optimism_mainnet()
790        {
791            let latest = self.blockchain_db().last_block_number()?;
792            // bedrock height
793            if latest < 105235063 {
794                error!("Op-mainnet has been launched without importing the pre-Bedrock state. The chain can't progress without this. See also https://reth.rs/run/sync-op-mainnet.html?minimal-bootstrap-recommended");
795                return Err(ProviderError::BestBlockNotFound)
796            }
797        }
798
799        Ok(())
800    }
801
802    /// Check if the pipeline is consistent (all stages have the checkpoint block numbers no less
803    /// than the checkpoint of the first stage).
804    ///
805    /// This will return the pipeline target if:
806    ///  * the pipeline was interrupted during its previous run
807    ///  * a new stage was added
808    ///  * stage data was dropped manually through `reth stage drop ...`
809    ///
810    /// # Returns
811    ///
812    /// A target block hash if the pipeline is inconsistent, otherwise `None`.
813    pub fn check_pipeline_consistency(&self) -> ProviderResult<Option<B256>> {
814        // If no target was provided, check if the stages are congruent - check if the
815        // checkpoint of the last stage matches the checkpoint of the first.
816        let first_stage_checkpoint = self
817            .blockchain_db()
818            .get_stage_checkpoint(*StageId::ALL.first().unwrap())?
819            .unwrap_or_default()
820            .block_number;
821
822        // Skip the first stage as we've already retrieved it and comparing all other checkpoints
823        // against it.
824        for stage_id in StageId::ALL.iter().skip(1) {
825            let stage_checkpoint = self
826                .blockchain_db()
827                .get_stage_checkpoint(*stage_id)?
828                .unwrap_or_default()
829                .block_number;
830
831            // If the checkpoint of any stage is less than the checkpoint of the first stage,
832            // retrieve and return the block hash of the latest header and use it as the target.
833            if stage_checkpoint < first_stage_checkpoint {
834                debug!(
835                    target: "consensus::engine",
836                    first_stage_checkpoint,
837                    inconsistent_stage_id = %stage_id,
838                    inconsistent_stage_checkpoint = stage_checkpoint,
839                    "Pipeline sync progress is inconsistent"
840                );
841                return self.blockchain_db().block_hash(first_stage_checkpoint);
842            }
843        }
844
845        self.ensure_chain_specific_db_checks()?;
846
847        Ok(None)
848    }
849
850    /// Returns the metrics sender.
851    pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
852        self.right().db_provider_container.metrics_sender.clone()
853    }
854
855    /// Returns the node adapter components.
856    pub const fn components(&self) -> &CB::Components {
857        &self.node_adapter().components
858    }
859}
860
861impl<T, CB>
862    LaunchContextWith<
863        Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
864    >
865where
866    T: FullNodeTypes<
867        Provider: StateProviderFactory + ChainSpecProvider,
868        Types: ProviderNodeTypes<Primitives: NodePrimitives<SignedTx = TransactionSigned>>,
869    >,
870    CB: NodeComponentsBuilder<T>,
871{
872    /// Returns the [`InvalidBlockHook`] to use for the node.
873    pub fn invalid_block_hook(
874        &self,
875    ) -> eyre::Result<Box<dyn InvalidBlockHook<<T::Types as NodeTypes>::Primitives>>> {
876        let Some(ref hook) = self.node_config().debug.invalid_block_hook else {
877            return Ok(Box::new(NoopInvalidBlockHook::default()))
878        };
879        let healthy_node_rpc_client = self.get_healthy_node_client()?;
880
881        let output_directory = self.data_dir().invalid_block_hooks();
882        let hooks = hook
883            .iter()
884            .copied()
885            .map(|hook| {
886                let output_directory = output_directory.join(hook.to_string());
887                fs::create_dir_all(&output_directory)?;
888
889                Ok(match hook {
890                    InvalidBlockHookType::Witness => Box::new(InvalidBlockWitnessHook::new(
891                        self.blockchain_db().clone(),
892                        self.components().evm_config().clone(),
893                        output_directory,
894                        healthy_node_rpc_client.clone(),
895                    )),
896                    InvalidBlockHookType::PreState | InvalidBlockHookType::Opcode => {
897                        eyre::bail!("invalid block hook {hook:?} is not implemented yet")
898                    }
899                } as Box<dyn InvalidBlockHook<_>>)
900            })
901            .collect::<Result<_, _>>()?;
902
903        Ok(Box::new(InvalidBlockHooks(hooks)))
904    }
905
906    /// Returns an RPC client for the healthy node, if configured in the node config.
907    fn get_healthy_node_client(&self) -> eyre::Result<Option<jsonrpsee::http_client::HttpClient>> {
908        self.node_config()
909            .debug
910            .healthy_node_rpc_url
911            .as_ref()
912            .map(|url| {
913                let client = jsonrpsee::http_client::HttpClientBuilder::default().build(url)?;
914
915                // Verify that the healthy node is running the same chain as the current node.
916                let chain_id = futures::executor::block_on(async {
917                    EthApiClient::<
918                        alloy_rpc_types::Transaction,
919                        alloy_rpc_types::Block,
920                        alloy_rpc_types::Receipt,
921                        alloy_rpc_types::Header,
922                    >::chain_id(&client)
923                    .await
924                })?
925                .ok_or_eyre("healthy node rpc client didn't return a chain id")?;
926                if chain_id.to::<u64>() != self.chain_id().id() {
927                    eyre::bail!("invalid chain id for healthy node: {chain_id}")
928                }
929
930                Ok(client)
931            })
932            .transpose()
933    }
934}
935
936/// Joins two attachments together.
937#[derive(Clone, Copy, Debug)]
938pub struct Attached<L, R> {
939    left: L,
940    right: R,
941}
942
943impl<L, R> Attached<L, R> {
944    /// Creates a new `Attached` with the given values.
945    pub const fn new(left: L, right: R) -> Self {
946        Self { left, right }
947    }
948
949    /// Maps the left value to a new value.
950    pub fn map_left<F, T>(self, f: F) -> Attached<T, R>
951    where
952        F: FnOnce(L) -> T,
953    {
954        Attached::new(f(self.left), self.right)
955    }
956
957    /// Maps the right value to a new value.
958    pub fn map_right<F, T>(self, f: F) -> Attached<L, T>
959    where
960        F: FnOnce(R) -> T,
961    {
962        Attached::new(self.left, f(self.right))
963    }
964
965    /// Get a reference to the left value.
966    pub const fn left(&self) -> &L {
967        &self.left
968    }
969
970    /// Get a reference to the right value.
971    pub const fn right(&self) -> &R {
972        &self.right
973    }
974
975    /// Get a mutable reference to the right value.
976    pub fn left_mut(&mut self) -> &mut R {
977        &mut self.right
978    }
979
980    /// Get a mutable reference to the right value.
981    pub fn right_mut(&mut self) -> &mut R {
982        &mut self.right
983    }
984}
985
986/// Helper container type to bundle the initial [`NodeConfig`] and the loaded settings from the
987/// reth.toml config
988#[derive(Debug, Clone)]
989pub struct WithConfigs<ChainSpec> {
990    /// The configured, usually derived from the CLI.
991    pub config: NodeConfig<ChainSpec>,
992    /// The loaded reth.toml config.
993    pub toml_config: reth_config::Config,
994}
995
996/// Helper container type to bundle the [`ProviderFactory`] and the metrics
997/// sender.
998#[derive(Debug, Clone)]
999pub struct WithMeteredProvider<N: NodeTypesWithDB> {
1000    provider_factory: ProviderFactory<N>,
1001    metrics_sender: UnboundedSender<MetricEvent>,
1002}
1003
1004/// Helper container to bundle the [`ProviderFactory`], [`FullNodeTypes::Provider`]
1005/// and a metrics sender.
1006#[allow(missing_debug_implementations)]
1007pub struct WithMeteredProviders<T>
1008where
1009    T: FullNodeTypes,
1010{
1011    db_provider_container: WithMeteredProvider<T::Types>,
1012    blockchain_db: T::Provider,
1013}
1014
1015/// Helper container to bundle the metered providers container and [`NodeAdapter`].
1016#[allow(missing_debug_implementations)]
1017pub struct WithComponents<T, CB>
1018where
1019    T: FullNodeTypes,
1020    CB: NodeComponentsBuilder<T>,
1021{
1022    db_provider_container: WithMeteredProvider<T::Types>,
1023    node_adapter: NodeAdapter<T, CB::Components>,
1024    head: Head,
1025}
1026
1027#[cfg(test)]
1028mod tests {
1029    use super::{LaunchContext, NodeConfig};
1030    use reth_config::Config;
1031    use reth_node_core::args::PruningArgs;
1032
1033    const EXTENSION: &str = "toml";
1034
1035    fn with_tempdir(filename: &str, proc: fn(&std::path::Path)) {
1036        let temp_dir = tempfile::tempdir().unwrap();
1037        let config_path = temp_dir.path().join(filename).with_extension(EXTENSION);
1038        proc(&config_path);
1039        temp_dir.close().unwrap()
1040    }
1041
1042    #[test]
1043    fn test_save_prune_config() {
1044        with_tempdir("prune-store-test", |config_path| {
1045            let mut reth_config = Config::default();
1046            let node_config = NodeConfig {
1047                pruning: PruningArgs {
1048                    full: true,
1049                    block_interval: None,
1050                    sender_recovery_full: false,
1051                    sender_recovery_distance: None,
1052                    sender_recovery_before: None,
1053                    transaction_lookup_full: false,
1054                    transaction_lookup_distance: None,
1055                    transaction_lookup_before: None,
1056                    receipts_full: false,
1057                    receipts_distance: None,
1058                    receipts_before: None,
1059                    account_history_full: false,
1060                    account_history_distance: None,
1061                    account_history_before: None,
1062                    storage_history_full: false,
1063                    storage_history_distance: None,
1064                    storage_history_before: None,
1065                    receipts_log_filter: vec![],
1066                },
1067                ..NodeConfig::test()
1068            };
1069            LaunchContext::save_pruning_config_if_full_node(
1070                &mut reth_config,
1071                &node_config,
1072                config_path,
1073            )
1074            .unwrap();
1075
1076            let loaded_config = Config::from_path(config_path).unwrap();
1077
1078            assert_eq!(reth_config, loaded_config);
1079        })
1080    }
1081}