reth_prune/segments/user/
account_history.rs

1use crate::{
2    db_ext::DbTxPruneExt,
3    segments::{user::history::prune_history_indices, PruneInput, Segment},
4    PrunerError,
5};
6use itertools::Itertools;
7use reth_db::{tables, transaction::DbTxMut};
8use reth_db_api::models::ShardedKey;
9use reth_provider::DBProvider;
10use reth_prune_types::{
11    PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
12};
13use rustc_hash::FxHashMap;
14use tracing::{instrument, trace};
15
16/// Number of account history tables to prune in one step.
17///
18/// Account History consists of two tables: [`tables::AccountChangeSets`] and
19/// [`tables::AccountsHistory`]. We want to prune them to the same block number.
20const ACCOUNT_HISTORY_TABLES_TO_PRUNE: usize = 2;
21
22#[derive(Debug)]
23pub struct AccountHistory {
24    mode: PruneMode,
25}
26
27impl AccountHistory {
28    pub const fn new(mode: PruneMode) -> Self {
29        Self { mode }
30    }
31}
32
33impl<Provider> Segment<Provider> for AccountHistory
34where
35    Provider: DBProvider<Tx: DbTxMut>,
36{
37    fn segment(&self) -> PruneSegment {
38        PruneSegment::AccountHistory
39    }
40
41    fn mode(&self) -> Option<PruneMode> {
42        Some(self.mode)
43    }
44
45    fn purpose(&self) -> PrunePurpose {
46        PrunePurpose::User
47    }
48
49    #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
50    fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
51        let range = match input.get_next_block_range() {
52            Some(range) => range,
53            None => {
54                trace!(target: "pruner", "No account history to prune");
55                return Ok(SegmentOutput::done())
56            }
57        };
58        let range_end = *range.end();
59
60        let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
61            input.limiter.set_deleted_entries_limit(limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE)
62        } else {
63            input.limiter
64        };
65        if limiter.is_limit_reached() {
66            return Ok(SegmentOutput::not_done(
67                limiter.interrupt_reason(),
68                input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
69            ))
70        }
71
72        let mut last_changeset_pruned_block = None;
73        // Deleted account changeset keys (account addresses) with the highest block number deleted
74        // for that key.
75        //
76        // The size of this map it's limited by `prune_delete_limit * blocks_since_last_run /
77        // ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with current default it's usually `3500 * 5
78        // / 2`, so 8750 entries. Each entry is `160 bit + 256 bit + 64 bit`, so the total
79        // size should be up to 0.5MB + some hashmap overhead. `blocks_since_last_run` is
80        // additionally limited by the `max_reorg_depth`, so no OOM is expected here.
81        let mut highest_deleted_accounts = FxHashMap::default();
82        let (pruned_changesets, done) =
83            provider.tx_ref().prune_table_with_range::<tables::AccountChangeSets>(
84                range,
85                &mut limiter,
86                |_| false,
87                |(block_number, account)| {
88                    highest_deleted_accounts.insert(account.address, block_number);
89                    last_changeset_pruned_block = Some(block_number);
90                },
91            )?;
92        trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets)");
93
94        let last_changeset_pruned_block = last_changeset_pruned_block
95            // If there's more account changesets to prune, set the checkpoint block number to
96            // previous, so we could finish pruning its account changesets on the next run.
97            .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
98            .unwrap_or(range_end);
99
100        // Sort highest deleted block numbers by account address and turn them into sharded keys.
101        // We did not use `BTreeMap` from the beginning, because it's inefficient for hashes.
102        let highest_sharded_keys = highest_deleted_accounts
103            .into_iter()
104            .sorted_unstable() // Unstable is fine because no equal keys exist in the map
105            .map(|(address, block_number)| {
106                ShardedKey::new(address, block_number.min(last_changeset_pruned_block))
107            });
108        let outcomes = prune_history_indices::<Provider, tables::AccountsHistory, _>(
109            provider,
110            highest_sharded_keys,
111            |a, b| a.key == b.key,
112        )?;
113        trace!(target: "pruner", ?outcomes, %done, "Pruned account history (indices)");
114
115        let progress = limiter.progress(done);
116
117        Ok(SegmentOutput {
118            progress,
119            pruned: pruned_changesets + outcomes.deleted,
120            checkpoint: Some(SegmentOutputCheckpoint {
121                block_number: Some(last_changeset_pruned_block),
122                tx_number: None,
123            }),
124        })
125    }
126}
127
128#[cfg(test)]
129mod tests {
130    use crate::segments::{
131        user::account_history::ACCOUNT_HISTORY_TABLES_TO_PRUNE, AccountHistory, PruneInput,
132        PruneLimiter, Segment, SegmentOutput,
133    };
134    use alloy_primitives::{BlockNumber, B256};
135    use assert_matches::assert_matches;
136    use reth_db::{tables, BlockNumberList};
137    use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader};
138    use reth_prune_types::{
139        PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
140    };
141    use reth_stages::test_utils::{StorageKind, TestStageDB};
142    use reth_testing_utils::generators::{
143        self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
144    };
145    use std::{collections::BTreeMap, ops::AddAssign};
146
147    #[test]
148    fn prune() {
149        let db = TestStageDB::default();
150        let mut rng = generators::rng();
151
152        let blocks = random_block_range(
153            &mut rng,
154            1..=5000,
155            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
156        );
157        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
158
159        let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
160
161        let (changesets, _) = random_changeset_range(
162            &mut rng,
163            blocks.iter(),
164            accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
165            0..0,
166            0..0,
167        );
168        db.insert_changesets(changesets.clone(), None).expect("insert changesets");
169        db.insert_history(changesets.clone(), None).expect("insert history");
170
171        let account_occurrences = db.table::<tables::AccountsHistory>().unwrap().into_iter().fold(
172            BTreeMap::<_, usize>::new(),
173            |mut map, (key, _)| {
174                map.entry(key.key).or_default().add_assign(1);
175                map
176            },
177        );
178        assert!(account_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
179
180        assert_eq!(
181            db.table::<tables::AccountChangeSets>().unwrap().len(),
182            changesets.iter().flatten().count()
183        );
184
185        let original_shards = db.table::<tables::AccountsHistory>().unwrap();
186
187        let test_prune =
188            |to_block: BlockNumber, run: usize, expected_result: (PruneProgress, usize)| {
189                let prune_mode = PruneMode::Before(to_block);
190                let deleted_entries_limit = 2000;
191                let mut limiter =
192                    PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
193                let input = PruneInput {
194                    previous_checkpoint: db
195                        .factory
196                        .provider()
197                        .unwrap()
198                        .get_prune_checkpoint(PruneSegment::AccountHistory)
199                        .unwrap(),
200                    to_block,
201                    limiter: limiter.clone(),
202                };
203                let segment = AccountHistory::new(prune_mode);
204
205                let provider = db.factory.database_provider_rw().unwrap();
206                let result = segment.prune(&provider, input).unwrap();
207                limiter.increment_deleted_entries_count_by(result.pruned);
208
209                assert_matches!(
210                    result,
211                    SegmentOutput {progress, pruned, checkpoint: Some(_)}
212                        if (progress, pruned) == expected_result
213                );
214
215                segment
216                    .save_checkpoint(
217                        &provider,
218                        result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
219                    )
220                    .unwrap();
221                provider.commit().expect("commit");
222
223                let changesets = changesets
224                    .iter()
225                    .enumerate()
226                    .flat_map(|(block_number, changeset)| {
227                        changeset.iter().map(move |change| (block_number, change))
228                    })
229                    .collect::<Vec<_>>();
230
231                #[allow(clippy::skip_while_next)]
232                let pruned = changesets
233                    .iter()
234                    .enumerate()
235                    .skip_while(|(i, (block_number, _))| {
236                        *i < deleted_entries_limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE * run &&
237                            *block_number <= to_block as usize
238                    })
239                    .next()
240                    .map(|(i, _)| i)
241                    .unwrap_or_default();
242
243                let mut pruned_changesets = changesets
244                    .iter()
245                    // Skip what we've pruned so far, subtracting one to get last pruned block
246                    // number further down
247                    .skip(pruned.saturating_sub(1));
248
249                let last_pruned_block_number = pruned_changesets
250                .next()
251                .map(|(block_number, _)| if result.progress.is_finished() {
252                    *block_number
253                } else {
254                    block_number.saturating_sub(1)
255                } as BlockNumber)
256                .unwrap_or(to_block);
257
258                let pruned_changesets = pruned_changesets.fold(
259                    BTreeMap::<_, Vec<_>>::new(),
260                    |mut acc, (block_number, change)| {
261                        acc.entry(block_number).or_default().push(change);
262                        acc
263                    },
264                );
265
266                assert_eq!(
267                    db.table::<tables::AccountChangeSets>().unwrap().len(),
268                    pruned_changesets.values().flatten().count()
269                );
270
271                let actual_shards = db.table::<tables::AccountsHistory>().unwrap();
272
273                let expected_shards = original_shards
274                    .iter()
275                    .filter(|(key, _)| key.highest_block_number > last_pruned_block_number)
276                    .map(|(key, blocks)| {
277                        let new_blocks =
278                            blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
279                        (key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
280                    })
281                    .collect::<Vec<_>>();
282
283                assert_eq!(actual_shards, expected_shards);
284
285                assert_eq!(
286                    db.factory
287                        .provider()
288                        .unwrap()
289                        .get_prune_checkpoint(PruneSegment::AccountHistory)
290                        .unwrap(),
291                    Some(PruneCheckpoint {
292                        block_number: Some(last_pruned_block_number),
293                        tx_number: None,
294                        prune_mode
295                    })
296                );
297            };
298
299        test_prune(
300            998,
301            1,
302            (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 1000),
303        );
304        test_prune(998, 2, (PruneProgress::Finished, 998));
305        test_prune(1400, 3, (PruneProgress::Finished, 804));
306    }
307}