reth/commands/debug_cmd/
execution.rs

1//! Command for debugging execution.
2
3use crate::{args::NetworkArgs, utils::get_single_header};
4use alloy_eips::BlockHashOrNumber;
5use alloy_primitives::{BlockNumber, B256};
6use clap::Parser;
7use futures::StreamExt;
8use reth_beacon_consensus::EthBeaconConsensus;
9use reth_chainspec::ChainSpec;
10use reth_cli::chainspec::ChainSpecParser;
11use reth_cli_commands::common::{AccessRights, CliNodeTypes, Environment, EnvironmentArgs};
12use reth_cli_runner::CliContext;
13use reth_cli_util::get_secret_key;
14use reth_config::Config;
15use reth_consensus::Consensus;
16use reth_db::DatabaseEnv;
17use reth_downloaders::{
18    bodies::bodies::BodiesDownloaderBuilder,
19    headers::reverse_headers::ReverseHeadersDownloaderBuilder,
20};
21use reth_exex::ExExManagerHandle;
22use reth_network::{BlockDownloaderProvider, NetworkHandle};
23use reth_network_api::NetworkInfo;
24use reth_network_p2p::{headers::client::HeadersClient, EthBlockClient};
25use reth_node_api::NodeTypesWithDBAdapter;
26use reth_node_ethereum::EthExecutorProvider;
27use reth_provider::{
28    providers::ProviderNodeTypes, ChainSpecProvider, ProviderFactory, StageCheckpointReader,
29};
30use reth_prune::PruneModes;
31use reth_stages::{
32    sets::DefaultStages, stages::ExecutionStage, ExecutionStageThresholds, Pipeline, StageId,
33    StageSet,
34};
35use reth_static_file::StaticFileProducer;
36use reth_tasks::TaskExecutor;
37use std::{path::PathBuf, sync::Arc};
38use tokio::sync::watch;
39use tracing::*;
40
41/// `reth debug execution` command
42#[derive(Debug, Parser)]
43pub struct Command<C: ChainSpecParser> {
44    #[command(flatten)]
45    env: EnvironmentArgs<C>,
46
47    #[command(flatten)]
48    network: NetworkArgs,
49
50    /// The maximum block height.
51    #[arg(long)]
52    pub to: u64,
53
54    /// The block interval for sync and unwind.
55    /// Defaults to `1000`.
56    #[arg(long, default_value = "1000")]
57    pub interval: u64,
58}
59
60impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
61    fn build_pipeline<N: ProviderNodeTypes<ChainSpec = C::ChainSpec> + CliNodeTypes, Client>(
62        &self,
63        config: &Config,
64        client: Client,
65        consensus: Arc<dyn Consensus>,
66        provider_factory: ProviderFactory<N>,
67        task_executor: &TaskExecutor,
68        static_file_producer: StaticFileProducer<ProviderFactory<N>>,
69    ) -> eyre::Result<Pipeline<N>>
70    where
71        Client: EthBlockClient + 'static,
72    {
73        // building network downloaders using the fetch client
74        let header_downloader = ReverseHeadersDownloaderBuilder::new(config.stages.headers)
75            .build(client.clone(), consensus.clone().as_header_validator())
76            .into_task_with(task_executor);
77
78        let body_downloader = BodiesDownloaderBuilder::new(config.stages.bodies)
79            .build(client, Arc::clone(&consensus), provider_factory.clone())
80            .into_task_with(task_executor);
81
82        let stage_conf = &config.stages;
83        let prune_modes = config.prune.clone().map(|prune| prune.segments).unwrap_or_default();
84
85        let (tip_tx, tip_rx) = watch::channel(B256::ZERO);
86        let executor = EthExecutorProvider::ethereum(provider_factory.chain_spec());
87
88        let pipeline = Pipeline::<N>::builder()
89            .with_tip_sender(tip_tx)
90            .add_stages(
91                DefaultStages::new(
92                    provider_factory.clone(),
93                    tip_rx,
94                    Arc::clone(&consensus),
95                    header_downloader,
96                    body_downloader,
97                    executor.clone(),
98                    stage_conf.clone(),
99                    prune_modes.clone(),
100                )
101                .set(ExecutionStage::new(
102                    executor,
103                    ExecutionStageThresholds {
104                        max_blocks: None,
105                        max_changes: None,
106                        max_cumulative_gas: None,
107                        max_duration: None,
108                    },
109                    stage_conf.execution_external_clean_threshold(),
110                    prune_modes,
111                    ExExManagerHandle::empty(),
112                )),
113            )
114            .build(provider_factory, static_file_producer);
115
116        Ok(pipeline)
117    }
118
119    async fn build_network<N: CliNodeTypes<ChainSpec = C::ChainSpec>>(
120        &self,
121        config: &Config,
122        task_executor: TaskExecutor,
123        provider_factory: ProviderFactory<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>,
124        network_secret_path: PathBuf,
125        default_peers_path: PathBuf,
126    ) -> eyre::Result<NetworkHandle> {
127        let secret_key = get_secret_key(&network_secret_path)?;
128        let network = self
129            .network
130            .network_config(config, provider_factory.chain_spec(), secret_key, default_peers_path)
131            .with_task_executor(Box::new(task_executor))
132            .build(provider_factory)
133            .start_network()
134            .await?;
135        info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network");
136        debug!(target: "reth::cli", peer_id = ?network.peer_id(), "Full peer ID");
137        Ok(network)
138    }
139
140    async fn fetch_block_hash<Client>(
141        &self,
142        client: Client,
143        block: BlockNumber,
144    ) -> eyre::Result<B256>
145    where
146        Client: HeadersClient<Header: reth_primitives_traits::BlockHeader>,
147    {
148        info!(target: "reth::cli", ?block, "Fetching block from the network.");
149        loop {
150            match get_single_header(&client, BlockHashOrNumber::Number(block)).await {
151                Ok(tip_header) => {
152                    info!(target: "reth::cli", ?block, "Successfully fetched block");
153                    return Ok(tip_header.hash())
154                }
155                Err(error) => {
156                    error!(target: "reth::cli", ?block, %error, "Failed to fetch the block. Retrying...");
157                }
158            }
159        }
160    }
161
162    /// Execute `execution-debug` command
163    pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec>>(
164        self,
165        ctx: CliContext,
166    ) -> eyre::Result<()> {
167        let Environment { provider_factory, config, data_dir } =
168            self.env.init::<N>(AccessRights::RW)?;
169
170        let consensus: Arc<dyn Consensus> =
171            Arc::new(EthBeaconConsensus::new(provider_factory.chain_spec()));
172
173        // Configure and build network
174        let network_secret_path =
175            self.network.p2p_secret_key.clone().unwrap_or_else(|| data_dir.p2p_secret());
176        let network = self
177            .build_network(
178                &config,
179                ctx.task_executor.clone(),
180                provider_factory.clone(),
181                network_secret_path,
182                data_dir.known_peers(),
183            )
184            .await?;
185
186        let static_file_producer =
187            StaticFileProducer::new(provider_factory.clone(), PruneModes::default());
188
189        // Configure the pipeline
190        let fetch_client = network.fetch_client().await?;
191        let mut pipeline = self.build_pipeline(
192            &config,
193            fetch_client.clone(),
194            Arc::clone(&consensus),
195            provider_factory.clone(),
196            &ctx.task_executor,
197            static_file_producer,
198        )?;
199
200        let provider = provider_factory.provider()?;
201
202        let latest_block_number =
203            provider.get_stage_checkpoint(StageId::Finish)?.map(|ch| ch.block_number);
204        if latest_block_number.unwrap_or_default() >= self.to {
205            info!(target: "reth::cli", latest = latest_block_number, "Nothing to run");
206            return Ok(())
207        }
208
209        ctx.task_executor.spawn_critical(
210            "events task",
211            reth_node_events::node::handle_events(
212                Some(Box::new(network)),
213                latest_block_number,
214                pipeline.events().map(Into::into),
215            ),
216        );
217
218        let mut current_max_block = latest_block_number.unwrap_or_default();
219        while current_max_block < self.to {
220            let next_block = current_max_block + 1;
221            let target_block = self.to.min(current_max_block + self.interval);
222            let target_block_hash =
223                self.fetch_block_hash(fetch_client.clone(), target_block).await?;
224
225            // Run the pipeline
226            info!(target: "reth::cli", from = next_block, to = target_block, tip = ?target_block_hash, "Starting pipeline");
227            pipeline.set_tip(target_block_hash);
228            let result = pipeline.run_loop().await?;
229            trace!(target: "reth::cli", from = next_block, to = target_block, tip = ?target_block_hash, ?result, "Pipeline finished");
230
231            // Unwind the pipeline without committing.
232            provider_factory.provider_rw()?.unwind_trie_state_range(next_block..=target_block)?;
233
234            // Update latest block
235            current_max_block = target_block;
236        }
237
238        Ok(())
239    }
240}