reth_cli_commands/stage/
unwind.rs

1//! Unwinding a certain block range
2
3use crate::common::{AccessRights, CliNodeTypes, Environment, EnvironmentArgs};
4use alloy_eips::BlockHashOrNumber;
5use alloy_primitives::B256;
6use clap::{Parser, Subcommand};
7use reth_beacon_consensus::EthBeaconConsensus;
8use reth_chainspec::{EthChainSpec, EthereumHardforks};
9use reth_cli::chainspec::ChainSpecParser;
10use reth_config::Config;
11use reth_consensus::Consensus;
12use reth_db::DatabaseEnv;
13use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
14use reth_evm::noop::NoopBlockExecutorProvider;
15use reth_exex::ExExManagerHandle;
16use reth_node_core::args::NetworkArgs;
17use reth_provider::{
18    providers::ProviderNodeTypes, BlockExecutionWriter, BlockNumReader, ChainSpecProvider,
19    ChainStateBlockReader, ChainStateBlockWriter, ProviderFactory, StaticFileProviderFactory,
20    StorageLocation,
21};
22use reth_prune::PruneModes;
23use reth_stages::{
24    sets::{DefaultStages, OfflineStages},
25    stages::ExecutionStage,
26    ExecutionStageThresholds, Pipeline, StageSet,
27};
28use reth_static_file::StaticFileProducer;
29use std::sync::Arc;
30use tokio::sync::watch;
31use tracing::info;
32
33/// `reth stage unwind` command
34#[derive(Debug, Parser)]
35pub struct Command<C: ChainSpecParser> {
36    #[command(flatten)]
37    env: EnvironmentArgs<C>,
38
39    #[command(flatten)]
40    network: NetworkArgs,
41
42    #[command(subcommand)]
43    command: Subcommands,
44
45    /// If this is enabled, then all stages except headers, bodies, and sender recovery will be
46    /// unwound.
47    #[arg(long)]
48    offline: bool,
49}
50
51impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C> {
52    /// Execute `db stage unwind` command
53    pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec>>(self) -> eyre::Result<()> {
54        let Environment { provider_factory, config, .. } = self.env.init::<N>(AccessRights::RW)?;
55
56        let target = self.command.unwind_target(provider_factory.clone())?;
57
58        let highest_static_file_block = provider_factory
59            .static_file_provider()
60            .get_highest_static_files()
61            .max_block_num()
62            .filter(|highest_static_file_block| *highest_static_file_block > target);
63
64        // Execute a pipeline unwind if the start of the range overlaps the existing static
65        // files. If that's the case, then copy all available data from MDBX to static files, and
66        // only then, proceed with the unwind.
67        //
68        // We also execute a pipeline unwind if `offline` is specified, because we need to only
69        // unwind the data associated with offline stages.
70        if highest_static_file_block.is_some() || self.offline {
71            if self.offline {
72                info!(target: "reth::cli", "Performing an unwind for offline-only data!");
73            }
74
75            if let Some(highest_static_file_block) = highest_static_file_block {
76                info!(target: "reth::cli", ?target, ?highest_static_file_block, "Executing a pipeline unwind.");
77            } else {
78                info!(target: "reth::cli", ?target, "Executing a pipeline unwind.");
79            }
80
81            // This will build an offline-only pipeline if the `offline` flag is enabled
82            let mut pipeline = self.build_pipeline(config, provider_factory)?;
83
84            // Move all applicable data from database to static files.
85            pipeline.move_to_static_files()?;
86
87            pipeline.unwind(target, None)?;
88        } else {
89            info!(target: "reth::cli", ?target, "Executing a database unwind.");
90            let provider = provider_factory.provider_rw()?;
91
92            provider
93                .remove_block_and_execution_above(target, StorageLocation::Both)
94                .map_err(|err| eyre::eyre!("Transaction error on unwind: {err}"))?;
95
96            // update finalized block if needed
97            let last_saved_finalized_block_number = provider.last_finalized_block_number()?;
98            if last_saved_finalized_block_number.is_none_or(|f| f > target) {
99                provider.save_finalized_block_number(target)?;
100            }
101
102            provider.commit()?;
103        }
104
105        info!(target: "reth::cli", ?target, "Unwound blocks");
106
107        Ok(())
108    }
109
110    fn build_pipeline<N: ProviderNodeTypes<ChainSpec = C::ChainSpec> + CliNodeTypes>(
111        self,
112        config: Config,
113        provider_factory: ProviderFactory<N>,
114    ) -> Result<Pipeline<N>, eyre::Error> {
115        let consensus: Arc<dyn Consensus> =
116            Arc::new(EthBeaconConsensus::new(provider_factory.chain_spec()));
117        let stage_conf = &config.stages;
118        let prune_modes = config.prune.clone().map(|prune| prune.segments).unwrap_or_default();
119
120        let (tip_tx, tip_rx) = watch::channel(B256::ZERO);
121
122        // Unwinding does not require a valid executor
123        let executor = NoopBlockExecutorProvider::<N::Primitives>::default();
124
125        let builder = if self.offline {
126            Pipeline::<N>::builder().add_stages(
127                OfflineStages::new(executor, config.stages, PruneModes::default())
128                    .builder()
129                    .disable(reth_stages::StageId::SenderRecovery),
130            )
131        } else {
132            Pipeline::<N>::builder().with_tip_sender(tip_tx).add_stages(
133                DefaultStages::new(
134                    provider_factory.clone(),
135                    tip_rx,
136                    Arc::clone(&consensus),
137                    NoopHeaderDownloader::default(),
138                    NoopBodiesDownloader::default(),
139                    executor.clone(),
140                    stage_conf.clone(),
141                    prune_modes.clone(),
142                )
143                .set(ExecutionStage::new(
144                    executor,
145                    ExecutionStageThresholds {
146                        max_blocks: None,
147                        max_changes: None,
148                        max_cumulative_gas: None,
149                        max_duration: None,
150                    },
151                    stage_conf.execution_external_clean_threshold(),
152                    prune_modes,
153                    ExExManagerHandle::empty(),
154                )),
155            )
156        };
157
158        let pipeline = builder.build(
159            provider_factory.clone(),
160            StaticFileProducer::new(provider_factory, PruneModes::default()),
161        );
162        Ok(pipeline)
163    }
164}
165
166/// `reth stage unwind` subcommand
167#[derive(Subcommand, Debug, Eq, PartialEq)]
168enum Subcommands {
169    /// Unwinds the database from the latest block, until the given block number or hash has been
170    /// reached, that block is not included.
171    #[command(name = "to-block")]
172    ToBlock { target: BlockHashOrNumber },
173    /// Unwinds the database from the latest block, until the given number of blocks have been
174    /// reached.
175    #[command(name = "num-blocks")]
176    NumBlocks { amount: u64 },
177}
178
179impl Subcommands {
180    /// Returns the block to unwind to. The returned block will stay in database.
181    fn unwind_target<N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>>(
182        &self,
183        factory: ProviderFactory<N>,
184    ) -> eyre::Result<u64> {
185        let provider = factory.provider()?;
186        let last = provider.last_block_number()?;
187        let target = match self {
188            Self::ToBlock { target } => match target {
189                BlockHashOrNumber::Hash(hash) => provider
190                    .block_number(*hash)?
191                    .ok_or_else(|| eyre::eyre!("Block hash not found in database: {hash:?}"))?,
192                BlockHashOrNumber::Number(num) => *num,
193            },
194            Self::NumBlocks { amount } => last.saturating_sub(*amount),
195        };
196        if target > last {
197            eyre::bail!("Target block number is higher than the latest block number")
198        }
199        Ok(target)
200    }
201}
202
203#[cfg(test)]
204mod tests {
205    use reth_ethereum_cli::chainspec::EthereumChainSpecParser;
206
207    use super::*;
208
209    #[test]
210    fn parse_unwind() {
211        let cmd = Command::<EthereumChainSpecParser>::parse_from([
212            "reth",
213            "--datadir",
214            "dir",
215            "to-block",
216            "100",
217        ]);
218        assert_eq!(cmd.command, Subcommands::ToBlock { target: BlockHashOrNumber::Number(100) });
219
220        let cmd = Command::<EthereumChainSpecParser>::parse_from([
221            "reth",
222            "--datadir",
223            "dir",
224            "num-blocks",
225            "100",
226        ]);
227        assert_eq!(cmd.command, Subcommands::NumBlocks { amount: 100 });
228    }
229}