reth_prune/segments/user/
history.rs1use alloy_primitives::BlockNumber;
2use reth_db::{BlockNumberList, RawKey, RawTable, RawValue};
3use reth_db_api::{
4 cursor::{DbCursorRO, DbCursorRW},
5 models::ShardedKey,
6 table::Table,
7 transaction::DbTxMut,
8 DatabaseError,
9};
10use reth_provider::DBProvider;
11
12enum PruneShardOutcome {
13 Deleted,
14 Updated,
15 Unchanged,
16}
17
18#[derive(Debug, Default)]
19pub(crate) struct PrunedIndices {
20 pub(crate) deleted: usize,
21 pub(crate) updated: usize,
22 pub(crate) unchanged: usize,
23}
24
25pub(crate) fn prune_history_indices<Provider, T, SK>(
29 provider: &Provider,
30 highest_sharded_keys: impl IntoIterator<Item = T::Key>,
31 key_matches: impl Fn(&T::Key, &T::Key) -> bool,
32) -> Result<PrunedIndices, DatabaseError>
33where
34 Provider: DBProvider<Tx: DbTxMut>,
35 T: Table<Value = BlockNumberList>,
36 T::Key: AsRef<ShardedKey<SK>>,
37{
38 let mut outcomes = PrunedIndices::default();
39 let mut cursor = provider.tx_ref().cursor_write::<RawTable<T>>()?;
40
41 for sharded_key in highest_sharded_keys {
42 let mut shard = cursor.seek(RawKey::new(sharded_key.clone()))?;
45
46 let to_block = sharded_key.as_ref().highest_block_number;
48
49 'shard: loop {
50 let Some((key, block_nums)) =
51 shard.map(|(k, v)| Result::<_, DatabaseError>::Ok((k.key()?, v))).transpose()?
52 else {
53 break
54 };
55
56 if key_matches(&key, &sharded_key) {
57 match prune_shard(&mut cursor, key, block_nums, to_block, &key_matches)? {
58 PruneShardOutcome::Deleted => outcomes.deleted += 1,
59 PruneShardOutcome::Updated => outcomes.updated += 1,
60 PruneShardOutcome::Unchanged => outcomes.unchanged += 1,
61 }
62 } else {
63 break 'shard
65 }
66
67 shard = cursor.next()?;
68 }
69 }
70
71 Ok(outcomes)
72}
73
74fn prune_shard<C, T, SK>(
82 cursor: &mut C,
83 key: T::Key,
84 raw_blocks: RawValue<T::Value>,
85 to_block: BlockNumber,
86 key_matches: impl Fn(&T::Key, &T::Key) -> bool,
87) -> Result<PruneShardOutcome, DatabaseError>
88where
89 C: DbCursorRO<RawTable<T>> + DbCursorRW<RawTable<T>>,
90 T: Table<Value = BlockNumberList>,
91 T::Key: AsRef<ShardedKey<SK>>,
92{
93 if key.as_ref().highest_block_number <= to_block {
96 cursor.delete_current()?;
97 Ok(PruneShardOutcome::Deleted)
98 }
99 else {
103 let blocks = raw_blocks.value()?;
104 let higher_blocks =
105 blocks.iter().skip_while(|block| *block <= to_block).collect::<Vec<_>>();
106
107 if blocks.len() as usize == higher_blocks.len() {
110 return Ok(PruneShardOutcome::Unchanged);
111 }
112
113 if higher_blocks.is_empty() {
116 if key.as_ref().highest_block_number == u64::MAX {
117 let prev_row = cursor
118 .prev()?
119 .map(|(k, v)| Result::<_, DatabaseError>::Ok((k.key()?, v)))
120 .transpose()?;
121 match prev_row {
122 Some((prev_key, prev_value)) if key_matches(&prev_key, &key) => {
125 cursor.delete_current()?;
126 cursor.upsert(RawKey::new(key), prev_value)?;
129 Ok(PruneShardOutcome::Updated)
130 }
131 _ => {
134 if prev_row.is_some() {
137 cursor.next()?;
138 }
139 cursor.delete_current()?;
141 Ok(PruneShardOutcome::Deleted)
142 }
143 }
144 }
145 else {
148 cursor.delete_current()?;
149 Ok(PruneShardOutcome::Deleted)
150 }
151 } else {
152 cursor.upsert(
153 RawKey::new(key),
154 RawValue::new(BlockNumberList::new_pre_sorted(higher_blocks)),
155 )?;
156 Ok(PruneShardOutcome::Updated)
157 }
158 }
159}