reth_cli_commands/stage/
run.rs

1//! Main `stage` command
2//!
3//! Stage debugging tool
4
5use crate::common::{AccessRights, CliNodeTypes, Environment, EnvironmentArgs};
6use alloy_eips::BlockHashOrNumber;
7use clap::Parser;
8use reth_beacon_consensus::EthBeaconConsensus;
9use reth_chainspec::{EthChainSpec, EthereumHardforks};
10use reth_cli::chainspec::ChainSpecParser;
11use reth_cli_runner::CliContext;
12use reth_cli_util::get_secret_key;
13use reth_config::config::{HashingConfig, SenderRecoveryConfig, TransactionLookupConfig};
14use reth_db_api::database_metrics::DatabaseMetrics;
15use reth_downloaders::{
16    bodies::bodies::BodiesDownloaderBuilder,
17    headers::reverse_headers::ReverseHeadersDownloaderBuilder,
18};
19use reth_evm::execute::BlockExecutorProvider;
20use reth_exex::ExExManagerHandle;
21use reth_network::BlockDownloaderProvider;
22use reth_network_p2p::HeadersClient;
23use reth_node_core::{
24    args::{NetworkArgs, StageEnum},
25    version::{
26        BUILD_PROFILE_NAME, CARGO_PKG_VERSION, VERGEN_BUILD_TIMESTAMP, VERGEN_CARGO_FEATURES,
27        VERGEN_CARGO_TARGET_TRIPLE, VERGEN_GIT_SHA,
28    },
29};
30use reth_node_metrics::{
31    chain::ChainSpecInfo,
32    hooks::Hooks,
33    server::{MetricServer, MetricServerConfig},
34    version::VersionInfo,
35};
36use reth_provider::{
37    writer::UnifiedStorageWriter, ChainSpecProvider, DatabaseProviderFactory,
38    StageCheckpointReader, StageCheckpointWriter, StaticFileProviderFactory,
39};
40use reth_stages::{
41    stages::{
42        AccountHashingStage, BodyStage, ExecutionStage, HeaderStage, IndexAccountHistoryStage,
43        IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage,
44        TransactionLookupStage,
45    },
46    ExecInput, ExecOutput, ExecutionStageThresholds, Stage, StageError, StageExt, UnwindInput,
47    UnwindOutput,
48};
49use std::{any::Any, net::SocketAddr, sync::Arc, time::Instant};
50use tokio::sync::watch;
51use tracing::*;
52
53/// `reth stage` command
54#[derive(Debug, Parser)]
55pub struct Command<C: ChainSpecParser> {
56    #[command(flatten)]
57    env: EnvironmentArgs<C>,
58
59    /// Enable Prometheus metrics.
60    ///
61    /// The metrics will be served at the given interface and port.
62    #[arg(long, value_name = "SOCKET")]
63    metrics: Option<SocketAddr>,
64
65    /// The name of the stage to run
66    #[arg(value_enum)]
67    stage: StageEnum,
68
69    /// The height to start at
70    #[arg(long)]
71    from: u64,
72
73    /// The end of the stage
74    #[arg(long, short)]
75    to: u64,
76
77    /// Batch size for stage execution and unwind
78    #[arg(long)]
79    batch_size: Option<u64>,
80
81    /// Normally, running the stage requires unwinding for stages that already
82    /// have been run, in order to not rewrite to the same database slots.
83    ///
84    /// You can optionally skip the unwinding phase if you're syncing a block
85    /// range that has not been synced before.
86    #[arg(long, short)]
87    skip_unwind: bool,
88
89    /// Commits the changes in the database. WARNING: potentially destructive.
90    ///
91    /// Useful when you want to run diagnostics on the database.
92    // TODO: We should consider allowing to run hooks at the end of the stage run,
93    // e.g. query the DB size, or any table data.
94    #[arg(long, short)]
95    commit: bool,
96
97    /// Save stage checkpoints
98    #[arg(long)]
99    checkpoints: bool,
100
101    #[command(flatten)]
102    network: NetworkArgs,
103}
104
105impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C> {
106    /// Execute `stage` command
107    pub async fn execute<N, E, F>(self, ctx: CliContext, executor: F) -> eyre::Result<()>
108    where
109        N: CliNodeTypes<ChainSpec = C::ChainSpec>,
110        E: BlockExecutorProvider<Primitives = N::Primitives>,
111        F: FnOnce(Arc<C::ChainSpec>) -> E,
112    {
113        // Raise the fd limit of the process.
114        // Does not do anything on windows.
115        let _ = fdlimit::raise_fd_limit();
116
117        let Environment { provider_factory, config, data_dir } =
118            self.env.init::<N>(AccessRights::RW)?;
119
120        let mut provider_rw = provider_factory.database_provider_rw()?;
121
122        if let Some(listen_addr) = self.metrics {
123            info!(target: "reth::cli", "Starting metrics endpoint at {}", listen_addr);
124            let config = MetricServerConfig::new(
125                listen_addr,
126                VersionInfo {
127                    version: CARGO_PKG_VERSION,
128                    build_timestamp: VERGEN_BUILD_TIMESTAMP,
129                    cargo_features: VERGEN_CARGO_FEATURES,
130                    git_sha: VERGEN_GIT_SHA,
131                    target_triple: VERGEN_CARGO_TARGET_TRIPLE,
132                    build_profile: BUILD_PROFILE_NAME,
133                },
134                ChainSpecInfo { name: provider_factory.chain_spec().chain().to_string() },
135                ctx.task_executor,
136                Hooks::builder()
137                    .with_hook({
138                        let db = provider_factory.db_ref().clone();
139                        move || db.report_metrics()
140                    })
141                    .with_hook({
142                        let sfp = provider_factory.static_file_provider();
143                        move || {
144                            if let Err(error) = sfp.report_metrics() {
145                                error!(%error, "Failed to report metrics from static file provider");
146                            }
147                        }
148                    })
149                    .build(),
150            );
151
152            MetricServer::new(config).serve().await?;
153        }
154
155        let batch_size = self.batch_size.unwrap_or(self.to.saturating_sub(self.from) + 1);
156
157        let etl_config = config.stages.etl.clone();
158        let prune_modes = config.prune.clone().map(|prune| prune.segments).unwrap_or_default();
159
160        let (mut exec_stage, mut unwind_stage): (Box<dyn Stage<_>>, Option<Box<dyn Stage<_>>>) =
161            match self.stage {
162                StageEnum::Headers => {
163                    let consensus =
164                        Arc::new(EthBeaconConsensus::new(provider_factory.chain_spec()));
165
166                    let network_secret_path = self
167                        .network
168                        .p2p_secret_key
169                        .clone()
170                        .unwrap_or_else(|| data_dir.p2p_secret());
171                    let p2p_secret_key = get_secret_key(&network_secret_path)?;
172
173                    let default_peers_path = data_dir.known_peers();
174
175                    let network = self
176                        .network
177                        .network_config(
178                            &config,
179                            provider_factory.chain_spec(),
180                            p2p_secret_key,
181                            default_peers_path,
182                        )
183                        .build(provider_factory.clone())
184                        .start_network()
185                        .await?;
186                    let fetch_client = Arc::new(network.fetch_client().await?);
187
188                    // Use `to` as the tip for the stage
189                    let tip = fetch_client
190                        .get_header(BlockHashOrNumber::Number(self.to))
191                        .await?
192                        .into_data()
193                        .ok_or(StageError::MissingSyncGap)?;
194                    let (_, rx) = watch::channel(tip.hash_slow());
195
196                    (
197                        Box::new(HeaderStage::new(
198                            provider_factory.clone(),
199                            ReverseHeadersDownloaderBuilder::new(config.stages.headers)
200                                .build(fetch_client, consensus.clone()),
201                            rx,
202                            consensus,
203                            etl_config,
204                        )),
205                        None,
206                    )
207                }
208                StageEnum::Bodies => {
209                    let consensus =
210                        Arc::new(EthBeaconConsensus::new(provider_factory.chain_spec()));
211
212                    let mut config = config;
213                    config.peers.trusted_nodes_only = self.network.trusted_only;
214                    config.peers.trusted_nodes.extend(self.network.trusted_peers.clone());
215
216                    let network_secret_path = self
217                        .network
218                        .p2p_secret_key
219                        .clone()
220                        .unwrap_or_else(|| data_dir.p2p_secret());
221                    let p2p_secret_key = get_secret_key(&network_secret_path)?;
222
223                    let default_peers_path = data_dir.known_peers();
224
225                    let network = self
226                        .network
227                        .network_config(
228                            &config,
229                            provider_factory.chain_spec(),
230                            p2p_secret_key,
231                            default_peers_path,
232                        )
233                        .build(provider_factory.clone())
234                        .start_network()
235                        .await?;
236                    let fetch_client = Arc::new(network.fetch_client().await?);
237
238                    let stage = BodyStage::new(
239                        BodiesDownloaderBuilder::default()
240                            .with_stream_batch_size(batch_size as usize)
241                            .with_request_limit(config.stages.bodies.downloader_request_limit)
242                            .with_max_buffered_blocks_size_bytes(
243                                config.stages.bodies.downloader_max_buffered_blocks_size_bytes,
244                            )
245                            .with_concurrent_requests_range(
246                                config.stages.bodies.downloader_min_concurrent_requests..=
247                                    config.stages.bodies.downloader_max_concurrent_requests,
248                            )
249                            .build(fetch_client, consensus.clone(), provider_factory.clone()),
250                    );
251                    (Box::new(stage), None)
252                }
253                StageEnum::Senders => (
254                    Box::new(SenderRecoveryStage::new(SenderRecoveryConfig {
255                        commit_threshold: batch_size,
256                    })),
257                    None,
258                ),
259                StageEnum::Execution => (
260                    Box::new(ExecutionStage::new(
261                        executor(provider_factory.chain_spec()),
262                        ExecutionStageThresholds {
263                            max_blocks: Some(batch_size),
264                            max_changes: None,
265                            max_cumulative_gas: None,
266                            max_duration: None,
267                        },
268                        config.stages.merkle.clean_threshold,
269                        prune_modes,
270                        ExExManagerHandle::empty(),
271                    )),
272                    None,
273                ),
274                StageEnum::TxLookup => (
275                    Box::new(TransactionLookupStage::new(
276                        TransactionLookupConfig { chunk_size: batch_size },
277                        etl_config,
278                        prune_modes.transaction_lookup,
279                    )),
280                    None,
281                ),
282                StageEnum::AccountHashing => (
283                    Box::new(AccountHashingStage::new(
284                        HashingConfig { clean_threshold: 1, commit_threshold: batch_size },
285                        etl_config,
286                    )),
287                    None,
288                ),
289                StageEnum::StorageHashing => (
290                    Box::new(StorageHashingStage::new(
291                        HashingConfig { clean_threshold: 1, commit_threshold: batch_size },
292                        etl_config,
293                    )),
294                    None,
295                ),
296                StageEnum::Merkle => (
297                    Box::new(MerkleStage::new_execution(config.stages.merkle.clean_threshold)),
298                    Some(Box::new(MerkleStage::default_unwind())),
299                ),
300                StageEnum::AccountHistory => (
301                    Box::new(IndexAccountHistoryStage::new(
302                        config.stages.index_account_history,
303                        etl_config,
304                        prune_modes.account_history,
305                    )),
306                    None,
307                ),
308                StageEnum::StorageHistory => (
309                    Box::new(IndexStorageHistoryStage::new(
310                        config.stages.index_storage_history,
311                        etl_config,
312                        prune_modes.storage_history,
313                    )),
314                    None,
315                ),
316                _ => return Ok(()),
317            };
318        if let Some(unwind_stage) = &unwind_stage {
319            assert_eq!((*exec_stage).type_id(), (**unwind_stage).type_id());
320        }
321
322        let checkpoint = provider_rw.get_stage_checkpoint(exec_stage.id())?.unwrap_or_default();
323
324        let unwind_stage = unwind_stage.as_mut().unwrap_or(&mut exec_stage);
325
326        let mut unwind = UnwindInput {
327            checkpoint: checkpoint.with_block_number(self.to),
328            unwind_to: self.from,
329            bad_block: None,
330        };
331
332        if !self.skip_unwind {
333            while unwind.checkpoint.block_number > self.from {
334                let UnwindOutput { checkpoint } = unwind_stage.unwind(&provider_rw, unwind)?;
335                unwind.checkpoint = checkpoint;
336
337                if self.checkpoints {
338                    provider_rw.save_stage_checkpoint(unwind_stage.id(), checkpoint)?;
339                }
340
341                if self.commit {
342                    UnifiedStorageWriter::commit_unwind(provider_rw)?;
343                    provider_rw = provider_factory.database_provider_rw()?;
344                }
345            }
346        }
347
348        let mut input = ExecInput {
349            target: Some(self.to),
350            checkpoint: Some(checkpoint.with_block_number(self.from)),
351        };
352
353        let start = Instant::now();
354        info!(target: "reth::cli", stage = %self.stage, "Executing stage");
355        loop {
356            exec_stage.execute_ready(input).await?;
357            let ExecOutput { checkpoint, done } = exec_stage.execute(&provider_rw, input)?;
358
359            input.checkpoint = Some(checkpoint);
360
361            if self.checkpoints {
362                provider_rw.save_stage_checkpoint(exec_stage.id(), checkpoint)?;
363            }
364            if self.commit {
365                UnifiedStorageWriter::commit(provider_rw)?;
366                provider_rw = provider_factory.database_provider_rw()?;
367            }
368
369            if done {
370                break
371            }
372        }
373        info!(target: "reth::cli", stage = %self.stage, time = ?start.elapsed(), "Finished stage");
374
375        Ok(())
376    }
377}