1use crate::{
4 components::{NodeComponents, NodeComponentsBuilder},
5 hooks::OnComponentInitializedHook,
6 BuilderContext, NodeAdapter,
7};
8use alloy_eips::eip2124::Head;
9use alloy_primitives::{BlockNumber, B256};
10use eyre::{Context, OptionExt};
11use rayon::ThreadPoolBuilder;
12use reth_chainspec::{Chain, EthChainSpec, EthereumHardforks};
13use reth_config::{config::EtlConfig, PruneConfig};
14use reth_consensus::noop::NoopConsensus;
15use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
16use reth_db_common::init::{init_genesis, InitStorageError};
17use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
18use reth_engine_local::MiningMode;
19use reth_engine_tree::tree::{InvalidBlockHook, InvalidBlockHooks, NoopInvalidBlockHook};
20use reth_evm::{noop::NoopEvmConfig, ConfigureEvm};
21use reth_fs_util as fs;
22use reth_invalid_block_hooks::InvalidBlockWitnessHook;
23use reth_network_p2p::headers::client::HeadersClient;
24use reth_node_api::{FullNodeTypes, NodeTypes, NodeTypesWithDB, NodeTypesWithDBAdapter};
25use reth_node_core::{
26 args::InvalidBlockHookType,
27 dirs::{ChainPath, DataDirPath},
28 node_config::NodeConfig,
29 primitives::BlockHeader,
30 version::{
31 BUILD_PROFILE_NAME, CARGO_PKG_VERSION, VERGEN_BUILD_TIMESTAMP, VERGEN_CARGO_FEATURES,
32 VERGEN_CARGO_TARGET_TRIPLE, VERGEN_GIT_SHA,
33 },
34};
35use reth_node_metrics::{
36 chain::ChainSpecInfo,
37 hooks::Hooks,
38 recorder::install_prometheus_recorder,
39 server::{MetricServer, MetricServerConfig},
40 version::VersionInfo,
41};
42use reth_provider::{
43 providers::{NodeTypesForProvider, ProviderNodeTypes, StaticFileProvider},
44 BlockHashReader, BlockNumReader, ChainSpecProvider, ProviderError, ProviderFactory,
45 ProviderResult, StageCheckpointReader, StateProviderFactory, StaticFileProviderFactory,
46};
47use reth_prune::{PruneModes, PrunerBuilder};
48use reth_rpc_api::clients::EthApiClient;
49use reth_rpc_builder::config::RethRpcServerConfig;
50use reth_rpc_layer::JwtSecret;
51use reth_stages::{sets::DefaultStages, MetricEvent, PipelineBuilder, PipelineTarget, StageId};
52use reth_static_file::StaticFileProducer;
53use reth_tasks::TaskExecutor;
54use reth_tracing::tracing::{debug, error, info, warn};
55use reth_transaction_pool::TransactionPool;
56use std::{sync::Arc, thread::available_parallelism};
57use tokio::sync::{
58 mpsc::{unbounded_channel, UnboundedSender},
59 oneshot, watch,
60};
61
62#[derive(Debug, Clone)]
66pub struct LaunchContext {
67 pub task_executor: TaskExecutor,
69 pub data_dir: ChainPath<DataDirPath>,
71}
72
73impl LaunchContext {
74 pub const fn new(task_executor: TaskExecutor, data_dir: ChainPath<DataDirPath>) -> Self {
76 Self { task_executor, data_dir }
77 }
78
79 pub const fn with<T>(self, attachment: T) -> LaunchContextWith<T> {
81 LaunchContextWith { inner: self, attachment }
82 }
83
84 pub fn with_loaded_toml_config<ChainSpec: EthChainSpec>(
89 self,
90 config: NodeConfig<ChainSpec>,
91 ) -> eyre::Result<LaunchContextWith<WithConfigs<ChainSpec>>> {
92 let toml_config = self.load_toml_config(&config)?;
93 Ok(self.with(WithConfigs { config, toml_config }))
94 }
95
96 pub fn load_toml_config<ChainSpec: EthChainSpec>(
101 &self,
102 config: &NodeConfig<ChainSpec>,
103 ) -> eyre::Result<reth_config::Config> {
104 let config_path = config.config.clone().unwrap_or_else(|| self.data_dir.config());
105
106 let mut toml_config = reth_config::Config::from_path(&config_path)
107 .wrap_err_with(|| format!("Could not load config file {config_path:?}"))?;
108
109 Self::save_pruning_config_if_full_node(&mut toml_config, config, &config_path)?;
110
111 info!(target: "reth::cli", path = ?config_path, "Configuration loaded");
112
113 toml_config.peers.trusted_nodes_only = config.network.trusted_only;
115
116 Ok(toml_config)
117 }
118
119 fn save_pruning_config_if_full_node<ChainSpec: EthChainSpec>(
121 reth_config: &mut reth_config::Config,
122 config: &NodeConfig<ChainSpec>,
123 config_path: impl AsRef<std::path::Path>,
124 ) -> eyre::Result<()> {
125 if reth_config.prune.is_none() {
126 if let Some(prune_config) = config.prune_config() {
127 reth_config.update_prune_config(prune_config);
128 info!(target: "reth::cli", "Saving prune config to toml file");
129 reth_config.save(config_path.as_ref())?;
130 }
131 } else if config.prune_config().is_none() {
132 warn!(target: "reth::cli", "Prune configs present in config file but --full not provided. Running as a Full node");
133 }
134 Ok(())
135 }
136
137 pub fn with_configured_globals(self, reserved_cpu_cores: usize) -> Self {
139 self.configure_globals(reserved_cpu_cores);
140 self
141 }
142
143 pub fn configure_globals(&self, reserved_cpu_cores: usize) {
150 match fdlimit::raise_fd_limit() {
153 Ok(fdlimit::Outcome::LimitRaised { from, to }) => {
154 debug!(from, to, "Raised file descriptor limit");
155 }
156 Ok(fdlimit::Outcome::Unsupported) => {}
157 Err(err) => warn!(%err, "Failed to raise file descriptor limit"),
158 }
159
160 let num_threads = available_parallelism()
164 .map_or(0, |num| num.get().saturating_sub(reserved_cpu_cores).max(1));
165 if let Err(err) = ThreadPoolBuilder::new()
166 .num_threads(num_threads)
167 .thread_name(|i| format!("reth-rayon-{i}"))
168 .build_global()
169 {
170 error!(%err, "Failed to build global thread pool")
171 }
172 }
173}
174
175#[derive(Debug, Clone)]
181pub struct LaunchContextWith<T> {
182 pub inner: LaunchContext,
184 pub attachment: T,
186}
187
188impl<T> LaunchContextWith<T> {
189 pub fn configure_globals(&self, reserved_cpu_cores: u64) {
194 self.inner.configure_globals(reserved_cpu_cores.try_into().unwrap());
195 }
196
197 pub const fn data_dir(&self) -> &ChainPath<DataDirPath> {
199 &self.inner.data_dir
200 }
201
202 pub const fn task_executor(&self) -> &TaskExecutor {
204 &self.inner.task_executor
205 }
206
207 pub fn attach<A>(self, attachment: A) -> LaunchContextWith<Attached<T, A>> {
209 LaunchContextWith {
210 inner: self.inner,
211 attachment: Attached::new(self.attachment, attachment),
212 }
213 }
214
215 pub fn inspect<F>(self, f: F) -> Self
218 where
219 F: FnOnce(&Self),
220 {
221 f(&self);
222 self
223 }
224}
225
226impl<ChainSpec> LaunchContextWith<WithConfigs<ChainSpec>> {
227 pub fn with_resolved_peers(mut self) -> eyre::Result<Self> {
229 if !self.attachment.config.network.trusted_peers.is_empty() {
230 info!(target: "reth::cli", "Adding trusted nodes");
231
232 self.attachment
233 .toml_config
234 .peers
235 .trusted_nodes
236 .extend(self.attachment.config.network.trusted_peers.clone());
237 }
238 Ok(self)
239 }
240}
241
242impl<L, R> LaunchContextWith<Attached<L, R>> {
243 pub const fn left(&self) -> &L {
245 &self.attachment.left
246 }
247
248 pub const fn right(&self) -> &R {
250 &self.attachment.right
251 }
252
253 pub const fn left_mut(&mut self) -> &mut L {
255 &mut self.attachment.left
256 }
257
258 pub const fn right_mut(&mut self) -> &mut R {
260 &mut self.attachment.right
261 }
262}
263impl<R, ChainSpec: EthChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, R>> {
264 pub fn with_adjusted_configs(self) -> Self {
270 self.ensure_etl_datadir().with_adjusted_instance_ports()
271 }
272
273 pub fn ensure_etl_datadir(mut self) -> Self {
275 if self.toml_config_mut().stages.etl.dir.is_none() {
276 self.toml_config_mut().stages.etl.dir =
277 Some(EtlConfig::from_datadir(self.data_dir().data_dir()))
278 }
279
280 self
281 }
282
283 pub fn with_adjusted_instance_ports(mut self) -> Self {
285 self.node_config_mut().adjust_instance_ports();
286 self
287 }
288
289 pub const fn configs(&self) -> &WithConfigs<ChainSpec> {
291 self.attachment.left()
292 }
293
294 pub const fn node_config(&self) -> &NodeConfig<ChainSpec> {
296 &self.left().config
297 }
298
299 pub const fn node_config_mut(&mut self) -> &mut NodeConfig<ChainSpec> {
301 &mut self.left_mut().config
302 }
303
304 pub const fn toml_config(&self) -> &reth_config::Config {
306 &self.left().toml_config
307 }
308
309 pub const fn toml_config_mut(&mut self) -> &mut reth_config::Config {
311 &mut self.left_mut().toml_config
312 }
313
314 pub fn chain_spec(&self) -> Arc<ChainSpec> {
316 self.node_config().chain.clone()
317 }
318
319 pub fn genesis_hash(&self) -> B256 {
321 self.node_config().chain.genesis_hash()
322 }
323
324 pub fn chain_id(&self) -> Chain {
326 self.node_config().chain.chain()
327 }
328
329 pub const fn is_dev(&self) -> bool {
331 self.node_config().dev.dev
332 }
333
334 pub fn prune_config(&self) -> Option<PruneConfig> {
338 let Some(mut node_prune_config) = self.node_config().prune_config() else {
339 return self.toml_config().prune.clone();
341 };
342
343 node_prune_config.merge(self.toml_config().prune.clone());
345 Some(node_prune_config)
346 }
347
348 pub fn prune_modes(&self) -> PruneModes {
350 self.prune_config().map(|config| config.segments).unwrap_or_default()
351 }
352
353 pub fn pruner_builder(&self) -> PrunerBuilder {
355 PrunerBuilder::new(self.prune_config().unwrap_or_default())
356 .delete_limit(self.chain_spec().prune_delete_limit())
357 .timeout(PrunerBuilder::DEFAULT_TIMEOUT)
358 }
359
360 pub fn auth_jwt_secret(&self) -> eyre::Result<JwtSecret> {
362 let default_jwt_path = self.data_dir().jwt();
363 let secret = self.node_config().rpc.auth_jwt_secret(default_jwt_path)?;
364 Ok(secret)
365 }
366
367 pub fn dev_mining_mode(&self, pool: impl TransactionPool) -> MiningMode {
369 if let Some(interval) = self.node_config().dev.block_time {
370 MiningMode::interval(interval)
371 } else {
372 MiningMode::instant(pool)
373 }
374 }
375}
376
377impl<DB, ChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, DB>>
378where
379 DB: Database + Clone + 'static,
380 ChainSpec: EthChainSpec + EthereumHardforks + 'static,
381{
382 pub async fn create_provider_factory<N, Evm>(&self) -> eyre::Result<ProviderFactory<N>>
386 where
387 N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
388 Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
389 {
390 let factory = ProviderFactory::new(
391 self.right().clone(),
392 self.chain_spec(),
393 StaticFileProvider::read_write(self.data_dir().static_files())?,
394 )
395 .with_prune_modes(self.prune_modes())
396 .with_static_files_metrics();
397
398 let has_receipt_pruning =
399 self.toml_config().prune.as_ref().is_some_and(|a| a.has_receipts_pruning());
400
401 if let Some(unwind_target) = factory
404 .static_file_provider()
405 .check_consistency(&factory.provider()?, has_receipt_pruning)?
406 {
407 assert_ne!(
410 unwind_target,
411 PipelineTarget::Unwind(0),
412 "A static file <> database inconsistency was found that would trigger an unwind to block 0"
413 );
414
415 info!(target: "reth::cli", unwind_target = %unwind_target, "Executing an unwind after a failed storage consistency check.");
416
417 let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);
418
419 let pipeline = PipelineBuilder::default()
421 .add_stages(DefaultStages::new(
422 factory.clone(),
423 tip_rx,
424 Arc::new(NoopConsensus::default()),
425 NoopHeaderDownloader::default(),
426 NoopBodiesDownloader::default(),
427 NoopEvmConfig::<Evm>::default(),
428 self.toml_config().stages.clone(),
429 self.prune_modes(),
430 ))
431 .build(
432 factory.clone(),
433 StaticFileProducer::new(factory.clone(), self.prune_modes()),
434 );
435
436 let (tx, rx) = oneshot::channel();
438
439 self.task_executor().spawn_critical_blocking(
441 "pipeline task",
442 Box::pin(async move {
443 let (_, result) = pipeline.run_as_fut(Some(unwind_target)).await;
444 let _ = tx.send(result);
445 }),
446 );
447 rx.await?.inspect_err(|err| {
448 error!(target: "reth::cli", unwind_target = %unwind_target, %err, "failed to run unwind")
449 })?;
450 }
451
452 Ok(factory)
453 }
454
455 pub async fn with_provider_factory<N, Evm>(
457 self,
458 ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>>>
459 where
460 N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
461 Evm: ConfigureEvm<Primitives = N::Primitives> + 'static,
462 {
463 let factory = self.create_provider_factory::<N, Evm>().await?;
464 let ctx = LaunchContextWith {
465 inner: self.inner,
466 attachment: self.attachment.map_right(|_| factory),
467 };
468
469 Ok(ctx)
470 }
471}
472
473impl<T> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, ProviderFactory<T>>>
474where
475 T: ProviderNodeTypes,
476{
477 pub const fn database(&self) -> &T::DB {
479 self.right().db_ref()
480 }
481
482 pub const fn provider_factory(&self) -> &ProviderFactory<T> {
484 self.right()
485 }
486
487 pub fn static_file_provider(&self) -> StaticFileProvider<T::Primitives> {
489 self.right().static_file_provider()
490 }
491
492 pub async fn with_prometheus_server(self) -> eyre::Result<Self> {
496 self.start_prometheus_endpoint().await?;
497 Ok(self)
498 }
499
500 pub async fn start_prometheus_endpoint(&self) -> eyre::Result<()> {
502 install_prometheus_recorder().spawn_upkeep();
504
505 let listen_addr = self.node_config().metrics;
506 if let Some(addr) = listen_addr {
507 info!(target: "reth::cli", "Starting metrics endpoint at {}", addr);
508 let config = MetricServerConfig::new(
509 addr,
510 VersionInfo {
511 version: CARGO_PKG_VERSION,
512 build_timestamp: VERGEN_BUILD_TIMESTAMP,
513 cargo_features: VERGEN_CARGO_FEATURES,
514 git_sha: VERGEN_GIT_SHA,
515 target_triple: VERGEN_CARGO_TARGET_TRIPLE,
516 build_profile: BUILD_PROFILE_NAME,
517 },
518 ChainSpecInfo { name: self.left().config.chain.chain().to_string() },
519 self.task_executor().clone(),
520 Hooks::builder()
521 .with_hook({
522 let db = self.database().clone();
523 move || db.report_metrics()
524 })
525 .with_hook({
526 let sfp = self.static_file_provider();
527 move || {
528 if let Err(error) = sfp.report_metrics() {
529 error!(%error, "Failed to report metrics for the static file provider");
530 }
531 }
532 })
533 .build(),
534 );
535
536 MetricServer::new(config).serve().await?;
537 }
538
539 Ok(())
540 }
541
542 pub fn with_genesis(self) -> Result<Self, InitStorageError> {
544 init_genesis(self.provider_factory())?;
545 Ok(self)
546 }
547
548 pub fn init_genesis(&self) -> Result<B256, InitStorageError> {
550 init_genesis(self.provider_factory())
551 }
552
553 pub fn with_metrics_task(
559 self,
560 ) -> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, WithMeteredProvider<T>>> {
561 let (metrics_sender, metrics_receiver) = unbounded_channel();
562
563 let with_metrics =
564 WithMeteredProvider { provider_factory: self.right().clone(), metrics_sender };
565
566 debug!(target: "reth::cli", "Spawning stages metrics listener task");
567 let sync_metrics_listener = reth_stages::MetricsListener::new(metrics_receiver);
568 self.task_executor().spawn_critical("stages metrics listener task", sync_metrics_listener);
569
570 LaunchContextWith {
571 inner: self.inner,
572 attachment: self.attachment.map_right(|_| with_metrics),
573 }
574 }
575}
576
577impl<N, DB>
578 LaunchContextWith<
579 Attached<WithConfigs<N::ChainSpec>, WithMeteredProvider<NodeTypesWithDBAdapter<N, DB>>>,
580 >
581where
582 N: NodeTypes,
583 DB: Database + DatabaseMetrics + Clone + Unpin + 'static,
584{
585 const fn provider_factory(&self) -> &ProviderFactory<NodeTypesWithDBAdapter<N, DB>> {
587 &self.right().provider_factory
588 }
589
590 fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
592 self.right().metrics_sender.clone()
593 }
594
595 #[expect(clippy::complexity)]
597 pub fn with_blockchain_db<T, F>(
598 self,
599 create_blockchain_provider: F,
600 ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<N::ChainSpec>, WithMeteredProviders<T>>>>
601 where
602 T: FullNodeTypes<Types = N, DB = DB>,
603 F: FnOnce(ProviderFactory<NodeTypesWithDBAdapter<N, DB>>) -> eyre::Result<T::Provider>,
604 {
605 let blockchain_db = create_blockchain_provider(self.provider_factory().clone())?;
606
607 let metered_providers = WithMeteredProviders {
608 db_provider_container: WithMeteredProvider {
609 provider_factory: self.provider_factory().clone(),
610 metrics_sender: self.sync_metrics_tx(),
611 },
612 blockchain_db,
613 };
614
615 let ctx = LaunchContextWith {
616 inner: self.inner,
617 attachment: self.attachment.map_right(|_| metered_providers),
618 };
619
620 Ok(ctx)
621 }
622}
623
624impl<T>
625 LaunchContextWith<
626 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithMeteredProviders<T>>,
627 >
628where
629 T: FullNodeTypes<Types: NodeTypesForProvider>,
630{
631 pub const fn database(&self) -> &T::DB {
633 self.provider_factory().db_ref()
634 }
635
636 pub const fn provider_factory(
638 &self,
639 ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
640 &self.right().db_provider_container.provider_factory
641 }
642
643 pub fn lookup_head(&self) -> eyre::Result<Head> {
647 self.node_config()
648 .lookup_head(self.provider_factory())
649 .wrap_err("the head block is missing")
650 }
651
652 pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
654 self.right().db_provider_container.metrics_sender.clone()
655 }
656
657 pub const fn blockchain_db(&self) -> &T::Provider {
659 &self.right().blockchain_db
660 }
661
662 pub async fn with_components<CB>(
664 self,
665 components_builder: CB,
666 on_component_initialized: Box<
667 dyn OnComponentInitializedHook<NodeAdapter<T, CB::Components>>,
668 >,
669 ) -> eyre::Result<
670 LaunchContextWith<
671 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
672 >,
673 >
674 where
675 CB: NodeComponentsBuilder<T>,
676 {
677 let head = self.lookup_head()?;
679
680 let builder_ctx = BuilderContext::new(
681 head,
682 self.blockchain_db().clone(),
683 self.task_executor().clone(),
684 self.configs().clone(),
685 );
686
687 debug!(target: "reth::cli", "creating components");
688 let components = components_builder.build_components(&builder_ctx).await?;
689
690 let blockchain_db = self.blockchain_db().clone();
691
692 let node_adapter = NodeAdapter {
693 components,
694 task_executor: self.task_executor().clone(),
695 provider: blockchain_db,
696 };
697
698 debug!(target: "reth::cli", "calling on_component_initialized hook");
699 on_component_initialized.on_event(node_adapter.clone())?;
700
701 let components_container = WithComponents {
702 db_provider_container: WithMeteredProvider {
703 provider_factory: self.provider_factory().clone(),
704 metrics_sender: self.sync_metrics_tx(),
705 },
706 node_adapter,
707 head,
708 };
709
710 let ctx = LaunchContextWith {
711 inner: self.inner,
712 attachment: self.attachment.map_right(|_| components_container),
713 };
714
715 Ok(ctx)
716 }
717}
718
719impl<T, CB>
720 LaunchContextWith<
721 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
722 >
723where
724 T: FullNodeTypes<Types: NodeTypesForProvider>,
725 CB: NodeComponentsBuilder<T>,
726{
727 pub const fn provider_factory(
729 &self,
730 ) -> &ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>> {
731 &self.right().db_provider_container.provider_factory
732 }
733
734 pub async fn max_block<C>(&self, client: C) -> eyre::Result<Option<BlockNumber>>
737 where
738 C: HeadersClient<Header: BlockHeader>,
739 {
740 self.node_config().max_block(client, self.provider_factory().clone()).await
741 }
742
743 pub fn static_file_provider(&self) -> StaticFileProvider<<T::Types as NodeTypes>::Primitives> {
745 self.provider_factory().static_file_provider()
746 }
747
748 pub fn static_file_producer(
750 &self,
751 ) -> StaticFileProducer<ProviderFactory<NodeTypesWithDBAdapter<T::Types, T::DB>>> {
752 StaticFileProducer::new(self.provider_factory().clone(), self.prune_modes())
753 }
754
755 pub const fn head(&self) -> Head {
757 self.right().head
758 }
759
760 pub const fn node_adapter(&self) -> &NodeAdapter<T, CB::Components> {
762 &self.right().node_adapter
763 }
764
765 pub const fn node_adapter_mut(&mut self) -> &mut NodeAdapter<T, CB::Components> {
767 &mut self.right_mut().node_adapter
768 }
769
770 pub const fn blockchain_db(&self) -> &T::Provider {
772 &self.node_adapter().provider
773 }
774
775 pub fn initial_backfill_target(&self) -> ProviderResult<Option<B256>> {
781 let mut initial_target = self.node_config().debug.tip;
782
783 if initial_target.is_none() {
784 initial_target = self.check_pipeline_consistency()?;
785 }
786
787 Ok(initial_target)
788 }
789
790 pub const fn terminate_after_initial_backfill(&self) -> bool {
796 self.node_config().debug.terminate || self.node_config().debug.max_block.is_some()
797 }
798
799 fn ensure_chain_specific_db_checks(&self) -> ProviderResult<()> {
804 if self.chain_spec().is_optimism() &&
805 !self.is_dev() &&
806 self.chain_id() == Chain::optimism_mainnet()
807 {
808 let latest = self.blockchain_db().last_block_number()?;
809 if latest < 105235063 {
811 error!(
812 "Op-mainnet has been launched without importing the pre-Bedrock state. The chain can't progress without this. See also https://reth.rs/run/sync-op-mainnet.html?minimal-bootstrap-recommended"
813 );
814 return Err(ProviderError::BestBlockNotFound)
815 }
816 }
817
818 Ok(())
819 }
820
821 pub fn check_pipeline_consistency(&self) -> ProviderResult<Option<B256>> {
833 let first_stage_checkpoint = self
836 .blockchain_db()
837 .get_stage_checkpoint(*StageId::ALL.first().unwrap())?
838 .unwrap_or_default()
839 .block_number;
840
841 for stage_id in StageId::ALL.iter().skip(1) {
844 let stage_checkpoint = self
845 .blockchain_db()
846 .get_stage_checkpoint(*stage_id)?
847 .unwrap_or_default()
848 .block_number;
849
850 if stage_checkpoint < first_stage_checkpoint {
853 debug!(
854 target: "consensus::engine",
855 first_stage_checkpoint,
856 inconsistent_stage_id = %stage_id,
857 inconsistent_stage_checkpoint = stage_checkpoint,
858 "Pipeline sync progress is inconsistent"
859 );
860 return self.blockchain_db().block_hash(first_stage_checkpoint);
861 }
862 }
863
864 self.ensure_chain_specific_db_checks()?;
865
866 Ok(None)
867 }
868
869 pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
871 self.right().db_provider_container.metrics_sender.clone()
872 }
873
874 pub const fn components(&self) -> &CB::Components {
876 &self.node_adapter().components
877 }
878}
879
880impl<T, CB>
881 LaunchContextWith<
882 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
883 >
884where
885 T: FullNodeTypes<
886 Provider: StateProviderFactory + ChainSpecProvider,
887 Types: NodeTypesForProvider,
888 >,
889 CB: NodeComponentsBuilder<T>,
890{
891 pub fn invalid_block_hook(
893 &self,
894 ) -> eyre::Result<Box<dyn InvalidBlockHook<<T::Types as NodeTypes>::Primitives>>> {
895 let Some(ref hook) = self.node_config().debug.invalid_block_hook else {
896 return Ok(Box::new(NoopInvalidBlockHook::default()))
897 };
898 let healthy_node_rpc_client = self.get_healthy_node_client()?;
899
900 let output_directory = self.data_dir().invalid_block_hooks();
901 let hooks = hook
902 .iter()
903 .copied()
904 .map(|hook| {
905 let output_directory = output_directory.join(hook.to_string());
906 fs::create_dir_all(&output_directory)?;
907
908 Ok(match hook {
909 InvalidBlockHookType::Witness => Box::new(InvalidBlockWitnessHook::new(
910 self.blockchain_db().clone(),
911 self.components().evm_config().clone(),
912 output_directory,
913 healthy_node_rpc_client.clone(),
914 )),
915 InvalidBlockHookType::PreState | InvalidBlockHookType::Opcode => {
916 eyre::bail!("invalid block hook {hook:?} is not implemented yet")
917 }
918 } as Box<dyn InvalidBlockHook<_>>)
919 })
920 .collect::<Result<_, _>>()?;
921
922 Ok(Box::new(InvalidBlockHooks(hooks)))
923 }
924
925 fn get_healthy_node_client(&self) -> eyre::Result<Option<jsonrpsee::http_client::HttpClient>> {
927 self.node_config()
928 .debug
929 .healthy_node_rpc_url
930 .as_ref()
931 .map(|url| {
932 let client = jsonrpsee::http_client::HttpClientBuilder::default().build(url)?;
933
934 let chain_id = futures::executor::block_on(async {
936 EthApiClient::<
937 alloy_rpc_types::Transaction,
938 alloy_rpc_types::Block,
939 alloy_rpc_types::Receipt,
940 alloy_rpc_types::Header,
941 >::chain_id(&client)
942 .await
943 })?
944 .ok_or_eyre("healthy node rpc client didn't return a chain id")?;
945 if chain_id.to::<u64>() != self.chain_id().id() {
946 eyre::bail!("invalid chain id for healthy node: {chain_id}")
947 }
948
949 Ok(client)
950 })
951 .transpose()
952 }
953}
954
955#[derive(Clone, Copy, Debug)]
957pub struct Attached<L, R> {
958 left: L,
959 right: R,
960}
961
962impl<L, R> Attached<L, R> {
963 pub const fn new(left: L, right: R) -> Self {
965 Self { left, right }
966 }
967
968 pub fn map_left<F, T>(self, f: F) -> Attached<T, R>
970 where
971 F: FnOnce(L) -> T,
972 {
973 Attached::new(f(self.left), self.right)
974 }
975
976 pub fn map_right<F, T>(self, f: F) -> Attached<L, T>
978 where
979 F: FnOnce(R) -> T,
980 {
981 Attached::new(self.left, f(self.right))
982 }
983
984 pub const fn left(&self) -> &L {
986 &self.left
987 }
988
989 pub const fn right(&self) -> &R {
991 &self.right
992 }
993
994 pub const fn left_mut(&mut self) -> &mut R {
996 &mut self.right
997 }
998
999 pub const fn right_mut(&mut self) -> &mut R {
1001 &mut self.right
1002 }
1003}
1004
1005#[derive(Debug)]
1008pub struct WithConfigs<ChainSpec> {
1009 pub config: NodeConfig<ChainSpec>,
1011 pub toml_config: reth_config::Config,
1013}
1014
1015impl<ChainSpec> Clone for WithConfigs<ChainSpec> {
1016 fn clone(&self) -> Self {
1017 Self { config: self.config.clone(), toml_config: self.toml_config.clone() }
1018 }
1019}
1020
1021#[derive(Debug, Clone)]
1024pub struct WithMeteredProvider<N: NodeTypesWithDB> {
1025 provider_factory: ProviderFactory<N>,
1026 metrics_sender: UnboundedSender<MetricEvent>,
1027}
1028
1029#[expect(missing_debug_implementations)]
1032pub struct WithMeteredProviders<T>
1033where
1034 T: FullNodeTypes,
1035{
1036 db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1037 blockchain_db: T::Provider,
1038}
1039
1040#[expect(missing_debug_implementations)]
1042pub struct WithComponents<T, CB>
1043where
1044 T: FullNodeTypes,
1045 CB: NodeComponentsBuilder<T>,
1046{
1047 db_provider_container: WithMeteredProvider<NodeTypesWithDBAdapter<T::Types, T::DB>>,
1048 node_adapter: NodeAdapter<T, CB::Components>,
1049 head: Head,
1050}
1051
1052#[cfg(test)]
1053mod tests {
1054 use super::{LaunchContext, NodeConfig};
1055 use reth_config::Config;
1056 use reth_node_core::args::PruningArgs;
1057
1058 const EXTENSION: &str = "toml";
1059
1060 fn with_tempdir(filename: &str, proc: fn(&std::path::Path)) {
1061 let temp_dir = tempfile::tempdir().unwrap();
1062 let config_path = temp_dir.path().join(filename).with_extension(EXTENSION);
1063 proc(&config_path);
1064 temp_dir.close().unwrap()
1065 }
1066
1067 #[test]
1068 fn test_save_prune_config() {
1069 with_tempdir("prune-store-test", |config_path| {
1070 let mut reth_config = Config::default();
1071 let node_config = NodeConfig {
1072 pruning: PruningArgs {
1073 full: true,
1074 block_interval: None,
1075 sender_recovery_full: false,
1076 sender_recovery_distance: None,
1077 sender_recovery_before: None,
1078 transaction_lookup_full: false,
1079 transaction_lookup_distance: None,
1080 transaction_lookup_before: None,
1081 receipts_full: false,
1082 receipts_distance: None,
1083 receipts_before: None,
1084 account_history_full: false,
1085 account_history_distance: None,
1086 account_history_before: None,
1087 storage_history_full: false,
1088 storage_history_distance: None,
1089 storage_history_before: None,
1090 receipts_log_filter: None,
1091 },
1092 ..NodeConfig::test()
1093 };
1094 LaunchContext::save_pruning_config_if_full_node(
1095 &mut reth_config,
1096 &node_config,
1097 config_path,
1098 )
1099 .unwrap();
1100
1101 let loaded_config = Config::from_path(config_path).unwrap();
1102
1103 assert_eq!(reth_config, loaded_config);
1104 })
1105 }
1106}