1use alloy_eips::eip2718::Encodable2718;
2use alloy_primitives::{TxHash, TxNumber};
3use num_traits::Zero;
4use reth_config::config::{EtlConfig, TransactionLookupConfig};
5use reth_db::{table::Value, tables, RawKey, RawValue};
6use reth_db_api::{
7 cursor::{DbCursorRO, DbCursorRW},
8 transaction::{DbTx, DbTxMut},
9};
10use reth_etl::Collector;
11use reth_primitives::NodePrimitives;
12use reth_primitives_traits::SignedTransaction;
13use reth_provider::{
14 BlockReader, DBProvider, PruneCheckpointReader, PruneCheckpointWriter,
15 StaticFileProviderFactory, StatsReader, TransactionsProvider, TransactionsProviderExt,
16};
17use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
18use reth_stages_api::{
19 EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId,
20 UnwindInput, UnwindOutput,
21};
22use reth_storage_errors::provider::ProviderError;
23use tracing::*;
24
25#[derive(Debug, Clone)]
34pub struct TransactionLookupStage {
35 chunk_size: u64,
38 etl_config: EtlConfig,
39 prune_mode: Option<PruneMode>,
40}
41
42impl Default for TransactionLookupStage {
43 fn default() -> Self {
44 Self { chunk_size: 5_000_000, etl_config: EtlConfig::default(), prune_mode: None }
45 }
46}
47
48impl TransactionLookupStage {
49 pub const fn new(
51 config: TransactionLookupConfig,
52 etl_config: EtlConfig,
53 prune_mode: Option<PruneMode>,
54 ) -> Self {
55 Self { chunk_size: config.chunk_size, etl_config, prune_mode }
56 }
57}
58
59impl<Provider> Stage<Provider> for TransactionLookupStage
60where
61 Provider: DBProvider<Tx: DbTxMut>
62 + PruneCheckpointWriter
63 + BlockReader
64 + PruneCheckpointReader
65 + StatsReader
66 + StaticFileProviderFactory<Primitives: NodePrimitives<SignedTx: Value + SignedTransaction>>
67 + TransactionsProviderExt,
68{
69 fn id(&self) -> StageId {
71 StageId::TransactionLookup
72 }
73
74 fn execute(
76 &mut self,
77 provider: &Provider,
78 mut input: ExecInput,
79 ) -> Result<ExecOutput, StageError> {
80 if let Some((target_prunable_block, prune_mode)) = self
81 .prune_mode
82 .map(|mode| {
83 mode.prune_target_block(
84 input.target(),
85 PruneSegment::TransactionLookup,
86 PrunePurpose::User,
87 )
88 })
89 .transpose()?
90 .flatten()
91 {
92 if target_prunable_block > input.checkpoint().block_number {
93 input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
94
95 if provider.get_prune_checkpoint(PruneSegment::TransactionLookup)?.is_none() {
98 let target_prunable_tx_number = provider
99 .block_body_indices(target_prunable_block)?
100 .ok_or(ProviderError::BlockBodyIndicesNotFound(target_prunable_block))?
101 .last_tx_num();
102
103 provider.save_prune_checkpoint(
104 PruneSegment::TransactionLookup,
105 PruneCheckpoint {
106 block_number: Some(target_prunable_block),
107 tx_number: Some(target_prunable_tx_number),
108 prune_mode,
109 },
110 )?;
111 }
112 }
113 }
114 if input.target_reached() {
115 return Ok(ExecOutput::done(input.checkpoint()));
116 }
117
118 let mut hash_collector: Collector<TxHash, TxNumber> =
120 Collector::new(self.etl_config.file_size, self.etl_config.dir.clone());
121
122 info!(
123 target: "sync::stages::transaction_lookup",
124 tx_range = ?input.checkpoint().block_number..=input.target(),
125 "Updating transaction lookup"
126 );
127
128 loop {
129 let (tx_range, block_range, is_final_range) =
130 input.next_block_range_with_transaction_threshold(provider, self.chunk_size)?;
131
132 let end_block = *block_range.end();
133
134 info!(target: "sync::stages::transaction_lookup", ?tx_range, "Calculating transaction hashes");
135
136 for (key, value) in provider.transaction_hashes_by_range(tx_range)? {
137 hash_collector.insert(key, value)?;
138 }
139
140 input.checkpoint = Some(
141 StageCheckpoint::new(end_block)
142 .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
143 );
144
145 if is_final_range {
146 let append_only =
147 provider.count_entries::<tables::TransactionHashNumbers>()?.is_zero();
148 let mut txhash_cursor = provider
149 .tx_ref()
150 .cursor_write::<tables::RawTable<tables::TransactionHashNumbers>>()?;
151
152 let total_hashes = hash_collector.len();
153 let interval = (total_hashes / 10).max(1);
154 for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
155 let (hash, number) = hash_to_number?;
156 if index > 0 && index % interval == 0 {
157 info!(
158 target: "sync::stages::transaction_lookup",
159 ?append_only,
160 progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
161 "Inserting hashes"
162 );
163 }
164
165 let key = RawKey::<TxHash>::from_vec(hash);
166 if append_only {
167 txhash_cursor.append(key, RawValue::<TxNumber>::from_vec(number))?
168 } else {
169 txhash_cursor.insert(key, RawValue::<TxNumber>::from_vec(number))?
170 }
171 }
172
173 trace!(target: "sync::stages::transaction_lookup",
174 total_hashes,
175 "Transaction hashes inserted"
176 );
177
178 break;
179 }
180 }
181
182 Ok(ExecOutput {
183 checkpoint: StageCheckpoint::new(input.target())
184 .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
185 done: true,
186 })
187 }
188
189 fn unwind(
191 &mut self,
192 provider: &Provider,
193 input: UnwindInput,
194 ) -> Result<UnwindOutput, StageError> {
195 let tx = provider.tx_ref();
196 let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.chunk_size);
197
198 let mut body_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
200 let mut tx_hash_number_cursor = tx.cursor_write::<tables::TransactionHashNumbers>()?;
201 let static_file_provider = provider.static_file_provider();
202 let mut rev_walker = body_cursor.walk_back(Some(*range.end()))?;
203 while let Some((number, body)) = rev_walker.next().transpose()? {
204 if number <= unwind_to {
205 break;
206 }
207
208 for tx_id in body.tx_num_range() {
210 if let Some(transaction) = static_file_provider.transaction_by_id(tx_id)? {
212 if tx_hash_number_cursor.seek_exact(transaction.trie_hash())?.is_some() {
213 tx_hash_number_cursor.delete_current()?;
214 }
215 }
216 }
217 }
218
219 Ok(UnwindOutput {
220 checkpoint: StageCheckpoint::new(unwind_to)
221 .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
222 })
223 }
224}
225
226fn stage_checkpoint<Provider>(provider: &Provider) -> Result<EntitiesCheckpoint, StageError>
227where
228 Provider: PruneCheckpointReader + StaticFileProviderFactory + StatsReader,
229{
230 let pruned_entries = provider
231 .get_prune_checkpoint(PruneSegment::TransactionLookup)?
232 .and_then(|checkpoint| checkpoint.tx_number)
233 .map(|tx_number| tx_number + 1)
235 .unwrap_or_default();
236 Ok(EntitiesCheckpoint {
237 processed: provider.count_entries::<tables::TransactionHashNumbers>()? as u64 +
241 pruned_entries,
242 total: provider.static_file_provider().count_entries::<tables::Transactions>()? as u64,
246 })
247}
248
249#[cfg(test)]
250mod tests {
251 use super::*;
252 use crate::test_utils::{
253 stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, StorageKind,
254 TestRunnerError, TestStageDB, UnwindStageTestRunner,
255 };
256 use alloy_primitives::{BlockNumber, B256};
257 use assert_matches::assert_matches;
258 use reth_primitives::SealedBlock;
259 use reth_provider::{
260 providers::StaticFileWriter, BlockBodyIndicesProvider, DatabaseProviderFactory,
261 StaticFileProviderFactory,
262 };
263 use reth_stages_api::StageUnitCheckpoint;
264 use reth_testing_utils::generators::{
265 self, random_block, random_block_range, BlockParams, BlockRangeParams,
266 };
267 use std::ops::Sub;
268
269 stage_test_suite_ext!(TransactionLookupTestRunner, transaction_lookup);
271
272 #[tokio::test]
273 async fn execute_single_transaction_lookup() {
274 let (previous_stage, stage_progress) = (500, 100);
275 let mut rng = generators::rng();
276
277 let runner = TransactionLookupTestRunner::default();
279 let input = ExecInput {
280 target: Some(previous_stage),
281 checkpoint: Some(StageCheckpoint::new(stage_progress)),
282 };
283
284 let non_empty_block_number = stage_progress + 10;
286 let blocks = (stage_progress..=input.target())
287 .map(|number| {
288 random_block(
289 &mut rng,
290 number,
291 BlockParams {
292 tx_count: Some((number == non_empty_block_number) as u8),
293 ..Default::default()
294 },
295 )
296 })
297 .collect::<Vec<_>>();
298 runner
299 .db
300 .insert_blocks(blocks.iter(), StorageKind::Static)
301 .expect("failed to insert blocks");
302
303 let rx = runner.execute(input);
304
305 let result = rx.await.unwrap();
307 assert_matches!(
308 result,
309 Ok(ExecOutput {
310 checkpoint: StageCheckpoint {
311 block_number,
312 stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
313 processed,
314 total
315 }))
316 }, done: true }) if block_number == previous_stage && processed == total &&
317 total == runner.db.factory.static_file_provider().count_entries::<tables::Transactions>().unwrap() as u64
318 );
319
320 assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
322 }
323
324 #[tokio::test]
325 async fn execute_pruned_transaction_lookup() {
326 let (previous_stage, prune_target, stage_progress) = (500, 400, 100);
327 let mut rng = generators::rng();
328
329 let mut runner = TransactionLookupTestRunner::default();
331 let input = ExecInput {
332 target: Some(previous_stage),
333 checkpoint: Some(StageCheckpoint::new(stage_progress)),
334 };
335
336 let seed = random_block_range(
338 &mut rng,
339 stage_progress + 1..=previous_stage,
340 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..2, ..Default::default() },
341 );
342 runner
343 .db
344 .insert_blocks(seed.iter(), StorageKind::Static)
345 .expect("failed to seed execution");
346
347 runner.set_prune_mode(PruneMode::Before(prune_target));
348
349 let rx = runner.execute(input);
350
351 let result = rx.await.unwrap();
353 assert_matches!(
354 result,
355 Ok(ExecOutput {
356 checkpoint: StageCheckpoint {
357 block_number,
358 stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
359 processed,
360 total
361 }))
362 }, done: true }) if block_number == previous_stage && processed == total &&
363 total == runner.db.factory.static_file_provider().count_entries::<tables::Transactions>().unwrap() as u64
364 );
365
366 assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
368 }
369
370 #[test]
371 fn stage_checkpoint_pruned() {
372 let db = TestStageDB::default();
373 let mut rng = generators::rng();
374
375 let blocks = random_block_range(
376 &mut rng,
377 0..=100,
378 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..10, ..Default::default() },
379 );
380 db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks");
381
382 let max_pruned_block = 30;
383 let max_processed_block = 70;
384
385 let mut tx_hash_numbers = Vec::new();
386 let mut tx_hash_number = 0;
387 for block in &blocks[..=max_processed_block] {
388 for transaction in &block.body.transactions {
389 if block.number > max_pruned_block {
390 tx_hash_numbers.push((transaction.hash(), tx_hash_number));
391 }
392 tx_hash_number += 1;
393 }
394 }
395 db.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");
396
397 let provider = db.factory.provider_rw().unwrap();
398 provider
399 .save_prune_checkpoint(
400 PruneSegment::TransactionLookup,
401 PruneCheckpoint {
402 block_number: Some(max_pruned_block),
403 tx_number: Some(
404 blocks[..=max_pruned_block as usize]
405 .iter()
406 .map(|block| block.body.transactions.len() as u64)
407 .sum::<u64>()
408 .sub(1), ),
410 prune_mode: PruneMode::Full,
411 },
412 )
413 .expect("save stage checkpoint");
414 provider.commit().expect("commit");
415
416 let provider = db.factory.database_provider_rw().unwrap();
417 assert_eq!(
418 stage_checkpoint(&provider).expect("stage checkpoint"),
419 EntitiesCheckpoint {
420 processed: blocks[..=max_processed_block]
421 .iter()
422 .map(|block| block.body.transactions.len() as u64)
423 .sum(),
424 total: blocks.iter().map(|block| block.body.transactions.len() as u64).sum()
425 }
426 );
427 }
428
429 struct TransactionLookupTestRunner {
430 db: TestStageDB,
431 chunk_size: u64,
432 etl_config: EtlConfig,
433 prune_mode: Option<PruneMode>,
434 }
435
436 impl Default for TransactionLookupTestRunner {
437 fn default() -> Self {
438 Self {
439 db: TestStageDB::default(),
440 chunk_size: 1000,
441 etl_config: EtlConfig::default(),
442 prune_mode: None,
443 }
444 }
445 }
446
447 impl TransactionLookupTestRunner {
448 fn set_prune_mode(&mut self, prune_mode: PruneMode) {
449 self.prune_mode = Some(prune_mode);
450 }
451
452 fn ensure_no_hash_by_block(&self, number: BlockNumber) -> Result<(), TestRunnerError> {
459 let body_result = self
460 .db
461 .factory
462 .provider_rw()?
463 .block_body_indices(number)?
464 .ok_or(ProviderError::BlockBodyIndicesNotFound(number));
465 match body_result {
466 Ok(body) => {
467 self.db.ensure_no_entry_above_by_value::<tables::TransactionHashNumbers, _>(
468 body.last_tx_num(),
469 |key| key,
470 )?
471 }
472 Err(_) => {
473 assert!(self.db.table_is_empty::<tables::TransactionHashNumbers>()?);
474 }
475 };
476
477 Ok(())
478 }
479 }
480
481 impl StageTestRunner for TransactionLookupTestRunner {
482 type S = TransactionLookupStage;
483
484 fn db(&self) -> &TestStageDB {
485 &self.db
486 }
487
488 fn stage(&self) -> Self::S {
489 TransactionLookupStage {
490 chunk_size: self.chunk_size,
491 etl_config: self.etl_config.clone(),
492 prune_mode: self.prune_mode,
493 }
494 }
495 }
496
497 impl ExecuteStageTestRunner for TransactionLookupTestRunner {
498 type Seed = Vec<SealedBlock>;
499
500 fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
501 let stage_progress = input.checkpoint().block_number;
502 let end = input.target();
503 let mut rng = generators::rng();
504
505 let blocks = random_block_range(
506 &mut rng,
507 stage_progress + 1..=end,
508 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..2, ..Default::default() },
509 );
510 self.db.insert_blocks(blocks.iter(), StorageKind::Static)?;
511 Ok(blocks)
512 }
513
514 fn validate_execution(
515 &self,
516 mut input: ExecInput,
517 output: Option<ExecOutput>,
518 ) -> Result<(), TestRunnerError> {
519 match output {
520 Some(output) => {
521 let provider = self.db.factory.provider()?;
522
523 if let Some((target_prunable_block, _)) = self
524 .prune_mode
525 .map(|mode| {
526 mode.prune_target_block(
527 input.target(),
528 PruneSegment::TransactionLookup,
529 PrunePurpose::User,
530 )
531 })
532 .transpose()
533 .expect("prune target block for transaction lookup")
534 .flatten()
535 {
536 if target_prunable_block > input.checkpoint().block_number {
537 input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
538 }
539 }
540 let start_block = input.next_block();
541 let end_block = output.checkpoint.block_number;
542
543 if start_block > end_block {
544 return Ok(())
545 }
546
547 let mut body_cursor =
548 provider.tx_ref().cursor_read::<tables::BlockBodyIndices>()?;
549 body_cursor.seek_exact(start_block)?;
550
551 while let Some((_, body)) = body_cursor.next()? {
552 for tx_id in body.tx_num_range() {
553 let transaction =
554 provider.transaction_by_id(tx_id)?.expect("no transaction entry");
555 assert_eq!(Some(tx_id), provider.transaction_id(transaction.hash())?);
556 }
557 }
558 }
559 None => self.ensure_no_hash_by_block(input.checkpoint().block_number)?,
560 };
561 Ok(())
562 }
563 }
564
565 impl UnwindStageTestRunner for TransactionLookupTestRunner {
566 fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
567 self.ensure_no_hash_by_block(input.unwind_to)
568 }
569 }
570}