reth_prune/segments/user/
receipts_by_logs.rs1use crate::{
2 db_ext::DbTxPruneExt,
3 segments::{PruneInput, Segment},
4 PrunerError,
5};
6use alloy_consensus::TxReceipt;
7use reth_db::{table::Value, tables, transaction::DbTxMut};
8use reth_primitives_traits::NodePrimitives;
9use reth_provider::{
10 BlockReader, DBProvider, NodePrimitivesProvider, PruneCheckpointWriter, TransactionsProvider,
11};
12use reth_prune_types::{
13 PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, ReceiptsLogPruneConfig, SegmentOutput,
14 MINIMUM_PRUNING_DISTANCE,
15};
16use tracing::{instrument, trace};
17#[derive(Debug)]
18pub struct ReceiptsByLogs {
19 config: ReceiptsLogPruneConfig,
20}
21
22impl ReceiptsByLogs {
23 pub const fn new(config: ReceiptsLogPruneConfig) -> Self {
24 Self { config }
25 }
26}
27
28impl<Provider> Segment<Provider> for ReceiptsByLogs
29where
30 Provider: DBProvider<Tx: DbTxMut>
31 + PruneCheckpointWriter
32 + TransactionsProvider
33 + BlockReader
34 + NodePrimitivesProvider<Primitives: NodePrimitives<Receipt: Value>>,
35{
36 fn segment(&self) -> PruneSegment {
37 PruneSegment::ContractLogs
38 }
39
40 fn mode(&self) -> Option<PruneMode> {
41 None
42 }
43
44 fn purpose(&self) -> PrunePurpose {
45 PrunePurpose::User
46 }
47
48 #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
49 fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
50 let to_block = PruneMode::Distance(MINIMUM_PRUNING_DISTANCE)
54 .prune_target_block(input.to_block, PruneSegment::ContractLogs, PrunePurpose::User)?
55 .map(|(bn, _)| bn)
56 .unwrap_or_default();
57
58 let mut last_pruned_block =
60 input.previous_checkpoint.and_then(|checkpoint| checkpoint.block_number);
61
62 let initial_last_pruned_block = last_pruned_block;
63
64 let mut from_tx_number = match initial_last_pruned_block {
65 Some(block) => provider
66 .block_body_indices(block)?
67 .map(|block| block.last_tx_num() + 1)
68 .unwrap_or(0),
69 None => 0,
70 };
71
72 let address_filter = self.config.group_by_block(input.to_block, last_pruned_block)?;
75
76 let mut block_ranges = vec![];
97 let mut blocks_iter = address_filter.iter().peekable();
98 let mut filtered_addresses = vec![];
99
100 while let Some((start_block, addresses)) = blocks_iter.next() {
101 filtered_addresses.extend_from_slice(addresses);
102
103 if block_ranges.is_empty() {
106 let init = last_pruned_block.map(|b| b + 1).unwrap_or_default();
107 if init < *start_block {
108 block_ranges.push((init, *start_block - 1, 0));
109 }
110 }
111
112 let end_block =
113 blocks_iter.peek().map(|(next_block, _)| *next_block - 1).unwrap_or(to_block);
114
115 block_ranges.push((*start_block, end_block, filtered_addresses.len()));
118 }
119
120 trace!(
121 target: "pruner",
122 ?block_ranges,
123 ?filtered_addresses,
124 "Calculated block ranges and filtered addresses",
125 );
126
127 let mut limiter = input.limiter;
128
129 let mut done = true;
130 let mut pruned = 0;
131 let mut last_pruned_transaction = None;
132 for (start_block, end_block, num_addresses) in block_ranges {
133 let block_range = start_block..=end_block;
134
135 let tx_range_end = match provider.block_body_indices(end_block)? {
137 Some(body) => body.last_tx_num(),
138 None => {
139 trace!(
140 target: "pruner",
141 ?block_range,
142 "No receipts to prune."
143 );
144 continue
145 }
146 };
147 let tx_range = from_tx_number..=tx_range_end;
148
149 let mut last_skipped_transaction = 0;
151 let deleted;
152 (deleted, done) = provider.tx_ref().prune_table_with_range::<tables::Receipts<
153 <Provider::Primitives as NodePrimitives>::Receipt,
154 >>(
155 tx_range,
156 &mut limiter,
157 |(tx_num, receipt)| {
158 let skip = num_addresses > 0 &&
159 receipt.logs().iter().any(|log| {
160 filtered_addresses[..num_addresses].contains(&&log.address)
161 });
162
163 if skip {
164 last_skipped_transaction = *tx_num;
165 }
166 skip
167 },
168 |row| last_pruned_transaction = Some(row.0),
169 )?;
170
171 trace!(target: "pruner", %deleted, %done, ?block_range, "Pruned receipts");
172
173 pruned += deleted;
174
175 let last_pruned_transaction = *last_pruned_transaction
179 .insert(last_pruned_transaction.unwrap_or_default().max(last_skipped_transaction));
180
181 last_pruned_block = Some(
182 provider
183 .transaction_block(last_pruned_transaction)?
184 .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
185 .saturating_sub(if done { 0 } else { 1 }),
189 );
190
191 if limiter.is_limit_reached() {
192 done &= end_block == to_block;
193 break
194 }
195
196 from_tx_number = last_pruned_transaction + 1;
197 }
198
199 let prune_mode_block = self
209 .config
210 .lowest_block_with_distance(input.to_block, initial_last_pruned_block)?
211 .unwrap_or(to_block);
212
213 provider.save_prune_checkpoint(
214 PruneSegment::ContractLogs,
215 PruneCheckpoint {
216 block_number: Some(prune_mode_block.min(last_pruned_block.unwrap_or(u64::MAX))),
217 tx_number: last_pruned_transaction,
218 prune_mode: PruneMode::Before(prune_mode_block),
219 },
220 )?;
221
222 let progress = limiter.progress(done);
223
224 Ok(SegmentOutput { progress, pruned, checkpoint: None })
225 }
226}
227
228#[cfg(test)]
229mod tests {
230 use crate::segments::{PruneInput, PruneLimiter, ReceiptsByLogs, Segment};
231 use alloy_primitives::B256;
232 use assert_matches::assert_matches;
233 use reth_db::tables;
234 use reth_db_api::{cursor::DbCursorRO, transaction::DbTx};
235 use reth_primitives_traits::InMemorySize;
236 use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader, TransactionsProvider};
237 use reth_prune_types::{PruneMode, PruneSegment, ReceiptsLogPruneConfig};
238 use reth_stages::test_utils::{StorageKind, TestStageDB};
239 use reth_testing_utils::generators::{
240 self, random_block_range, random_eoa_account, random_log, random_receipt, BlockRangeParams,
241 };
242 use std::collections::BTreeMap;
243
244 #[test]
245 fn prune_receipts_by_logs() {
246 reth_tracing::init_test_tracing();
247
248 let db = TestStageDB::default();
249 let mut rng = generators::rng();
250
251 let tip = 20000;
252 let blocks = [
253 random_block_range(
254 &mut rng,
255 0..=100,
256 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 1..5, ..Default::default() },
257 ),
258 random_block_range(
259 &mut rng,
260 (100 + 1)..=(tip - 100),
261 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
262 ),
263 random_block_range(
264 &mut rng,
265 (tip - 100 + 1)..=tip,
266 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 1..5, ..Default::default() },
267 ),
268 ]
269 .concat();
270 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
271
272 let mut receipts = Vec::new();
273
274 let (deposit_contract_addr, _) = random_eoa_account(&mut rng);
275 for block in &blocks {
276 receipts.reserve_exact(block.body.size());
277 for (txi, transaction) in block.body.transactions.iter().enumerate() {
278 let mut receipt = random_receipt(&mut rng, transaction, Some(1));
279 receipt.logs.push(random_log(
280 &mut rng,
281 (txi == (block.body.transactions.len() - 1)).then_some(deposit_contract_addr),
282 Some(1),
283 ));
284 receipts.push((receipts.len() as u64, receipt));
285 }
286 }
287 db.insert_receipts(receipts).expect("insert receipts");
288
289 assert_eq!(
290 db.table::<tables::Transactions>().unwrap().len(),
291 blocks.iter().map(|block| block.body.transactions.len()).sum::<usize>()
292 );
293 assert_eq!(
294 db.table::<tables::Transactions>().unwrap().len(),
295 db.table::<tables::Receipts>().unwrap().len()
296 );
297
298 let run_prune = || {
299 let provider = db.factory.database_provider_rw().unwrap();
300
301 let prune_before_block: usize = 20;
302 let prune_mode = PruneMode::Before(prune_before_block as u64);
303 let receipts_log_filter =
304 ReceiptsLogPruneConfig(BTreeMap::from([(deposit_contract_addr, prune_mode)]));
305
306 let limiter = PruneLimiter::default().set_deleted_entries_limit(10);
307
308 let result = ReceiptsByLogs::new(receipts_log_filter).prune(
309 &provider,
310 PruneInput {
311 previous_checkpoint: db
312 .factory
313 .provider()
314 .unwrap()
315 .get_prune_checkpoint(PruneSegment::ContractLogs)
316 .unwrap(),
317 to_block: tip,
318 limiter,
319 },
320 );
321 provider.commit().expect("commit");
322
323 assert_matches!(result, Ok(_));
324 let output = result.unwrap();
325
326 let (pruned_block, pruned_tx) = db
327 .factory
328 .provider()
329 .unwrap()
330 .get_prune_checkpoint(PruneSegment::ContractLogs)
331 .unwrap()
332 .map(|checkpoint| (checkpoint.block_number.unwrap(), checkpoint.tx_number.unwrap()))
333 .unwrap_or_default();
334
335 let unprunable = pruned_block.saturating_sub(prune_before_block as u64 - 1);
337
338 assert_eq!(
339 db.table::<tables::Receipts>().unwrap().len(),
340 blocks.iter().map(|block| block.body.transactions.len()).sum::<usize>() -
341 ((pruned_tx + 1) - unprunable) as usize
342 );
343
344 output.progress.is_finished()
345 };
346
347 while !run_prune() {}
348
349 let provider = db.factory.provider().unwrap();
350 let mut cursor = provider.tx_ref().cursor_read::<tables::Receipts>().unwrap();
351 let walker = cursor.walk(None).unwrap();
352 for receipt in walker {
353 let (tx_num, receipt) = receipt.unwrap();
354
355 assert!(
358 receipt.logs.iter().any(|l| l.address == deposit_contract_addr) ||
359 provider.transaction_block(tx_num).unwrap().unwrap() > tip - 128,
360 );
361 }
362 }
363}