reth_node_builder/
setup.rs1use std::sync::Arc;
4
5use alloy_primitives::{BlockNumber, B256};
6use reth_config::{config::StageConfig, PruneConfig};
7use reth_consensus::Consensus;
8use reth_downloaders::{
9 bodies::bodies::BodiesDownloaderBuilder,
10 headers::reverse_headers::ReverseHeadersDownloaderBuilder,
11};
12use reth_evm::execute::BlockExecutorProvider;
13use reth_exex::ExExManagerHandle;
14use reth_network_p2p::{
15 bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader, BlockClient,
16};
17use reth_node_api::{BodyTy, HeaderTy};
18use reth_provider::{providers::ProviderNodeTypes, ProviderFactory};
19use reth_stages::{prelude::DefaultStages, stages::ExecutionStage, Pipeline, StageSet};
20use reth_static_file::StaticFileProducer;
21use reth_tasks::TaskExecutor;
22use reth_tracing::tracing::debug;
23use tokio::sync::watch;
24
25#[allow(clippy::too_many_arguments)]
27pub fn build_networked_pipeline<N, Client, Executor>(
28 config: &StageConfig,
29 client: Client,
30 consensus: Arc<dyn Consensus<Client::Header, Client::Body>>,
31 provider_factory: ProviderFactory<N>,
32 task_executor: &TaskExecutor,
33 metrics_tx: reth_stages::MetricEventsSender,
34 prune_config: Option<PruneConfig>,
35 max_block: Option<BlockNumber>,
36 static_file_producer: StaticFileProducer<ProviderFactory<N>>,
37 executor: Executor,
38 exex_manager_handle: ExExManagerHandle<N::Primitives>,
39) -> eyre::Result<Pipeline<N>>
40where
41 N: ProviderNodeTypes,
42 Client: BlockClient<Header = HeaderTy<N>, Body = BodyTy<N>> + 'static,
43 Executor: BlockExecutorProvider<Primitives = N::Primitives>,
44{
45 let header_downloader = ReverseHeadersDownloaderBuilder::new(config.headers)
47 .build(client.clone(), consensus.clone().as_header_validator())
48 .into_task_with(task_executor);
49
50 let body_downloader = BodiesDownloaderBuilder::new(config.bodies)
51 .build(client, Arc::clone(&consensus), provider_factory.clone())
52 .into_task_with(task_executor);
53
54 let pipeline = build_pipeline(
55 provider_factory,
56 config,
57 header_downloader,
58 body_downloader,
59 consensus,
60 max_block,
61 metrics_tx,
62 prune_config,
63 static_file_producer,
64 executor,
65 exex_manager_handle,
66 )?;
67
68 Ok(pipeline)
69}
70
71#[allow(clippy::too_many_arguments)]
73pub fn build_pipeline<N, H, B, Executor>(
74 provider_factory: ProviderFactory<N>,
75 stage_config: &StageConfig,
76 header_downloader: H,
77 body_downloader: B,
78 consensus: Arc<dyn Consensus<H::Header, B::Body>>,
79 max_block: Option<u64>,
80 metrics_tx: reth_stages::MetricEventsSender,
81 prune_config: Option<PruneConfig>,
82 static_file_producer: StaticFileProducer<ProviderFactory<N>>,
83 executor: Executor,
84 exex_manager_handle: ExExManagerHandle<N::Primitives>,
85) -> eyre::Result<Pipeline<N>>
86where
87 N: ProviderNodeTypes,
88 H: HeaderDownloader<Header = HeaderTy<N>> + 'static,
89 B: BodyDownloader<Header = HeaderTy<N>, Body = BodyTy<N>> + 'static,
90 Executor: BlockExecutorProvider<Primitives = N::Primitives>,
91{
92 let mut builder = Pipeline::<N>::builder();
93
94 if let Some(max_block) = max_block {
95 debug!(target: "reth::cli", max_block, "Configuring builder to use max block");
96 builder = builder.with_max_block(max_block)
97 }
98
99 let (tip_tx, tip_rx) = watch::channel(B256::ZERO);
100
101 let prune_modes = prune_config.map(|prune| prune.segments).unwrap_or_default();
102
103 let pipeline = builder
104 .with_tip_sender(tip_tx)
105 .with_metrics_tx(metrics_tx)
106 .add_stages(
107 DefaultStages::new(
108 provider_factory.clone(),
109 tip_rx,
110 Arc::clone(&consensus),
111 header_downloader,
112 body_downloader,
113 executor.clone(),
114 stage_config.clone(),
115 prune_modes.clone(),
116 )
117 .set(ExecutionStage::new(
118 executor,
119 stage_config.execution.into(),
120 stage_config.execution_external_clean_threshold(),
121 prune_modes,
122 exex_manager_handle,
123 )),
124 )
125 .build(provider_factory, static_file_producer);
126
127 Ok(pipeline)
128}