reth_prune/segments/user/
history.rs1use alloy_primitives::BlockNumber;
2use reth_db_api::{
3 cursor::{DbCursorRO, DbCursorRW},
4 models::ShardedKey,
5 table::Table,
6 transaction::DbTxMut,
7 BlockNumberList, DatabaseError, RawKey, RawTable, RawValue,
8};
9use reth_provider::DBProvider;
10
11enum PruneShardOutcome {
12 Deleted,
13 Updated,
14 Unchanged,
15}
16
17#[derive(Debug, Default)]
18pub(crate) struct PrunedIndices {
19 pub(crate) deleted: usize,
20 pub(crate) updated: usize,
21 pub(crate) unchanged: usize,
22}
23
24pub(crate) fn prune_history_indices<Provider, T, SK>(
28 provider: &Provider,
29 highest_sharded_keys: impl IntoIterator<Item = T::Key>,
30 key_matches: impl Fn(&T::Key, &T::Key) -> bool,
31) -> Result<PrunedIndices, DatabaseError>
32where
33 Provider: DBProvider<Tx: DbTxMut>,
34 T: Table<Value = BlockNumberList>,
35 T::Key: AsRef<ShardedKey<SK>>,
36{
37 let mut outcomes = PrunedIndices::default();
38 let mut cursor = provider.tx_ref().cursor_write::<RawTable<T>>()?;
39
40 for sharded_key in highest_sharded_keys {
41 let mut shard = cursor.seek(RawKey::new(sharded_key.clone()))?;
44
45 let to_block = sharded_key.as_ref().highest_block_number;
47
48 'shard: loop {
49 let Some((key, block_nums)) =
50 shard.map(|(k, v)| Result::<_, DatabaseError>::Ok((k.key()?, v))).transpose()?
51 else {
52 break
53 };
54
55 if key_matches(&key, &sharded_key) {
56 match prune_shard(&mut cursor, key, block_nums, to_block, &key_matches)? {
57 PruneShardOutcome::Deleted => outcomes.deleted += 1,
58 PruneShardOutcome::Updated => outcomes.updated += 1,
59 PruneShardOutcome::Unchanged => outcomes.unchanged += 1,
60 }
61 } else {
62 break 'shard
64 }
65
66 shard = cursor.next()?;
67 }
68 }
69
70 Ok(outcomes)
71}
72
73fn prune_shard<C, T, SK>(
81 cursor: &mut C,
82 key: T::Key,
83 raw_blocks: RawValue<T::Value>,
84 to_block: BlockNumber,
85 key_matches: impl Fn(&T::Key, &T::Key) -> bool,
86) -> Result<PruneShardOutcome, DatabaseError>
87where
88 C: DbCursorRO<RawTable<T>> + DbCursorRW<RawTable<T>>,
89 T: Table<Value = BlockNumberList>,
90 T::Key: AsRef<ShardedKey<SK>>,
91{
92 if key.as_ref().highest_block_number <= to_block {
95 cursor.delete_current()?;
96 Ok(PruneShardOutcome::Deleted)
97 }
98 else {
102 let blocks = raw_blocks.value()?;
103 let higher_blocks =
104 blocks.iter().skip_while(|block| *block <= to_block).collect::<Vec<_>>();
105
106 if blocks.len() as usize == higher_blocks.len() {
109 return Ok(PruneShardOutcome::Unchanged);
110 }
111
112 if higher_blocks.is_empty() {
115 if key.as_ref().highest_block_number == u64::MAX {
116 let prev_row = cursor
117 .prev()?
118 .map(|(k, v)| Result::<_, DatabaseError>::Ok((k.key()?, v)))
119 .transpose()?;
120 match prev_row {
121 Some((prev_key, prev_value)) if key_matches(&prev_key, &key) => {
124 cursor.delete_current()?;
125 cursor.upsert(RawKey::new(key), &prev_value)?;
128 Ok(PruneShardOutcome::Updated)
129 }
130 _ => {
133 if prev_row.is_some() {
136 cursor.next()?;
137 }
138 cursor.delete_current()?;
140 Ok(PruneShardOutcome::Deleted)
141 }
142 }
143 }
144 else {
147 cursor.delete_current()?;
148 Ok(PruneShardOutcome::Deleted)
149 }
150 } else {
151 cursor.upsert(
152 RawKey::new(key),
153 &RawValue::new(BlockNumberList::new_pre_sorted(higher_blocks)),
154 )?;
155 Ok(PruneShardOutcome::Updated)
156 }
157 }
158}