reth_stages/stages/
mod.rs

1/// The bodies stage.
2mod bodies;
3/// The execution stage that generates state diff.
4mod execution;
5/// The finish stage
6mod finish;
7/// Account hashing stage.
8mod hashing_account;
9/// Storage hashing stage.
10mod hashing_storage;
11/// The headers stage.
12mod headers;
13/// Index history of account changes
14mod index_account_history;
15/// Index history of storage changes
16mod index_storage_history;
17/// Stage for computing state root.
18mod merkle;
19mod prune;
20/// The sender recovery stage.
21mod sender_recovery;
22/// The transaction lookup stage
23mod tx_lookup;
24
25pub use bodies::*;
26pub use execution::*;
27pub use finish::*;
28pub use hashing_account::*;
29pub use hashing_storage::*;
30pub use headers::*;
31pub use index_account_history::*;
32pub use index_storage_history::*;
33pub use merkle::*;
34pub use prune::*;
35pub use sender_recovery::*;
36pub use tx_lookup::*;
37
38mod utils;
39use utils::*;
40
41#[cfg(test)]
42mod tests {
43    use super::*;
44    use crate::test_utils::{StorageKind, TestStageDB};
45    use alloy_primitives::{address, hex_literal::hex, keccak256, BlockNumber, B256, U256};
46    use alloy_rlp::Decodable;
47    use reth_chainspec::ChainSpecBuilder;
48    use reth_db::{
49        mdbx::{cursor::Cursor, RW},
50        tables, AccountsHistory,
51    };
52    use reth_db_api::{
53        cursor::{DbCursorRO, DbCursorRW},
54        table::Table,
55        transaction::{DbTx, DbTxMut},
56    };
57    use reth_evm_ethereum::execute::EthExecutorProvider;
58    use reth_exex::ExExManagerHandle;
59    use reth_primitives::{Account, Bytecode, SealedBlock, StaticFileSegment};
60    use reth_provider::{
61        providers::{StaticFileProvider, StaticFileWriter},
62        test_utils::MockNodeTypesWithDB,
63        AccountExtReader, BlockBodyIndicesProvider, DatabaseProviderFactory, ProviderFactory,
64        ProviderResult, ReceiptProvider, StageCheckpointWriter, StaticFileProviderFactory,
65        StorageReader,
66    };
67    use reth_prune_types::{PruneMode, PruneModes};
68    use reth_stages_api::{
69        ExecInput, ExecutionStageThresholds, PipelineTarget, Stage, StageCheckpoint, StageId,
70    };
71    use reth_testing_utils::generators::{
72        self, random_block, random_block_range, random_receipt, BlockRangeParams,
73    };
74    use std::{io::Write, sync::Arc};
75
76    #[tokio::test]
77    #[ignore]
78    async fn test_prune() {
79        let test_db = TestStageDB::default();
80
81        let provider_rw = test_db.factory.provider_rw().unwrap();
82        let tip = 66;
83        let input = ExecInput { target: Some(tip), checkpoint: None };
84        let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000808502540be400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice();
85        let genesis = SealedBlock::decode(&mut genesis_rlp).unwrap();
86        let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabb901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000018502540be40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice();
87        let block = SealedBlock::decode(&mut block_rlp).unwrap();
88        provider_rw.insert_historical_block(genesis.try_seal_with_senders().unwrap()).unwrap();
89        provider_rw
90            .insert_historical_block(block.clone().try_seal_with_senders().unwrap())
91            .unwrap();
92
93        // Fill with bogus blocks to respect PruneMode distance.
94        let mut head = block.hash();
95        let mut rng = generators::rng();
96        for block_number in 2..=tip {
97            let nblock = random_block(
98                &mut rng,
99                block_number,
100                generators::BlockParams { parent: Some(head), ..Default::default() },
101            );
102            head = nblock.hash();
103            provider_rw.insert_historical_block(nblock.try_seal_with_senders().unwrap()).unwrap();
104        }
105        provider_rw
106            .static_file_provider()
107            .latest_writer(StaticFileSegment::Headers)
108            .unwrap()
109            .commit()
110            .unwrap();
111        provider_rw.commit().unwrap();
112
113        // insert pre state
114        let provider_rw = test_db.factory.provider_rw().unwrap();
115        let code = hex!("5a465a905090036002900360015500");
116        let code_hash = keccak256(hex!("5a465a905090036002900360015500"));
117        provider_rw
118            .tx_ref()
119            .put::<tables::PlainAccountState>(
120                address!("1000000000000000000000000000000000000000"),
121                Account { nonce: 0, balance: U256::ZERO, bytecode_hash: Some(code_hash) },
122            )
123            .unwrap();
124        provider_rw
125            .tx_ref()
126            .put::<tables::PlainAccountState>(
127                address!("a94f5374fce5edbc8e2a8697c15331677e6ebf0b"),
128                Account {
129                    nonce: 0,
130                    balance: U256::from(0x3635c9adc5dea00000u128),
131                    bytecode_hash: None,
132                },
133            )
134            .unwrap();
135        provider_rw
136            .tx_ref()
137            .put::<tables::Bytecodes>(code_hash, Bytecode::new_raw(code.to_vec().into()))
138            .unwrap();
139        provider_rw.commit().unwrap();
140
141        let check_pruning = |factory: ProviderFactory<MockNodeTypesWithDB>,
142                             prune_modes: PruneModes,
143                             expect_num_receipts: usize,
144                             expect_num_acc_changesets: usize,
145                             expect_num_storage_changesets: usize| async move {
146            let provider = factory.database_provider_rw().unwrap();
147
148            // Check execution and create receipts and changesets according to the pruning
149            // configuration
150            let mut execution_stage = ExecutionStage::new(
151                EthExecutorProvider::ethereum(Arc::new(
152                    ChainSpecBuilder::mainnet().berlin_activated().build(),
153                )),
154                ExecutionStageThresholds {
155                    max_blocks: Some(100),
156                    max_changes: None,
157                    max_cumulative_gas: None,
158                    max_duration: None,
159                },
160                MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD,
161                prune_modes.clone(),
162                ExExManagerHandle::empty(),
163            );
164
165            execution_stage.execute(&provider, input).unwrap();
166            assert_eq!(
167                provider.receipts_by_block(1.into()).unwrap().unwrap().len(),
168                expect_num_receipts
169            );
170
171            assert_eq!(
172                provider.changed_storages_and_blocks_with_range(0..=1000).unwrap().len(),
173                expect_num_storage_changesets
174            );
175
176            assert_eq!(
177                provider.changed_accounts_and_blocks_with_range(0..=1000).unwrap().len(),
178                expect_num_acc_changesets
179            );
180
181            // Check AccountHistory
182            let mut acc_indexing_stage = IndexAccountHistoryStage {
183                prune_mode: prune_modes.account_history,
184                ..Default::default()
185            };
186
187            if prune_modes.account_history == Some(PruneMode::Full) {
188                // Full is not supported
189                assert!(acc_indexing_stage.execute(&provider, input).is_err());
190            } else {
191                acc_indexing_stage.execute(&provider, input).unwrap();
192                let mut account_history: Cursor<RW, AccountsHistory> =
193                    provider.tx_ref().cursor_read::<tables::AccountsHistory>().unwrap();
194                assert_eq!(account_history.walk(None).unwrap().count(), expect_num_acc_changesets);
195            }
196
197            // Check StorageHistory
198            let mut storage_indexing_stage = IndexStorageHistoryStage {
199                prune_mode: prune_modes.storage_history,
200                ..Default::default()
201            };
202
203            if prune_modes.storage_history == Some(PruneMode::Full) {
204                // Full is not supported
205                assert!(acc_indexing_stage.execute(&provider, input).is_err());
206            } else {
207                storage_indexing_stage.execute(&provider, input).unwrap();
208
209                let mut storage_history =
210                    provider.tx_ref().cursor_read::<tables::StoragesHistory>().unwrap();
211                assert_eq!(
212                    storage_history.walk(None).unwrap().count(),
213                    expect_num_storage_changesets
214                );
215            }
216        };
217
218        // In an unpruned configuration there is 1 receipt, 3 changed accounts and 1 changed
219        // storage.
220        let mut prune = PruneModes::none();
221        check_pruning(test_db.factory.clone(), prune.clone(), 1, 3, 1).await;
222
223        prune.receipts = Some(PruneMode::Full);
224        prune.account_history = Some(PruneMode::Full);
225        prune.storage_history = Some(PruneMode::Full);
226        // This will result in error for account_history and storage_history, which is caught.
227        check_pruning(test_db.factory.clone(), prune.clone(), 0, 0, 0).await;
228
229        prune.receipts = Some(PruneMode::Before(1));
230        prune.account_history = Some(PruneMode::Before(1));
231        prune.storage_history = Some(PruneMode::Before(1));
232        check_pruning(test_db.factory.clone(), prune.clone(), 1, 3, 1).await;
233
234        prune.receipts = Some(PruneMode::Before(2));
235        prune.account_history = Some(PruneMode::Before(2));
236        prune.storage_history = Some(PruneMode::Before(2));
237        // The one account is the miner
238        check_pruning(test_db.factory.clone(), prune.clone(), 0, 1, 0).await;
239
240        prune.receipts = Some(PruneMode::Distance(66));
241        prune.account_history = Some(PruneMode::Distance(66));
242        prune.storage_history = Some(PruneMode::Distance(66));
243        check_pruning(test_db.factory.clone(), prune.clone(), 1, 3, 1).await;
244
245        prune.receipts = Some(PruneMode::Distance(64));
246        prune.account_history = Some(PruneMode::Distance(64));
247        prune.storage_history = Some(PruneMode::Distance(64));
248        // The one account is the miner
249        check_pruning(test_db.factory.clone(), prune.clone(), 0, 1, 0).await;
250    }
251
252    /// It will generate `num_blocks`, push them to static files and set all stage checkpoints to
253    /// `num_blocks - 1`.
254    fn seed_data(num_blocks: usize) -> ProviderResult<TestStageDB> {
255        let db = TestStageDB::default();
256        let mut rng = generators::rng();
257        let genesis_hash = B256::ZERO;
258        let tip = (num_blocks - 1) as u64;
259
260        let blocks = random_block_range(
261            &mut rng,
262            0..=tip,
263            BlockRangeParams { parent: Some(genesis_hash), tx_count: 2..3, ..Default::default() },
264        );
265        db.insert_blocks(blocks.iter(), StorageKind::Static)?;
266
267        let mut receipts = Vec::with_capacity(blocks.len());
268        let mut tx_num = 0u64;
269        for block in &blocks {
270            let mut block_receipts = Vec::with_capacity(block.body.transactions.len());
271            for transaction in &block.body.transactions {
272                block_receipts.push((tx_num, random_receipt(&mut rng, transaction, Some(0))));
273                tx_num += 1;
274            }
275            receipts.push((block.number, block_receipts));
276        }
277        db.insert_receipts_by_block(receipts, StorageKind::Static)?;
278
279        // simulate pipeline by setting all checkpoints to inserted height.
280        let provider_rw = db.factory.provider_rw()?;
281        for stage in StageId::ALL {
282            provider_rw.save_stage_checkpoint(stage, StageCheckpoint::new(tip))?;
283        }
284        provider_rw.commit()?;
285
286        Ok(db)
287    }
288
289    /// Simulates losing data to corruption and compare the check consistency result
290    /// against the expected one.
291    fn simulate_behind_checkpoint_corruption(
292        db: &TestStageDB,
293        prune_count: usize,
294        segment: StaticFileSegment,
295        is_full_node: bool,
296        expected: Option<PipelineTarget>,
297    ) {
298        // We recreate the static file provider, since consistency heals are done on fetching the
299        // writer for the first time.
300        let mut static_file_provider = db.factory.static_file_provider();
301        static_file_provider = StaticFileProvider::read_write(static_file_provider.path()).unwrap();
302
303        // Simulate corruption by removing `prune_count` rows from the data file without updating
304        // its offset list and configuration.
305        {
306            let mut headers_writer = static_file_provider.latest_writer(segment).unwrap();
307            let reader = headers_writer.inner().jar().open_data_reader().unwrap();
308            let columns = headers_writer.inner().jar().columns();
309            let data_file = headers_writer.inner().data_file();
310            let last_offset = reader.reverse_offset(prune_count * columns).unwrap();
311            data_file.get_mut().set_len(last_offset).unwrap();
312            data_file.flush().unwrap();
313            data_file.get_ref().sync_all().unwrap();
314        }
315
316        // We recreate the static file provider, since consistency heals are done on fetching the
317        // writer for the first time.
318        let mut static_file_provider = db.factory.static_file_provider();
319        static_file_provider = StaticFileProvider::read_write(static_file_provider.path()).unwrap();
320        assert_eq!(
321            static_file_provider
322                .check_consistency(&db.factory.database_provider_ro().unwrap(), is_full_node,),
323            Ok(expected)
324        );
325    }
326
327    /// Saves a checkpoint with `checkpoint_block_number` and compare the check consistency result
328    /// against the expected one.
329    fn save_checkpoint_and_check(
330        db: &TestStageDB,
331        stage_id: StageId,
332        checkpoint_block_number: BlockNumber,
333        expected: Option<PipelineTarget>,
334    ) {
335        let provider_rw = db.factory.provider_rw().unwrap();
336        provider_rw
337            .save_stage_checkpoint(stage_id, StageCheckpoint::new(checkpoint_block_number))
338            .unwrap();
339        provider_rw.commit().unwrap();
340
341        assert_eq!(
342            db.factory
343                .static_file_provider()
344                .check_consistency(&db.factory.database_provider_ro().unwrap(), false,),
345            Ok(expected)
346        );
347    }
348
349    /// Inserts a dummy value at key and compare the check consistency result against the expected
350    /// one.
351    fn update_db_and_check<T: Table<Key = u64>>(
352        db: &TestStageDB,
353        key: u64,
354        expected: Option<PipelineTarget>,
355    ) where
356        <T as Table>::Value: Default,
357    {
358        let provider_rw = db.factory.provider_rw().unwrap();
359        let mut cursor = provider_rw.tx_ref().cursor_write::<T>().unwrap();
360        cursor.insert(key, Default::default()).unwrap();
361        provider_rw.commit().unwrap();
362
363        assert_eq!(
364            db.factory
365                .static_file_provider()
366                .check_consistency(&db.factory.database_provider_ro().unwrap(), false),
367            Ok(expected)
368        );
369    }
370
371    #[test]
372    fn test_consistency() {
373        let db = seed_data(90).unwrap();
374        let db_provider = db.factory.database_provider_ro().unwrap();
375
376        assert_eq!(
377            db.factory.static_file_provider().check_consistency(&db_provider, false),
378            Ok(None)
379        );
380    }
381
382    #[test]
383    fn test_consistency_no_commit_prune() {
384        let db = seed_data(90).unwrap();
385        let full_node = true;
386        let archive_node = !full_node;
387
388        // Full node does not use receipts, therefore doesn't check for consistency on receipts
389        // segment
390        simulate_behind_checkpoint_corruption(&db, 1, StaticFileSegment::Receipts, full_node, None);
391
392        // there are 2 to 3 transactions per block. however, if we lose one tx, we need to unwind to
393        // the previous block.
394        simulate_behind_checkpoint_corruption(
395            &db,
396            1,
397            StaticFileSegment::Receipts,
398            archive_node,
399            Some(PipelineTarget::Unwind(88)),
400        );
401
402        simulate_behind_checkpoint_corruption(
403            &db,
404            3,
405            StaticFileSegment::Headers,
406            archive_node,
407            Some(PipelineTarget::Unwind(86)),
408        );
409    }
410
411    #[test]
412    fn test_consistency_checkpoints() {
413        let db = seed_data(90).unwrap();
414
415        // When a checkpoint is behind, we delete data from static files.
416        let block = 87;
417        save_checkpoint_and_check(&db, StageId::Bodies, block, None);
418        assert_eq!(
419            db.factory
420                .static_file_provider()
421                .get_highest_static_file_block(StaticFileSegment::Transactions),
422            Some(block)
423        );
424        assert_eq!(
425            db.factory
426                .static_file_provider()
427                .get_highest_static_file_tx(StaticFileSegment::Transactions),
428            db.factory.block_body_indices(block).unwrap().map(|b| b.last_tx_num())
429        );
430
431        let block = 86;
432        save_checkpoint_and_check(&db, StageId::Execution, block, None);
433        assert_eq!(
434            db.factory
435                .static_file_provider()
436                .get_highest_static_file_block(StaticFileSegment::Receipts),
437            Some(block)
438        );
439        assert_eq!(
440            db.factory
441                .static_file_provider()
442                .get_highest_static_file_tx(StaticFileSegment::Receipts),
443            db.factory.block_body_indices(block).unwrap().map(|b| b.last_tx_num())
444        );
445
446        let block = 80;
447        save_checkpoint_and_check(&db, StageId::Headers, block, None);
448        assert_eq!(
449            db.factory
450                .static_file_provider()
451                .get_highest_static_file_block(StaticFileSegment::Headers),
452            Some(block)
453        );
454
455        // When a checkpoint is ahead, we request a pipeline unwind.
456        save_checkpoint_and_check(&db, StageId::Headers, 91, Some(PipelineTarget::Unwind(block)));
457    }
458
459    #[test]
460    fn test_consistency_headers_gap() {
461        let db = seed_data(90).unwrap();
462        let current = db
463            .factory
464            .static_file_provider()
465            .get_highest_static_file_block(StaticFileSegment::Headers)
466            .unwrap();
467
468        // Creates a gap of one header: static_file <missing> db
469        update_db_and_check::<tables::Headers>(&db, current + 2, Some(PipelineTarget::Unwind(89)));
470
471        // Fill the gap, and ensure no unwind is necessary.
472        update_db_and_check::<tables::Headers>(&db, current + 1, None);
473    }
474
475    #[test]
476    fn test_consistency_tx_gap() {
477        let db = seed_data(90).unwrap();
478        let current = db
479            .factory
480            .static_file_provider()
481            .get_highest_static_file_tx(StaticFileSegment::Transactions)
482            .unwrap();
483
484        // Creates a gap of one transaction: static_file <missing> db
485        update_db_and_check::<tables::Transactions>(
486            &db,
487            current + 2,
488            Some(PipelineTarget::Unwind(89)),
489        );
490
491        // Fill the gap, and ensure no unwind is necessary.
492        update_db_and_check::<tables::Transactions>(&db, current + 1, None);
493    }
494
495    #[test]
496    fn test_consistency_receipt_gap() {
497        let db = seed_data(90).unwrap();
498        let current = db
499            .factory
500            .static_file_provider()
501            .get_highest_static_file_tx(StaticFileSegment::Receipts)
502            .unwrap();
503
504        // Creates a gap of one receipt: static_file <missing> db
505        update_db_and_check::<tables::Receipts>(&db, current + 2, Some(PipelineTarget::Unwind(89)));
506
507        // Fill the gap, and ensure no unwind is necessary.
508        update_db_and_check::<tables::Receipts>(&db, current + 1, None);
509    }
510}