reth_cli_commands/stage/
unwind.rs

1//! Unwinding a certain block range
2
3use crate::{
4    common::{AccessRights, CliNodeTypes, Environment, EnvironmentArgs},
5    stage::CliNodeComponents,
6};
7use alloy_eips::BlockHashOrNumber;
8use alloy_primitives::B256;
9use clap::{Parser, Subcommand};
10use reth_chainspec::{ChainSpecProvider, EthChainSpec, EthereumHardforks};
11use reth_cli::chainspec::ChainSpecParser;
12use reth_config::Config;
13use reth_consensus::noop::NoopConsensus;
14use reth_db::DatabaseEnv;
15use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
16use reth_evm::ConfigureEvm;
17use reth_exex::ExExManagerHandle;
18use reth_provider::{
19    providers::ProviderNodeTypes, BlockExecutionWriter, BlockNumReader, ChainStateBlockReader,
20    ChainStateBlockWriter, ProviderFactory, StaticFileProviderFactory, StorageLocation,
21};
22use reth_stages::{
23    sets::{DefaultStages, OfflineStages},
24    stages::ExecutionStage,
25    ExecutionStageThresholds, Pipeline, StageSet,
26};
27use reth_static_file::StaticFileProducer;
28use std::sync::Arc;
29use tokio::sync::watch;
30use tracing::info;
31
32/// `reth stage unwind` command
33#[derive(Debug, Parser)]
34pub struct Command<C: ChainSpecParser> {
35    #[command(flatten)]
36    env: EnvironmentArgs<C>,
37
38    #[command(subcommand)]
39    command: Subcommands,
40
41    /// If this is enabled, then all stages except headers, bodies, and sender recovery will be
42    /// unwound.
43    #[arg(long)]
44    offline: bool,
45}
46
47impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C> {
48    /// Execute `db stage unwind` command
49    pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec>, F, Comp>(
50        self,
51        components: F,
52    ) -> eyre::Result<()>
53    where
54        Comp: CliNodeComponents<N>,
55        F: FnOnce(Arc<C::ChainSpec>) -> Comp,
56    {
57        let Environment { provider_factory, config, .. } = self.env.init::<N>(AccessRights::RW)?;
58
59        let target = self.command.unwind_target(provider_factory.clone())?;
60
61        let components = components(provider_factory.chain_spec());
62
63        let highest_static_file_block = provider_factory
64            .static_file_provider()
65            .get_highest_static_files()
66            .max_block_num()
67            .filter(|highest_static_file_block| *highest_static_file_block > target);
68
69        // Execute a pipeline unwind if the start of the range overlaps the existing static
70        // files. If that's the case, then copy all available data from MDBX to static files, and
71        // only then, proceed with the unwind.
72        //
73        // We also execute a pipeline unwind if `offline` is specified, because we need to only
74        // unwind the data associated with offline stages.
75        if highest_static_file_block.is_some() || self.offline {
76            if self.offline {
77                info!(target: "reth::cli", "Performing an unwind for offline-only data!");
78            }
79
80            if let Some(highest_static_file_block) = highest_static_file_block {
81                info!(target: "reth::cli", ?target, ?highest_static_file_block, "Executing a pipeline unwind.");
82            } else {
83                info!(target: "reth::cli", ?target, "Executing a pipeline unwind.");
84            }
85
86            // This will build an offline-only pipeline if the `offline` flag is enabled
87            let mut pipeline =
88                self.build_pipeline(config, provider_factory, components.evm_config().clone())?;
89
90            // Move all applicable data from database to static files.
91            pipeline.move_to_static_files()?;
92
93            pipeline.unwind(target, None)?;
94        } else {
95            info!(target: "reth::cli", ?target, "Executing a database unwind.");
96            let provider = provider_factory.provider_rw()?;
97
98            provider
99                .remove_block_and_execution_above(target, StorageLocation::Both)
100                .map_err(|err| eyre::eyre!("Transaction error on unwind: {err}"))?;
101
102            // update finalized block if needed
103            let last_saved_finalized_block_number = provider.last_finalized_block_number()?;
104            if last_saved_finalized_block_number.is_none_or(|f| f > target) {
105                provider.save_finalized_block_number(target)?;
106            }
107
108            provider.commit()?;
109        }
110
111        info!(target: "reth::cli", ?target, "Unwound blocks");
112
113        Ok(())
114    }
115
116    fn build_pipeline<N: ProviderNodeTypes<ChainSpec = C::ChainSpec>>(
117        self,
118        config: Config,
119        provider_factory: ProviderFactory<N>,
120        evm_config: impl ConfigureEvm<Primitives = N::Primitives> + 'static,
121    ) -> Result<Pipeline<N>, eyre::Error> {
122        let stage_conf = &config.stages;
123        let prune_modes = config.prune.clone().map(|prune| prune.segments).unwrap_or_default();
124
125        let (tip_tx, tip_rx) = watch::channel(B256::ZERO);
126
127        let builder = if self.offline {
128            Pipeline::<N>::builder().add_stages(
129                OfflineStages::new(
130                    evm_config,
131                    NoopConsensus::arc(),
132                    config.stages,
133                    prune_modes.clone(),
134                )
135                .builder()
136                .disable(reth_stages::StageId::SenderRecovery),
137            )
138        } else {
139            Pipeline::<N>::builder().with_tip_sender(tip_tx).add_stages(
140                DefaultStages::new(
141                    provider_factory.clone(),
142                    tip_rx,
143                    Arc::new(NoopConsensus::default()),
144                    NoopHeaderDownloader::default(),
145                    NoopBodiesDownloader::default(),
146                    evm_config.clone(),
147                    stage_conf.clone(),
148                    prune_modes.clone(),
149                )
150                .set(ExecutionStage::new(
151                    evm_config,
152                    Arc::new(NoopConsensus::default()),
153                    ExecutionStageThresholds {
154                        max_blocks: None,
155                        max_changes: None,
156                        max_cumulative_gas: None,
157                        max_duration: None,
158                    },
159                    stage_conf.execution_external_clean_threshold(),
160                    ExExManagerHandle::empty(),
161                )),
162            )
163        };
164
165        let pipeline = builder.build(
166            provider_factory.clone(),
167            StaticFileProducer::new(provider_factory, prune_modes),
168        );
169        Ok(pipeline)
170    }
171}
172
173impl<C: ChainSpecParser> Command<C> {
174    /// Return the underlying chain being used to run this command
175    pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
176        Some(&self.env.chain)
177    }
178}
179
180/// `reth stage unwind` subcommand
181#[derive(Subcommand, Debug, Eq, PartialEq)]
182enum Subcommands {
183    /// Unwinds the database from the latest block, until the given block number or hash has been
184    /// reached, that block is not included.
185    #[command(name = "to-block")]
186    ToBlock { target: BlockHashOrNumber },
187    /// Unwinds the database from the latest block, until the given number of blocks have been
188    /// reached.
189    #[command(name = "num-blocks")]
190    NumBlocks { amount: u64 },
191}
192
193impl Subcommands {
194    /// Returns the block to unwind to. The returned block will stay in database.
195    fn unwind_target<N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>>(
196        &self,
197        factory: ProviderFactory<N>,
198    ) -> eyre::Result<u64> {
199        let provider = factory.provider()?;
200        let last = provider.last_block_number()?;
201        let target = match self {
202            Self::ToBlock { target } => match target {
203                BlockHashOrNumber::Hash(hash) => provider
204                    .block_number(*hash)?
205                    .ok_or_else(|| eyre::eyre!("Block hash not found in database: {hash:?}"))?,
206                BlockHashOrNumber::Number(num) => *num,
207            },
208            Self::NumBlocks { amount } => last.saturating_sub(*amount),
209        };
210        if target > last {
211            eyre::bail!("Target block number is higher than the latest block number")
212        }
213        Ok(target)
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use reth_ethereum_cli::chainspec::EthereumChainSpecParser;
220
221    use super::*;
222
223    #[test]
224    fn parse_unwind() {
225        let cmd = Command::<EthereumChainSpecParser>::parse_from([
226            "reth",
227            "--datadir",
228            "dir",
229            "to-block",
230            "100",
231        ]);
232        assert_eq!(cmd.command, Subcommands::ToBlock { target: BlockHashOrNumber::Number(100) });
233
234        let cmd = Command::<EthereumChainSpecParser>::parse_from([
235            "reth",
236            "--datadir",
237            "dir",
238            "num-blocks",
239            "100",
240        ]);
241        assert_eq!(cmd.command, Subcommands::NumBlocks { amount: 100 });
242    }
243}