reth_cli_commands/stage/
run.rs

1//! Main `stage` command
2//!
3//! Stage debugging tool
4
5use crate::common::{AccessRights, CliNodeComponents, CliNodeTypes, Environment, EnvironmentArgs};
6use alloy_eips::BlockHashOrNumber;
7use alloy_primitives::Sealable;
8use clap::Parser;
9use reth_chainspec::{EthChainSpec, EthereumHardforks, Hardforks};
10use reth_cli::chainspec::ChainSpecParser;
11use reth_cli_runner::CliContext;
12use reth_cli_util::get_secret_key;
13use reth_config::config::{HashingConfig, SenderRecoveryConfig, TransactionLookupConfig};
14use reth_db_api::database_metrics::DatabaseMetrics;
15use reth_downloaders::{
16    bodies::bodies::BodiesDownloaderBuilder,
17    headers::reverse_headers::ReverseHeadersDownloaderBuilder,
18};
19use reth_eth_wire::NetPrimitivesFor;
20use reth_exex::ExExManagerHandle;
21use reth_network::BlockDownloaderProvider;
22use reth_network_p2p::HeadersClient;
23use reth_node_core::{
24    args::{NetworkArgs, StageEnum},
25    version::{
26        BUILD_PROFILE_NAME, CARGO_PKG_VERSION, VERGEN_BUILD_TIMESTAMP, VERGEN_CARGO_FEATURES,
27        VERGEN_CARGO_TARGET_TRIPLE, VERGEN_GIT_SHA,
28    },
29};
30use reth_node_metrics::{
31    chain::ChainSpecInfo,
32    hooks::Hooks,
33    server::{MetricServer, MetricServerConfig},
34    version::VersionInfo,
35};
36use reth_provider::{
37    writer::UnifiedStorageWriter, ChainSpecProvider, DatabaseProviderFactory,
38    StageCheckpointReader, StageCheckpointWriter, StaticFileProviderFactory,
39};
40use reth_stages::{
41    stages::{
42        AccountHashingStage, BodyStage, ExecutionStage, HeaderStage, IndexAccountHistoryStage,
43        IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage,
44        TransactionLookupStage,
45    },
46    ExecInput, ExecOutput, ExecutionStageThresholds, Stage, StageExt, UnwindInput, UnwindOutput,
47};
48use std::{any::Any, net::SocketAddr, sync::Arc, time::Instant};
49use tokio::sync::watch;
50use tracing::*;
51
52/// `reth stage` command
53#[derive(Debug, Parser)]
54pub struct Command<C: ChainSpecParser> {
55    #[command(flatten)]
56    env: EnvironmentArgs<C>,
57
58    /// Enable Prometheus metrics.
59    ///
60    /// The metrics will be served at the given interface and port.
61    #[arg(long, value_name = "SOCKET")]
62    metrics: Option<SocketAddr>,
63
64    /// The name of the stage to run
65    #[arg(value_enum)]
66    stage: StageEnum,
67
68    /// The height to start at
69    #[arg(long)]
70    from: u64,
71
72    /// The end of the stage
73    #[arg(long, short)]
74    to: u64,
75
76    /// Batch size for stage execution and unwind
77    #[arg(long)]
78    batch_size: Option<u64>,
79
80    /// Normally, running the stage requires unwinding for stages that already
81    /// have been run, in order to not rewrite to the same database slots.
82    ///
83    /// You can optionally skip the unwinding phase if you're syncing a block
84    /// range that has not been synced before.
85    #[arg(long, short)]
86    skip_unwind: bool,
87
88    /// Commits the changes in the database. WARNING: potentially destructive.
89    ///
90    /// Useful when you want to run diagnostics on the database.
91    // TODO: We should consider allowing to run hooks at the end of the stage run,
92    // e.g. query the DB size, or any table data.
93    #[arg(long, short)]
94    commit: bool,
95
96    /// Save stage checkpoints
97    #[arg(long)]
98    checkpoints: bool,
99
100    #[command(flatten)]
101    network: NetworkArgs,
102}
103
104impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>> Command<C> {
105    /// Execute `stage` command
106    pub async fn execute<N, Comp, F, P>(self, ctx: CliContext, components: F) -> eyre::Result<()>
107    where
108        N: CliNodeTypes<ChainSpec = C::ChainSpec>,
109        Comp: CliNodeComponents<N>,
110        F: FnOnce(Arc<C::ChainSpec>) -> Comp,
111        P: NetPrimitivesFor<N::Primitives>,
112    {
113        // Raise the fd limit of the process.
114        // Does not do anything on windows.
115        let _ = fdlimit::raise_fd_limit();
116
117        let Environment { provider_factory, config, data_dir } =
118            self.env.init::<N>(AccessRights::RW)?;
119
120        let mut provider_rw = provider_factory.database_provider_rw()?;
121        let components = components(provider_factory.chain_spec());
122
123        if let Some(listen_addr) = self.metrics {
124            info!(target: "reth::cli", "Starting metrics endpoint at {}", listen_addr);
125            let config = MetricServerConfig::new(
126                listen_addr,
127                VersionInfo {
128                    version: CARGO_PKG_VERSION,
129                    build_timestamp: VERGEN_BUILD_TIMESTAMP,
130                    cargo_features: VERGEN_CARGO_FEATURES,
131                    git_sha: VERGEN_GIT_SHA,
132                    target_triple: VERGEN_CARGO_TARGET_TRIPLE,
133                    build_profile: BUILD_PROFILE_NAME,
134                },
135                ChainSpecInfo { name: provider_factory.chain_spec().chain().to_string() },
136                ctx.task_executor,
137                Hooks::builder()
138                    .with_hook({
139                        let db = provider_factory.db_ref().clone();
140                        move || db.report_metrics()
141                    })
142                    .with_hook({
143                        let sfp = provider_factory.static_file_provider();
144                        move || {
145                            if let Err(error) = sfp.report_metrics() {
146                                error!(%error, "Failed to report metrics from static file provider");
147                            }
148                        }
149                    })
150                    .build(),
151            );
152
153            MetricServer::new(config).serve().await?;
154        }
155
156        let batch_size = self.batch_size.unwrap_or(self.to.saturating_sub(self.from) + 1);
157
158        let etl_config = config.stages.etl.clone();
159        let prune_modes = config.prune.clone().map(|prune| prune.segments).unwrap_or_default();
160
161        let (mut exec_stage, mut unwind_stage): (Box<dyn Stage<_>>, Option<Box<dyn Stage<_>>>) =
162            match self.stage {
163                StageEnum::Headers => {
164                    let consensus = Arc::new(components.consensus().clone());
165
166                    let network_secret_path = self
167                        .network
168                        .p2p_secret_key
169                        .clone()
170                        .unwrap_or_else(|| data_dir.p2p_secret());
171                    let p2p_secret_key = get_secret_key(&network_secret_path)?;
172
173                    let default_peers_path = data_dir.known_peers();
174
175                    let network = self
176                        .network
177                        .network_config::<P>(
178                            &config,
179                            provider_factory.chain_spec(),
180                            p2p_secret_key,
181                            default_peers_path,
182                        )
183                        .build(provider_factory.clone())
184                        .start_network()
185                        .await?;
186                    let fetch_client = Arc::new(network.fetch_client().await?);
187
188                    // Use `to` as the tip for the stage
189                    let tip: P::BlockHeader = loop {
190                        match fetch_client.get_header(BlockHashOrNumber::Number(self.to)).await {
191                            Ok(header) => {
192                                if let Some(header) = header.into_data() {
193                                    break header
194                                }
195                            }
196                            Err(error) if error.is_retryable() => {
197                                warn!(target: "reth::cli", "Error requesting header: {error}. Retrying...")
198                            }
199                            Err(error) => return Err(error.into()),
200                        }
201                    };
202                    let (_, rx) = watch::channel(tip.hash_slow());
203                    (
204                        Box::new(HeaderStage::new(
205                            provider_factory.clone(),
206                            ReverseHeadersDownloaderBuilder::new(config.stages.headers)
207                                .build(fetch_client, consensus.clone()),
208                            rx,
209                            etl_config,
210                        )),
211                        None,
212                    )
213                }
214                StageEnum::Bodies => {
215                    let consensus = Arc::new(components.consensus().clone());
216
217                    let mut config = config;
218                    config.peers.trusted_nodes_only = self.network.trusted_only;
219                    config.peers.trusted_nodes.extend(self.network.trusted_peers.clone());
220
221                    let network_secret_path = self
222                        .network
223                        .p2p_secret_key
224                        .clone()
225                        .unwrap_or_else(|| data_dir.p2p_secret());
226                    let p2p_secret_key = get_secret_key(&network_secret_path)?;
227
228                    let default_peers_path = data_dir.known_peers();
229
230                    let network = self
231                        .network
232                        .network_config::<P>(
233                            &config,
234                            provider_factory.chain_spec(),
235                            p2p_secret_key,
236                            default_peers_path,
237                        )
238                        .build(provider_factory.clone())
239                        .start_network()
240                        .await?;
241                    let fetch_client = Arc::new(network.fetch_client().await?);
242
243                    let stage = BodyStage::new(
244                        BodiesDownloaderBuilder::default()
245                            .with_stream_batch_size(batch_size as usize)
246                            .with_request_limit(config.stages.bodies.downloader_request_limit)
247                            .with_max_buffered_blocks_size_bytes(
248                                config.stages.bodies.downloader_max_buffered_blocks_size_bytes,
249                            )
250                            .with_concurrent_requests_range(
251                                config.stages.bodies.downloader_min_concurrent_requests..=
252                                    config.stages.bodies.downloader_max_concurrent_requests,
253                            )
254                            .build(fetch_client, consensus.clone(), provider_factory.clone()),
255                    );
256                    (Box::new(stage), None)
257                }
258                StageEnum::Senders => (
259                    Box::new(SenderRecoveryStage::new(SenderRecoveryConfig {
260                        commit_threshold: batch_size,
261                    })),
262                    None,
263                ),
264                StageEnum::Execution => (
265                    Box::new(ExecutionStage::new(
266                        components.evm_config().clone(),
267                        Arc::new(components.consensus().clone()),
268                        ExecutionStageThresholds {
269                            max_blocks: Some(batch_size),
270                            max_changes: None,
271                            max_cumulative_gas: None,
272                            max_duration: None,
273                        },
274                        config.stages.merkle.clean_threshold,
275                        ExExManagerHandle::empty(),
276                    )),
277                    None,
278                ),
279                StageEnum::TxLookup => (
280                    Box::new(TransactionLookupStage::new(
281                        TransactionLookupConfig { chunk_size: batch_size },
282                        etl_config,
283                        prune_modes.transaction_lookup,
284                    )),
285                    None,
286                ),
287                StageEnum::AccountHashing => (
288                    Box::new(AccountHashingStage::new(
289                        HashingConfig { clean_threshold: 1, commit_threshold: batch_size },
290                        etl_config,
291                    )),
292                    None,
293                ),
294                StageEnum::StorageHashing => (
295                    Box::new(StorageHashingStage::new(
296                        HashingConfig { clean_threshold: 1, commit_threshold: batch_size },
297                        etl_config,
298                    )),
299                    None,
300                ),
301                StageEnum::Merkle => (
302                    Box::new(MerkleStage::new_execution(config.stages.merkle.clean_threshold)),
303                    Some(Box::new(MerkleStage::default_unwind())),
304                ),
305                StageEnum::AccountHistory => (
306                    Box::new(IndexAccountHistoryStage::new(
307                        config.stages.index_account_history,
308                        etl_config,
309                        prune_modes.account_history,
310                    )),
311                    None,
312                ),
313                StageEnum::StorageHistory => (
314                    Box::new(IndexStorageHistoryStage::new(
315                        config.stages.index_storage_history,
316                        etl_config,
317                        prune_modes.storage_history,
318                    )),
319                    None,
320                ),
321                _ => return Ok(()),
322            };
323        if let Some(unwind_stage) = &unwind_stage {
324            assert_eq!((*exec_stage).type_id(), (**unwind_stage).type_id());
325        }
326
327        let checkpoint = provider_rw.get_stage_checkpoint(exec_stage.id())?.unwrap_or_default();
328
329        let unwind_stage = unwind_stage.as_mut().unwrap_or(&mut exec_stage);
330
331        let mut unwind = UnwindInput {
332            checkpoint: checkpoint.with_block_number(self.to),
333            unwind_to: self.from,
334            bad_block: None,
335        };
336
337        if !self.skip_unwind {
338            while unwind.checkpoint.block_number > self.from {
339                let UnwindOutput { checkpoint } = unwind_stage.unwind(&provider_rw, unwind)?;
340                unwind.checkpoint = checkpoint;
341
342                if self.checkpoints {
343                    provider_rw.save_stage_checkpoint(unwind_stage.id(), checkpoint)?;
344                }
345
346                if self.commit {
347                    UnifiedStorageWriter::commit_unwind(provider_rw)?;
348                    provider_rw = provider_factory.database_provider_rw()?;
349                }
350            }
351        }
352
353        let mut input = ExecInput {
354            target: Some(self.to),
355            checkpoint: Some(checkpoint.with_block_number(self.from)),
356        };
357
358        let start = Instant::now();
359        info!(target: "reth::cli", stage = %self.stage, "Executing stage");
360        loop {
361            exec_stage.execute_ready(input).await?;
362            let ExecOutput { checkpoint, done } = exec_stage.execute(&provider_rw, input)?;
363
364            input.checkpoint = Some(checkpoint);
365
366            if self.checkpoints {
367                provider_rw.save_stage_checkpoint(exec_stage.id(), checkpoint)?;
368            }
369            if self.commit {
370                UnifiedStorageWriter::commit(provider_rw)?;
371                provider_rw = provider_factory.database_provider_rw()?;
372            }
373
374            if done {
375                break
376            }
377        }
378        info!(target: "reth::cli", stage = %self.stage, time = ?start.elapsed(), "Finished stage");
379
380        Ok(())
381    }
382}
383
384impl<C: ChainSpecParser> Command<C> {
385    /// Returns the underlying chain being used to run this command
386    pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
387        Some(&self.env.chain)
388    }
389}