1use crate::{args::NetworkArgs, utils::get_single_header};
4use alloy_eips::BlockHashOrNumber;
5use alloy_primitives::{BlockNumber, B256};
6use clap::Parser;
7use futures::StreamExt;
8use reth_beacon_consensus::EthBeaconConsensus;
9use reth_chainspec::ChainSpec;
10use reth_cli::chainspec::ChainSpecParser;
11use reth_cli_commands::common::{AccessRights, CliNodeTypes, Environment, EnvironmentArgs};
12use reth_cli_runner::CliContext;
13use reth_cli_util::get_secret_key;
14use reth_config::Config;
15use reth_consensus::Consensus;
16use reth_db::DatabaseEnv;
17use reth_downloaders::{
18 bodies::bodies::BodiesDownloaderBuilder,
19 headers::reverse_headers::ReverseHeadersDownloaderBuilder,
20};
21use reth_exex::ExExManagerHandle;
22use reth_network::{BlockDownloaderProvider, NetworkHandle};
23use reth_network_api::NetworkInfo;
24use reth_network_p2p::{headers::client::HeadersClient, EthBlockClient};
25use reth_node_api::NodeTypesWithDBAdapter;
26use reth_node_ethereum::EthExecutorProvider;
27use reth_provider::{
28 providers::ProviderNodeTypes, ChainSpecProvider, ProviderFactory, StageCheckpointReader,
29};
30use reth_prune::PruneModes;
31use reth_stages::{
32 sets::DefaultStages, stages::ExecutionStage, ExecutionStageThresholds, Pipeline, StageId,
33 StageSet,
34};
35use reth_static_file::StaticFileProducer;
36use reth_tasks::TaskExecutor;
37use std::{path::PathBuf, sync::Arc};
38use tokio::sync::watch;
39use tracing::*;
40
41#[derive(Debug, Parser)]
43pub struct Command<C: ChainSpecParser> {
44 #[command(flatten)]
45 env: EnvironmentArgs<C>,
46
47 #[command(flatten)]
48 network: NetworkArgs,
49
50 #[arg(long)]
52 pub to: u64,
53
54 #[arg(long, default_value = "1000")]
57 pub interval: u64,
58}
59
60impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
61 fn build_pipeline<N: ProviderNodeTypes<ChainSpec = C::ChainSpec> + CliNodeTypes, Client>(
62 &self,
63 config: &Config,
64 client: Client,
65 consensus: Arc<dyn Consensus>,
66 provider_factory: ProviderFactory<N>,
67 task_executor: &TaskExecutor,
68 static_file_producer: StaticFileProducer<ProviderFactory<N>>,
69 ) -> eyre::Result<Pipeline<N>>
70 where
71 Client: EthBlockClient + 'static,
72 {
73 let header_downloader = ReverseHeadersDownloaderBuilder::new(config.stages.headers)
75 .build(client.clone(), consensus.clone().as_header_validator())
76 .into_task_with(task_executor);
77
78 let body_downloader = BodiesDownloaderBuilder::new(config.stages.bodies)
79 .build(client, Arc::clone(&consensus), provider_factory.clone())
80 .into_task_with(task_executor);
81
82 let stage_conf = &config.stages;
83 let prune_modes = config.prune.clone().map(|prune| prune.segments).unwrap_or_default();
84
85 let (tip_tx, tip_rx) = watch::channel(B256::ZERO);
86 let executor = EthExecutorProvider::ethereum(provider_factory.chain_spec());
87
88 let pipeline = Pipeline::<N>::builder()
89 .with_tip_sender(tip_tx)
90 .add_stages(
91 DefaultStages::new(
92 provider_factory.clone(),
93 tip_rx,
94 Arc::clone(&consensus),
95 header_downloader,
96 body_downloader,
97 executor.clone(),
98 stage_conf.clone(),
99 prune_modes.clone(),
100 )
101 .set(ExecutionStage::new(
102 executor,
103 ExecutionStageThresholds {
104 max_blocks: None,
105 max_changes: None,
106 max_cumulative_gas: None,
107 max_duration: None,
108 },
109 stage_conf.execution_external_clean_threshold(),
110 prune_modes,
111 ExExManagerHandle::empty(),
112 )),
113 )
114 .build(provider_factory, static_file_producer);
115
116 Ok(pipeline)
117 }
118
119 async fn build_network<N: CliNodeTypes<ChainSpec = C::ChainSpec>>(
120 &self,
121 config: &Config,
122 task_executor: TaskExecutor,
123 provider_factory: ProviderFactory<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>,
124 network_secret_path: PathBuf,
125 default_peers_path: PathBuf,
126 ) -> eyre::Result<NetworkHandle> {
127 let secret_key = get_secret_key(&network_secret_path)?;
128 let network = self
129 .network
130 .network_config(config, provider_factory.chain_spec(), secret_key, default_peers_path)
131 .with_task_executor(Box::new(task_executor))
132 .build(provider_factory)
133 .start_network()
134 .await?;
135 info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network");
136 debug!(target: "reth::cli", peer_id = ?network.peer_id(), "Full peer ID");
137 Ok(network)
138 }
139
140 async fn fetch_block_hash<Client>(
141 &self,
142 client: Client,
143 block: BlockNumber,
144 ) -> eyre::Result<B256>
145 where
146 Client: HeadersClient<Header: reth_primitives_traits::BlockHeader>,
147 {
148 info!(target: "reth::cli", ?block, "Fetching block from the network.");
149 loop {
150 match get_single_header(&client, BlockHashOrNumber::Number(block)).await {
151 Ok(tip_header) => {
152 info!(target: "reth::cli", ?block, "Successfully fetched block");
153 return Ok(tip_header.hash())
154 }
155 Err(error) => {
156 error!(target: "reth::cli", ?block, %error, "Failed to fetch the block. Retrying...");
157 }
158 }
159 }
160 }
161
162 pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec>>(
164 self,
165 ctx: CliContext,
166 ) -> eyre::Result<()> {
167 let Environment { provider_factory, config, data_dir } =
168 self.env.init::<N>(AccessRights::RW)?;
169
170 let consensus: Arc<dyn Consensus> =
171 Arc::new(EthBeaconConsensus::new(provider_factory.chain_spec()));
172
173 let network_secret_path =
175 self.network.p2p_secret_key.clone().unwrap_or_else(|| data_dir.p2p_secret());
176 let network = self
177 .build_network(
178 &config,
179 ctx.task_executor.clone(),
180 provider_factory.clone(),
181 network_secret_path,
182 data_dir.known_peers(),
183 )
184 .await?;
185
186 let static_file_producer =
187 StaticFileProducer::new(provider_factory.clone(), PruneModes::default());
188
189 let fetch_client = network.fetch_client().await?;
191 let mut pipeline = self.build_pipeline(
192 &config,
193 fetch_client.clone(),
194 Arc::clone(&consensus),
195 provider_factory.clone(),
196 &ctx.task_executor,
197 static_file_producer,
198 )?;
199
200 let provider = provider_factory.provider()?;
201
202 let latest_block_number =
203 provider.get_stage_checkpoint(StageId::Finish)?.map(|ch| ch.block_number);
204 if latest_block_number.unwrap_or_default() >= self.to {
205 info!(target: "reth::cli", latest = latest_block_number, "Nothing to run");
206 return Ok(())
207 }
208
209 ctx.task_executor.spawn_critical(
210 "events task",
211 reth_node_events::node::handle_events(
212 Some(Box::new(network)),
213 latest_block_number,
214 pipeline.events().map(Into::into),
215 ),
216 );
217
218 let mut current_max_block = latest_block_number.unwrap_or_default();
219 while current_max_block < self.to {
220 let next_block = current_max_block + 1;
221 let target_block = self.to.min(current_max_block + self.interval);
222 let target_block_hash =
223 self.fetch_block_hash(fetch_client.clone(), target_block).await?;
224
225 info!(target: "reth::cli", from = next_block, to = target_block, tip = ?target_block_hash, "Starting pipeline");
227 pipeline.set_tip(target_block_hash);
228 let result = pipeline.run_loop().await?;
229 trace!(target: "reth::cli", from = next_block, to = target_block, tip = ?target_block_hash, ?result, "Pipeline finished");
230
231 provider_factory.provider_rw()?.unwind_trie_state_range(next_block..=target_block)?;
233
234 current_max_block = target_block;
236 }
237
238 Ok(())
239 }
240}