reth_stages/stages/
merkle.rs

1use alloy_consensus::BlockHeader;
2use alloy_primitives::{BlockNumber, B256};
3use reth_codecs::Compact;
4use reth_consensus::ConsensusError;
5use reth_db::tables;
6use reth_db_api::transaction::{DbTx, DbTxMut};
7use reth_primitives::{GotExpected, SealedHeader};
8use reth_provider::{
9    DBProvider, HeaderProvider, ProviderError, StageCheckpointReader, StageCheckpointWriter,
10    StatsReader, TrieWriter,
11};
12use reth_stages_api::{
13    BlockErrorKind, EntitiesCheckpoint, ExecInput, ExecOutput, MerkleCheckpoint, Stage,
14    StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput,
15};
16use reth_trie::{IntermediateStateRootState, StateRoot, StateRootProgress, StoredSubNode};
17use reth_trie_db::DatabaseStateRoot;
18use std::fmt::Debug;
19use tracing::*;
20
21// TODO: automate the process outlined below so the user can just send in a debugging package
22/// The error message that we include in invalid state root errors to tell users what information
23/// they should include in a bug report, since true state root errors can be impossible to debug
24/// with just basic logs.
25pub const INVALID_STATE_ROOT_ERROR_MESSAGE: &str = r#"
26Invalid state root error on stage verification!
27This is an error that likely requires a report to the reth team with additional information.
28Please include the following information in your report:
29 * This error message
30 * The state root of the block that was rejected
31 * The output of `reth db stats --checksum` from the database that was being used. This will take a long time to run!
32 * 50-100 lines of logs before and after the first occurrence of the log message with the state root of the block that was rejected.
33 * The debug logs from __the same time period__. To find the default location for these logs, run:
34   `reth --help | grep -A 4 'log.file.directory'`
35
36Once you have this information, please submit a github issue at https://github.com/paradigmxyz/reth/issues/new
37"#;
38
39/// The default threshold (in number of blocks) for switching from incremental trie building
40/// of changes to whole rebuild.
41pub const MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD: u64 = 5_000;
42
43/// The merkle hashing stage uses input from
44/// [`AccountHashingStage`][crate::stages::AccountHashingStage] and
45/// [`StorageHashingStage`][crate::stages::AccountHashingStage] to calculate intermediate hashes
46/// and state roots.
47///
48/// This stage should be run with the above two stages, otherwise it is a no-op.
49///
50/// This stage is split in two: one for calculating hashes and one for unwinding.
51///
52/// When run in execution, it's going to be executed AFTER the hashing stages, to generate
53/// the state root. When run in unwind mode, it's going to be executed BEFORE the hashing stages,
54/// so that it unwinds the intermediate hashes based on the unwound hashed state from the hashing
55/// stages. The order of these two variants is important. The unwind variant should be added to the
56/// pipeline before the execution variant.
57///
58/// An example pipeline to only hash state would be:
59///
60/// - [`MerkleStage::Unwind`]
61/// - [`AccountHashingStage`][crate::stages::AccountHashingStage]
62/// - [`StorageHashingStage`][crate::stages::StorageHashingStage]
63/// - [`MerkleStage::Execution`]
64#[derive(Debug, Clone)]
65pub enum MerkleStage {
66    /// The execution portion of the merkle stage.
67    Execution {
68        /// The threshold (in number of blocks) for switching from incremental trie building
69        /// of changes to whole rebuild.
70        clean_threshold: u64,
71    },
72    /// The unwind portion of the merkle stage.
73    Unwind,
74    /// Able to execute and unwind. Used for tests
75    #[cfg(any(test, feature = "test-utils"))]
76    Both {
77        /// The threshold (in number of blocks) for switching from incremental trie building
78        /// of changes to whole rebuild.
79        clean_threshold: u64,
80    },
81}
82
83impl MerkleStage {
84    /// Stage default for the [`MerkleStage::Execution`].
85    pub const fn default_execution() -> Self {
86        Self::Execution { clean_threshold: MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD }
87    }
88
89    /// Stage default for the [`MerkleStage::Unwind`].
90    pub const fn default_unwind() -> Self {
91        Self::Unwind
92    }
93
94    /// Create new instance of [`MerkleStage::Execution`].
95    pub const fn new_execution(clean_threshold: u64) -> Self {
96        Self::Execution { clean_threshold }
97    }
98
99    /// Gets the hashing progress
100    pub fn get_execution_checkpoint(
101        &self,
102        provider: &impl StageCheckpointReader,
103    ) -> Result<Option<MerkleCheckpoint>, StageError> {
104        let buf =
105            provider.get_stage_checkpoint_progress(StageId::MerkleExecute)?.unwrap_or_default();
106
107        if buf.is_empty() {
108            return Ok(None)
109        }
110
111        let (checkpoint, _) = MerkleCheckpoint::from_compact(&buf, buf.len());
112        Ok(Some(checkpoint))
113    }
114
115    /// Saves the hashing progress
116    pub fn save_execution_checkpoint(
117        &self,
118        provider: &impl StageCheckpointWriter,
119        checkpoint: Option<MerkleCheckpoint>,
120    ) -> Result<(), StageError> {
121        let mut buf = vec![];
122        if let Some(checkpoint) = checkpoint {
123            debug!(
124                target: "sync::stages::merkle::exec",
125                last_account_key = ?checkpoint.last_account_key,
126                "Saving inner merkle checkpoint"
127            );
128            checkpoint.to_compact(&mut buf);
129        }
130        Ok(provider.save_stage_checkpoint_progress(StageId::MerkleExecute, buf)?)
131    }
132}
133
134impl<Provider> Stage<Provider> for MerkleStage
135where
136    Provider: DBProvider<Tx: DbTxMut>
137        + TrieWriter
138        + StatsReader
139        + HeaderProvider
140        + StageCheckpointReader
141        + StageCheckpointWriter,
142{
143    /// Return the id of the stage
144    fn id(&self) -> StageId {
145        match self {
146            Self::Execution { .. } => StageId::MerkleExecute,
147            Self::Unwind => StageId::MerkleUnwind,
148            #[cfg(any(test, feature = "test-utils"))]
149            Self::Both { .. } => StageId::Other("MerkleBoth"),
150        }
151    }
152
153    /// Execute the stage.
154    fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
155        let threshold = match self {
156            Self::Unwind => {
157                info!(target: "sync::stages::merkle::unwind", "Stage is always skipped");
158                return Ok(ExecOutput::done(StageCheckpoint::new(input.target())))
159            }
160            Self::Execution { clean_threshold } => *clean_threshold,
161            #[cfg(any(test, feature = "test-utils"))]
162            Self::Both { clean_threshold } => *clean_threshold,
163        };
164
165        let range = input.next_block_range();
166        let (from_block, to_block) = range.clone().into_inner();
167        let current_block_number = input.checkpoint().block_number;
168
169        let target_block = provider
170            .header_by_number(to_block)?
171            .ok_or_else(|| ProviderError::HeaderNotFound(to_block.into()))?;
172        let target_block_root = target_block.state_root();
173
174        let mut checkpoint = self.get_execution_checkpoint(provider)?;
175        let (trie_root, entities_checkpoint) = if range.is_empty() {
176            (target_block_root, input.checkpoint().entities_stage_checkpoint().unwrap_or_default())
177        } else if to_block - from_block > threshold || from_block == 1 {
178            // if there are more blocks than threshold it is faster to rebuild the trie
179            let mut entities_checkpoint = if let Some(checkpoint) =
180                checkpoint.as_ref().filter(|c| c.target_block == to_block)
181            {
182                debug!(
183                    target: "sync::stages::merkle::exec",
184                    current = ?current_block_number,
185                    target = ?to_block,
186                    last_account_key = ?checkpoint.last_account_key,
187                    "Continuing inner merkle checkpoint"
188                );
189
190                input.checkpoint().entities_stage_checkpoint()
191            } else {
192                debug!(
193                    target: "sync::stages::merkle::exec",
194                    current = ?current_block_number,
195                    target = ?to_block,
196                    previous_checkpoint = ?checkpoint,
197                    "Rebuilding trie"
198                );
199                // Reset the checkpoint and clear trie tables
200                checkpoint = None;
201                self.save_execution_checkpoint(provider, None)?;
202                provider.tx_ref().clear::<tables::AccountsTrie>()?;
203                provider.tx_ref().clear::<tables::StoragesTrie>()?;
204
205                None
206            }
207            .unwrap_or(EntitiesCheckpoint {
208                processed: 0,
209                total: (provider.count_entries::<tables::HashedAccounts>()? +
210                    provider.count_entries::<tables::HashedStorages>()?)
211                    as u64,
212            });
213
214            let tx = provider.tx_ref();
215            let progress = StateRoot::from_tx(tx)
216                .with_intermediate_state(checkpoint.map(IntermediateStateRootState::from))
217                .root_with_progress()
218                .map_err(|e| {
219                    error!(target: "sync::stages::merkle", %e, ?current_block_number, ?to_block, "State root with progress failed! {INVALID_STATE_ROOT_ERROR_MESSAGE}");
220                    StageError::Fatal(Box::new(e))
221                })?;
222            match progress {
223                StateRootProgress::Progress(state, hashed_entries_walked, updates) => {
224                    provider.write_trie_updates(&updates)?;
225
226                    let checkpoint = MerkleCheckpoint::new(
227                        to_block,
228                        state.last_account_key,
229                        state.walker_stack.into_iter().map(StoredSubNode::from).collect(),
230                        state.hash_builder.into(),
231                    );
232                    self.save_execution_checkpoint(provider, Some(checkpoint))?;
233
234                    entities_checkpoint.processed += hashed_entries_walked as u64;
235
236                    return Ok(ExecOutput {
237                        checkpoint: input
238                            .checkpoint()
239                            .with_entities_stage_checkpoint(entities_checkpoint),
240                        done: false,
241                    })
242                }
243                StateRootProgress::Complete(root, hashed_entries_walked, updates) => {
244                    provider.write_trie_updates(&updates)?;
245
246                    entities_checkpoint.processed += hashed_entries_walked as u64;
247
248                    (root, entities_checkpoint)
249                }
250            }
251        } else {
252            debug!(target: "sync::stages::merkle::exec", current = ?current_block_number, target = ?to_block, "Updating trie");
253            let (root, updates) =
254                StateRoot::incremental_root_with_updates(provider.tx_ref(), range)
255                    .map_err(|e| {
256                        error!(target: "sync::stages::merkle", %e, ?current_block_number, ?to_block, "Incremental state root failed! {INVALID_STATE_ROOT_ERROR_MESSAGE}");
257                        StageError::Fatal(Box::new(e))
258                    })?;
259
260            provider.write_trie_updates(&updates)?;
261
262            let total_hashed_entries = (provider.count_entries::<tables::HashedAccounts>()? +
263                provider.count_entries::<tables::HashedStorages>()?)
264                as u64;
265
266            let entities_checkpoint = EntitiesCheckpoint {
267                // This is fine because `range` doesn't have an upper bound, so in this `else`
268                // branch we're just hashing all remaining accounts and storage slots we have in the
269                // database.
270                processed: total_hashed_entries,
271                total: total_hashed_entries,
272            };
273
274            (root, entities_checkpoint)
275        };
276
277        // Reset the checkpoint
278        self.save_execution_checkpoint(provider, None)?;
279
280        validate_state_root(trie_root, SealedHeader::seal(target_block), to_block)?;
281
282        Ok(ExecOutput {
283            checkpoint: StageCheckpoint::new(to_block)
284                .with_entities_stage_checkpoint(entities_checkpoint),
285            done: true,
286        })
287    }
288
289    /// Unwind the stage.
290    fn unwind(
291        &mut self,
292        provider: &Provider,
293        input: UnwindInput,
294    ) -> Result<UnwindOutput, StageError> {
295        let tx = provider.tx_ref();
296        let range = input.unwind_block_range();
297        if matches!(self, Self::Execution { .. }) {
298            info!(target: "sync::stages::merkle::unwind", "Stage is always skipped");
299            return Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) })
300        }
301
302        let mut entities_checkpoint =
303            input.checkpoint.entities_stage_checkpoint().unwrap_or(EntitiesCheckpoint {
304                processed: 0,
305                total: (tx.entries::<tables::HashedAccounts>()? +
306                    tx.entries::<tables::HashedStorages>()?) as u64,
307            });
308
309        if input.unwind_to == 0 {
310            tx.clear::<tables::AccountsTrie>()?;
311            tx.clear::<tables::StoragesTrie>()?;
312
313            entities_checkpoint.processed = 0;
314
315            return Ok(UnwindOutput {
316                checkpoint: StageCheckpoint::new(input.unwind_to)
317                    .with_entities_stage_checkpoint(entities_checkpoint),
318            })
319        }
320
321        // Unwind trie only if there are transitions
322        if range.is_empty() {
323            info!(target: "sync::stages::merkle::unwind", "Nothing to unwind");
324        } else {
325            let (block_root, updates) = StateRoot::incremental_root_with_updates(tx, range)
326                .map_err(|e| StageError::Fatal(Box::new(e)))?;
327
328            // Validate the calculated state root
329            let target = provider
330                .header_by_number(input.unwind_to)?
331                .ok_or_else(|| ProviderError::HeaderNotFound(input.unwind_to.into()))?;
332
333            validate_state_root(block_root, SealedHeader::seal(target), input.unwind_to)?;
334
335            // Validation passed, apply unwind changes to the database.
336            provider.write_trie_updates(&updates)?;
337
338            // TODO(alexey): update entities checkpoint
339        }
340
341        Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) })
342    }
343}
344
345/// Check that the computed state root matches the root in the expected header.
346#[inline]
347fn validate_state_root<H: BlockHeader + Debug>(
348    got: B256,
349    expected: SealedHeader<H>,
350    target_block: BlockNumber,
351) -> Result<(), StageError> {
352    if got == expected.state_root() {
353        Ok(())
354    } else {
355        error!(target: "sync::stages::merkle", ?target_block, ?got, ?expected, "Failed to verify block state root! {INVALID_STATE_ROOT_ERROR_MESSAGE}");
356        Err(StageError::Block {
357            error: BlockErrorKind::Validation(ConsensusError::BodyStateRootDiff(
358                GotExpected { got, expected: expected.state_root() }.into(),
359            )),
360            block: Box::new(expected.block_with_parent()),
361        })
362    }
363}
364
365#[cfg(test)]
366mod tests {
367    use super::*;
368    use crate::test_utils::{
369        stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, StorageKind,
370        TestRunnerError, TestStageDB, UnwindStageTestRunner,
371    };
372    use alloy_primitives::{keccak256, U256};
373    use assert_matches::assert_matches;
374    use reth_db_api::cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO};
375    use reth_primitives::{SealedBlock, StaticFileSegment, StorageEntry};
376    use reth_provider::{providers::StaticFileWriter, StaticFileProviderFactory};
377    use reth_stages_api::StageUnitCheckpoint;
378    use reth_testing_utils::generators::{
379        self, random_block, random_block_range, random_changeset_range,
380        random_contract_account_range, BlockParams, BlockRangeParams,
381    };
382    use reth_trie::test_utils::{state_root, state_root_prehashed};
383    use std::collections::BTreeMap;
384
385    stage_test_suite_ext!(MerkleTestRunner, merkle);
386
387    /// Execute from genesis so as to merkelize whole state
388    #[tokio::test]
389    async fn execute_clean_merkle() {
390        let (previous_stage, stage_progress) = (500, 0);
391
392        // Set up the runner
393        let mut runner = MerkleTestRunner::default();
394        // set low threshold so we hash the whole storage
395        let input = ExecInput {
396            target: Some(previous_stage),
397            checkpoint: Some(StageCheckpoint::new(stage_progress)),
398        };
399
400        runner.seed_execution(input).expect("failed to seed execution");
401
402        let rx = runner.execute(input);
403
404        // Assert the successful result
405        let result = rx.await.unwrap();
406        assert_matches!(
407            result,
408            Ok(ExecOutput {
409                checkpoint: StageCheckpoint {
410                    block_number,
411                    stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
412                        processed,
413                        total
414                    }))
415                },
416                done: true
417            }) if block_number == previous_stage && processed == total &&
418                total == (
419                    runner.db.table::<tables::HashedAccounts>().unwrap().len() +
420                    runner.db.table::<tables::HashedStorages>().unwrap().len()
421                ) as u64
422        );
423
424        // Validate the stage execution
425        assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
426    }
427
428    /// Update small trie
429    #[tokio::test]
430    async fn execute_small_merkle() {
431        let (previous_stage, stage_progress) = (2, 1);
432
433        // Set up the runner
434        let mut runner = MerkleTestRunner::default();
435        let input = ExecInput {
436            target: Some(previous_stage),
437            checkpoint: Some(StageCheckpoint::new(stage_progress)),
438        };
439
440        runner.seed_execution(input).expect("failed to seed execution");
441
442        let rx = runner.execute(input);
443
444        // Assert the successful result
445        let result = rx.await.unwrap();
446        assert_matches!(
447            result,
448            Ok(ExecOutput {
449                checkpoint: StageCheckpoint {
450                    block_number,
451                    stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
452                        processed,
453                        total
454                    }))
455                },
456                done: true
457            }) if block_number == previous_stage && processed == total &&
458                total == (
459                    runner.db.table::<tables::HashedAccounts>().unwrap().len() +
460                    runner.db.table::<tables::HashedStorages>().unwrap().len()
461                ) as u64
462        );
463
464        // Validate the stage execution
465        assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
466    }
467
468    struct MerkleTestRunner {
469        db: TestStageDB,
470        clean_threshold: u64,
471    }
472
473    impl Default for MerkleTestRunner {
474        fn default() -> Self {
475            Self { db: TestStageDB::default(), clean_threshold: 10000 }
476        }
477    }
478
479    impl StageTestRunner for MerkleTestRunner {
480        type S = MerkleStage;
481
482        fn db(&self) -> &TestStageDB {
483            &self.db
484        }
485
486        fn stage(&self) -> Self::S {
487            Self::S::Both { clean_threshold: self.clean_threshold }
488        }
489    }
490
491    impl ExecuteStageTestRunner for MerkleTestRunner {
492        type Seed = Vec<SealedBlock>;
493
494        fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
495            let stage_progress = input.checkpoint().block_number;
496            let start = stage_progress + 1;
497            let end = input.target();
498            let mut rng = generators::rng();
499
500            let mut preblocks = vec![];
501            if stage_progress > 0 {
502                preblocks.append(&mut random_block_range(
503                    &mut rng,
504                    0..=stage_progress - 1,
505                    BlockRangeParams {
506                        parent: Some(B256::ZERO),
507                        tx_count: 0..1,
508                        ..Default::default()
509                    },
510                ));
511                self.db.insert_blocks(preblocks.iter(), StorageKind::Static)?;
512            }
513
514            let num_of_accounts = 31;
515            let accounts = random_contract_account_range(&mut rng, &mut (0..num_of_accounts))
516                .into_iter()
517                .collect::<BTreeMap<_, _>>();
518
519            self.db.insert_accounts_and_storages(
520                accounts.iter().map(|(addr, acc)| (*addr, (*acc, std::iter::empty()))),
521            )?;
522
523            let SealedBlock { header, body } = random_block(
524                &mut rng,
525                stage_progress,
526                BlockParams { parent: preblocks.last().map(|b| b.hash()), ..Default::default() },
527            );
528            let mut header = header.unseal();
529
530            header.state_root = state_root(
531                accounts
532                    .clone()
533                    .into_iter()
534                    .map(|(address, account)| (address, (account, std::iter::empty()))),
535            );
536            let sealed_head = SealedBlock { header: SealedHeader::seal(header), body };
537
538            let head_hash = sealed_head.hash();
539            let mut blocks = vec![sealed_head];
540            blocks.extend(random_block_range(
541                &mut rng,
542                start..=end,
543                BlockRangeParams { parent: Some(head_hash), tx_count: 0..3, ..Default::default() },
544            ));
545            let last_block = blocks.last().cloned().unwrap();
546            self.db.insert_blocks(blocks.iter(), StorageKind::Static)?;
547
548            let (transitions, final_state) = random_changeset_range(
549                &mut rng,
550                blocks.iter(),
551                accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
552                0..3,
553                0..256,
554            );
555            // add block changeset from block 1.
556            self.db.insert_changesets(transitions, Some(start))?;
557            self.db.insert_accounts_and_storages(final_state)?;
558
559            // Calculate state root
560            let root = self.db.query(|tx| {
561                let mut accounts = BTreeMap::default();
562                let mut accounts_cursor = tx.cursor_read::<tables::HashedAccounts>()?;
563                let mut storage_cursor = tx.cursor_dup_read::<tables::HashedStorages>()?;
564                for entry in accounts_cursor.walk_range(..)? {
565                    let (key, account) = entry?;
566                    let mut storage_entries = Vec::new();
567                    let mut entry = storage_cursor.seek_exact(key)?;
568                    while let Some((_, storage)) = entry {
569                        storage_entries.push(storage);
570                        entry = storage_cursor.next_dup()?;
571                    }
572                    let storage = storage_entries
573                        .into_iter()
574                        .filter(|v| !v.value.is_zero())
575                        .map(|v| (v.key, v.value))
576                        .collect::<Vec<_>>();
577                    accounts.insert(key, (account, storage));
578                }
579
580                Ok(state_root_prehashed(accounts.into_iter()))
581            })?;
582
583            let static_file_provider = self.db.factory.static_file_provider();
584            let mut writer =
585                static_file_provider.latest_writer(StaticFileSegment::Headers).unwrap();
586            let mut last_header = last_block.header().clone();
587            last_header.state_root = root;
588
589            let hash = last_header.hash_slow();
590            writer.prune_headers(1).unwrap();
591            writer.commit().unwrap();
592            writer.append_header(&last_header, U256::ZERO, &hash).unwrap();
593            writer.commit().unwrap();
594
595            Ok(blocks)
596        }
597
598        fn validate_execution(
599            &self,
600            _input: ExecInput,
601            _output: Option<ExecOutput>,
602        ) -> Result<(), TestRunnerError> {
603            // The execution is validated within the stage
604            Ok(())
605        }
606    }
607
608    impl UnwindStageTestRunner for MerkleTestRunner {
609        fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> {
610            // The unwind is validated within the stage
611            Ok(())
612        }
613
614        fn before_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
615            let target_block = input.unwind_to + 1;
616
617            self.db
618                .commit(|tx| {
619                    let mut storage_changesets_cursor =
620                        tx.cursor_dup_read::<tables::StorageChangeSets>().unwrap();
621                    let mut storage_cursor =
622                        tx.cursor_dup_write::<tables::HashedStorages>().unwrap();
623
624                    let mut tree: BTreeMap<B256, BTreeMap<B256, (U256, bool)>> = BTreeMap::new();
625
626                    let mut rev_changeset_walker =
627                        storage_changesets_cursor.walk_back(None).unwrap();
628                    while let Some((bn_address, entry)) =
629                        rev_changeset_walker.next().transpose().unwrap()
630                    {
631                        if bn_address.block_number() < target_block {
632                            break
633                        }
634
635                        tree.entry(keccak256(bn_address.address()))
636                            .or_default()
637                            .insert(keccak256(entry.key), (entry.value, entry.is_private));
638                    }
639                    for (hashed_address, storage) in tree {
640                        for (hashed_slot, (value, is_private)) in storage {
641                            let storage_entry = storage_cursor
642                                .seek_by_key_subkey(hashed_address, hashed_slot)
643                                .unwrap();
644                            if storage_entry.is_some_and(|v| v.key == hashed_slot) {
645                                storage_cursor.delete_current().unwrap();
646                            }
647
648                            if !value.is_zero() {
649                                let storage_entry =
650                                    StorageEntry { key: hashed_slot, value, is_private };
651                                storage_cursor.upsert(hashed_address, storage_entry).unwrap();
652                            }
653                        }
654                    }
655
656                    let mut changeset_cursor =
657                        tx.cursor_dup_write::<tables::AccountChangeSets>().unwrap();
658                    let mut rev_changeset_walker = changeset_cursor.walk_back(None).unwrap();
659
660                    while let Some((block_number, account_before_tx)) =
661                        rev_changeset_walker.next().transpose().unwrap()
662                    {
663                        if block_number < target_block {
664                            break
665                        }
666
667                        if let Some(acc) = account_before_tx.info {
668                            tx.put::<tables::HashedAccounts>(
669                                keccak256(account_before_tx.address),
670                                acc,
671                            )
672                            .unwrap();
673                        } else {
674                            tx.delete::<tables::HashedAccounts>(
675                                keccak256(account_before_tx.address),
676                                None,
677                            )
678                            .unwrap();
679                        }
680                    }
681                    Ok(())
682                })
683                .unwrap();
684            Ok(())
685        }
686    }
687}