1use crate::common::{AccessRights, CliNodeTypes, Environment, EnvironmentArgs};
6use alloy_eips::BlockHashOrNumber;
7use clap::Parser;
8use reth_beacon_consensus::EthBeaconConsensus;
9use reth_chainspec::{EthChainSpec, EthereumHardforks};
10use reth_cli::chainspec::ChainSpecParser;
11use reth_cli_runner::CliContext;
12use reth_cli_util::get_secret_key;
13use reth_config::config::{HashingConfig, SenderRecoveryConfig, TransactionLookupConfig};
14use reth_db_api::database_metrics::DatabaseMetrics;
15use reth_downloaders::{
16 bodies::bodies::BodiesDownloaderBuilder,
17 headers::reverse_headers::ReverseHeadersDownloaderBuilder,
18};
19use reth_evm::execute::BlockExecutorProvider;
20use reth_exex::ExExManagerHandle;
21use reth_network::BlockDownloaderProvider;
22use reth_network_p2p::HeadersClient;
23use reth_node_core::{
24 args::{NetworkArgs, StageEnum},
25 version::{
26 BUILD_PROFILE_NAME, CARGO_PKG_VERSION, VERGEN_BUILD_TIMESTAMP, VERGEN_CARGO_FEATURES,
27 VERGEN_CARGO_TARGET_TRIPLE, VERGEN_GIT_SHA,
28 },
29};
30use reth_node_metrics::{
31 chain::ChainSpecInfo,
32 hooks::Hooks,
33 server::{MetricServer, MetricServerConfig},
34 version::VersionInfo,
35};
36use reth_provider::{
37 writer::UnifiedStorageWriter, ChainSpecProvider, DatabaseProviderFactory,
38 StageCheckpointReader, StageCheckpointWriter, StaticFileProviderFactory,
39};
40use reth_stages::{
41 stages::{
42 AccountHashingStage, BodyStage, ExecutionStage, HeaderStage, IndexAccountHistoryStage,
43 IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage,
44 TransactionLookupStage,
45 },
46 ExecInput, ExecOutput, ExecutionStageThresholds, Stage, StageError, StageExt, UnwindInput,
47 UnwindOutput,
48};
49use std::{any::Any, net::SocketAddr, sync::Arc, time::Instant};
50use tokio::sync::watch;
51use tracing::*;
52
53#[derive(Debug, Parser)]
55pub struct Command<C: ChainSpecParser> {
56 #[command(flatten)]
57 env: EnvironmentArgs<C>,
58
59 #[arg(long, value_name = "SOCKET")]
63 metrics: Option<SocketAddr>,
64
65 #[arg(value_enum)]
67 stage: StageEnum,
68
69 #[arg(long)]
71 from: u64,
72
73 #[arg(long, short)]
75 to: u64,
76
77 #[arg(long)]
79 batch_size: Option<u64>,
80
81 #[arg(long, short)]
87 skip_unwind: bool,
88
89 #[arg(long, short)]
95 commit: bool,
96
97 #[arg(long)]
99 checkpoints: bool,
100
101 #[command(flatten)]
102 network: NetworkArgs,
103}
104
105impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C> {
106 pub async fn execute<N, E, F>(self, ctx: CliContext, executor: F) -> eyre::Result<()>
108 where
109 N: CliNodeTypes<ChainSpec = C::ChainSpec>,
110 E: BlockExecutorProvider<Primitives = N::Primitives>,
111 F: FnOnce(Arc<C::ChainSpec>) -> E,
112 {
113 let _ = fdlimit::raise_fd_limit();
116
117 let Environment { provider_factory, config, data_dir } =
118 self.env.init::<N>(AccessRights::RW)?;
119
120 let mut provider_rw = provider_factory.database_provider_rw()?;
121
122 if let Some(listen_addr) = self.metrics {
123 info!(target: "reth::cli", "Starting metrics endpoint at {}", listen_addr);
124 let config = MetricServerConfig::new(
125 listen_addr,
126 VersionInfo {
127 version: CARGO_PKG_VERSION,
128 build_timestamp: VERGEN_BUILD_TIMESTAMP,
129 cargo_features: VERGEN_CARGO_FEATURES,
130 git_sha: VERGEN_GIT_SHA,
131 target_triple: VERGEN_CARGO_TARGET_TRIPLE,
132 build_profile: BUILD_PROFILE_NAME,
133 },
134 ChainSpecInfo { name: provider_factory.chain_spec().chain().to_string() },
135 ctx.task_executor,
136 Hooks::builder()
137 .with_hook({
138 let db = provider_factory.db_ref().clone();
139 move || db.report_metrics()
140 })
141 .with_hook({
142 let sfp = provider_factory.static_file_provider();
143 move || {
144 if let Err(error) = sfp.report_metrics() {
145 error!(%error, "Failed to report metrics from static file provider");
146 }
147 }
148 })
149 .build(),
150 );
151
152 MetricServer::new(config).serve().await?;
153 }
154
155 let batch_size = self.batch_size.unwrap_or(self.to.saturating_sub(self.from) + 1);
156
157 let etl_config = config.stages.etl.clone();
158 let prune_modes = config.prune.clone().map(|prune| prune.segments).unwrap_or_default();
159
160 let (mut exec_stage, mut unwind_stage): (Box<dyn Stage<_>>, Option<Box<dyn Stage<_>>>) =
161 match self.stage {
162 StageEnum::Headers => {
163 let consensus =
164 Arc::new(EthBeaconConsensus::new(provider_factory.chain_spec()));
165
166 let network_secret_path = self
167 .network
168 .p2p_secret_key
169 .clone()
170 .unwrap_or_else(|| data_dir.p2p_secret());
171 let p2p_secret_key = get_secret_key(&network_secret_path)?;
172
173 let default_peers_path = data_dir.known_peers();
174
175 let network = self
176 .network
177 .network_config(
178 &config,
179 provider_factory.chain_spec(),
180 p2p_secret_key,
181 default_peers_path,
182 )
183 .build(provider_factory.clone())
184 .start_network()
185 .await?;
186 let fetch_client = Arc::new(network.fetch_client().await?);
187
188 let tip = fetch_client
190 .get_header(BlockHashOrNumber::Number(self.to))
191 .await?
192 .into_data()
193 .ok_or(StageError::MissingSyncGap)?;
194 let (_, rx) = watch::channel(tip.hash_slow());
195
196 (
197 Box::new(HeaderStage::new(
198 provider_factory.clone(),
199 ReverseHeadersDownloaderBuilder::new(config.stages.headers)
200 .build(fetch_client, consensus.clone()),
201 rx,
202 consensus,
203 etl_config,
204 )),
205 None,
206 )
207 }
208 StageEnum::Bodies => {
209 let consensus =
210 Arc::new(EthBeaconConsensus::new(provider_factory.chain_spec()));
211
212 let mut config = config;
213 config.peers.trusted_nodes_only = self.network.trusted_only;
214 config.peers.trusted_nodes.extend(self.network.trusted_peers.clone());
215
216 let network_secret_path = self
217 .network
218 .p2p_secret_key
219 .clone()
220 .unwrap_or_else(|| data_dir.p2p_secret());
221 let p2p_secret_key = get_secret_key(&network_secret_path)?;
222
223 let default_peers_path = data_dir.known_peers();
224
225 let network = self
226 .network
227 .network_config(
228 &config,
229 provider_factory.chain_spec(),
230 p2p_secret_key,
231 default_peers_path,
232 )
233 .build(provider_factory.clone())
234 .start_network()
235 .await?;
236 let fetch_client = Arc::new(network.fetch_client().await?);
237
238 let stage = BodyStage::new(
239 BodiesDownloaderBuilder::default()
240 .with_stream_batch_size(batch_size as usize)
241 .with_request_limit(config.stages.bodies.downloader_request_limit)
242 .with_max_buffered_blocks_size_bytes(
243 config.stages.bodies.downloader_max_buffered_blocks_size_bytes,
244 )
245 .with_concurrent_requests_range(
246 config.stages.bodies.downloader_min_concurrent_requests..=
247 config.stages.bodies.downloader_max_concurrent_requests,
248 )
249 .build(fetch_client, consensus.clone(), provider_factory.clone()),
250 );
251 (Box::new(stage), None)
252 }
253 StageEnum::Senders => (
254 Box::new(SenderRecoveryStage::new(SenderRecoveryConfig {
255 commit_threshold: batch_size,
256 })),
257 None,
258 ),
259 StageEnum::Execution => (
260 Box::new(ExecutionStage::new(
261 executor(provider_factory.chain_spec()),
262 ExecutionStageThresholds {
263 max_blocks: Some(batch_size),
264 max_changes: None,
265 max_cumulative_gas: None,
266 max_duration: None,
267 },
268 config.stages.merkle.clean_threshold,
269 prune_modes,
270 ExExManagerHandle::empty(),
271 )),
272 None,
273 ),
274 StageEnum::TxLookup => (
275 Box::new(TransactionLookupStage::new(
276 TransactionLookupConfig { chunk_size: batch_size },
277 etl_config,
278 prune_modes.transaction_lookup,
279 )),
280 None,
281 ),
282 StageEnum::AccountHashing => (
283 Box::new(AccountHashingStage::new(
284 HashingConfig { clean_threshold: 1, commit_threshold: batch_size },
285 etl_config,
286 )),
287 None,
288 ),
289 StageEnum::StorageHashing => (
290 Box::new(StorageHashingStage::new(
291 HashingConfig { clean_threshold: 1, commit_threshold: batch_size },
292 etl_config,
293 )),
294 None,
295 ),
296 StageEnum::Merkle => (
297 Box::new(MerkleStage::new_execution(config.stages.merkle.clean_threshold)),
298 Some(Box::new(MerkleStage::default_unwind())),
299 ),
300 StageEnum::AccountHistory => (
301 Box::new(IndexAccountHistoryStage::new(
302 config.stages.index_account_history,
303 etl_config,
304 prune_modes.account_history,
305 )),
306 None,
307 ),
308 StageEnum::StorageHistory => (
309 Box::new(IndexStorageHistoryStage::new(
310 config.stages.index_storage_history,
311 etl_config,
312 prune_modes.storage_history,
313 )),
314 None,
315 ),
316 _ => return Ok(()),
317 };
318 if let Some(unwind_stage) = &unwind_stage {
319 assert_eq!((*exec_stage).type_id(), (**unwind_stage).type_id());
320 }
321
322 let checkpoint = provider_rw.get_stage_checkpoint(exec_stage.id())?.unwrap_or_default();
323
324 let unwind_stage = unwind_stage.as_mut().unwrap_or(&mut exec_stage);
325
326 let mut unwind = UnwindInput {
327 checkpoint: checkpoint.with_block_number(self.to),
328 unwind_to: self.from,
329 bad_block: None,
330 };
331
332 if !self.skip_unwind {
333 while unwind.checkpoint.block_number > self.from {
334 let UnwindOutput { checkpoint } = unwind_stage.unwind(&provider_rw, unwind)?;
335 unwind.checkpoint = checkpoint;
336
337 if self.checkpoints {
338 provider_rw.save_stage_checkpoint(unwind_stage.id(), checkpoint)?;
339 }
340
341 if self.commit {
342 UnifiedStorageWriter::commit_unwind(provider_rw)?;
343 provider_rw = provider_factory.database_provider_rw()?;
344 }
345 }
346 }
347
348 let mut input = ExecInput {
349 target: Some(self.to),
350 checkpoint: Some(checkpoint.with_block_number(self.from)),
351 };
352
353 let start = Instant::now();
354 info!(target: "reth::cli", stage = %self.stage, "Executing stage");
355 loop {
356 exec_stage.execute_ready(input).await?;
357 let ExecOutput { checkpoint, done } = exec_stage.execute(&provider_rw, input)?;
358
359 input.checkpoint = Some(checkpoint);
360
361 if self.checkpoints {
362 provider_rw.save_stage_checkpoint(exec_stage.id(), checkpoint)?;
363 }
364 if self.commit {
365 UnifiedStorageWriter::commit(provider_rw)?;
366 provider_rw = provider_factory.database_provider_rw()?;
367 }
368
369 if done {
370 break
371 }
372 }
373 info!(target: "reth::cli", stage = %self.stage, time = ?start.elapsed(), "Finished stage");
374
375 Ok(())
376 }
377}