reth_prune/segments/user/
storage_history.rs

1use crate::{
2    db_ext::DbTxPruneExt,
3    segments::{user::history::prune_history_indices, PruneInput, Segment, SegmentOutput},
4    PrunerError,
5};
6use itertools::Itertools;
7use reth_db::{tables, transaction::DbTxMut};
8use reth_db_api::models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress};
9use reth_provider::DBProvider;
10use reth_prune_types::{PruneMode, PrunePurpose, PruneSegment, SegmentOutputCheckpoint};
11use rustc_hash::FxHashMap;
12use tracing::{instrument, trace};
13
14/// Number of storage history tables to prune in one step
15///
16/// Storage History consists of two tables: [`tables::StorageChangeSets`] and
17/// [`tables::StoragesHistory`]. We want to prune them to the same block number.
18const STORAGE_HISTORY_TABLES_TO_PRUNE: usize = 2;
19
20#[derive(Debug)]
21pub struct StorageHistory {
22    mode: PruneMode,
23}
24
25impl StorageHistory {
26    pub const fn new(mode: PruneMode) -> Self {
27        Self { mode }
28    }
29}
30
31impl<Provider> Segment<Provider> for StorageHistory
32where
33    Provider: DBProvider<Tx: DbTxMut>,
34{
35    fn segment(&self) -> PruneSegment {
36        PruneSegment::StorageHistory
37    }
38
39    fn mode(&self) -> Option<PruneMode> {
40        Some(self.mode)
41    }
42
43    fn purpose(&self) -> PrunePurpose {
44        PrunePurpose::User
45    }
46
47    #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
48    fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
49        let range = match input.get_next_block_range() {
50            Some(range) => range,
51            None => {
52                trace!(target: "pruner", "No storage history to prune");
53                return Ok(SegmentOutput::done())
54            }
55        };
56        let range_end = *range.end();
57
58        let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
59            input.limiter.set_deleted_entries_limit(limit / STORAGE_HISTORY_TABLES_TO_PRUNE)
60        } else {
61            input.limiter
62        };
63        if limiter.is_limit_reached() {
64            return Ok(SegmentOutput::not_done(
65                limiter.interrupt_reason(),
66                input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
67            ))
68        }
69
70        let mut last_changeset_pruned_block = None;
71        // Deleted storage changeset keys (account addresses and storage slots) with the highest
72        // block number deleted for that key.
73        //
74        // The size of this map it's limited by `prune_delete_limit * blocks_since_last_run /
75        // ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with current default it's usually `3500 * 5
76        // / 2`, so 8750 entries. Each entry is `160 bit + 256 bit + 64 bit`, so the total
77        // size should be up to 0.5MB + some hashmap overhead. `blocks_since_last_run` is
78        // additionally limited by the `max_reorg_depth`, so no OOM is expected here.
79        let mut highest_deleted_storages = FxHashMap::default();
80        let (pruned_changesets, done) =
81            provider.tx_ref().prune_table_with_range::<tables::StorageChangeSets>(
82                BlockNumberAddress::range(range),
83                &mut limiter,
84                |_| false,
85                |(BlockNumberAddress((block_number, address)), entry)| {
86                    highest_deleted_storages.insert((address, entry.key), block_number);
87                    last_changeset_pruned_block = Some(block_number);
88                },
89            )?;
90        trace!(target: "pruner", deleted = %pruned_changesets, %done, "Pruned storage history (changesets)");
91
92        let last_changeset_pruned_block = last_changeset_pruned_block
93            // If there's more storage changesets to prune, set the checkpoint block number to
94            // previous, so we could finish pruning its storage changesets on the next run.
95            .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
96            .unwrap_or(range_end);
97
98        // Sort highest deleted block numbers by account address and storage key and turn them into
99        // sharded keys.
100        // We did not use `BTreeMap` from the beginning, because it's inefficient for hashes.
101        let highest_sharded_keys = highest_deleted_storages
102            .into_iter()
103            .sorted_unstable() // Unstable is fine because no equal keys exist in the map
104            .map(|((address, storage_key), block_number)| {
105                StorageShardedKey::new(
106                    address,
107                    storage_key,
108                    block_number.min(last_changeset_pruned_block),
109                )
110            });
111        let outcomes = prune_history_indices::<Provider, tables::StoragesHistory, _>(
112            provider,
113            highest_sharded_keys,
114            |a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key,
115        )?;
116        trace!(target: "pruner", ?outcomes, %done, "Pruned storage history (indices)");
117
118        let progress = limiter.progress(done);
119
120        Ok(SegmentOutput {
121            progress,
122            pruned: pruned_changesets + outcomes.deleted,
123            checkpoint: Some(SegmentOutputCheckpoint {
124                block_number: Some(last_changeset_pruned_block),
125                tx_number: None,
126            }),
127        })
128    }
129}
130
131#[cfg(test)]
132mod tests {
133    use crate::segments::{
134        user::storage_history::STORAGE_HISTORY_TABLES_TO_PRUNE, PruneInput, PruneLimiter, Segment,
135        SegmentOutput, StorageHistory,
136    };
137    use alloy_primitives::{BlockNumber, B256};
138    use assert_matches::assert_matches;
139    use reth_db::{tables, BlockNumberList};
140    use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader};
141    use reth_prune_types::{PruneCheckpoint, PruneMode, PruneProgress, PruneSegment};
142    use reth_stages::test_utils::{StorageKind, TestStageDB};
143    use reth_testing_utils::generators::{
144        self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
145    };
146    use std::{collections::BTreeMap, ops::AddAssign};
147
148    #[test]
149    fn prune() {
150        let db = TestStageDB::default();
151        let mut rng = generators::rng();
152
153        let blocks = random_block_range(
154            &mut rng,
155            0..=5000,
156            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
157        );
158        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
159
160        let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
161
162        let (changesets, _) = random_changeset_range(
163            &mut rng,
164            blocks.iter(),
165            accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
166            1..2,
167            1..2,
168        );
169        db.insert_changesets(changesets.clone(), None).expect("insert changesets");
170        db.insert_history(changesets.clone(), None).expect("insert history");
171
172        let storage_occurrences = db.table::<tables::StoragesHistory>().unwrap().into_iter().fold(
173            BTreeMap::<_, usize>::new(),
174            |mut map, (key, _)| {
175                map.entry((key.address, key.sharded_key.key)).or_default().add_assign(1);
176                map
177            },
178        );
179        assert!(storage_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
180
181        assert_eq!(
182            db.table::<tables::StorageChangeSets>().unwrap().len(),
183            changesets.iter().flatten().flat_map(|(_, _, entries)| entries).count()
184        );
185
186        let original_shards = db.table::<tables::StoragesHistory>().unwrap();
187
188        let test_prune = |to_block: BlockNumber,
189                          run: usize,
190                          expected_result: (PruneProgress, usize)| {
191            let prune_mode = PruneMode::Before(to_block);
192            let deleted_entries_limit = 1000;
193            let mut limiter =
194                PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
195            let input = PruneInput {
196                previous_checkpoint: db
197                    .factory
198                    .provider()
199                    .unwrap()
200                    .get_prune_checkpoint(PruneSegment::StorageHistory)
201                    .unwrap(),
202                to_block,
203                limiter: limiter.clone(),
204            };
205            let segment = StorageHistory::new(prune_mode);
206
207            let provider = db.factory.database_provider_rw().unwrap();
208            let result = segment.prune(&provider, input).unwrap();
209            limiter.increment_deleted_entries_count_by(result.pruned);
210
211            assert_matches!(
212                result,
213                SegmentOutput {progress, pruned, checkpoint: Some(_)}
214                    if (progress, pruned) == expected_result
215            );
216
217            segment
218                .save_checkpoint(
219                    &provider,
220                    result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
221                )
222                .unwrap();
223            provider.commit().expect("commit");
224
225            let changesets = changesets
226                .iter()
227                .enumerate()
228                .flat_map(|(block_number, changeset)| {
229                    changeset.iter().flat_map(move |(address, _, entries)| {
230                        entries.iter().map(move |entry| (block_number, address, entry))
231                    })
232                })
233                .collect::<Vec<_>>();
234
235            #[allow(clippy::skip_while_next)]
236            let pruned = changesets
237                .iter()
238                .enumerate()
239                .skip_while(|(i, (block_number, _, _))| {
240                    *i < deleted_entries_limit / STORAGE_HISTORY_TABLES_TO_PRUNE * run &&
241                        *block_number <= to_block as usize
242                })
243                .next()
244                .map(|(i, _)| i)
245                .unwrap_or_default();
246
247            let mut pruned_changesets = changesets
248                .iter()
249                // Skip what we've pruned so far, subtracting one to get last pruned block number
250                // further down
251                .skip(pruned.saturating_sub(1));
252
253            let last_pruned_block_number = pruned_changesets
254                .next()
255                .map(|(block_number, _, _)| if result.progress.is_finished() {
256                    *block_number
257                } else {
258                    block_number.saturating_sub(1)
259                } as BlockNumber)
260                .unwrap_or(to_block);
261
262            let pruned_changesets = pruned_changesets.fold(
263                BTreeMap::<_, Vec<_>>::new(),
264                |mut acc, (block_number, address, entry)| {
265                    acc.entry((block_number, address)).or_default().push(entry);
266                    acc
267                },
268            );
269
270            assert_eq!(
271                db.table::<tables::StorageChangeSets>().unwrap().len(),
272                pruned_changesets.values().flatten().count()
273            );
274
275            let actual_shards = db.table::<tables::StoragesHistory>().unwrap();
276
277            let expected_shards = original_shards
278                .iter()
279                .filter(|(key, _)| key.sharded_key.highest_block_number > last_pruned_block_number)
280                .map(|(key, blocks)| {
281                    let new_blocks =
282                        blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
283                    (key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
284                })
285                .collect::<Vec<_>>();
286
287            assert_eq!(actual_shards, expected_shards);
288
289            assert_eq!(
290                db.factory
291                    .provider()
292                    .unwrap()
293                    .get_prune_checkpoint(PruneSegment::StorageHistory)
294                    .unwrap(),
295                Some(PruneCheckpoint {
296                    block_number: Some(last_pruned_block_number),
297                    tx_number: None,
298                    prune_mode
299                })
300            );
301        };
302
303        test_prune(
304            998,
305            1,
306            (
307                PruneProgress::HasMoreData(
308                    reth_prune_types::PruneInterruptReason::DeletedEntriesLimitReached,
309                ),
310                500,
311            ),
312        );
313        test_prune(998, 2, (PruneProgress::Finished, 499));
314        test_prune(1200, 3, (PruneProgress::Finished, 202));
315    }
316}