1mod bodies;
3mod execution;
5mod finish;
7mod hashing_account;
9mod hashing_storage;
11mod headers;
13mod index_account_history;
15mod index_storage_history;
17mod merkle;
19mod prune;
20mod sender_recovery;
22mod tx_lookup;
24
25pub use bodies::*;
26pub use execution::*;
27pub use finish::*;
28pub use hashing_account::*;
29pub use hashing_storage::*;
30pub use headers::*;
31pub use index_account_history::*;
32pub use index_storage_history::*;
33pub use merkle::*;
34pub use prune::*;
35pub use sender_recovery::*;
36pub use tx_lookup::*;
37
38mod utils;
39use utils::*;
40
41#[cfg(test)]
42mod tests {
43 use super::*;
44 use crate::test_utils::{StorageKind, TestStageDB};
45 use alloy_primitives::{address, hex_literal::hex, keccak256, BlockNumber, B256, U256};
46 use alloy_rlp::Decodable;
47 use reth_chainspec::ChainSpecBuilder;
48 use reth_db::{
49 mdbx::{cursor::Cursor, RW},
50 tables, AccountsHistory,
51 };
52 use reth_db_api::{
53 cursor::{DbCursorRO, DbCursorRW},
54 table::Table,
55 transaction::{DbTx, DbTxMut},
56 };
57 use reth_evm_ethereum::execute::EthExecutorProvider;
58 use reth_exex::ExExManagerHandle;
59 use reth_primitives::{Account, Bytecode, SealedBlock, StaticFileSegment};
60 use reth_provider::{
61 providers::{StaticFileProvider, StaticFileWriter},
62 test_utils::MockNodeTypesWithDB,
63 AccountExtReader, BlockBodyIndicesProvider, DatabaseProviderFactory, ProviderFactory,
64 ProviderResult, ReceiptProvider, StageCheckpointWriter, StaticFileProviderFactory,
65 StorageReader,
66 };
67 use reth_prune_types::{PruneMode, PruneModes};
68 use reth_stages_api::{
69 ExecInput, ExecutionStageThresholds, PipelineTarget, Stage, StageCheckpoint, StageId,
70 };
71 use reth_testing_utils::generators::{
72 self, random_block, random_block_range, random_receipt, BlockRangeParams,
73 };
74 use std::{io::Write, sync::Arc};
75
76 #[tokio::test]
77 #[ignore]
78 async fn test_prune() {
79 let test_db = TestStageDB::default();
80
81 let provider_rw = test_db.factory.provider_rw().unwrap();
82 let tip = 66;
83 let input = ExecInput { target: Some(tip), checkpoint: None };
84 let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000808502540be400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice();
85 let genesis = SealedBlock::decode(&mut genesis_rlp).unwrap();
86 let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabb901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000018502540be40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice();
87 let block = SealedBlock::decode(&mut block_rlp).unwrap();
88 provider_rw.insert_historical_block(genesis.try_seal_with_senders().unwrap()).unwrap();
89 provider_rw
90 .insert_historical_block(block.clone().try_seal_with_senders().unwrap())
91 .unwrap();
92
93 let mut head = block.hash();
95 let mut rng = generators::rng();
96 for block_number in 2..=tip {
97 let nblock = random_block(
98 &mut rng,
99 block_number,
100 generators::BlockParams { parent: Some(head), ..Default::default() },
101 );
102 head = nblock.hash();
103 provider_rw.insert_historical_block(nblock.try_seal_with_senders().unwrap()).unwrap();
104 }
105 provider_rw
106 .static_file_provider()
107 .latest_writer(StaticFileSegment::Headers)
108 .unwrap()
109 .commit()
110 .unwrap();
111 provider_rw.commit().unwrap();
112
113 let provider_rw = test_db.factory.provider_rw().unwrap();
115 let code = hex!("5a465a905090036002900360015500");
116 let code_hash = keccak256(hex!("5a465a905090036002900360015500"));
117 provider_rw
118 .tx_ref()
119 .put::<tables::PlainAccountState>(
120 address!("1000000000000000000000000000000000000000"),
121 Account { nonce: 0, balance: U256::ZERO, bytecode_hash: Some(code_hash) },
122 )
123 .unwrap();
124 provider_rw
125 .tx_ref()
126 .put::<tables::PlainAccountState>(
127 address!("a94f5374fce5edbc8e2a8697c15331677e6ebf0b"),
128 Account {
129 nonce: 0,
130 balance: U256::from(0x3635c9adc5dea00000u128),
131 bytecode_hash: None,
132 },
133 )
134 .unwrap();
135 provider_rw
136 .tx_ref()
137 .put::<tables::Bytecodes>(code_hash, Bytecode::new_raw(code.to_vec().into()))
138 .unwrap();
139 provider_rw.commit().unwrap();
140
141 let check_pruning = |factory: ProviderFactory<MockNodeTypesWithDB>,
142 prune_modes: PruneModes,
143 expect_num_receipts: usize,
144 expect_num_acc_changesets: usize,
145 expect_num_storage_changesets: usize| async move {
146 let provider = factory.database_provider_rw().unwrap();
147
148 let mut execution_stage = ExecutionStage::new(
151 EthExecutorProvider::ethereum(Arc::new(
152 ChainSpecBuilder::mainnet().berlin_activated().build(),
153 )),
154 ExecutionStageThresholds {
155 max_blocks: Some(100),
156 max_changes: None,
157 max_cumulative_gas: None,
158 max_duration: None,
159 },
160 MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD,
161 prune_modes.clone(),
162 ExExManagerHandle::empty(),
163 );
164
165 execution_stage.execute(&provider, input).unwrap();
166 assert_eq!(
167 provider.receipts_by_block(1.into()).unwrap().unwrap().len(),
168 expect_num_receipts
169 );
170
171 assert_eq!(
172 provider.changed_storages_and_blocks_with_range(0..=1000).unwrap().len(),
173 expect_num_storage_changesets
174 );
175
176 assert_eq!(
177 provider.changed_accounts_and_blocks_with_range(0..=1000).unwrap().len(),
178 expect_num_acc_changesets
179 );
180
181 let mut acc_indexing_stage = IndexAccountHistoryStage {
183 prune_mode: prune_modes.account_history,
184 ..Default::default()
185 };
186
187 if prune_modes.account_history == Some(PruneMode::Full) {
188 assert!(acc_indexing_stage.execute(&provider, input).is_err());
190 } else {
191 acc_indexing_stage.execute(&provider, input).unwrap();
192 let mut account_history: Cursor<RW, AccountsHistory> =
193 provider.tx_ref().cursor_read::<tables::AccountsHistory>().unwrap();
194 assert_eq!(account_history.walk(None).unwrap().count(), expect_num_acc_changesets);
195 }
196
197 let mut storage_indexing_stage = IndexStorageHistoryStage {
199 prune_mode: prune_modes.storage_history,
200 ..Default::default()
201 };
202
203 if prune_modes.storage_history == Some(PruneMode::Full) {
204 assert!(acc_indexing_stage.execute(&provider, input).is_err());
206 } else {
207 storage_indexing_stage.execute(&provider, input).unwrap();
208
209 let mut storage_history =
210 provider.tx_ref().cursor_read::<tables::StoragesHistory>().unwrap();
211 assert_eq!(
212 storage_history.walk(None).unwrap().count(),
213 expect_num_storage_changesets
214 );
215 }
216 };
217
218 let mut prune = PruneModes::none();
221 check_pruning(test_db.factory.clone(), prune.clone(), 1, 3, 1).await;
222
223 prune.receipts = Some(PruneMode::Full);
224 prune.account_history = Some(PruneMode::Full);
225 prune.storage_history = Some(PruneMode::Full);
226 check_pruning(test_db.factory.clone(), prune.clone(), 0, 0, 0).await;
228
229 prune.receipts = Some(PruneMode::Before(1));
230 prune.account_history = Some(PruneMode::Before(1));
231 prune.storage_history = Some(PruneMode::Before(1));
232 check_pruning(test_db.factory.clone(), prune.clone(), 1, 3, 1).await;
233
234 prune.receipts = Some(PruneMode::Before(2));
235 prune.account_history = Some(PruneMode::Before(2));
236 prune.storage_history = Some(PruneMode::Before(2));
237 check_pruning(test_db.factory.clone(), prune.clone(), 0, 1, 0).await;
239
240 prune.receipts = Some(PruneMode::Distance(66));
241 prune.account_history = Some(PruneMode::Distance(66));
242 prune.storage_history = Some(PruneMode::Distance(66));
243 check_pruning(test_db.factory.clone(), prune.clone(), 1, 3, 1).await;
244
245 prune.receipts = Some(PruneMode::Distance(64));
246 prune.account_history = Some(PruneMode::Distance(64));
247 prune.storage_history = Some(PruneMode::Distance(64));
248 check_pruning(test_db.factory.clone(), prune.clone(), 0, 1, 0).await;
250 }
251
252 fn seed_data(num_blocks: usize) -> ProviderResult<TestStageDB> {
255 let db = TestStageDB::default();
256 let mut rng = generators::rng();
257 let genesis_hash = B256::ZERO;
258 let tip = (num_blocks - 1) as u64;
259
260 let blocks = random_block_range(
261 &mut rng,
262 0..=tip,
263 BlockRangeParams { parent: Some(genesis_hash), tx_count: 2..3, ..Default::default() },
264 );
265 db.insert_blocks(blocks.iter(), StorageKind::Static)?;
266
267 let mut receipts = Vec::with_capacity(blocks.len());
268 let mut tx_num = 0u64;
269 for block in &blocks {
270 let mut block_receipts = Vec::with_capacity(block.body.transactions.len());
271 for transaction in &block.body.transactions {
272 block_receipts.push((tx_num, random_receipt(&mut rng, transaction, Some(0))));
273 tx_num += 1;
274 }
275 receipts.push((block.number, block_receipts));
276 }
277 db.insert_receipts_by_block(receipts, StorageKind::Static)?;
278
279 let provider_rw = db.factory.provider_rw()?;
281 for stage in StageId::ALL {
282 provider_rw.save_stage_checkpoint(stage, StageCheckpoint::new(tip))?;
283 }
284 provider_rw.commit()?;
285
286 Ok(db)
287 }
288
289 fn simulate_behind_checkpoint_corruption(
292 db: &TestStageDB,
293 prune_count: usize,
294 segment: StaticFileSegment,
295 is_full_node: bool,
296 expected: Option<PipelineTarget>,
297 ) {
298 let mut static_file_provider = db.factory.static_file_provider();
301 static_file_provider = StaticFileProvider::read_write(static_file_provider.path()).unwrap();
302
303 {
306 let mut headers_writer = static_file_provider.latest_writer(segment).unwrap();
307 let reader = headers_writer.inner().jar().open_data_reader().unwrap();
308 let columns = headers_writer.inner().jar().columns();
309 let data_file = headers_writer.inner().data_file();
310 let last_offset = reader.reverse_offset(prune_count * columns).unwrap();
311 data_file.get_mut().set_len(last_offset).unwrap();
312 data_file.flush().unwrap();
313 data_file.get_ref().sync_all().unwrap();
314 }
315
316 let mut static_file_provider = db.factory.static_file_provider();
319 static_file_provider = StaticFileProvider::read_write(static_file_provider.path()).unwrap();
320 assert_eq!(
321 static_file_provider
322 .check_consistency(&db.factory.database_provider_ro().unwrap(), is_full_node,),
323 Ok(expected)
324 );
325 }
326
327 fn save_checkpoint_and_check(
330 db: &TestStageDB,
331 stage_id: StageId,
332 checkpoint_block_number: BlockNumber,
333 expected: Option<PipelineTarget>,
334 ) {
335 let provider_rw = db.factory.provider_rw().unwrap();
336 provider_rw
337 .save_stage_checkpoint(stage_id, StageCheckpoint::new(checkpoint_block_number))
338 .unwrap();
339 provider_rw.commit().unwrap();
340
341 assert_eq!(
342 db.factory
343 .static_file_provider()
344 .check_consistency(&db.factory.database_provider_ro().unwrap(), false,),
345 Ok(expected)
346 );
347 }
348
349 fn update_db_and_check<T: Table<Key = u64>>(
352 db: &TestStageDB,
353 key: u64,
354 expected: Option<PipelineTarget>,
355 ) where
356 <T as Table>::Value: Default,
357 {
358 let provider_rw = db.factory.provider_rw().unwrap();
359 let mut cursor = provider_rw.tx_ref().cursor_write::<T>().unwrap();
360 cursor.insert(key, Default::default()).unwrap();
361 provider_rw.commit().unwrap();
362
363 assert_eq!(
364 db.factory
365 .static_file_provider()
366 .check_consistency(&db.factory.database_provider_ro().unwrap(), false),
367 Ok(expected)
368 );
369 }
370
371 #[test]
372 fn test_consistency() {
373 let db = seed_data(90).unwrap();
374 let db_provider = db.factory.database_provider_ro().unwrap();
375
376 assert_eq!(
377 db.factory.static_file_provider().check_consistency(&db_provider, false),
378 Ok(None)
379 );
380 }
381
382 #[test]
383 fn test_consistency_no_commit_prune() {
384 let db = seed_data(90).unwrap();
385 let full_node = true;
386 let archive_node = !full_node;
387
388 simulate_behind_checkpoint_corruption(&db, 1, StaticFileSegment::Receipts, full_node, None);
391
392 simulate_behind_checkpoint_corruption(
395 &db,
396 1,
397 StaticFileSegment::Receipts,
398 archive_node,
399 Some(PipelineTarget::Unwind(88)),
400 );
401
402 simulate_behind_checkpoint_corruption(
403 &db,
404 3,
405 StaticFileSegment::Headers,
406 archive_node,
407 Some(PipelineTarget::Unwind(86)),
408 );
409 }
410
411 #[test]
412 fn test_consistency_checkpoints() {
413 let db = seed_data(90).unwrap();
414
415 let block = 87;
417 save_checkpoint_and_check(&db, StageId::Bodies, block, None);
418 assert_eq!(
419 db.factory
420 .static_file_provider()
421 .get_highest_static_file_block(StaticFileSegment::Transactions),
422 Some(block)
423 );
424 assert_eq!(
425 db.factory
426 .static_file_provider()
427 .get_highest_static_file_tx(StaticFileSegment::Transactions),
428 db.factory.block_body_indices(block).unwrap().map(|b| b.last_tx_num())
429 );
430
431 let block = 86;
432 save_checkpoint_and_check(&db, StageId::Execution, block, None);
433 assert_eq!(
434 db.factory
435 .static_file_provider()
436 .get_highest_static_file_block(StaticFileSegment::Receipts),
437 Some(block)
438 );
439 assert_eq!(
440 db.factory
441 .static_file_provider()
442 .get_highest_static_file_tx(StaticFileSegment::Receipts),
443 db.factory.block_body_indices(block).unwrap().map(|b| b.last_tx_num())
444 );
445
446 let block = 80;
447 save_checkpoint_and_check(&db, StageId::Headers, block, None);
448 assert_eq!(
449 db.factory
450 .static_file_provider()
451 .get_highest_static_file_block(StaticFileSegment::Headers),
452 Some(block)
453 );
454
455 save_checkpoint_and_check(&db, StageId::Headers, 91, Some(PipelineTarget::Unwind(block)));
457 }
458
459 #[test]
460 fn test_consistency_headers_gap() {
461 let db = seed_data(90).unwrap();
462 let current = db
463 .factory
464 .static_file_provider()
465 .get_highest_static_file_block(StaticFileSegment::Headers)
466 .unwrap();
467
468 update_db_and_check::<tables::Headers>(&db, current + 2, Some(PipelineTarget::Unwind(89)));
470
471 update_db_and_check::<tables::Headers>(&db, current + 1, None);
473 }
474
475 #[test]
476 fn test_consistency_tx_gap() {
477 let db = seed_data(90).unwrap();
478 let current = db
479 .factory
480 .static_file_provider()
481 .get_highest_static_file_tx(StaticFileSegment::Transactions)
482 .unwrap();
483
484 update_db_and_check::<tables::Transactions>(
486 &db,
487 current + 2,
488 Some(PipelineTarget::Unwind(89)),
489 );
490
491 update_db_and_check::<tables::Transactions>(&db, current + 1, None);
493 }
494
495 #[test]
496 fn test_consistency_receipt_gap() {
497 let db = seed_data(90).unwrap();
498 let current = db
499 .factory
500 .static_file_provider()
501 .get_highest_static_file_tx(StaticFileSegment::Receipts)
502 .unwrap();
503
504 update_db_and_check::<tables::Receipts>(&db, current + 2, Some(PipelineTarget::Unwind(89)));
506
507 update_db_and_check::<tables::Receipts>(&db, current + 1, None);
509 }
510}