reth_prune/
db_ext.rs

1use std::{fmt::Debug, ops::RangeBounds};
2
3use crate::PruneLimiter;
4use reth_db::{
5    cursor::{DbCursorRO, DbCursorRW, RangeWalker},
6    table::{Table, TableRow},
7    transaction::DbTxMut,
8    DatabaseError,
9};
10use tracing::debug;
11
12pub(crate) trait DbTxPruneExt: DbTxMut {
13    /// Prune the table for the specified pre-sorted key iterator.
14    ///
15    /// Returns number of rows pruned.
16    fn prune_table_with_iterator<T: Table>(
17        &self,
18        keys: impl IntoIterator<Item = T::Key>,
19        limiter: &mut PruneLimiter,
20        mut delete_callback: impl FnMut(TableRow<T>),
21    ) -> Result<(usize, bool), DatabaseError> {
22        let mut cursor = self.cursor_write::<T>()?;
23        let mut keys = keys.into_iter();
24
25        let mut deleted_entries = 0;
26
27        for key in &mut keys {
28            if limiter.is_limit_reached() {
29                debug!(
30                    target: "providers::db",
31                    ?limiter,
32                    deleted_entries_limit = %limiter.is_deleted_entries_limit_reached(),
33                    time_limit = %limiter.is_time_limit_reached(),
34                    table = %T::NAME,
35                    "Pruning limit reached"
36                );
37                break
38            }
39
40            let row = cursor.seek_exact(key)?;
41            if let Some(row) = row {
42                cursor.delete_current()?;
43                limiter.increment_deleted_entries_count();
44                deleted_entries += 1;
45                delete_callback(row);
46            }
47        }
48
49        let done = keys.next().is_none();
50        Ok((deleted_entries, done))
51    }
52
53    /// Prune the table for the specified key range.
54    ///
55    /// Returns number of rows pruned.
56    fn prune_table_with_range<T: Table>(
57        &self,
58        keys: impl RangeBounds<T::Key> + Clone + Debug,
59        limiter: &mut PruneLimiter,
60        mut skip_filter: impl FnMut(&TableRow<T>) -> bool,
61        mut delete_callback: impl FnMut(TableRow<T>),
62    ) -> Result<(usize, bool), DatabaseError> {
63        let mut cursor = self.cursor_write::<T>()?;
64        let mut walker = cursor.walk_range(keys)?;
65
66        let mut deleted_entries = 0;
67
68        let done = loop {
69            // check for time out must be done in this scope since it's not done in
70            // `prune_table_with_range_step`
71            if limiter.is_limit_reached() {
72                debug!(
73                    target: "providers::db",
74                    ?limiter,
75                    deleted_entries_limit = %limiter.is_deleted_entries_limit_reached(),
76                    time_limit = %limiter.is_time_limit_reached(),
77                    table = %T::NAME,
78                    "Pruning limit reached"
79                );
80                break false
81            }
82
83            let done = self.prune_table_with_range_step(
84                &mut walker,
85                limiter,
86                &mut skip_filter,
87                &mut delete_callback,
88            )?;
89
90            if done {
91                break true
92            }
93            deleted_entries += 1;
94        };
95
96        Ok((deleted_entries, done))
97    }
98
99    /// Steps once with the given walker and prunes the entry in the table.
100    ///
101    /// Returns `true` if the walker is finished, `false` if it may have more data to prune.
102    ///
103    /// CAUTION: Pruner limits are not checked. This allows for a clean exit of a prune run that's
104    /// pruning different tables concurrently, by letting them step to the same height before
105    /// timing out.
106    fn prune_table_with_range_step<T: Table>(
107        &self,
108        walker: &mut RangeWalker<'_, T, Self::CursorMut<T>>,
109        limiter: &mut PruneLimiter,
110        skip_filter: &mut impl FnMut(&TableRow<T>) -> bool,
111        delete_callback: &mut impl FnMut(TableRow<T>),
112    ) -> Result<bool, DatabaseError> {
113        let Some(res) = walker.next() else { return Ok(true) };
114
115        let row = res?;
116
117        if !skip_filter(&row) {
118            walker.delete_current()?;
119            limiter.increment_deleted_entries_count();
120            delete_callback(row);
121        }
122
123        Ok(false)
124    }
125}
126
127impl<Tx> DbTxPruneExt for Tx where Tx: DbTxMut {}