reth_prune/segments/user/
account_history.rs1use crate::{
2 db_ext::DbTxPruneExt,
3 segments::{user::history::prune_history_indices, PruneInput, Segment},
4 PrunerError,
5};
6use itertools::Itertools;
7use reth_db::{tables, transaction::DbTxMut};
8use reth_db_api::models::ShardedKey;
9use reth_provider::DBProvider;
10use reth_prune_types::{
11 PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
12};
13use rustc_hash::FxHashMap;
14use tracing::{instrument, trace};
15
16const ACCOUNT_HISTORY_TABLES_TO_PRUNE: usize = 2;
21
22#[derive(Debug)]
23pub struct AccountHistory {
24 mode: PruneMode,
25}
26
27impl AccountHistory {
28 pub const fn new(mode: PruneMode) -> Self {
29 Self { mode }
30 }
31}
32
33impl<Provider> Segment<Provider> for AccountHistory
34where
35 Provider: DBProvider<Tx: DbTxMut>,
36{
37 fn segment(&self) -> PruneSegment {
38 PruneSegment::AccountHistory
39 }
40
41 fn mode(&self) -> Option<PruneMode> {
42 Some(self.mode)
43 }
44
45 fn purpose(&self) -> PrunePurpose {
46 PrunePurpose::User
47 }
48
49 #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
50 fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
51 let range = match input.get_next_block_range() {
52 Some(range) => range,
53 None => {
54 trace!(target: "pruner", "No account history to prune");
55 return Ok(SegmentOutput::done())
56 }
57 };
58 let range_end = *range.end();
59
60 let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
61 input.limiter.set_deleted_entries_limit(limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE)
62 } else {
63 input.limiter
64 };
65 if limiter.is_limit_reached() {
66 return Ok(SegmentOutput::not_done(
67 limiter.interrupt_reason(),
68 input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
69 ))
70 }
71
72 let mut last_changeset_pruned_block = None;
73 let mut highest_deleted_accounts = FxHashMap::default();
82 let (pruned_changesets, done) =
83 provider.tx_ref().prune_table_with_range::<tables::AccountChangeSets>(
84 range,
85 &mut limiter,
86 |_| false,
87 |(block_number, account)| {
88 highest_deleted_accounts.insert(account.address, block_number);
89 last_changeset_pruned_block = Some(block_number);
90 },
91 )?;
92 trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets)");
93
94 let last_changeset_pruned_block = last_changeset_pruned_block
95 .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
98 .unwrap_or(range_end);
99
100 let highest_sharded_keys = highest_deleted_accounts
103 .into_iter()
104 .sorted_unstable() .map(|(address, block_number)| {
106 ShardedKey::new(address, block_number.min(last_changeset_pruned_block))
107 });
108 let outcomes = prune_history_indices::<Provider, tables::AccountsHistory, _>(
109 provider,
110 highest_sharded_keys,
111 |a, b| a.key == b.key,
112 )?;
113 trace!(target: "pruner", ?outcomes, %done, "Pruned account history (indices)");
114
115 let progress = limiter.progress(done);
116
117 Ok(SegmentOutput {
118 progress,
119 pruned: pruned_changesets + outcomes.deleted,
120 checkpoint: Some(SegmentOutputCheckpoint {
121 block_number: Some(last_changeset_pruned_block),
122 tx_number: None,
123 }),
124 })
125 }
126}
127
128#[cfg(test)]
129mod tests {
130 use crate::segments::{
131 user::account_history::ACCOUNT_HISTORY_TABLES_TO_PRUNE, AccountHistory, PruneInput,
132 PruneLimiter, Segment, SegmentOutput,
133 };
134 use alloy_primitives::{BlockNumber, B256};
135 use assert_matches::assert_matches;
136 use reth_db::{tables, BlockNumberList};
137 use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader};
138 use reth_prune_types::{
139 PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
140 };
141 use reth_stages::test_utils::{StorageKind, TestStageDB};
142 use reth_testing_utils::generators::{
143 self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
144 };
145 use std::{collections::BTreeMap, ops::AddAssign};
146
147 #[test]
148 fn prune() {
149 let db = TestStageDB::default();
150 let mut rng = generators::rng();
151
152 let blocks = random_block_range(
153 &mut rng,
154 1..=5000,
155 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
156 );
157 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
158
159 let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
160
161 let (changesets, _) = random_changeset_range(
162 &mut rng,
163 blocks.iter(),
164 accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
165 0..0,
166 0..0,
167 );
168 db.insert_changesets(changesets.clone(), None).expect("insert changesets");
169 db.insert_history(changesets.clone(), None).expect("insert history");
170
171 let account_occurrences = db.table::<tables::AccountsHistory>().unwrap().into_iter().fold(
172 BTreeMap::<_, usize>::new(),
173 |mut map, (key, _)| {
174 map.entry(key.key).or_default().add_assign(1);
175 map
176 },
177 );
178 assert!(account_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
179
180 assert_eq!(
181 db.table::<tables::AccountChangeSets>().unwrap().len(),
182 changesets.iter().flatten().count()
183 );
184
185 let original_shards = db.table::<tables::AccountsHistory>().unwrap();
186
187 let test_prune =
188 |to_block: BlockNumber, run: usize, expected_result: (PruneProgress, usize)| {
189 let prune_mode = PruneMode::Before(to_block);
190 let deleted_entries_limit = 2000;
191 let mut limiter =
192 PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
193 let input = PruneInput {
194 previous_checkpoint: db
195 .factory
196 .provider()
197 .unwrap()
198 .get_prune_checkpoint(PruneSegment::AccountHistory)
199 .unwrap(),
200 to_block,
201 limiter: limiter.clone(),
202 };
203 let segment = AccountHistory::new(prune_mode);
204
205 let provider = db.factory.database_provider_rw().unwrap();
206 let result = segment.prune(&provider, input).unwrap();
207 limiter.increment_deleted_entries_count_by(result.pruned);
208
209 assert_matches!(
210 result,
211 SegmentOutput {progress, pruned, checkpoint: Some(_)}
212 if (progress, pruned) == expected_result
213 );
214
215 segment
216 .save_checkpoint(
217 &provider,
218 result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
219 )
220 .unwrap();
221 provider.commit().expect("commit");
222
223 let changesets = changesets
224 .iter()
225 .enumerate()
226 .flat_map(|(block_number, changeset)| {
227 changeset.iter().map(move |change| (block_number, change))
228 })
229 .collect::<Vec<_>>();
230
231 #[allow(clippy::skip_while_next)]
232 let pruned = changesets
233 .iter()
234 .enumerate()
235 .skip_while(|(i, (block_number, _))| {
236 *i < deleted_entries_limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE * run &&
237 *block_number <= to_block as usize
238 })
239 .next()
240 .map(|(i, _)| i)
241 .unwrap_or_default();
242
243 let mut pruned_changesets = changesets
244 .iter()
245 .skip(pruned.saturating_sub(1));
248
249 let last_pruned_block_number = pruned_changesets
250 .next()
251 .map(|(block_number, _)| if result.progress.is_finished() {
252 *block_number
253 } else {
254 block_number.saturating_sub(1)
255 } as BlockNumber)
256 .unwrap_or(to_block);
257
258 let pruned_changesets = pruned_changesets.fold(
259 BTreeMap::<_, Vec<_>>::new(),
260 |mut acc, (block_number, change)| {
261 acc.entry(block_number).or_default().push(change);
262 acc
263 },
264 );
265
266 assert_eq!(
267 db.table::<tables::AccountChangeSets>().unwrap().len(),
268 pruned_changesets.values().flatten().count()
269 );
270
271 let actual_shards = db.table::<tables::AccountsHistory>().unwrap();
272
273 let expected_shards = original_shards
274 .iter()
275 .filter(|(key, _)| key.highest_block_number > last_pruned_block_number)
276 .map(|(key, blocks)| {
277 let new_blocks =
278 blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
279 (key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
280 })
281 .collect::<Vec<_>>();
282
283 assert_eq!(actual_shards, expected_shards);
284
285 assert_eq!(
286 db.factory
287 .provider()
288 .unwrap()
289 .get_prune_checkpoint(PruneSegment::AccountHistory)
290 .unwrap(),
291 Some(PruneCheckpoint {
292 block_number: Some(last_pruned_block_number),
293 tx_number: None,
294 prune_mode
295 })
296 );
297 };
298
299 test_prune(
300 998,
301 1,
302 (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 1000),
303 );
304 test_prune(998, 2, (PruneProgress::Finished, 998));
305 test_prune(1400, 3, (PruneProgress::Finished, 804));
306 }
307}