reth_node_builder/
setup.rs

1//! Helpers for setting up parts of the node.
2
3use std::sync::Arc;
4
5use alloy_primitives::{BlockNumber, B256};
6use reth_config::{config::StageConfig, PruneConfig};
7use reth_consensus::Consensus;
8use reth_downloaders::{
9    bodies::bodies::BodiesDownloaderBuilder,
10    headers::reverse_headers::ReverseHeadersDownloaderBuilder,
11};
12use reth_evm::execute::BlockExecutorProvider;
13use reth_exex::ExExManagerHandle;
14use reth_network_p2p::{
15    bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader, BlockClient,
16};
17use reth_node_api::{BodyTy, HeaderTy};
18use reth_provider::{providers::ProviderNodeTypes, ProviderFactory};
19use reth_stages::{prelude::DefaultStages, stages::ExecutionStage, Pipeline, StageSet};
20use reth_static_file::StaticFileProducer;
21use reth_tasks::TaskExecutor;
22use reth_tracing::tracing::debug;
23use tokio::sync::watch;
24
25/// Constructs a [Pipeline] that's wired to the network
26#[allow(clippy::too_many_arguments)]
27pub fn build_networked_pipeline<N, Client, Executor>(
28    config: &StageConfig,
29    client: Client,
30    consensus: Arc<dyn Consensus<Client::Header, Client::Body>>,
31    provider_factory: ProviderFactory<N>,
32    task_executor: &TaskExecutor,
33    metrics_tx: reth_stages::MetricEventsSender,
34    prune_config: Option<PruneConfig>,
35    max_block: Option<BlockNumber>,
36    static_file_producer: StaticFileProducer<ProviderFactory<N>>,
37    executor: Executor,
38    exex_manager_handle: ExExManagerHandle<N::Primitives>,
39) -> eyre::Result<Pipeline<N>>
40where
41    N: ProviderNodeTypes,
42    Client: BlockClient<Header = HeaderTy<N>, Body = BodyTy<N>> + 'static,
43    Executor: BlockExecutorProvider<Primitives = N::Primitives>,
44{
45    // building network downloaders using the fetch client
46    let header_downloader = ReverseHeadersDownloaderBuilder::new(config.headers)
47        .build(client.clone(), consensus.clone().as_header_validator())
48        .into_task_with(task_executor);
49
50    let body_downloader = BodiesDownloaderBuilder::new(config.bodies)
51        .build(client, Arc::clone(&consensus), provider_factory.clone())
52        .into_task_with(task_executor);
53
54    let pipeline = build_pipeline(
55        provider_factory,
56        config,
57        header_downloader,
58        body_downloader,
59        consensus,
60        max_block,
61        metrics_tx,
62        prune_config,
63        static_file_producer,
64        executor,
65        exex_manager_handle,
66    )?;
67
68    Ok(pipeline)
69}
70
71/// Builds the [Pipeline] with the given [`ProviderFactory`] and downloaders.
72#[allow(clippy::too_many_arguments)]
73pub fn build_pipeline<N, H, B, Executor>(
74    provider_factory: ProviderFactory<N>,
75    stage_config: &StageConfig,
76    header_downloader: H,
77    body_downloader: B,
78    consensus: Arc<dyn Consensus<H::Header, B::Body>>,
79    max_block: Option<u64>,
80    metrics_tx: reth_stages::MetricEventsSender,
81    prune_config: Option<PruneConfig>,
82    static_file_producer: StaticFileProducer<ProviderFactory<N>>,
83    executor: Executor,
84    exex_manager_handle: ExExManagerHandle<N::Primitives>,
85) -> eyre::Result<Pipeline<N>>
86where
87    N: ProviderNodeTypes,
88    H: HeaderDownloader<Header = HeaderTy<N>> + 'static,
89    B: BodyDownloader<Header = HeaderTy<N>, Body = BodyTy<N>> + 'static,
90    Executor: BlockExecutorProvider<Primitives = N::Primitives>,
91{
92    let mut builder = Pipeline::<N>::builder();
93
94    if let Some(max_block) = max_block {
95        debug!(target: "reth::cli", max_block, "Configuring builder to use max block");
96        builder = builder.with_max_block(max_block)
97    }
98
99    let (tip_tx, tip_rx) = watch::channel(B256::ZERO);
100
101    let prune_modes = prune_config.map(|prune| prune.segments).unwrap_or_default();
102
103    let pipeline = builder
104        .with_tip_sender(tip_tx)
105        .with_metrics_tx(metrics_tx)
106        .add_stages(
107            DefaultStages::new(
108                provider_factory.clone(),
109                tip_rx,
110                Arc::clone(&consensus),
111                header_downloader,
112                body_downloader,
113                executor.clone(),
114                stage_config.clone(),
115                prune_modes.clone(),
116            )
117            .set(ExecutionStage::new(
118                executor,
119                stage_config.execution.into(),
120                stage_config.execution_external_clean_threshold(),
121                prune_modes,
122                exex_manager_handle,
123            )),
124        )
125        .build(provider_factory, static_file_producer);
126
127    Ok(pipeline)
128}