reth_prune/segments/user/
history.rs

1use alloy_primitives::BlockNumber;
2use reth_db::{BlockNumberList, RawKey, RawTable, RawValue};
3use reth_db_api::{
4    cursor::{DbCursorRO, DbCursorRW},
5    models::ShardedKey,
6    table::Table,
7    transaction::DbTxMut,
8    DatabaseError,
9};
10use reth_provider::DBProvider;
11
12enum PruneShardOutcome {
13    Deleted,
14    Updated,
15    Unchanged,
16}
17
18#[derive(Debug, Default)]
19pub(crate) struct PrunedIndices {
20    pub(crate) deleted: usize,
21    pub(crate) updated: usize,
22    pub(crate) unchanged: usize,
23}
24
25/// Prune history indices according to the provided list of highest sharded keys.
26///
27/// Returns total number of deleted, updated and unchanged entities.
28pub(crate) fn prune_history_indices<Provider, T, SK>(
29    provider: &Provider,
30    highest_sharded_keys: impl IntoIterator<Item = T::Key>,
31    key_matches: impl Fn(&T::Key, &T::Key) -> bool,
32) -> Result<PrunedIndices, DatabaseError>
33where
34    Provider: DBProvider<Tx: DbTxMut>,
35    T: Table<Value = BlockNumberList>,
36    T::Key: AsRef<ShardedKey<SK>>,
37{
38    let mut outcomes = PrunedIndices::default();
39    let mut cursor = provider.tx_ref().cursor_write::<RawTable<T>>()?;
40
41    for sharded_key in highest_sharded_keys {
42        // Seek to the shard that has the key >= the given sharded key
43        // TODO: optimize
44        let mut shard = cursor.seek(RawKey::new(sharded_key.clone()))?;
45
46        // Get the highest block number that needs to be deleted for this sharded key
47        let to_block = sharded_key.as_ref().highest_block_number;
48
49        'shard: loop {
50            let Some((key, block_nums)) =
51                shard.map(|(k, v)| Result::<_, DatabaseError>::Ok((k.key()?, v))).transpose()?
52            else {
53                break
54            };
55
56            if key_matches(&key, &sharded_key) {
57                match prune_shard(&mut cursor, key, block_nums, to_block, &key_matches)? {
58                    PruneShardOutcome::Deleted => outcomes.deleted += 1,
59                    PruneShardOutcome::Updated => outcomes.updated += 1,
60                    PruneShardOutcome::Unchanged => outcomes.unchanged += 1,
61                }
62            } else {
63                // If such shard doesn't exist, skip to the next sharded key
64                break 'shard
65            }
66
67            shard = cursor.next()?;
68        }
69    }
70
71    Ok(outcomes)
72}
73
74/// Prunes one shard of a history table.
75///
76/// 1. If the shard has `highest_block_number` less than or equal to the target block number for
77///    pruning, delete the shard completely.
78/// 2. If the shard has `highest_block_number` greater than the target block number for pruning,
79///    filter block numbers inside the shard which are less than the target block number for
80///    pruning.
81fn prune_shard<C, T, SK>(
82    cursor: &mut C,
83    key: T::Key,
84    raw_blocks: RawValue<T::Value>,
85    to_block: BlockNumber,
86    key_matches: impl Fn(&T::Key, &T::Key) -> bool,
87) -> Result<PruneShardOutcome, DatabaseError>
88where
89    C: DbCursorRO<RawTable<T>> + DbCursorRW<RawTable<T>>,
90    T: Table<Value = BlockNumberList>,
91    T::Key: AsRef<ShardedKey<SK>>,
92{
93    // If shard consists only of block numbers less than the target one, delete shard
94    // completely.
95    if key.as_ref().highest_block_number <= to_block {
96        cursor.delete_current()?;
97        Ok(PruneShardOutcome::Deleted)
98    }
99    // Shard contains block numbers that are higher than the target one, so we need to
100    // filter it. It is guaranteed that further shards for this sharded key will not
101    // contain the target block number, as it's in this shard.
102    else {
103        let blocks = raw_blocks.value()?;
104        let higher_blocks =
105            blocks.iter().skip_while(|block| *block <= to_block).collect::<Vec<_>>();
106
107        // If there were blocks less than or equal to the target one
108        // (so the shard has changed), update the shard.
109        if blocks.len() as usize == higher_blocks.len() {
110            return Ok(PruneShardOutcome::Unchanged);
111        }
112
113        // If there will be no more blocks in the shard after pruning blocks below target
114        // block, we need to remove it, as empty shards are not allowed.
115        if higher_blocks.is_empty() {
116            if key.as_ref().highest_block_number == u64::MAX {
117                let prev_row = cursor
118                    .prev()?
119                    .map(|(k, v)| Result::<_, DatabaseError>::Ok((k.key()?, v)))
120                    .transpose()?;
121                match prev_row {
122                    // If current shard is the last shard for the sharded key that
123                    // has previous shards, replace it with the previous shard.
124                    Some((prev_key, prev_value)) if key_matches(&prev_key, &key) => {
125                        cursor.delete_current()?;
126                        // Upsert will replace the last shard for this sharded key with
127                        // the previous value.
128                        cursor.upsert(RawKey::new(key), prev_value)?;
129                        Ok(PruneShardOutcome::Updated)
130                    }
131                    // If there's no previous shard for this sharded key,
132                    // just delete last shard completely.
133                    _ => {
134                        // If we successfully moved the cursor to a previous row,
135                        // jump to the original last shard.
136                        if prev_row.is_some() {
137                            cursor.next()?;
138                        }
139                        // Delete shard.
140                        cursor.delete_current()?;
141                        Ok(PruneShardOutcome::Deleted)
142                    }
143                }
144            }
145            // If current shard is not the last shard for this sharded key,
146            // just delete it.
147            else {
148                cursor.delete_current()?;
149                Ok(PruneShardOutcome::Deleted)
150            }
151        } else {
152            cursor.upsert(
153                RawKey::new(key),
154                RawValue::new(BlockNumberList::new_pre_sorted(higher_blocks)),
155            )?;
156            Ok(PruneShardOutcome::Updated)
157        }
158    }
159}