reth_prune/segments/user/
sender_recovery.rs1use crate::{
2 db_ext::DbTxPruneExt,
3 segments::{PruneInput, Segment},
4 PrunerError,
5};
6use reth_db::{tables, transaction::DbTxMut};
7use reth_provider::{BlockReader, DBProvider, TransactionsProvider};
8use reth_prune_types::{
9 PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
10};
11use tracing::{instrument, trace};
12
13#[derive(Debug)]
14pub struct SenderRecovery {
15 mode: PruneMode,
16}
17
18impl SenderRecovery {
19 pub const fn new(mode: PruneMode) -> Self {
20 Self { mode }
21 }
22}
23
24impl<Provider> Segment<Provider> for SenderRecovery
25where
26 Provider: DBProvider<Tx: DbTxMut> + TransactionsProvider + BlockReader,
27{
28 fn segment(&self) -> PruneSegment {
29 PruneSegment::SenderRecovery
30 }
31
32 fn mode(&self) -> Option<PruneMode> {
33 Some(self.mode)
34 }
35
36 fn purpose(&self) -> PrunePurpose {
37 PrunePurpose::User
38 }
39
40 #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
41 fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
42 let tx_range = match input.get_next_tx_num_range(provider)? {
43 Some(range) => range,
44 None => {
45 trace!(target: "pruner", "No transaction senders to prune");
46 return Ok(SegmentOutput::done())
47 }
48 };
49 let tx_range_end = *tx_range.end();
50
51 let mut limiter = input.limiter;
52
53 let mut last_pruned_transaction = tx_range_end;
54 let (pruned, done) =
55 provider.tx_ref().prune_table_with_range::<tables::TransactionSenders>(
56 tx_range,
57 &mut limiter,
58 |_| false,
59 |row| last_pruned_transaction = row.0,
60 )?;
61 trace!(target: "pruner", %pruned, %done, "Pruned transaction senders");
62
63 let last_pruned_block = provider
64 .transaction_block(last_pruned_transaction)?
65 .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
66 .checked_sub(if done { 0 } else { 1 });
69
70 let progress = limiter.progress(done);
71
72 Ok(SegmentOutput {
73 progress,
74 pruned,
75 checkpoint: Some(SegmentOutputCheckpoint {
76 block_number: last_pruned_block,
77 tx_number: Some(last_pruned_transaction),
78 }),
79 })
80 }
81}
82
83#[cfg(test)]
84mod tests {
85 use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, SenderRecovery};
86 use alloy_primitives::{BlockNumber, TxNumber, B256};
87 use assert_matches::assert_matches;
88 use itertools::{
89 FoldWhile::{Continue, Done},
90 Itertools,
91 };
92 use reth_db::tables;
93 use reth_primitives_traits::SignedTransaction;
94 use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader};
95 use reth_prune_types::{PruneCheckpoint, PruneMode, PruneProgress, PruneSegment};
96 use reth_stages::test_utils::{StorageKind, TestStageDB};
97 use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
98 use std::ops::Sub;
99
100 #[test]
101 fn prune() {
102 let db = TestStageDB::default();
103 let mut rng = generators::rng();
104
105 let blocks = random_block_range(
106 &mut rng,
107 1..=10,
108 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
109 );
110 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
111
112 let mut transaction_senders = Vec::new();
113 for block in &blocks {
114 transaction_senders.reserve_exact(block.body.transactions.len());
115 for transaction in &block.body.transactions {
116 transaction_senders.push((
117 transaction_senders.len() as u64,
118 transaction.recover_signer().expect("recover signer"),
119 ));
120 }
121 }
122 let transaction_senders_len = transaction_senders.len();
123 db.insert_transaction_senders(transaction_senders).expect("insert transaction senders");
124
125 assert_eq!(
126 db.table::<tables::Transactions>().unwrap().len(),
127 blocks.iter().map(|block| block.body.transactions.len()).sum::<usize>()
128 );
129 assert_eq!(
130 db.table::<tables::Transactions>().unwrap().len(),
131 db.table::<tables::TransactionSenders>().unwrap().len()
132 );
133
134 let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
135 let prune_mode = PruneMode::Before(to_block);
136 let segment = SenderRecovery::new(prune_mode);
137 let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
138 let input = PruneInput {
139 previous_checkpoint: db
140 .factory
141 .provider()
142 .unwrap()
143 .get_prune_checkpoint(PruneSegment::SenderRecovery)
144 .unwrap(),
145 to_block,
146 limiter: limiter.clone(),
147 };
148
149 let next_tx_number_to_prune = db
150 .factory
151 .provider()
152 .unwrap()
153 .get_prune_checkpoint(PruneSegment::SenderRecovery)
154 .unwrap()
155 .and_then(|checkpoint| checkpoint.tx_number)
156 .map(|tx_number| tx_number + 1)
157 .unwrap_or_default();
158
159 let last_pruned_tx_number = blocks
160 .iter()
161 .take(to_block as usize)
162 .map(|block| block.body.transactions.len())
163 .sum::<usize>()
164 .min(
165 next_tx_number_to_prune as usize +
166 input.limiter.deleted_entries_limit().unwrap(),
167 )
168 .sub(1);
169
170 let last_pruned_block_number = blocks
171 .iter()
172 .fold_while((0, 0), |(_, mut tx_count), block| {
173 tx_count += block.body.transactions.len();
174
175 if tx_count > last_pruned_tx_number {
176 Done((block.number, tx_count))
177 } else {
178 Continue((block.number, tx_count))
179 }
180 })
181 .into_inner()
182 .0;
183
184 let provider = db.factory.database_provider_rw().unwrap();
185 let result = segment.prune(&provider, input).unwrap();
186 limiter.increment_deleted_entries_count_by(result.pruned);
187
188 assert_matches!(
189 result,
190 SegmentOutput {progress, pruned, checkpoint: Some(_)}
191 if (progress, pruned) == expected_result
192 );
193
194 segment
195 .save_checkpoint(
196 &provider,
197 result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
198 )
199 .unwrap();
200 provider.commit().expect("commit");
201
202 let last_pruned_block_number = last_pruned_block_number
203 .checked_sub(if result.progress.is_finished() { 0 } else { 1 });
204
205 assert_eq!(
206 db.table::<tables::TransactionSenders>().unwrap().len(),
207 transaction_senders_len - (last_pruned_tx_number + 1)
208 );
209 assert_eq!(
210 db.factory
211 .provider()
212 .unwrap()
213 .get_prune_checkpoint(PruneSegment::SenderRecovery)
214 .unwrap(),
215 Some(PruneCheckpoint {
216 block_number: last_pruned_block_number,
217 tx_number: Some(last_pruned_tx_number as TxNumber),
218 prune_mode
219 })
220 );
221 };
222
223 test_prune(
224 6,
225 (
226 PruneProgress::HasMoreData(
227 reth_prune_types::PruneInterruptReason::DeletedEntriesLimitReached,
228 ),
229 10,
230 ),
231 );
232 test_prune(6, (PruneProgress::Finished, 2));
233 test_prune(10, (PruneProgress::Finished, 8));
234 }
235}