reth_prune/segments/user/
storage_history.rs1use crate::{
2 db_ext::DbTxPruneExt,
3 segments::{user::history::prune_history_indices, PruneInput, Segment, SegmentOutput},
4 PrunerError,
5};
6use itertools::Itertools;
7use reth_db::{tables, transaction::DbTxMut};
8use reth_db_api::models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress};
9use reth_provider::DBProvider;
10use reth_prune_types::{PruneMode, PrunePurpose, PruneSegment, SegmentOutputCheckpoint};
11use rustc_hash::FxHashMap;
12use tracing::{instrument, trace};
13
14const STORAGE_HISTORY_TABLES_TO_PRUNE: usize = 2;
19
20#[derive(Debug)]
21pub struct StorageHistory {
22 mode: PruneMode,
23}
24
25impl StorageHistory {
26 pub const fn new(mode: PruneMode) -> Self {
27 Self { mode }
28 }
29}
30
31impl<Provider> Segment<Provider> for StorageHistory
32where
33 Provider: DBProvider<Tx: DbTxMut>,
34{
35 fn segment(&self) -> PruneSegment {
36 PruneSegment::StorageHistory
37 }
38
39 fn mode(&self) -> Option<PruneMode> {
40 Some(self.mode)
41 }
42
43 fn purpose(&self) -> PrunePurpose {
44 PrunePurpose::User
45 }
46
47 #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
48 fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
49 let range = match input.get_next_block_range() {
50 Some(range) => range,
51 None => {
52 trace!(target: "pruner", "No storage history to prune");
53 return Ok(SegmentOutput::done())
54 }
55 };
56 let range_end = *range.end();
57
58 let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
59 input.limiter.set_deleted_entries_limit(limit / STORAGE_HISTORY_TABLES_TO_PRUNE)
60 } else {
61 input.limiter
62 };
63 if limiter.is_limit_reached() {
64 return Ok(SegmentOutput::not_done(
65 limiter.interrupt_reason(),
66 input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
67 ))
68 }
69
70 let mut last_changeset_pruned_block = None;
71 let mut highest_deleted_storages = FxHashMap::default();
80 let (pruned_changesets, done) =
81 provider.tx_ref().prune_table_with_range::<tables::StorageChangeSets>(
82 BlockNumberAddress::range(range),
83 &mut limiter,
84 |_| false,
85 |(BlockNumberAddress((block_number, address)), entry)| {
86 highest_deleted_storages.insert((address, entry.key), block_number);
87 last_changeset_pruned_block = Some(block_number);
88 },
89 )?;
90 trace!(target: "pruner", deleted = %pruned_changesets, %done, "Pruned storage history (changesets)");
91
92 let last_changeset_pruned_block = last_changeset_pruned_block
93 .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
96 .unwrap_or(range_end);
97
98 let highest_sharded_keys = highest_deleted_storages
102 .into_iter()
103 .sorted_unstable() .map(|((address, storage_key), block_number)| {
105 StorageShardedKey::new(
106 address,
107 storage_key,
108 block_number.min(last_changeset_pruned_block),
109 )
110 });
111 let outcomes = prune_history_indices::<Provider, tables::StoragesHistory, _>(
112 provider,
113 highest_sharded_keys,
114 |a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key,
115 )?;
116 trace!(target: "pruner", ?outcomes, %done, "Pruned storage history (indices)");
117
118 let progress = limiter.progress(done);
119
120 Ok(SegmentOutput {
121 progress,
122 pruned: pruned_changesets + outcomes.deleted,
123 checkpoint: Some(SegmentOutputCheckpoint {
124 block_number: Some(last_changeset_pruned_block),
125 tx_number: None,
126 }),
127 })
128 }
129}
130
131#[cfg(test)]
132mod tests {
133 use crate::segments::{
134 user::storage_history::STORAGE_HISTORY_TABLES_TO_PRUNE, PruneInput, PruneLimiter, Segment,
135 SegmentOutput, StorageHistory,
136 };
137 use alloy_primitives::{BlockNumber, B256};
138 use assert_matches::assert_matches;
139 use reth_db::{tables, BlockNumberList};
140 use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader};
141 use reth_prune_types::{PruneCheckpoint, PruneMode, PruneProgress, PruneSegment};
142 use reth_stages::test_utils::{StorageKind, TestStageDB};
143 use reth_testing_utils::generators::{
144 self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
145 };
146 use std::{collections::BTreeMap, ops::AddAssign};
147
148 #[test]
149 fn prune() {
150 let db = TestStageDB::default();
151 let mut rng = generators::rng();
152
153 let blocks = random_block_range(
154 &mut rng,
155 0..=5000,
156 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
157 );
158 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
159
160 let accounts = random_eoa_accounts(&mut rng, 2).into_iter().collect::<BTreeMap<_, _>>();
161
162 let (changesets, _) = random_changeset_range(
163 &mut rng,
164 blocks.iter(),
165 accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
166 1..2,
167 1..2,
168 );
169 db.insert_changesets(changesets.clone(), None).expect("insert changesets");
170 db.insert_history(changesets.clone(), None).expect("insert history");
171
172 let storage_occurrences = db.table::<tables::StoragesHistory>().unwrap().into_iter().fold(
173 BTreeMap::<_, usize>::new(),
174 |mut map, (key, _)| {
175 map.entry((key.address, key.sharded_key.key)).or_default().add_assign(1);
176 map
177 },
178 );
179 assert!(storage_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
180
181 assert_eq!(
182 db.table::<tables::StorageChangeSets>().unwrap().len(),
183 changesets.iter().flatten().flat_map(|(_, _, entries)| entries).count()
184 );
185
186 let original_shards = db.table::<tables::StoragesHistory>().unwrap();
187
188 let test_prune = |to_block: BlockNumber,
189 run: usize,
190 expected_result: (PruneProgress, usize)| {
191 let prune_mode = PruneMode::Before(to_block);
192 let deleted_entries_limit = 1000;
193 let mut limiter =
194 PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
195 let input = PruneInput {
196 previous_checkpoint: db
197 .factory
198 .provider()
199 .unwrap()
200 .get_prune_checkpoint(PruneSegment::StorageHistory)
201 .unwrap(),
202 to_block,
203 limiter: limiter.clone(),
204 };
205 let segment = StorageHistory::new(prune_mode);
206
207 let provider = db.factory.database_provider_rw().unwrap();
208 let result = segment.prune(&provider, input).unwrap();
209 limiter.increment_deleted_entries_count_by(result.pruned);
210
211 assert_matches!(
212 result,
213 SegmentOutput {progress, pruned, checkpoint: Some(_)}
214 if (progress, pruned) == expected_result
215 );
216
217 segment
218 .save_checkpoint(
219 &provider,
220 result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
221 )
222 .unwrap();
223 provider.commit().expect("commit");
224
225 let changesets = changesets
226 .iter()
227 .enumerate()
228 .flat_map(|(block_number, changeset)| {
229 changeset.iter().flat_map(move |(address, _, entries)| {
230 entries.iter().map(move |entry| (block_number, address, entry))
231 })
232 })
233 .collect::<Vec<_>>();
234
235 #[allow(clippy::skip_while_next)]
236 let pruned = changesets
237 .iter()
238 .enumerate()
239 .skip_while(|(i, (block_number, _, _))| {
240 *i < deleted_entries_limit / STORAGE_HISTORY_TABLES_TO_PRUNE * run &&
241 *block_number <= to_block as usize
242 })
243 .next()
244 .map(|(i, _)| i)
245 .unwrap_or_default();
246
247 let mut pruned_changesets = changesets
248 .iter()
249 .skip(pruned.saturating_sub(1));
252
253 let last_pruned_block_number = pruned_changesets
254 .next()
255 .map(|(block_number, _, _)| if result.progress.is_finished() {
256 *block_number
257 } else {
258 block_number.saturating_sub(1)
259 } as BlockNumber)
260 .unwrap_or(to_block);
261
262 let pruned_changesets = pruned_changesets.fold(
263 BTreeMap::<_, Vec<_>>::new(),
264 |mut acc, (block_number, address, entry)| {
265 acc.entry((block_number, address)).or_default().push(entry);
266 acc
267 },
268 );
269
270 assert_eq!(
271 db.table::<tables::StorageChangeSets>().unwrap().len(),
272 pruned_changesets.values().flatten().count()
273 );
274
275 let actual_shards = db.table::<tables::StoragesHistory>().unwrap();
276
277 let expected_shards = original_shards
278 .iter()
279 .filter(|(key, _)| key.sharded_key.highest_block_number > last_pruned_block_number)
280 .map(|(key, blocks)| {
281 let new_blocks =
282 blocks.iter().skip_while(|block| *block <= last_pruned_block_number);
283 (key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
284 })
285 .collect::<Vec<_>>();
286
287 assert_eq!(actual_shards, expected_shards);
288
289 assert_eq!(
290 db.factory
291 .provider()
292 .unwrap()
293 .get_prune_checkpoint(PruneSegment::StorageHistory)
294 .unwrap(),
295 Some(PruneCheckpoint {
296 block_number: Some(last_pruned_block_number),
297 tx_number: None,
298 prune_mode
299 })
300 );
301 };
302
303 test_prune(
304 998,
305 1,
306 (
307 PruneProgress::HasMoreData(
308 reth_prune_types::PruneInterruptReason::DeletedEntriesLimitReached,
309 ),
310 500,
311 ),
312 );
313 test_prune(998, 2, (PruneProgress::Finished, 499));
314 test_prune(1200, 3, (PruneProgress::Finished, 202));
315 }
316}