1use crate::common::{AccessRights, CliNodeComponents, CliNodeTypes, Environment, EnvironmentArgs};
6use alloy_eips::BlockHashOrNumber;
7use alloy_primitives::Sealable;
8use clap::Parser;
9use reth_chainspec::{EthChainSpec, EthereumHardforks, Hardforks};
10use reth_cli::chainspec::ChainSpecParser;
11use reth_cli_runner::CliContext;
12use reth_cli_util::get_secret_key;
13use reth_config::config::{HashingConfig, SenderRecoveryConfig, TransactionLookupConfig};
14use reth_db_api::database_metrics::DatabaseMetrics;
15use reth_downloaders::{
16 bodies::bodies::BodiesDownloaderBuilder,
17 headers::reverse_headers::ReverseHeadersDownloaderBuilder,
18};
19use reth_eth_wire::NetPrimitivesFor;
20use reth_exex::ExExManagerHandle;
21use reth_network::BlockDownloaderProvider;
22use reth_network_p2p::HeadersClient;
23use reth_node_core::{
24 args::{NetworkArgs, StageEnum},
25 version::{
26 BUILD_PROFILE_NAME, CARGO_PKG_VERSION, VERGEN_BUILD_TIMESTAMP, VERGEN_CARGO_FEATURES,
27 VERGEN_CARGO_TARGET_TRIPLE, VERGEN_GIT_SHA,
28 },
29};
30use reth_node_metrics::{
31 chain::ChainSpecInfo,
32 hooks::Hooks,
33 server::{MetricServer, MetricServerConfig},
34 version::VersionInfo,
35};
36use reth_provider::{
37 writer::UnifiedStorageWriter, ChainSpecProvider, DatabaseProviderFactory,
38 StageCheckpointReader, StageCheckpointWriter, StaticFileProviderFactory,
39};
40use reth_stages::{
41 stages::{
42 AccountHashingStage, BodyStage, ExecutionStage, HeaderStage, IndexAccountHistoryStage,
43 IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage,
44 TransactionLookupStage,
45 },
46 ExecInput, ExecOutput, ExecutionStageThresholds, Stage, StageExt, UnwindInput, UnwindOutput,
47};
48use std::{any::Any, net::SocketAddr, sync::Arc, time::Instant};
49use tokio::sync::watch;
50use tracing::*;
51
52#[derive(Debug, Parser)]
54pub struct Command<C: ChainSpecParser> {
55 #[command(flatten)]
56 env: EnvironmentArgs<C>,
57
58 #[arg(long, value_name = "SOCKET")]
62 metrics: Option<SocketAddr>,
63
64 #[arg(value_enum)]
66 stage: StageEnum,
67
68 #[arg(long)]
70 from: u64,
71
72 #[arg(long, short)]
74 to: u64,
75
76 #[arg(long)]
78 batch_size: Option<u64>,
79
80 #[arg(long, short)]
86 skip_unwind: bool,
87
88 #[arg(long, short)]
94 commit: bool,
95
96 #[arg(long)]
98 checkpoints: bool,
99
100 #[command(flatten)]
101 network: NetworkArgs,
102}
103
104impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>> Command<C> {
105 pub async fn execute<N, Comp, F, P>(self, ctx: CliContext, components: F) -> eyre::Result<()>
107 where
108 N: CliNodeTypes<ChainSpec = C::ChainSpec>,
109 Comp: CliNodeComponents<N>,
110 F: FnOnce(Arc<C::ChainSpec>) -> Comp,
111 P: NetPrimitivesFor<N::Primitives>,
112 {
113 let _ = fdlimit::raise_fd_limit();
116
117 let Environment { provider_factory, config, data_dir } =
118 self.env.init::<N>(AccessRights::RW)?;
119
120 let mut provider_rw = provider_factory.database_provider_rw()?;
121 let components = components(provider_factory.chain_spec());
122
123 if let Some(listen_addr) = self.metrics {
124 info!(target: "reth::cli", "Starting metrics endpoint at {}", listen_addr);
125 let config = MetricServerConfig::new(
126 listen_addr,
127 VersionInfo {
128 version: CARGO_PKG_VERSION,
129 build_timestamp: VERGEN_BUILD_TIMESTAMP,
130 cargo_features: VERGEN_CARGO_FEATURES,
131 git_sha: VERGEN_GIT_SHA,
132 target_triple: VERGEN_CARGO_TARGET_TRIPLE,
133 build_profile: BUILD_PROFILE_NAME,
134 },
135 ChainSpecInfo { name: provider_factory.chain_spec().chain().to_string() },
136 ctx.task_executor,
137 Hooks::builder()
138 .with_hook({
139 let db = provider_factory.db_ref().clone();
140 move || db.report_metrics()
141 })
142 .with_hook({
143 let sfp = provider_factory.static_file_provider();
144 move || {
145 if let Err(error) = sfp.report_metrics() {
146 error!(%error, "Failed to report metrics from static file provider");
147 }
148 }
149 })
150 .build(),
151 );
152
153 MetricServer::new(config).serve().await?;
154 }
155
156 let batch_size = self.batch_size.unwrap_or(self.to.saturating_sub(self.from) + 1);
157
158 let etl_config = config.stages.etl.clone();
159 let prune_modes = config.prune.clone().map(|prune| prune.segments).unwrap_or_default();
160
161 let (mut exec_stage, mut unwind_stage): (Box<dyn Stage<_>>, Option<Box<dyn Stage<_>>>) =
162 match self.stage {
163 StageEnum::Headers => {
164 let consensus = Arc::new(components.consensus().clone());
165
166 let network_secret_path = self
167 .network
168 .p2p_secret_key
169 .clone()
170 .unwrap_or_else(|| data_dir.p2p_secret());
171 let p2p_secret_key = get_secret_key(&network_secret_path)?;
172
173 let default_peers_path = data_dir.known_peers();
174
175 let network = self
176 .network
177 .network_config::<P>(
178 &config,
179 provider_factory.chain_spec(),
180 p2p_secret_key,
181 default_peers_path,
182 )
183 .build(provider_factory.clone())
184 .start_network()
185 .await?;
186 let fetch_client = Arc::new(network.fetch_client().await?);
187
188 let tip: P::BlockHeader = loop {
190 match fetch_client.get_header(BlockHashOrNumber::Number(self.to)).await {
191 Ok(header) => {
192 if let Some(header) = header.into_data() {
193 break header
194 }
195 }
196 Err(error) if error.is_retryable() => {
197 warn!(target: "reth::cli", "Error requesting header: {error}. Retrying...")
198 }
199 Err(error) => return Err(error.into()),
200 }
201 };
202 let (_, rx) = watch::channel(tip.hash_slow());
203 (
204 Box::new(HeaderStage::new(
205 provider_factory.clone(),
206 ReverseHeadersDownloaderBuilder::new(config.stages.headers)
207 .build(fetch_client, consensus.clone()),
208 rx,
209 etl_config,
210 )),
211 None,
212 )
213 }
214 StageEnum::Bodies => {
215 let consensus = Arc::new(components.consensus().clone());
216
217 let mut config = config;
218 config.peers.trusted_nodes_only = self.network.trusted_only;
219 config.peers.trusted_nodes.extend(self.network.trusted_peers.clone());
220
221 let network_secret_path = self
222 .network
223 .p2p_secret_key
224 .clone()
225 .unwrap_or_else(|| data_dir.p2p_secret());
226 let p2p_secret_key = get_secret_key(&network_secret_path)?;
227
228 let default_peers_path = data_dir.known_peers();
229
230 let network = self
231 .network
232 .network_config::<P>(
233 &config,
234 provider_factory.chain_spec(),
235 p2p_secret_key,
236 default_peers_path,
237 )
238 .build(provider_factory.clone())
239 .start_network()
240 .await?;
241 let fetch_client = Arc::new(network.fetch_client().await?);
242
243 let stage = BodyStage::new(
244 BodiesDownloaderBuilder::default()
245 .with_stream_batch_size(batch_size as usize)
246 .with_request_limit(config.stages.bodies.downloader_request_limit)
247 .with_max_buffered_blocks_size_bytes(
248 config.stages.bodies.downloader_max_buffered_blocks_size_bytes,
249 )
250 .with_concurrent_requests_range(
251 config.stages.bodies.downloader_min_concurrent_requests..=
252 config.stages.bodies.downloader_max_concurrent_requests,
253 )
254 .build(fetch_client, consensus.clone(), provider_factory.clone()),
255 );
256 (Box::new(stage), None)
257 }
258 StageEnum::Senders => (
259 Box::new(SenderRecoveryStage::new(SenderRecoveryConfig {
260 commit_threshold: batch_size,
261 })),
262 None,
263 ),
264 StageEnum::Execution => (
265 Box::new(ExecutionStage::new(
266 components.evm_config().clone(),
267 Arc::new(components.consensus().clone()),
268 ExecutionStageThresholds {
269 max_blocks: Some(batch_size),
270 max_changes: None,
271 max_cumulative_gas: None,
272 max_duration: None,
273 },
274 config.stages.merkle.clean_threshold,
275 ExExManagerHandle::empty(),
276 )),
277 None,
278 ),
279 StageEnum::TxLookup => (
280 Box::new(TransactionLookupStage::new(
281 TransactionLookupConfig { chunk_size: batch_size },
282 etl_config,
283 prune_modes.transaction_lookup,
284 )),
285 None,
286 ),
287 StageEnum::AccountHashing => (
288 Box::new(AccountHashingStage::new(
289 HashingConfig { clean_threshold: 1, commit_threshold: batch_size },
290 etl_config,
291 )),
292 None,
293 ),
294 StageEnum::StorageHashing => (
295 Box::new(StorageHashingStage::new(
296 HashingConfig { clean_threshold: 1, commit_threshold: batch_size },
297 etl_config,
298 )),
299 None,
300 ),
301 StageEnum::Merkle => (
302 Box::new(MerkleStage::new_execution(config.stages.merkle.clean_threshold)),
303 Some(Box::new(MerkleStage::default_unwind())),
304 ),
305 StageEnum::AccountHistory => (
306 Box::new(IndexAccountHistoryStage::new(
307 config.stages.index_account_history,
308 etl_config,
309 prune_modes.account_history,
310 )),
311 None,
312 ),
313 StageEnum::StorageHistory => (
314 Box::new(IndexStorageHistoryStage::new(
315 config.stages.index_storage_history,
316 etl_config,
317 prune_modes.storage_history,
318 )),
319 None,
320 ),
321 _ => return Ok(()),
322 };
323 if let Some(unwind_stage) = &unwind_stage {
324 assert_eq!((*exec_stage).type_id(), (**unwind_stage).type_id());
325 }
326
327 let checkpoint = provider_rw.get_stage_checkpoint(exec_stage.id())?.unwrap_or_default();
328
329 let unwind_stage = unwind_stage.as_mut().unwrap_or(&mut exec_stage);
330
331 let mut unwind = UnwindInput {
332 checkpoint: checkpoint.with_block_number(self.to),
333 unwind_to: self.from,
334 bad_block: None,
335 };
336
337 if !self.skip_unwind {
338 while unwind.checkpoint.block_number > self.from {
339 let UnwindOutput { checkpoint } = unwind_stage.unwind(&provider_rw, unwind)?;
340 unwind.checkpoint = checkpoint;
341
342 if self.checkpoints {
343 provider_rw.save_stage_checkpoint(unwind_stage.id(), checkpoint)?;
344 }
345
346 if self.commit {
347 UnifiedStorageWriter::commit_unwind(provider_rw)?;
348 provider_rw = provider_factory.database_provider_rw()?;
349 }
350 }
351 }
352
353 let mut input = ExecInput {
354 target: Some(self.to),
355 checkpoint: Some(checkpoint.with_block_number(self.from)),
356 };
357
358 let start = Instant::now();
359 info!(target: "reth::cli", stage = %self.stage, "Executing stage");
360 loop {
361 exec_stage.execute_ready(input).await?;
362 let ExecOutput { checkpoint, done } = exec_stage.execute(&provider_rw, input)?;
363
364 input.checkpoint = Some(checkpoint);
365
366 if self.checkpoints {
367 provider_rw.save_stage_checkpoint(exec_stage.id(), checkpoint)?;
368 }
369 if self.commit {
370 UnifiedStorageWriter::commit(provider_rw)?;
371 provider_rw = provider_factory.database_provider_rw()?;
372 }
373
374 if done {
375 break
376 }
377 }
378 info!(target: "reth::cli", stage = %self.stage, time = ?start.elapsed(), "Finished stage");
379
380 Ok(())
381 }
382}
383
384impl<C: ChainSpecParser> Command<C> {
385 pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
387 Some(&self.env.chain)
388 }
389}