1use alloy_consensus::BlockHeader;
2use alloy_primitives::{BlockNumber, B256};
3use reth_codecs::Compact;
4use reth_consensus::ConsensusError;
5use reth_db::tables;
6use reth_db_api::transaction::{DbTx, DbTxMut};
7use reth_primitives::{GotExpected, SealedHeader};
8use reth_provider::{
9 DBProvider, HeaderProvider, ProviderError, StageCheckpointReader, StageCheckpointWriter,
10 StatsReader, TrieWriter,
11};
12use reth_stages_api::{
13 BlockErrorKind, EntitiesCheckpoint, ExecInput, ExecOutput, MerkleCheckpoint, Stage,
14 StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput,
15};
16use reth_trie::{IntermediateStateRootState, StateRoot, StateRootProgress, StoredSubNode};
17use reth_trie_db::DatabaseStateRoot;
18use std::fmt::Debug;
19use tracing::*;
20
21pub const INVALID_STATE_ROOT_ERROR_MESSAGE: &str = r#"
26Invalid state root error on stage verification!
27This is an error that likely requires a report to the reth team with additional information.
28Please include the following information in your report:
29 * This error message
30 * The state root of the block that was rejected
31 * The output of `reth db stats --checksum` from the database that was being used. This will take a long time to run!
32 * 50-100 lines of logs before and after the first occurrence of the log message with the state root of the block that was rejected.
33 * The debug logs from __the same time period__. To find the default location for these logs, run:
34 `reth --help | grep -A 4 'log.file.directory'`
35
36Once you have this information, please submit a github issue at https://github.com/paradigmxyz/reth/issues/new
37"#;
38
39pub const MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD: u64 = 5_000;
42
43#[derive(Debug, Clone)]
65pub enum MerkleStage {
66 Execution {
68 clean_threshold: u64,
71 },
72 Unwind,
74 #[cfg(any(test, feature = "test-utils"))]
76 Both {
77 clean_threshold: u64,
80 },
81}
82
83impl MerkleStage {
84 pub const fn default_execution() -> Self {
86 Self::Execution { clean_threshold: MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD }
87 }
88
89 pub const fn default_unwind() -> Self {
91 Self::Unwind
92 }
93
94 pub const fn new_execution(clean_threshold: u64) -> Self {
96 Self::Execution { clean_threshold }
97 }
98
99 pub fn get_execution_checkpoint(
101 &self,
102 provider: &impl StageCheckpointReader,
103 ) -> Result<Option<MerkleCheckpoint>, StageError> {
104 let buf =
105 provider.get_stage_checkpoint_progress(StageId::MerkleExecute)?.unwrap_or_default();
106
107 if buf.is_empty() {
108 return Ok(None)
109 }
110
111 let (checkpoint, _) = MerkleCheckpoint::from_compact(&buf, buf.len());
112 Ok(Some(checkpoint))
113 }
114
115 pub fn save_execution_checkpoint(
117 &self,
118 provider: &impl StageCheckpointWriter,
119 checkpoint: Option<MerkleCheckpoint>,
120 ) -> Result<(), StageError> {
121 let mut buf = vec![];
122 if let Some(checkpoint) = checkpoint {
123 debug!(
124 target: "sync::stages::merkle::exec",
125 last_account_key = ?checkpoint.last_account_key,
126 "Saving inner merkle checkpoint"
127 );
128 checkpoint.to_compact(&mut buf);
129 }
130 Ok(provider.save_stage_checkpoint_progress(StageId::MerkleExecute, buf)?)
131 }
132}
133
134impl<Provider> Stage<Provider> for MerkleStage
135where
136 Provider: DBProvider<Tx: DbTxMut>
137 + TrieWriter
138 + StatsReader
139 + HeaderProvider
140 + StageCheckpointReader
141 + StageCheckpointWriter,
142{
143 fn id(&self) -> StageId {
145 match self {
146 Self::Execution { .. } => StageId::MerkleExecute,
147 Self::Unwind => StageId::MerkleUnwind,
148 #[cfg(any(test, feature = "test-utils"))]
149 Self::Both { .. } => StageId::Other("MerkleBoth"),
150 }
151 }
152
153 fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
155 let threshold = match self {
156 Self::Unwind => {
157 info!(target: "sync::stages::merkle::unwind", "Stage is always skipped");
158 return Ok(ExecOutput::done(StageCheckpoint::new(input.target())))
159 }
160 Self::Execution { clean_threshold } => *clean_threshold,
161 #[cfg(any(test, feature = "test-utils"))]
162 Self::Both { clean_threshold } => *clean_threshold,
163 };
164
165 let range = input.next_block_range();
166 let (from_block, to_block) = range.clone().into_inner();
167 let current_block_number = input.checkpoint().block_number;
168
169 let target_block = provider
170 .header_by_number(to_block)?
171 .ok_or_else(|| ProviderError::HeaderNotFound(to_block.into()))?;
172 let target_block_root = target_block.state_root();
173
174 let mut checkpoint = self.get_execution_checkpoint(provider)?;
175 let (trie_root, entities_checkpoint) = if range.is_empty() {
176 (target_block_root, input.checkpoint().entities_stage_checkpoint().unwrap_or_default())
177 } else if to_block - from_block > threshold || from_block == 1 {
178 let mut entities_checkpoint = if let Some(checkpoint) =
180 checkpoint.as_ref().filter(|c| c.target_block == to_block)
181 {
182 debug!(
183 target: "sync::stages::merkle::exec",
184 current = ?current_block_number,
185 target = ?to_block,
186 last_account_key = ?checkpoint.last_account_key,
187 "Continuing inner merkle checkpoint"
188 );
189
190 input.checkpoint().entities_stage_checkpoint()
191 } else {
192 debug!(
193 target: "sync::stages::merkle::exec",
194 current = ?current_block_number,
195 target = ?to_block,
196 previous_checkpoint = ?checkpoint,
197 "Rebuilding trie"
198 );
199 checkpoint = None;
201 self.save_execution_checkpoint(provider, None)?;
202 provider.tx_ref().clear::<tables::AccountsTrie>()?;
203 provider.tx_ref().clear::<tables::StoragesTrie>()?;
204
205 None
206 }
207 .unwrap_or(EntitiesCheckpoint {
208 processed: 0,
209 total: (provider.count_entries::<tables::HashedAccounts>()? +
210 provider.count_entries::<tables::HashedStorages>()?)
211 as u64,
212 });
213
214 let tx = provider.tx_ref();
215 let progress = StateRoot::from_tx(tx)
216 .with_intermediate_state(checkpoint.map(IntermediateStateRootState::from))
217 .root_with_progress()
218 .map_err(|e| {
219 error!(target: "sync::stages::merkle", %e, ?current_block_number, ?to_block, "State root with progress failed! {INVALID_STATE_ROOT_ERROR_MESSAGE}");
220 StageError::Fatal(Box::new(e))
221 })?;
222 match progress {
223 StateRootProgress::Progress(state, hashed_entries_walked, updates) => {
224 provider.write_trie_updates(&updates)?;
225
226 let checkpoint = MerkleCheckpoint::new(
227 to_block,
228 state.last_account_key,
229 state.walker_stack.into_iter().map(StoredSubNode::from).collect(),
230 state.hash_builder.into(),
231 );
232 self.save_execution_checkpoint(provider, Some(checkpoint))?;
233
234 entities_checkpoint.processed += hashed_entries_walked as u64;
235
236 return Ok(ExecOutput {
237 checkpoint: input
238 .checkpoint()
239 .with_entities_stage_checkpoint(entities_checkpoint),
240 done: false,
241 })
242 }
243 StateRootProgress::Complete(root, hashed_entries_walked, updates) => {
244 provider.write_trie_updates(&updates)?;
245
246 entities_checkpoint.processed += hashed_entries_walked as u64;
247
248 (root, entities_checkpoint)
249 }
250 }
251 } else {
252 debug!(target: "sync::stages::merkle::exec", current = ?current_block_number, target = ?to_block, "Updating trie");
253 let (root, updates) =
254 StateRoot::incremental_root_with_updates(provider.tx_ref(), range)
255 .map_err(|e| {
256 error!(target: "sync::stages::merkle", %e, ?current_block_number, ?to_block, "Incremental state root failed! {INVALID_STATE_ROOT_ERROR_MESSAGE}");
257 StageError::Fatal(Box::new(e))
258 })?;
259
260 provider.write_trie_updates(&updates)?;
261
262 let total_hashed_entries = (provider.count_entries::<tables::HashedAccounts>()? +
263 provider.count_entries::<tables::HashedStorages>()?)
264 as u64;
265
266 let entities_checkpoint = EntitiesCheckpoint {
267 processed: total_hashed_entries,
271 total: total_hashed_entries,
272 };
273
274 (root, entities_checkpoint)
275 };
276
277 self.save_execution_checkpoint(provider, None)?;
279
280 validate_state_root(trie_root, SealedHeader::seal(target_block), to_block)?;
281
282 Ok(ExecOutput {
283 checkpoint: StageCheckpoint::new(to_block)
284 .with_entities_stage_checkpoint(entities_checkpoint),
285 done: true,
286 })
287 }
288
289 fn unwind(
291 &mut self,
292 provider: &Provider,
293 input: UnwindInput,
294 ) -> Result<UnwindOutput, StageError> {
295 let tx = provider.tx_ref();
296 let range = input.unwind_block_range();
297 if matches!(self, Self::Execution { .. }) {
298 info!(target: "sync::stages::merkle::unwind", "Stage is always skipped");
299 return Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) })
300 }
301
302 let mut entities_checkpoint =
303 input.checkpoint.entities_stage_checkpoint().unwrap_or(EntitiesCheckpoint {
304 processed: 0,
305 total: (tx.entries::<tables::HashedAccounts>()? +
306 tx.entries::<tables::HashedStorages>()?) as u64,
307 });
308
309 if input.unwind_to == 0 {
310 tx.clear::<tables::AccountsTrie>()?;
311 tx.clear::<tables::StoragesTrie>()?;
312
313 entities_checkpoint.processed = 0;
314
315 return Ok(UnwindOutput {
316 checkpoint: StageCheckpoint::new(input.unwind_to)
317 .with_entities_stage_checkpoint(entities_checkpoint),
318 })
319 }
320
321 if range.is_empty() {
323 info!(target: "sync::stages::merkle::unwind", "Nothing to unwind");
324 } else {
325 let (block_root, updates) = StateRoot::incremental_root_with_updates(tx, range)
326 .map_err(|e| StageError::Fatal(Box::new(e)))?;
327
328 let target = provider
330 .header_by_number(input.unwind_to)?
331 .ok_or_else(|| ProviderError::HeaderNotFound(input.unwind_to.into()))?;
332
333 validate_state_root(block_root, SealedHeader::seal(target), input.unwind_to)?;
334
335 provider.write_trie_updates(&updates)?;
337
338 }
340
341 Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) })
342 }
343}
344
345#[inline]
347fn validate_state_root<H: BlockHeader + Debug>(
348 got: B256,
349 expected: SealedHeader<H>,
350 target_block: BlockNumber,
351) -> Result<(), StageError> {
352 if got == expected.state_root() {
353 Ok(())
354 } else {
355 error!(target: "sync::stages::merkle", ?target_block, ?got, ?expected, "Failed to verify block state root! {INVALID_STATE_ROOT_ERROR_MESSAGE}");
356 Err(StageError::Block {
357 error: BlockErrorKind::Validation(ConsensusError::BodyStateRootDiff(
358 GotExpected { got, expected: expected.state_root() }.into(),
359 )),
360 block: Box::new(expected.block_with_parent()),
361 })
362 }
363}
364
365#[cfg(test)]
366mod tests {
367 use super::*;
368 use crate::test_utils::{
369 stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, StorageKind,
370 TestRunnerError, TestStageDB, UnwindStageTestRunner,
371 };
372 use alloy_primitives::{keccak256, U256};
373 use assert_matches::assert_matches;
374 use reth_db_api::cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO};
375 use reth_primitives::{SealedBlock, StaticFileSegment, StorageEntry};
376 use reth_provider::{providers::StaticFileWriter, StaticFileProviderFactory};
377 use reth_stages_api::StageUnitCheckpoint;
378 use reth_testing_utils::generators::{
379 self, random_block, random_block_range, random_changeset_range,
380 random_contract_account_range, BlockParams, BlockRangeParams,
381 };
382 use reth_trie::test_utils::{state_root, state_root_prehashed};
383 use std::collections::BTreeMap;
384
385 stage_test_suite_ext!(MerkleTestRunner, merkle);
386
387 #[tokio::test]
389 async fn execute_clean_merkle() {
390 let (previous_stage, stage_progress) = (500, 0);
391
392 let mut runner = MerkleTestRunner::default();
394 let input = ExecInput {
396 target: Some(previous_stage),
397 checkpoint: Some(StageCheckpoint::new(stage_progress)),
398 };
399
400 runner.seed_execution(input).expect("failed to seed execution");
401
402 let rx = runner.execute(input);
403
404 let result = rx.await.unwrap();
406 assert_matches!(
407 result,
408 Ok(ExecOutput {
409 checkpoint: StageCheckpoint {
410 block_number,
411 stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
412 processed,
413 total
414 }))
415 },
416 done: true
417 }) if block_number == previous_stage && processed == total &&
418 total == (
419 runner.db.table::<tables::HashedAccounts>().unwrap().len() +
420 runner.db.table::<tables::HashedStorages>().unwrap().len()
421 ) as u64
422 );
423
424 assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
426 }
427
428 #[tokio::test]
430 async fn execute_small_merkle() {
431 let (previous_stage, stage_progress) = (2, 1);
432
433 let mut runner = MerkleTestRunner::default();
435 let input = ExecInput {
436 target: Some(previous_stage),
437 checkpoint: Some(StageCheckpoint::new(stage_progress)),
438 };
439
440 runner.seed_execution(input).expect("failed to seed execution");
441
442 let rx = runner.execute(input);
443
444 let result = rx.await.unwrap();
446 assert_matches!(
447 result,
448 Ok(ExecOutput {
449 checkpoint: StageCheckpoint {
450 block_number,
451 stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
452 processed,
453 total
454 }))
455 },
456 done: true
457 }) if block_number == previous_stage && processed == total &&
458 total == (
459 runner.db.table::<tables::HashedAccounts>().unwrap().len() +
460 runner.db.table::<tables::HashedStorages>().unwrap().len()
461 ) as u64
462 );
463
464 assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
466 }
467
468 struct MerkleTestRunner {
469 db: TestStageDB,
470 clean_threshold: u64,
471 }
472
473 impl Default for MerkleTestRunner {
474 fn default() -> Self {
475 Self { db: TestStageDB::default(), clean_threshold: 10000 }
476 }
477 }
478
479 impl StageTestRunner for MerkleTestRunner {
480 type S = MerkleStage;
481
482 fn db(&self) -> &TestStageDB {
483 &self.db
484 }
485
486 fn stage(&self) -> Self::S {
487 Self::S::Both { clean_threshold: self.clean_threshold }
488 }
489 }
490
491 impl ExecuteStageTestRunner for MerkleTestRunner {
492 type Seed = Vec<SealedBlock>;
493
494 fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
495 let stage_progress = input.checkpoint().block_number;
496 let start = stage_progress + 1;
497 let end = input.target();
498 let mut rng = generators::rng();
499
500 let mut preblocks = vec![];
501 if stage_progress > 0 {
502 preblocks.append(&mut random_block_range(
503 &mut rng,
504 0..=stage_progress - 1,
505 BlockRangeParams {
506 parent: Some(B256::ZERO),
507 tx_count: 0..1,
508 ..Default::default()
509 },
510 ));
511 self.db.insert_blocks(preblocks.iter(), StorageKind::Static)?;
512 }
513
514 let num_of_accounts = 31;
515 let accounts = random_contract_account_range(&mut rng, &mut (0..num_of_accounts))
516 .into_iter()
517 .collect::<BTreeMap<_, _>>();
518
519 self.db.insert_accounts_and_storages(
520 accounts.iter().map(|(addr, acc)| (*addr, (*acc, std::iter::empty()))),
521 )?;
522
523 let SealedBlock { header, body } = random_block(
524 &mut rng,
525 stage_progress,
526 BlockParams { parent: preblocks.last().map(|b| b.hash()), ..Default::default() },
527 );
528 let mut header = header.unseal();
529
530 header.state_root = state_root(
531 accounts
532 .clone()
533 .into_iter()
534 .map(|(address, account)| (address, (account, std::iter::empty()))),
535 );
536 let sealed_head = SealedBlock { header: SealedHeader::seal(header), body };
537
538 let head_hash = sealed_head.hash();
539 let mut blocks = vec![sealed_head];
540 blocks.extend(random_block_range(
541 &mut rng,
542 start..=end,
543 BlockRangeParams { parent: Some(head_hash), tx_count: 0..3, ..Default::default() },
544 ));
545 let last_block = blocks.last().cloned().unwrap();
546 self.db.insert_blocks(blocks.iter(), StorageKind::Static)?;
547
548 let (transitions, final_state) = random_changeset_range(
549 &mut rng,
550 blocks.iter(),
551 accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
552 0..3,
553 0..256,
554 );
555 self.db.insert_changesets(transitions, Some(start))?;
557 self.db.insert_accounts_and_storages(final_state)?;
558
559 let root = self.db.query(|tx| {
561 let mut accounts = BTreeMap::default();
562 let mut accounts_cursor = tx.cursor_read::<tables::HashedAccounts>()?;
563 let mut storage_cursor = tx.cursor_dup_read::<tables::HashedStorages>()?;
564 for entry in accounts_cursor.walk_range(..)? {
565 let (key, account) = entry?;
566 let mut storage_entries = Vec::new();
567 let mut entry = storage_cursor.seek_exact(key)?;
568 while let Some((_, storage)) = entry {
569 storage_entries.push(storage);
570 entry = storage_cursor.next_dup()?;
571 }
572 let storage = storage_entries
573 .into_iter()
574 .filter(|v| !v.value.is_zero())
575 .map(|v| (v.key, v.value))
576 .collect::<Vec<_>>();
577 accounts.insert(key, (account, storage));
578 }
579
580 Ok(state_root_prehashed(accounts.into_iter()))
581 })?;
582
583 let static_file_provider = self.db.factory.static_file_provider();
584 let mut writer =
585 static_file_provider.latest_writer(StaticFileSegment::Headers).unwrap();
586 let mut last_header = last_block.header().clone();
587 last_header.state_root = root;
588
589 let hash = last_header.hash_slow();
590 writer.prune_headers(1).unwrap();
591 writer.commit().unwrap();
592 writer.append_header(&last_header, U256::ZERO, &hash).unwrap();
593 writer.commit().unwrap();
594
595 Ok(blocks)
596 }
597
598 fn validate_execution(
599 &self,
600 _input: ExecInput,
601 _output: Option<ExecOutput>,
602 ) -> Result<(), TestRunnerError> {
603 Ok(())
605 }
606 }
607
608 impl UnwindStageTestRunner for MerkleTestRunner {
609 fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> {
610 Ok(())
612 }
613
614 fn before_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
615 let target_block = input.unwind_to + 1;
616
617 self.db
618 .commit(|tx| {
619 let mut storage_changesets_cursor =
620 tx.cursor_dup_read::<tables::StorageChangeSets>().unwrap();
621 let mut storage_cursor =
622 tx.cursor_dup_write::<tables::HashedStorages>().unwrap();
623
624 let mut tree: BTreeMap<B256, BTreeMap<B256, (U256, bool)>> = BTreeMap::new();
625
626 let mut rev_changeset_walker =
627 storage_changesets_cursor.walk_back(None).unwrap();
628 while let Some((bn_address, entry)) =
629 rev_changeset_walker.next().transpose().unwrap()
630 {
631 if bn_address.block_number() < target_block {
632 break
633 }
634
635 tree.entry(keccak256(bn_address.address()))
636 .or_default()
637 .insert(keccak256(entry.key), (entry.value, entry.is_private));
638 }
639 for (hashed_address, storage) in tree {
640 for (hashed_slot, (value, is_private)) in storage {
641 let storage_entry = storage_cursor
642 .seek_by_key_subkey(hashed_address, hashed_slot)
643 .unwrap();
644 if storage_entry.is_some_and(|v| v.key == hashed_slot) {
645 storage_cursor.delete_current().unwrap();
646 }
647
648 if !value.is_zero() {
649 let storage_entry =
650 StorageEntry { key: hashed_slot, value, is_private };
651 storage_cursor.upsert(hashed_address, storage_entry).unwrap();
652 }
653 }
654 }
655
656 let mut changeset_cursor =
657 tx.cursor_dup_write::<tables::AccountChangeSets>().unwrap();
658 let mut rev_changeset_walker = changeset_cursor.walk_back(None).unwrap();
659
660 while let Some((block_number, account_before_tx)) =
661 rev_changeset_walker.next().transpose().unwrap()
662 {
663 if block_number < target_block {
664 break
665 }
666
667 if let Some(acc) = account_before_tx.info {
668 tx.put::<tables::HashedAccounts>(
669 keccak256(account_before_tx.address),
670 acc,
671 )
672 .unwrap();
673 } else {
674 tx.delete::<tables::HashedAccounts>(
675 keccak256(account_before_tx.address),
676 None,
677 )
678 .unwrap();
679 }
680 }
681 Ok(())
682 })
683 .unwrap();
684 Ok(())
685 }
686 }
687}