1use std::{sync::Arc, thread::available_parallelism};
4
5use crate::{
6 components::{NodeComponents, NodeComponentsBuilder},
7 hooks::OnComponentInitializedHook,
8 BuilderContext, NodeAdapter,
9};
10use alloy_primitives::{BlockNumber, B256};
11use eyre::{Context, OptionExt};
12use rayon::ThreadPoolBuilder;
13use reth_chainspec::{Chain, EthChainSpec, EthereumHardforks};
14use reth_config::{config::EtlConfig, PruneConfig};
15use reth_consensus::noop::NoopConsensus;
16use reth_db_api::{database::Database, database_metrics::DatabaseMetrics};
17use reth_db_common::init::{init_genesis, InitStorageError};
18use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
19use reth_engine_local::MiningMode;
20use reth_engine_tree::tree::{InvalidBlockHook, InvalidBlockHooks, NoopInvalidBlockHook};
21use reth_evm::noop::NoopBlockExecutorProvider;
22use reth_fs_util as fs;
23use reth_invalid_block_hooks::InvalidBlockWitnessHook;
24use reth_network_p2p::headers::client::HeadersClient;
25use reth_node_api::{
26 FullNodePrimitives, FullNodeTypes, NodePrimitives, NodeTypes, NodeTypesWithDB,
27};
28use reth_node_core::{
29 args::InvalidBlockHookType,
30 dirs::{ChainPath, DataDirPath},
31 node_config::NodeConfig,
32 primitives::BlockHeader,
33 version::{
34 BUILD_PROFILE_NAME, CARGO_PKG_VERSION, VERGEN_BUILD_TIMESTAMP, VERGEN_CARGO_FEATURES,
35 VERGEN_CARGO_TARGET_TRIPLE, VERGEN_GIT_SHA,
36 },
37};
38use reth_node_metrics::{
39 chain::ChainSpecInfo,
40 hooks::Hooks,
41 recorder::install_prometheus_recorder,
42 server::{MetricServer, MetricServerConfig},
43 version::VersionInfo,
44};
45use reth_primitives::{Head, TransactionSigned};
46use reth_provider::{
47 providers::{ProviderNodeTypes, StaticFileProvider},
48 BlockHashReader, BlockNumReader, ChainSpecProvider, ProviderError, ProviderFactory,
49 ProviderResult, StageCheckpointReader, StateProviderFactory, StaticFileProviderFactory,
50};
51use reth_prune::{PruneModes, PrunerBuilder};
52use reth_rpc_api::clients::EthApiClient;
53use reth_rpc_builder::config::RethRpcServerConfig;
54use reth_rpc_layer::JwtSecret;
55use reth_stages::{sets::DefaultStages, MetricEvent, PipelineBuilder, PipelineTarget, StageId};
56use reth_static_file::StaticFileProducer;
57use reth_tasks::TaskExecutor;
58use reth_tracing::tracing::{debug, error, info, warn};
59use reth_transaction_pool::TransactionPool;
60use tokio::sync::{
61 mpsc::{unbounded_channel, UnboundedSender},
62 oneshot, watch,
63};
64
65#[derive(Debug, Clone)]
69pub struct LaunchContext {
70 pub task_executor: TaskExecutor,
72 pub data_dir: ChainPath<DataDirPath>,
74}
75
76impl LaunchContext {
77 pub const fn new(task_executor: TaskExecutor, data_dir: ChainPath<DataDirPath>) -> Self {
79 Self { task_executor, data_dir }
80 }
81
82 pub const fn with<DB>(self, database: DB) -> LaunchContextWith<DB> {
84 LaunchContextWith { inner: self, attachment: database }
85 }
86
87 pub fn with_loaded_toml_config<ChainSpec: EthChainSpec>(
92 self,
93 config: NodeConfig<ChainSpec>,
94 ) -> eyre::Result<LaunchContextWith<WithConfigs<ChainSpec>>> {
95 let toml_config = self.load_toml_config(&config)?;
96 Ok(self.with(WithConfigs { config, toml_config }))
97 }
98
99 pub fn load_toml_config<ChainSpec: EthChainSpec>(
104 &self,
105 config: &NodeConfig<ChainSpec>,
106 ) -> eyre::Result<reth_config::Config> {
107 let config_path = config.config.clone().unwrap_or_else(|| self.data_dir.config());
108
109 let mut toml_config = reth_config::Config::from_path(&config_path)
110 .wrap_err_with(|| format!("Could not load config file {config_path:?}"))?;
111
112 Self::save_pruning_config_if_full_node(&mut toml_config, config, &config_path)?;
113
114 info!(target: "reth::cli", path = ?config_path, "Configuration loaded");
115
116 toml_config.peers.trusted_nodes_only = config.network.trusted_only;
118
119 Ok(toml_config)
120 }
121
122 fn save_pruning_config_if_full_node<ChainSpec: EthChainSpec>(
124 reth_config: &mut reth_config::Config,
125 config: &NodeConfig<ChainSpec>,
126 config_path: impl AsRef<std::path::Path>,
127 ) -> eyre::Result<()> {
128 if reth_config.prune.is_none() {
129 if let Some(prune_config) = config.prune_config() {
130 reth_config.update_prune_config(prune_config);
131 info!(target: "reth::cli", "Saving prune config to toml file");
132 reth_config.save(config_path.as_ref())?;
133 }
134 } else if config.prune_config().is_none() {
135 warn!(target: "reth::cli", "Prune configs present in config file but --full not provided. Running as a Full node");
136 }
137 Ok(())
138 }
139
140 pub fn with_configured_globals(self) -> Self {
142 self.configure_globals();
143 self
144 }
145
146 pub fn configure_globals(&self) {
151 match fdlimit::raise_fd_limit() {
154 Ok(fdlimit::Outcome::LimitRaised { from, to }) => {
155 debug!(from, to, "Raised file descriptor limit");
156 }
157 Ok(fdlimit::Outcome::Unsupported) => {}
158 Err(err) => warn!(%err, "Failed to raise file descriptor limit"),
159 }
160
161 let num_threads =
164 available_parallelism().map_or(0, |num| num.get().saturating_sub(1).max(1));
165 if let Err(err) = ThreadPoolBuilder::new()
166 .num_threads(num_threads)
167 .thread_name(|i| format!("reth-rayon-{i}"))
168 .build_global()
169 {
170 error!(%err, "Failed to build global thread pool")
171 }
172 }
173}
174
175#[derive(Debug, Clone)]
181pub struct LaunchContextWith<T> {
182 pub inner: LaunchContext,
184 pub attachment: T,
186}
187
188impl<T> LaunchContextWith<T> {
189 pub fn configure_globals(&self) {
194 self.inner.configure_globals();
195 }
196
197 pub const fn data_dir(&self) -> &ChainPath<DataDirPath> {
199 &self.inner.data_dir
200 }
201
202 pub const fn task_executor(&self) -> &TaskExecutor {
204 &self.inner.task_executor
205 }
206
207 pub fn attach<A>(self, attachment: A) -> LaunchContextWith<Attached<T, A>> {
209 LaunchContextWith {
210 inner: self.inner,
211 attachment: Attached::new(self.attachment, attachment),
212 }
213 }
214
215 pub fn inspect<F>(self, f: F) -> Self
218 where
219 F: FnOnce(&Self),
220 {
221 f(&self);
222 self
223 }
224}
225
226impl<ChainSpec> LaunchContextWith<WithConfigs<ChainSpec>> {
227 pub async fn with_resolved_peers(mut self) -> eyre::Result<Self> {
229 if !self.attachment.config.network.trusted_peers.is_empty() {
230 info!(target: "reth::cli", "Adding trusted nodes");
231
232 self.attachment
233 .toml_config
234 .peers
235 .trusted_nodes
236 .extend(self.attachment.config.network.trusted_peers.clone());
237 }
238 Ok(self)
239 }
240}
241
242impl<L, R> LaunchContextWith<Attached<L, R>> {
243 pub const fn left(&self) -> &L {
245 &self.attachment.left
246 }
247
248 pub const fn right(&self) -> &R {
250 &self.attachment.right
251 }
252
253 pub fn left_mut(&mut self) -> &mut L {
255 &mut self.attachment.left
256 }
257
258 pub fn right_mut(&mut self) -> &mut R {
260 &mut self.attachment.right
261 }
262}
263impl<R, ChainSpec: EthChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, R>> {
264 pub fn with_adjusted_configs(self) -> Self {
270 self.ensure_etl_datadir().with_adjusted_instance_ports()
271 }
272
273 pub fn ensure_etl_datadir(mut self) -> Self {
275 if self.toml_config_mut().stages.etl.dir.is_none() {
276 self.toml_config_mut().stages.etl.dir =
277 Some(EtlConfig::from_datadir(self.data_dir().data_dir()))
278 }
279
280 self
281 }
282
283 pub fn with_adjusted_instance_ports(mut self) -> Self {
285 self.node_config_mut().adjust_instance_ports();
286 self
287 }
288
289 pub const fn configs(&self) -> &WithConfigs<ChainSpec> {
291 self.attachment.left()
292 }
293
294 pub const fn node_config(&self) -> &NodeConfig<ChainSpec> {
296 &self.left().config
297 }
298
299 pub fn node_config_mut(&mut self) -> &mut NodeConfig<ChainSpec> {
301 &mut self.left_mut().config
302 }
303
304 pub const fn toml_config(&self) -> &reth_config::Config {
306 &self.left().toml_config
307 }
308
309 pub fn toml_config_mut(&mut self) -> &mut reth_config::Config {
311 &mut self.left_mut().toml_config
312 }
313
314 pub fn chain_spec(&self) -> Arc<ChainSpec> {
316 self.node_config().chain.clone()
317 }
318
319 pub fn genesis_hash(&self) -> B256 {
321 self.node_config().chain.genesis_hash()
322 }
323
324 pub fn chain_id(&self) -> Chain {
326 self.node_config().chain.chain()
327 }
328
329 pub const fn is_dev(&self) -> bool {
331 self.node_config().dev.dev
332 }
333
334 pub fn prune_config(&self) -> Option<PruneConfig> {
337 let Some(mut node_prune_config) = self.node_config().prune_config() else {
338 return self.toml_config().prune.clone();
340 };
341
342 node_prune_config.merge(self.toml_config().prune.clone());
344 Some(node_prune_config)
345 }
346
347 pub fn prune_modes(&self) -> PruneModes {
349 self.prune_config().map(|config| config.segments).unwrap_or_default()
350 }
351
352 pub fn pruner_builder(&self) -> PrunerBuilder {
354 PrunerBuilder::new(self.prune_config().unwrap_or_default())
355 .delete_limit(self.chain_spec().prune_delete_limit())
356 .timeout(PrunerBuilder::DEFAULT_TIMEOUT)
357 }
358
359 pub fn auth_jwt_secret(&self) -> eyre::Result<JwtSecret> {
361 let default_jwt_path = self.data_dir().jwt();
362 let secret = self.node_config().rpc.auth_jwt_secret(default_jwt_path)?;
363 Ok(secret)
364 }
365
366 pub fn dev_mining_mode(&self, pool: impl TransactionPool) -> MiningMode {
368 if let Some(interval) = self.node_config().dev.block_time {
369 MiningMode::interval(interval)
370 } else {
371 MiningMode::instant(pool)
372 }
373 }
374}
375
376impl<DB, ChainSpec> LaunchContextWith<Attached<WithConfigs<ChainSpec>, DB>>
377where
378 DB: Database + Clone + 'static,
379 ChainSpec: EthChainSpec + EthereumHardforks + 'static,
380{
381 pub async fn create_provider_factory<N>(&self) -> eyre::Result<ProviderFactory<N>>
385 where
386 N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
387 N::Primitives: FullNodePrimitives<BlockHeader = reth_primitives::Header>,
388 {
389 let factory = ProviderFactory::new(
390 self.right().clone(),
391 self.chain_spec(),
392 StaticFileProvider::read_write(self.data_dir().static_files())?,
393 )
394 .with_prune_modes(self.prune_modes())
395 .with_static_files_metrics();
396
397 let has_receipt_pruning =
398 self.toml_config().prune.as_ref().is_some_and(|a| a.has_receipts_pruning());
399
400 if let Some(unwind_target) = factory
403 .static_file_provider()
404 .check_consistency(&factory.provider()?, has_receipt_pruning)?
405 {
406 assert_ne!(unwind_target, PipelineTarget::Unwind(0), "A static file <> database inconsistency was found that would trigger an unwind to block 0");
409
410 info!(target: "reth::cli", unwind_target = %unwind_target, "Executing an unwind after a failed storage consistency check.");
411
412 let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);
413
414 let pipeline = PipelineBuilder::default()
416 .add_stages(DefaultStages::new(
417 factory.clone(),
418 tip_rx,
419 Arc::new(NoopConsensus::default()),
420 NoopHeaderDownloader::default(),
421 NoopBodiesDownloader::default(),
422 NoopBlockExecutorProvider::<N::Primitives>::default(),
423 self.toml_config().stages.clone(),
424 self.prune_modes(),
425 ))
426 .build(
427 factory.clone(),
428 StaticFileProducer::new(factory.clone(), self.prune_modes()),
429 );
430
431 let (tx, rx) = oneshot::channel();
433
434 self.task_executor().spawn_critical_blocking(
436 "pipeline task",
437 Box::pin(async move {
438 let (_, result) = pipeline.run_as_fut(Some(unwind_target)).await;
439 let _ = tx.send(result);
440 }),
441 );
442 rx.await??;
443 }
444
445 Ok(factory)
446 }
447
448 pub async fn with_provider_factory<N>(
450 self,
451 ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<ChainSpec>, ProviderFactory<N>>>>
452 where
453 N: ProviderNodeTypes<DB = DB, ChainSpec = ChainSpec>,
454 N::Primitives: FullNodePrimitives<BlockHeader = reth_primitives::Header>,
455 {
456 let factory = self.create_provider_factory().await?;
457 let ctx = LaunchContextWith {
458 inner: self.inner,
459 attachment: self.attachment.map_right(|_| factory),
460 };
461
462 Ok(ctx)
463 }
464}
465
466impl<T> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, ProviderFactory<T>>>
467where
468 T: ProviderNodeTypes,
469{
470 pub const fn database(&self) -> &T::DB {
472 self.right().db_ref()
473 }
474
475 pub const fn provider_factory(&self) -> &ProviderFactory<T> {
477 self.right()
478 }
479
480 pub fn static_file_provider(&self) -> StaticFileProvider<T::Primitives> {
482 self.right().static_file_provider()
483 }
484
485 pub async fn with_prometheus_server(self) -> eyre::Result<Self> {
489 self.start_prometheus_endpoint().await?;
490 Ok(self)
491 }
492
493 pub async fn start_prometheus_endpoint(&self) -> eyre::Result<()> {
495 install_prometheus_recorder().spawn_upkeep();
497
498 let listen_addr = self.node_config().metrics;
499 if let Some(addr) = listen_addr {
500 info!(target: "reth::cli", "Starting metrics endpoint at {}", addr);
501 let config = MetricServerConfig::new(
502 addr,
503 VersionInfo {
504 version: CARGO_PKG_VERSION,
505 build_timestamp: VERGEN_BUILD_TIMESTAMP,
506 cargo_features: VERGEN_CARGO_FEATURES,
507 git_sha: VERGEN_GIT_SHA,
508 target_triple: VERGEN_CARGO_TARGET_TRIPLE,
509 build_profile: BUILD_PROFILE_NAME,
510 },
511 ChainSpecInfo { name: self.left().config.chain.chain().to_string() },
512 self.task_executor().clone(),
513 Hooks::builder()
514 .with_hook({
515 let db = self.database().clone();
516 move || db.report_metrics()
517 })
518 .with_hook({
519 let sfp = self.static_file_provider();
520 move || {
521 if let Err(error) = sfp.report_metrics() {
522 error!(%error, "Failed to report metrics for the static file provider");
523 }
524 }
525 })
526 .build(),
527 );
528
529 MetricServer::new(config).serve().await?;
530 }
531
532 Ok(())
533 }
534
535 pub fn with_genesis(self) -> Result<Self, InitStorageError> {
537 init_genesis(self.provider_factory())?;
538 Ok(self)
539 }
540
541 pub fn init_genesis(&self) -> Result<B256, InitStorageError> {
543 init_genesis(self.provider_factory())
544 }
545
546 pub fn with_metrics_task(
552 self,
553 ) -> LaunchContextWith<Attached<WithConfigs<T::ChainSpec>, WithMeteredProvider<T>>> {
554 let (metrics_sender, metrics_receiver) = unbounded_channel();
555
556 let with_metrics =
557 WithMeteredProvider { provider_factory: self.right().clone(), metrics_sender };
558
559 debug!(target: "reth::cli", "Spawning stages metrics listener task");
560 let sync_metrics_listener = reth_stages::MetricsListener::new(metrics_receiver);
561 self.task_executor().spawn_critical("stages metrics listener task", sync_metrics_listener);
562
563 LaunchContextWith {
564 inner: self.inner,
565 attachment: self.attachment.map_right(|_| with_metrics),
566 }
567 }
568}
569
570impl<N> LaunchContextWith<Attached<WithConfigs<N::ChainSpec>, WithMeteredProvider<N>>>
571where
572 N: NodeTypesWithDB,
573{
574 const fn provider_factory(&self) -> &ProviderFactory<N> {
576 &self.right().provider_factory
577 }
578
579 fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
581 self.right().metrics_sender.clone()
582 }
583
584 #[allow(clippy::complexity)]
586 pub fn with_blockchain_db<T, F>(
587 self,
588 create_blockchain_provider: F,
589 ) -> eyre::Result<LaunchContextWith<Attached<WithConfigs<N::ChainSpec>, WithMeteredProviders<T>>>>
590 where
591 T: FullNodeTypes<Types = N>,
592 F: FnOnce(ProviderFactory<N>) -> eyre::Result<T::Provider>,
593 {
594 let blockchain_db = create_blockchain_provider(self.provider_factory().clone())?;
595
596 let metered_providers = WithMeteredProviders {
597 db_provider_container: WithMeteredProvider {
598 provider_factory: self.provider_factory().clone(),
599 metrics_sender: self.sync_metrics_tx(),
600 },
601 blockchain_db,
602 };
603
604 let ctx = LaunchContextWith {
605 inner: self.inner,
606 attachment: self.attachment.map_right(|_| metered_providers),
607 };
608
609 Ok(ctx)
610 }
611}
612
613impl<T>
614 LaunchContextWith<
615 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithMeteredProviders<T>>,
616 >
617where
618 T: FullNodeTypes<Types: ProviderNodeTypes>,
619{
620 pub const fn database(&self) -> &<T::Types as NodeTypesWithDB>::DB {
622 self.provider_factory().db_ref()
623 }
624
625 pub const fn provider_factory(&self) -> &ProviderFactory<T::Types> {
627 &self.right().db_provider_container.provider_factory
628 }
629
630 pub fn lookup_head(&self) -> eyre::Result<Head> {
634 self.node_config()
635 .lookup_head(self.provider_factory())
636 .wrap_err("the head block is missing")
637 }
638
639 pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
641 self.right().db_provider_container.metrics_sender.clone()
642 }
643
644 pub const fn blockchain_db(&self) -> &T::Provider {
646 &self.right().blockchain_db
647 }
648
649 pub async fn with_components<CB>(
651 self,
652 components_builder: CB,
653 on_component_initialized: Box<
654 dyn OnComponentInitializedHook<NodeAdapter<T, CB::Components>>,
655 >,
656 ) -> eyre::Result<
657 LaunchContextWith<
658 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
659 >,
660 >
661 where
662 CB: NodeComponentsBuilder<T>,
663 {
664 let head = self.lookup_head()?;
666
667 let builder_ctx = BuilderContext::new(
668 head,
669 self.blockchain_db().clone(),
670 self.task_executor().clone(),
671 self.configs().clone(),
672 );
673
674 debug!(target: "reth::cli", "creating components");
675 let components = components_builder.build_components(&builder_ctx).await?;
676
677 let blockchain_db = self.blockchain_db().clone();
678
679 let node_adapter = NodeAdapter {
680 components,
681 task_executor: self.task_executor().clone(),
682 provider: blockchain_db,
683 };
684
685 debug!(target: "reth::cli", "calling on_component_initialized hook");
686 on_component_initialized.on_event(node_adapter.clone())?;
687
688 let components_container = WithComponents {
689 db_provider_container: WithMeteredProvider {
690 provider_factory: self.provider_factory().clone(),
691 metrics_sender: self.sync_metrics_tx(),
692 },
693 node_adapter,
694 head,
695 };
696
697 let ctx = LaunchContextWith {
698 inner: self.inner,
699 attachment: self.attachment.map_right(|_| components_container),
700 };
701
702 Ok(ctx)
703 }
704}
705
706impl<T, CB>
707 LaunchContextWith<
708 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
709 >
710where
711 T: FullNodeTypes<Types: ProviderNodeTypes>,
712 CB: NodeComponentsBuilder<T>,
713{
714 pub const fn provider_factory(&self) -> &ProviderFactory<T::Types> {
716 &self.right().db_provider_container.provider_factory
717 }
718
719 pub async fn max_block<C>(&self, client: C) -> eyre::Result<Option<BlockNumber>>
722 where
723 C: HeadersClient<Header: BlockHeader>,
724 {
725 self.node_config().max_block(client, self.provider_factory().clone()).await
726 }
727
728 pub fn static_file_provider(&self) -> StaticFileProvider<<T::Types as NodeTypes>::Primitives> {
730 self.provider_factory().static_file_provider()
731 }
732
733 pub fn static_file_producer(&self) -> StaticFileProducer<ProviderFactory<T::Types>> {
735 StaticFileProducer::new(self.provider_factory().clone(), self.prune_modes())
736 }
737
738 pub const fn head(&self) -> Head {
740 self.right().head
741 }
742
743 pub const fn node_adapter(&self) -> &NodeAdapter<T, CB::Components> {
745 &self.right().node_adapter
746 }
747
748 pub fn node_adapter_mut(&mut self) -> &mut NodeAdapter<T, CB::Components> {
750 &mut self.right_mut().node_adapter
751 }
752
753 pub const fn blockchain_db(&self) -> &T::Provider {
755 &self.node_adapter().provider
756 }
757
758 pub fn initial_backfill_target(&self) -> ProviderResult<Option<B256>> {
764 let mut initial_target = self.node_config().debug.tip;
765
766 if initial_target.is_none() {
767 initial_target = self.check_pipeline_consistency()?;
768 }
769
770 Ok(initial_target)
771 }
772
773 pub const fn terminate_after_initial_backfill(&self) -> bool {
779 self.node_config().debug.terminate || self.node_config().debug.max_block.is_some()
780 }
781
782 fn ensure_chain_specific_db_checks(&self) -> ProviderResult<()> {
787 if self.chain_spec().is_optimism() &&
788 !self.is_dev() &&
789 self.chain_id() == Chain::optimism_mainnet()
790 {
791 let latest = self.blockchain_db().last_block_number()?;
792 if latest < 105235063 {
794 error!("Op-mainnet has been launched without importing the pre-Bedrock state. The chain can't progress without this. See also https://reth.rs/run/sync-op-mainnet.html?minimal-bootstrap-recommended");
795 return Err(ProviderError::BestBlockNotFound)
796 }
797 }
798
799 Ok(())
800 }
801
802 pub fn check_pipeline_consistency(&self) -> ProviderResult<Option<B256>> {
814 let first_stage_checkpoint = self
817 .blockchain_db()
818 .get_stage_checkpoint(*StageId::ALL.first().unwrap())?
819 .unwrap_or_default()
820 .block_number;
821
822 for stage_id in StageId::ALL.iter().skip(1) {
825 let stage_checkpoint = self
826 .blockchain_db()
827 .get_stage_checkpoint(*stage_id)?
828 .unwrap_or_default()
829 .block_number;
830
831 if stage_checkpoint < first_stage_checkpoint {
834 debug!(
835 target: "consensus::engine",
836 first_stage_checkpoint,
837 inconsistent_stage_id = %stage_id,
838 inconsistent_stage_checkpoint = stage_checkpoint,
839 "Pipeline sync progress is inconsistent"
840 );
841 return self.blockchain_db().block_hash(first_stage_checkpoint);
842 }
843 }
844
845 self.ensure_chain_specific_db_checks()?;
846
847 Ok(None)
848 }
849
850 pub fn sync_metrics_tx(&self) -> UnboundedSender<MetricEvent> {
852 self.right().db_provider_container.metrics_sender.clone()
853 }
854
855 pub const fn components(&self) -> &CB::Components {
857 &self.node_adapter().components
858 }
859}
860
861impl<T, CB>
862 LaunchContextWith<
863 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, WithComponents<T, CB>>,
864 >
865where
866 T: FullNodeTypes<
867 Provider: StateProviderFactory + ChainSpecProvider,
868 Types: ProviderNodeTypes<Primitives: NodePrimitives<SignedTx = TransactionSigned>>,
869 >,
870 CB: NodeComponentsBuilder<T>,
871{
872 pub fn invalid_block_hook(
874 &self,
875 ) -> eyre::Result<Box<dyn InvalidBlockHook<<T::Types as NodeTypes>::Primitives>>> {
876 let Some(ref hook) = self.node_config().debug.invalid_block_hook else {
877 return Ok(Box::new(NoopInvalidBlockHook::default()))
878 };
879 let healthy_node_rpc_client = self.get_healthy_node_client()?;
880
881 let output_directory = self.data_dir().invalid_block_hooks();
882 let hooks = hook
883 .iter()
884 .copied()
885 .map(|hook| {
886 let output_directory = output_directory.join(hook.to_string());
887 fs::create_dir_all(&output_directory)?;
888
889 Ok(match hook {
890 InvalidBlockHookType::Witness => Box::new(InvalidBlockWitnessHook::new(
891 self.blockchain_db().clone(),
892 self.components().evm_config().clone(),
893 output_directory,
894 healthy_node_rpc_client.clone(),
895 )),
896 InvalidBlockHookType::PreState | InvalidBlockHookType::Opcode => {
897 eyre::bail!("invalid block hook {hook:?} is not implemented yet")
898 }
899 } as Box<dyn InvalidBlockHook<_>>)
900 })
901 .collect::<Result<_, _>>()?;
902
903 Ok(Box::new(InvalidBlockHooks(hooks)))
904 }
905
906 fn get_healthy_node_client(&self) -> eyre::Result<Option<jsonrpsee::http_client::HttpClient>> {
908 self.node_config()
909 .debug
910 .healthy_node_rpc_url
911 .as_ref()
912 .map(|url| {
913 let client = jsonrpsee::http_client::HttpClientBuilder::default().build(url)?;
914
915 let chain_id = futures::executor::block_on(async {
917 EthApiClient::<
918 alloy_rpc_types::Transaction,
919 alloy_rpc_types::Block,
920 alloy_rpc_types::Receipt,
921 alloy_rpc_types::Header,
922 >::chain_id(&client)
923 .await
924 })?
925 .ok_or_eyre("healthy node rpc client didn't return a chain id")?;
926 if chain_id.to::<u64>() != self.chain_id().id() {
927 eyre::bail!("invalid chain id for healthy node: {chain_id}")
928 }
929
930 Ok(client)
931 })
932 .transpose()
933 }
934}
935
936#[derive(Clone, Copy, Debug)]
938pub struct Attached<L, R> {
939 left: L,
940 right: R,
941}
942
943impl<L, R> Attached<L, R> {
944 pub const fn new(left: L, right: R) -> Self {
946 Self { left, right }
947 }
948
949 pub fn map_left<F, T>(self, f: F) -> Attached<T, R>
951 where
952 F: FnOnce(L) -> T,
953 {
954 Attached::new(f(self.left), self.right)
955 }
956
957 pub fn map_right<F, T>(self, f: F) -> Attached<L, T>
959 where
960 F: FnOnce(R) -> T,
961 {
962 Attached::new(self.left, f(self.right))
963 }
964
965 pub const fn left(&self) -> &L {
967 &self.left
968 }
969
970 pub const fn right(&self) -> &R {
972 &self.right
973 }
974
975 pub fn left_mut(&mut self) -> &mut R {
977 &mut self.right
978 }
979
980 pub fn right_mut(&mut self) -> &mut R {
982 &mut self.right
983 }
984}
985
986#[derive(Debug, Clone)]
989pub struct WithConfigs<ChainSpec> {
990 pub config: NodeConfig<ChainSpec>,
992 pub toml_config: reth_config::Config,
994}
995
996#[derive(Debug, Clone)]
999pub struct WithMeteredProvider<N: NodeTypesWithDB> {
1000 provider_factory: ProviderFactory<N>,
1001 metrics_sender: UnboundedSender<MetricEvent>,
1002}
1003
1004#[allow(missing_debug_implementations)]
1007pub struct WithMeteredProviders<T>
1008where
1009 T: FullNodeTypes,
1010{
1011 db_provider_container: WithMeteredProvider<T::Types>,
1012 blockchain_db: T::Provider,
1013}
1014
1015#[allow(missing_debug_implementations)]
1017pub struct WithComponents<T, CB>
1018where
1019 T: FullNodeTypes,
1020 CB: NodeComponentsBuilder<T>,
1021{
1022 db_provider_container: WithMeteredProvider<T::Types>,
1023 node_adapter: NodeAdapter<T, CB::Components>,
1024 head: Head,
1025}
1026
1027#[cfg(test)]
1028mod tests {
1029 use super::{LaunchContext, NodeConfig};
1030 use reth_config::Config;
1031 use reth_node_core::args::PruningArgs;
1032
1033 const EXTENSION: &str = "toml";
1034
1035 fn with_tempdir(filename: &str, proc: fn(&std::path::Path)) {
1036 let temp_dir = tempfile::tempdir().unwrap();
1037 let config_path = temp_dir.path().join(filename).with_extension(EXTENSION);
1038 proc(&config_path);
1039 temp_dir.close().unwrap()
1040 }
1041
1042 #[test]
1043 fn test_save_prune_config() {
1044 with_tempdir("prune-store-test", |config_path| {
1045 let mut reth_config = Config::default();
1046 let node_config = NodeConfig {
1047 pruning: PruningArgs {
1048 full: true,
1049 block_interval: None,
1050 sender_recovery_full: false,
1051 sender_recovery_distance: None,
1052 sender_recovery_before: None,
1053 transaction_lookup_full: false,
1054 transaction_lookup_distance: None,
1055 transaction_lookup_before: None,
1056 receipts_full: false,
1057 receipts_distance: None,
1058 receipts_before: None,
1059 account_history_full: false,
1060 account_history_distance: None,
1061 account_history_before: None,
1062 storage_history_full: false,
1063 storage_history_distance: None,
1064 storage_history_before: None,
1065 receipts_log_filter: vec![],
1066 },
1067 ..NodeConfig::test()
1068 };
1069 LaunchContext::save_pruning_config_if_full_node(
1070 &mut reth_config,
1071 &node_config,
1072 config_path,
1073 )
1074 .unwrap();
1075
1076 let loaded_config = Config::from_path(config_path).unwrap();
1077
1078 assert_eq!(reth_config, loaded_config);
1079 })
1080 }
1081}