reth_stages/stages/
tx_lookup.rs

1use alloy_eips::eip2718::Encodable2718;
2use alloy_primitives::{TxHash, TxNumber};
3use num_traits::Zero;
4use reth_config::config::{EtlConfig, TransactionLookupConfig};
5use reth_db::{table::Value, tables, RawKey, RawValue};
6use reth_db_api::{
7    cursor::{DbCursorRO, DbCursorRW},
8    transaction::{DbTx, DbTxMut},
9};
10use reth_etl::Collector;
11use reth_primitives::NodePrimitives;
12use reth_primitives_traits::SignedTransaction;
13use reth_provider::{
14    BlockReader, DBProvider, PruneCheckpointReader, PruneCheckpointWriter,
15    StaticFileProviderFactory, StatsReader, TransactionsProvider, TransactionsProviderExt,
16};
17use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
18use reth_stages_api::{
19    EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId,
20    UnwindInput, UnwindOutput,
21};
22use reth_storage_errors::provider::ProviderError;
23use tracing::*;
24
25/// The transaction lookup stage.
26///
27/// This stage walks over existing transactions, and sets the transaction hash of each transaction
28/// in a block to the corresponding `BlockNumber` at each block. This is written to the
29/// [`tables::TransactionHashNumbers`] This is used for looking up changesets via the transaction
30/// hash.
31///
32/// It uses [`reth_etl::Collector`] to collect all entries before finally writing them to disk.
33#[derive(Debug, Clone)]
34pub struct TransactionLookupStage {
35    /// The maximum number of lookup entries to hold in memory before pushing them to
36    /// [`reth_etl::Collector`].
37    chunk_size: u64,
38    etl_config: EtlConfig,
39    prune_mode: Option<PruneMode>,
40}
41
42impl Default for TransactionLookupStage {
43    fn default() -> Self {
44        Self { chunk_size: 5_000_000, etl_config: EtlConfig::default(), prune_mode: None }
45    }
46}
47
48impl TransactionLookupStage {
49    /// Create new instance of [`TransactionLookupStage`].
50    pub const fn new(
51        config: TransactionLookupConfig,
52        etl_config: EtlConfig,
53        prune_mode: Option<PruneMode>,
54    ) -> Self {
55        Self { chunk_size: config.chunk_size, etl_config, prune_mode }
56    }
57}
58
59impl<Provider> Stage<Provider> for TransactionLookupStage
60where
61    Provider: DBProvider<Tx: DbTxMut>
62        + PruneCheckpointWriter
63        + BlockReader
64        + PruneCheckpointReader
65        + StatsReader
66        + StaticFileProviderFactory<Primitives: NodePrimitives<SignedTx: Value + SignedTransaction>>
67        + TransactionsProviderExt,
68{
69    /// Return the id of the stage
70    fn id(&self) -> StageId {
71        StageId::TransactionLookup
72    }
73
74    /// Write transaction hash -> id entries
75    fn execute(
76        &mut self,
77        provider: &Provider,
78        mut input: ExecInput,
79    ) -> Result<ExecOutput, StageError> {
80        if let Some((target_prunable_block, prune_mode)) = self
81            .prune_mode
82            .map(|mode| {
83                mode.prune_target_block(
84                    input.target(),
85                    PruneSegment::TransactionLookup,
86                    PrunePurpose::User,
87                )
88            })
89            .transpose()?
90            .flatten()
91        {
92            if target_prunable_block > input.checkpoint().block_number {
93                input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
94
95                // Save prune checkpoint only if we don't have one already.
96                // Otherwise, pruner may skip the unpruned range of blocks.
97                if provider.get_prune_checkpoint(PruneSegment::TransactionLookup)?.is_none() {
98                    let target_prunable_tx_number = provider
99                        .block_body_indices(target_prunable_block)?
100                        .ok_or(ProviderError::BlockBodyIndicesNotFound(target_prunable_block))?
101                        .last_tx_num();
102
103                    provider.save_prune_checkpoint(
104                        PruneSegment::TransactionLookup,
105                        PruneCheckpoint {
106                            block_number: Some(target_prunable_block),
107                            tx_number: Some(target_prunable_tx_number),
108                            prune_mode,
109                        },
110                    )?;
111                }
112            }
113        }
114        if input.target_reached() {
115            return Ok(ExecOutput::done(input.checkpoint()));
116        }
117
118        // 500MB temporary files
119        let mut hash_collector: Collector<TxHash, TxNumber> =
120            Collector::new(self.etl_config.file_size, self.etl_config.dir.clone());
121
122        info!(
123            target: "sync::stages::transaction_lookup",
124            tx_range = ?input.checkpoint().block_number..=input.target(),
125            "Updating transaction lookup"
126        );
127
128        loop {
129            let (tx_range, block_range, is_final_range) =
130                input.next_block_range_with_transaction_threshold(provider, self.chunk_size)?;
131
132            let end_block = *block_range.end();
133
134            info!(target: "sync::stages::transaction_lookup", ?tx_range, "Calculating transaction hashes");
135
136            for (key, value) in provider.transaction_hashes_by_range(tx_range)? {
137                hash_collector.insert(key, value)?;
138            }
139
140            input.checkpoint = Some(
141                StageCheckpoint::new(end_block)
142                    .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
143            );
144
145            if is_final_range {
146                let append_only =
147                    provider.count_entries::<tables::TransactionHashNumbers>()?.is_zero();
148                let mut txhash_cursor = provider
149                    .tx_ref()
150                    .cursor_write::<tables::RawTable<tables::TransactionHashNumbers>>()?;
151
152                let total_hashes = hash_collector.len();
153                let interval = (total_hashes / 10).max(1);
154                for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
155                    let (hash, number) = hash_to_number?;
156                    if index > 0 && index % interval == 0 {
157                        info!(
158                            target: "sync::stages::transaction_lookup",
159                            ?append_only,
160                            progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
161                            "Inserting hashes"
162                        );
163                    }
164
165                    let key = RawKey::<TxHash>::from_vec(hash);
166                    if append_only {
167                        txhash_cursor.append(key, RawValue::<TxNumber>::from_vec(number))?
168                    } else {
169                        txhash_cursor.insert(key, RawValue::<TxNumber>::from_vec(number))?
170                    }
171                }
172
173                trace!(target: "sync::stages::transaction_lookup",
174                    total_hashes,
175                    "Transaction hashes inserted"
176                );
177
178                break;
179            }
180        }
181
182        Ok(ExecOutput {
183            checkpoint: StageCheckpoint::new(input.target())
184                .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
185            done: true,
186        })
187    }
188
189    /// Unwind the stage.
190    fn unwind(
191        &mut self,
192        provider: &Provider,
193        input: UnwindInput,
194    ) -> Result<UnwindOutput, StageError> {
195        let tx = provider.tx_ref();
196        let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.chunk_size);
197
198        // Cursors to unwind tx hash to number
199        let mut body_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
200        let mut tx_hash_number_cursor = tx.cursor_write::<tables::TransactionHashNumbers>()?;
201        let static_file_provider = provider.static_file_provider();
202        let mut rev_walker = body_cursor.walk_back(Some(*range.end()))?;
203        while let Some((number, body)) = rev_walker.next().transpose()? {
204            if number <= unwind_to {
205                break;
206            }
207
208            // Delete all transactions that belong to this block
209            for tx_id in body.tx_num_range() {
210                // First delete the transaction and hash to id mapping
211                if let Some(transaction) = static_file_provider.transaction_by_id(tx_id)? {
212                    if tx_hash_number_cursor.seek_exact(transaction.trie_hash())?.is_some() {
213                        tx_hash_number_cursor.delete_current()?;
214                    }
215                }
216            }
217        }
218
219        Ok(UnwindOutput {
220            checkpoint: StageCheckpoint::new(unwind_to)
221                .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
222        })
223    }
224}
225
226fn stage_checkpoint<Provider>(provider: &Provider) -> Result<EntitiesCheckpoint, StageError>
227where
228    Provider: PruneCheckpointReader + StaticFileProviderFactory + StatsReader,
229{
230    let pruned_entries = provider
231        .get_prune_checkpoint(PruneSegment::TransactionLookup)?
232        .and_then(|checkpoint| checkpoint.tx_number)
233        // `+1` is needed because `TxNumber` is 0-indexed
234        .map(|tx_number| tx_number + 1)
235        .unwrap_or_default();
236    Ok(EntitiesCheckpoint {
237        // If `TransactionHashNumbers` table was pruned, we will have a number of entries in it not
238        // matching the actual number of processed transactions. To fix that, we add the
239        // number of pruned `TransactionHashNumbers` entries.
240        processed: provider.count_entries::<tables::TransactionHashNumbers>()? as u64 +
241            pruned_entries,
242        // Count only static files entries. If we count the database entries too, we may have
243        // duplicates. We're sure that the static files have all entries that database has,
244        // because we run the `StaticFileProducer` before starting the pipeline.
245        total: provider.static_file_provider().count_entries::<tables::Transactions>()? as u64,
246    })
247}
248
249#[cfg(test)]
250mod tests {
251    use super::*;
252    use crate::test_utils::{
253        stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, StorageKind,
254        TestRunnerError, TestStageDB, UnwindStageTestRunner,
255    };
256    use alloy_primitives::{BlockNumber, B256};
257    use assert_matches::assert_matches;
258    use reth_primitives::SealedBlock;
259    use reth_provider::{
260        providers::StaticFileWriter, BlockBodyIndicesProvider, DatabaseProviderFactory,
261        StaticFileProviderFactory,
262    };
263    use reth_stages_api::StageUnitCheckpoint;
264    use reth_testing_utils::generators::{
265        self, random_block, random_block_range, BlockParams, BlockRangeParams,
266    };
267    use std::ops::Sub;
268
269    // Implement stage test suite.
270    stage_test_suite_ext!(TransactionLookupTestRunner, transaction_lookup);
271
272    #[tokio::test]
273    async fn execute_single_transaction_lookup() {
274        let (previous_stage, stage_progress) = (500, 100);
275        let mut rng = generators::rng();
276
277        // Set up the runner
278        let runner = TransactionLookupTestRunner::default();
279        let input = ExecInput {
280            target: Some(previous_stage),
281            checkpoint: Some(StageCheckpoint::new(stage_progress)),
282        };
283
284        // Insert blocks with a single transaction at block `stage_progress + 10`
285        let non_empty_block_number = stage_progress + 10;
286        let blocks = (stage_progress..=input.target())
287            .map(|number| {
288                random_block(
289                    &mut rng,
290                    number,
291                    BlockParams {
292                        tx_count: Some((number == non_empty_block_number) as u8),
293                        ..Default::default()
294                    },
295                )
296            })
297            .collect::<Vec<_>>();
298        runner
299            .db
300            .insert_blocks(blocks.iter(), StorageKind::Static)
301            .expect("failed to insert blocks");
302
303        let rx = runner.execute(input);
304
305        // Assert the successful result
306        let result = rx.await.unwrap();
307        assert_matches!(
308            result,
309            Ok(ExecOutput {
310                checkpoint: StageCheckpoint {
311                block_number,
312                stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
313                    processed,
314                    total
315                }))
316            }, done: true }) if block_number == previous_stage && processed == total &&
317                total == runner.db.factory.static_file_provider().count_entries::<tables::Transactions>().unwrap() as u64
318        );
319
320        // Validate the stage execution
321        assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
322    }
323
324    #[tokio::test]
325    async fn execute_pruned_transaction_lookup() {
326        let (previous_stage, prune_target, stage_progress) = (500, 400, 100);
327        let mut rng = generators::rng();
328
329        // Set up the runner
330        let mut runner = TransactionLookupTestRunner::default();
331        let input = ExecInput {
332            target: Some(previous_stage),
333            checkpoint: Some(StageCheckpoint::new(stage_progress)),
334        };
335
336        // Seed only once with full input range
337        let seed = random_block_range(
338            &mut rng,
339            stage_progress + 1..=previous_stage,
340            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..2, ..Default::default() },
341        );
342        runner
343            .db
344            .insert_blocks(seed.iter(), StorageKind::Static)
345            .expect("failed to seed execution");
346
347        runner.set_prune_mode(PruneMode::Before(prune_target));
348
349        let rx = runner.execute(input);
350
351        // Assert the successful result
352        let result = rx.await.unwrap();
353        assert_matches!(
354            result,
355            Ok(ExecOutput {
356                checkpoint: StageCheckpoint {
357                block_number,
358                stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
359                    processed,
360                    total
361                }))
362            }, done: true }) if block_number == previous_stage && processed == total &&
363                total == runner.db.factory.static_file_provider().count_entries::<tables::Transactions>().unwrap() as u64
364        );
365
366        // Validate the stage execution
367        assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
368    }
369
370    #[test]
371    fn stage_checkpoint_pruned() {
372        let db = TestStageDB::default();
373        let mut rng = generators::rng();
374
375        let blocks = random_block_range(
376            &mut rng,
377            0..=100,
378            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..10, ..Default::default() },
379        );
380        db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
381
382        let max_pruned_block = 30;
383        let max_processed_block = 70;
384
385        let mut tx_hash_numbers = Vec::new();
386        let mut tx_hash_number = 0;
387        for block in &blocks[..=max_processed_block] {
388            for transaction in &block.body.transactions {
389                if block.number > max_pruned_block {
390                    tx_hash_numbers.push((transaction.hash(), tx_hash_number));
391                }
392                tx_hash_number += 1;
393            }
394        }
395        db.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");
396
397        let provider = db.factory.provider_rw().unwrap();
398        provider
399            .save_prune_checkpoint(
400                PruneSegment::TransactionLookup,
401                PruneCheckpoint {
402                    block_number: Some(max_pruned_block),
403                    tx_number: Some(
404                        blocks[..=max_pruned_block as usize]
405                            .iter()
406                            .map(|block| block.body.transactions.len() as u64)
407                            .sum::<u64>()
408                            .sub(1), // `TxNumber` is 0-indexed
409                    ),
410                    prune_mode: PruneMode::Full,
411                },
412            )
413            .expect("save stage checkpoint");
414        provider.commit().expect("commit");
415
416        let provider = db.factory.database_provider_rw().unwrap();
417        assert_eq!(
418            stage_checkpoint(&provider).expect("stage checkpoint"),
419            EntitiesCheckpoint {
420                processed: blocks[..=max_processed_block]
421                    .iter()
422                    .map(|block| block.body.transactions.len() as u64)
423                    .sum(),
424                total: blocks.iter().map(|block| block.body.transactions.len() as u64).sum()
425            }
426        );
427    }
428
429    struct TransactionLookupTestRunner {
430        db: TestStageDB,
431        chunk_size: u64,
432        etl_config: EtlConfig,
433        prune_mode: Option<PruneMode>,
434    }
435
436    impl Default for TransactionLookupTestRunner {
437        fn default() -> Self {
438            Self {
439                db: TestStageDB::default(),
440                chunk_size: 1000,
441                etl_config: EtlConfig::default(),
442                prune_mode: None,
443            }
444        }
445    }
446
447    impl TransactionLookupTestRunner {
448        fn set_prune_mode(&mut self, prune_mode: PruneMode) {
449            self.prune_mode = Some(prune_mode);
450        }
451
452        /// # Panics
453        ///
454        /// 1. If there are any entries in the [`tables::TransactionHashNumbers`] table above a
455        ///    given block number.
456        /// 2. If the is no requested block entry in the bodies table, but
457        ///    [`tables::TransactionHashNumbers`] is    not empty.
458        fn ensure_no_hash_by_block(&self, number: BlockNumber) -> Result<(), TestRunnerError> {
459            let body_result = self
460                .db
461                .factory
462                .provider_rw()?
463                .block_body_indices(number)?
464                .ok_or(ProviderError::BlockBodyIndicesNotFound(number));
465            match body_result {
466                Ok(body) => {
467                    self.db.ensure_no_entry_above_by_value::<tables::TransactionHashNumbers, _>(
468                        body.last_tx_num(),
469                        |key| key,
470                    )?
471                }
472                Err(_) => {
473                    assert!(self.db.table_is_empty::<tables::TransactionHashNumbers>()?);
474                }
475            };
476
477            Ok(())
478        }
479    }
480
481    impl StageTestRunner for TransactionLookupTestRunner {
482        type S = TransactionLookupStage;
483
484        fn db(&self) -> &TestStageDB {
485            &self.db
486        }
487
488        fn stage(&self) -> Self::S {
489            TransactionLookupStage {
490                chunk_size: self.chunk_size,
491                etl_config: self.etl_config.clone(),
492                prune_mode: self.prune_mode,
493            }
494        }
495    }
496
497    impl ExecuteStageTestRunner for TransactionLookupTestRunner {
498        type Seed = Vec<SealedBlock>;
499
500        fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
501            let stage_progress = input.checkpoint().block_number;
502            let end = input.target();
503            let mut rng = generators::rng();
504
505            let blocks = random_block_range(
506                &mut rng,
507                stage_progress + 1..=end,
508                BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..2, ..Default::default() },
509            );
510            self.db.insert_blocks(blocks.iter(), StorageKind::Static)?;
511            Ok(blocks)
512        }
513
514        fn validate_execution(
515            &self,
516            mut input: ExecInput,
517            output: Option<ExecOutput>,
518        ) -> Result<(), TestRunnerError> {
519            match output {
520                Some(output) => {
521                    let provider = self.db.factory.provider()?;
522
523                    if let Some((target_prunable_block, _)) = self
524                        .prune_mode
525                        .map(|mode| {
526                            mode.prune_target_block(
527                                input.target(),
528                                PruneSegment::TransactionLookup,
529                                PrunePurpose::User,
530                            )
531                        })
532                        .transpose()
533                        .expect("prune target block for transaction lookup")
534                        .flatten()
535                    {
536                        if target_prunable_block > input.checkpoint().block_number {
537                            input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
538                        }
539                    }
540                    let start_block = input.next_block();
541                    let end_block = output.checkpoint.block_number;
542
543                    if start_block > end_block {
544                        return Ok(())
545                    }
546
547                    let mut body_cursor =
548                        provider.tx_ref().cursor_read::<tables::BlockBodyIndices>()?;
549                    body_cursor.seek_exact(start_block)?;
550
551                    while let Some((_, body)) = body_cursor.next()? {
552                        for tx_id in body.tx_num_range() {
553                            let transaction =
554                                provider.transaction_by_id(tx_id)?.expect("no transaction entry");
555                            assert_eq!(Some(tx_id), provider.transaction_id(transaction.hash())?);
556                        }
557                    }
558                }
559                None => self.ensure_no_hash_by_block(input.checkpoint().block_number)?,
560            };
561            Ok(())
562        }
563    }
564
565    impl UnwindStageTestRunner for TransactionLookupTestRunner {
566        fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
567            self.ensure_no_hash_by_block(input.unwind_to)
568        }
569    }
570}