reth_prune/segments/user/
sender_recovery.rs

1use crate::{
2    db_ext::DbTxPruneExt,
3    segments::{PruneInput, Segment},
4    PrunerError,
5};
6use reth_db::{tables, transaction::DbTxMut};
7use reth_provider::{BlockReader, DBProvider, TransactionsProvider};
8use reth_prune_types::{
9    PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
10};
11use tracing::{instrument, trace};
12
13#[derive(Debug)]
14pub struct SenderRecovery {
15    mode: PruneMode,
16}
17
18impl SenderRecovery {
19    pub const fn new(mode: PruneMode) -> Self {
20        Self { mode }
21    }
22}
23
24impl<Provider> Segment<Provider> for SenderRecovery
25where
26    Provider: DBProvider<Tx: DbTxMut> + TransactionsProvider + BlockReader,
27{
28    fn segment(&self) -> PruneSegment {
29        PruneSegment::SenderRecovery
30    }
31
32    fn mode(&self) -> Option<PruneMode> {
33        Some(self.mode)
34    }
35
36    fn purpose(&self) -> PrunePurpose {
37        PrunePurpose::User
38    }
39
40    #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
41    fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
42        let tx_range = match input.get_next_tx_num_range(provider)? {
43            Some(range) => range,
44            None => {
45                trace!(target: "pruner", "No transaction senders to prune");
46                return Ok(SegmentOutput::done())
47            }
48        };
49        let tx_range_end = *tx_range.end();
50
51        let mut limiter = input.limiter;
52
53        let mut last_pruned_transaction = tx_range_end;
54        let (pruned, done) =
55            provider.tx_ref().prune_table_with_range::<tables::TransactionSenders>(
56                tx_range,
57                &mut limiter,
58                |_| false,
59                |row| last_pruned_transaction = row.0,
60            )?;
61        trace!(target: "pruner", %pruned, %done, "Pruned transaction senders");
62
63        let last_pruned_block = provider
64            .transaction_block(last_pruned_transaction)?
65            .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
66            // If there's more transaction senders to prune, set the checkpoint block number to
67            // previous, so we could finish pruning its transaction senders on the next run.
68            .checked_sub(if done { 0 } else { 1 });
69
70        let progress = limiter.progress(done);
71
72        Ok(SegmentOutput {
73            progress,
74            pruned,
75            checkpoint: Some(SegmentOutputCheckpoint {
76                block_number: last_pruned_block,
77                tx_number: Some(last_pruned_transaction),
78            }),
79        })
80    }
81}
82
83#[cfg(test)]
84mod tests {
85    use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, SenderRecovery};
86    use alloy_primitives::{BlockNumber, TxNumber, B256};
87    use assert_matches::assert_matches;
88    use itertools::{
89        FoldWhile::{Continue, Done},
90        Itertools,
91    };
92    use reth_db::tables;
93    use reth_primitives_traits::SignedTransaction;
94    use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader};
95    use reth_prune_types::{PruneCheckpoint, PruneMode, PruneProgress, PruneSegment};
96    use reth_stages::test_utils::{StorageKind, TestStageDB};
97    use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
98    use std::ops::Sub;
99
100    #[test]
101    fn prune() {
102        let db = TestStageDB::default();
103        let mut rng = generators::rng();
104
105        let blocks = random_block_range(
106            &mut rng,
107            1..=10,
108            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
109        );
110        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
111
112        let mut transaction_senders = Vec::new();
113        for block in &blocks {
114            transaction_senders.reserve_exact(block.body.transactions.len());
115            for transaction in &block.body.transactions {
116                transaction_senders.push((
117                    transaction_senders.len() as u64,
118                    transaction.recover_signer().expect("recover signer"),
119                ));
120            }
121        }
122        let transaction_senders_len = transaction_senders.len();
123        db.insert_transaction_senders(transaction_senders).expect("insert transaction senders");
124
125        assert_eq!(
126            db.table::<tables::Transactions>().unwrap().len(),
127            blocks.iter().map(|block| block.body.transactions.len()).sum::<usize>()
128        );
129        assert_eq!(
130            db.table::<tables::Transactions>().unwrap().len(),
131            db.table::<tables::TransactionSenders>().unwrap().len()
132        );
133
134        let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
135            let prune_mode = PruneMode::Before(to_block);
136            let segment = SenderRecovery::new(prune_mode);
137            let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
138            let input = PruneInput {
139                previous_checkpoint: db
140                    .factory
141                    .provider()
142                    .unwrap()
143                    .get_prune_checkpoint(PruneSegment::SenderRecovery)
144                    .unwrap(),
145                to_block,
146                limiter: limiter.clone(),
147            };
148
149            let next_tx_number_to_prune = db
150                .factory
151                .provider()
152                .unwrap()
153                .get_prune_checkpoint(PruneSegment::SenderRecovery)
154                .unwrap()
155                .and_then(|checkpoint| checkpoint.tx_number)
156                .map(|tx_number| tx_number + 1)
157                .unwrap_or_default();
158
159            let last_pruned_tx_number = blocks
160                .iter()
161                .take(to_block as usize)
162                .map(|block| block.body.transactions.len())
163                .sum::<usize>()
164                .min(
165                    next_tx_number_to_prune as usize +
166                        input.limiter.deleted_entries_limit().unwrap(),
167                )
168                .sub(1);
169
170            let last_pruned_block_number = blocks
171                .iter()
172                .fold_while((0, 0), |(_, mut tx_count), block| {
173                    tx_count += block.body.transactions.len();
174
175                    if tx_count > last_pruned_tx_number {
176                        Done((block.number, tx_count))
177                    } else {
178                        Continue((block.number, tx_count))
179                    }
180                })
181                .into_inner()
182                .0;
183
184            let provider = db.factory.database_provider_rw().unwrap();
185            let result = segment.prune(&provider, input).unwrap();
186            limiter.increment_deleted_entries_count_by(result.pruned);
187
188            assert_matches!(
189                result,
190                SegmentOutput {progress, pruned, checkpoint: Some(_)}
191                    if (progress, pruned) == expected_result
192            );
193
194            segment
195                .save_checkpoint(
196                    &provider,
197                    result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
198                )
199                .unwrap();
200            provider.commit().expect("commit");
201
202            let last_pruned_block_number = last_pruned_block_number
203                .checked_sub(if result.progress.is_finished() { 0 } else { 1 });
204
205            assert_eq!(
206                db.table::<tables::TransactionSenders>().unwrap().len(),
207                transaction_senders_len - (last_pruned_tx_number + 1)
208            );
209            assert_eq!(
210                db.factory
211                    .provider()
212                    .unwrap()
213                    .get_prune_checkpoint(PruneSegment::SenderRecovery)
214                    .unwrap(),
215                Some(PruneCheckpoint {
216                    block_number: last_pruned_block_number,
217                    tx_number: Some(last_pruned_tx_number as TxNumber),
218                    prune_mode
219                })
220            );
221        };
222
223        test_prune(
224            6,
225            (
226                PruneProgress::HasMoreData(
227                    reth_prune_types::PruneInterruptReason::DeletedEntriesLimitReached,
228                ),
229                10,
230            ),
231        );
232        test_prune(6, (PruneProgress::Finished, 2));
233        test_prune(10, (PruneProgress::Finished, 8));
234    }
235}