reth_prune/segments/static_file/
headers.rs

1use std::num::NonZeroUsize;
2
3use crate::{
4    db_ext::DbTxPruneExt,
5    segments::{PruneInput, Segment},
6    PruneLimiter, PrunerError,
7};
8use alloy_primitives::BlockNumber;
9use itertools::Itertools;
10use reth_db::{
11    cursor::{DbCursorRO, RangeWalker},
12    tables,
13    transaction::DbTxMut,
14};
15use reth_provider::{providers::StaticFileProvider, DBProvider, StaticFileProviderFactory};
16use reth_prune_types::{
17    PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
18};
19use reth_static_file_types::StaticFileSegment;
20use tracing::trace;
21
22/// Number of header tables to prune in one step
23const HEADER_TABLES_TO_PRUNE: usize = 3;
24
25#[derive(Debug)]
26pub struct Headers<N> {
27    static_file_provider: StaticFileProvider<N>,
28}
29
30impl<N> Headers<N> {
31    pub const fn new(static_file_provider: StaticFileProvider<N>) -> Self {
32        Self { static_file_provider }
33    }
34}
35
36impl<Provider: StaticFileProviderFactory + DBProvider<Tx: DbTxMut>> Segment<Provider>
37    for Headers<Provider::Primitives>
38{
39    fn segment(&self) -> PruneSegment {
40        PruneSegment::Headers
41    }
42
43    fn mode(&self) -> Option<PruneMode> {
44        self.static_file_provider
45            .get_highest_static_file_block(StaticFileSegment::Headers)
46            .map(PruneMode::before_inclusive)
47    }
48
49    fn purpose(&self) -> PrunePurpose {
50        PrunePurpose::StaticFile
51    }
52
53    fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
54        let (block_range_start, block_range_end) = match input.get_next_block_range() {
55            Some(range) => (*range.start(), *range.end()),
56            None => {
57                trace!(target: "pruner", "No headers to prune");
58                return Ok(SegmentOutput::done())
59            }
60        };
61
62        let last_pruned_block =
63            if block_range_start == 0 { None } else { Some(block_range_start - 1) };
64
65        let range = last_pruned_block.map_or(0, |block| block + 1)..=block_range_end;
66
67        let mut headers_cursor = provider.tx_ref().cursor_write::<tables::Headers>()?;
68        let mut header_tds_cursor =
69            provider.tx_ref().cursor_write::<tables::HeaderTerminalDifficulties>()?;
70        let mut canonical_headers_cursor =
71            provider.tx_ref().cursor_write::<tables::CanonicalHeaders>()?;
72
73        let mut limiter = input.limiter.floor_deleted_entries_limit_to_multiple_of(
74            NonZeroUsize::new(HEADER_TABLES_TO_PRUNE).unwrap(),
75        );
76
77        let tables_iter = HeaderTablesIter::new(
78            provider,
79            &mut limiter,
80            headers_cursor.walk_range(range.clone())?,
81            header_tds_cursor.walk_range(range.clone())?,
82            canonical_headers_cursor.walk_range(range)?,
83        );
84
85        let mut last_pruned_block: Option<u64> = None;
86        let mut pruned = 0;
87        for res in tables_iter {
88            let HeaderTablesIterItem { pruned_block, entries_pruned } = res?;
89            last_pruned_block = Some(pruned_block);
90            pruned += entries_pruned;
91        }
92
93        let done = last_pruned_block == Some(block_range_end);
94        let progress = limiter.progress(done);
95
96        Ok(SegmentOutput {
97            progress,
98            pruned,
99            checkpoint: Some(SegmentOutputCheckpoint {
100                block_number: last_pruned_block,
101                tx_number: None,
102            }),
103        })
104    }
105}
106type Walker<'a, Provider, T> =
107    RangeWalker<'a, T, <<Provider as DBProvider>::Tx as DbTxMut>::CursorMut<T>>;
108
109#[allow(missing_debug_implementations)]
110struct HeaderTablesIter<'a, Provider>
111where
112    Provider: DBProvider<Tx: DbTxMut>,
113{
114    provider: &'a Provider,
115    limiter: &'a mut PruneLimiter,
116    headers_walker: Walker<'a, Provider, tables::Headers>,
117    header_tds_walker: Walker<'a, Provider, tables::HeaderTerminalDifficulties>,
118    canonical_headers_walker: Walker<'a, Provider, tables::CanonicalHeaders>,
119}
120
121struct HeaderTablesIterItem {
122    pruned_block: BlockNumber,
123    entries_pruned: usize,
124}
125
126impl<'a, Provider> HeaderTablesIter<'a, Provider>
127where
128    Provider: DBProvider<Tx: DbTxMut>,
129{
130    fn new(
131        provider: &'a Provider,
132        limiter: &'a mut PruneLimiter,
133        headers_walker: Walker<'a, Provider, tables::Headers>,
134        header_tds_walker: Walker<'a, Provider, tables::HeaderTerminalDifficulties>,
135        canonical_headers_walker: Walker<'a, Provider, tables::CanonicalHeaders>,
136    ) -> Self {
137        Self { provider, limiter, headers_walker, header_tds_walker, canonical_headers_walker }
138    }
139}
140
141impl<Provider> Iterator for HeaderTablesIter<'_, Provider>
142where
143    Provider: DBProvider<Tx: DbTxMut>,
144{
145    type Item = Result<HeaderTablesIterItem, PrunerError>;
146    fn next(&mut self) -> Option<Self::Item> {
147        if self.limiter.is_limit_reached() {
148            return None
149        }
150
151        let mut pruned_block_headers = None;
152        let mut pruned_block_td = None;
153        let mut pruned_block_canonical = None;
154
155        if let Err(err) = self.provider.tx_ref().prune_table_with_range_step(
156            &mut self.headers_walker,
157            self.limiter,
158            &mut |_| false,
159            &mut |row| pruned_block_headers = Some(row.0),
160        ) {
161            return Some(Err(err.into()))
162        }
163
164        if let Err(err) = self.provider.tx_ref().prune_table_with_range_step(
165            &mut self.header_tds_walker,
166            self.limiter,
167            &mut |_| false,
168            &mut |row| pruned_block_td = Some(row.0),
169        ) {
170            return Some(Err(err.into()))
171        }
172
173        if let Err(err) = self.provider.tx_ref().prune_table_with_range_step(
174            &mut self.canonical_headers_walker,
175            self.limiter,
176            &mut |_| false,
177            &mut |row| pruned_block_canonical = Some(row.0),
178        ) {
179            return Some(Err(err.into()))
180        }
181
182        if ![pruned_block_headers, pruned_block_td, pruned_block_canonical].iter().all_equal() {
183            return Some(Err(PrunerError::InconsistentData(
184                "All headers-related tables should be pruned up to the same height",
185            )))
186        }
187
188        pruned_block_headers.map(move |block| {
189            Ok(HeaderTablesIterItem { pruned_block: block, entries_pruned: HEADER_TABLES_TO_PRUNE })
190        })
191    }
192}
193
194#[cfg(test)]
195mod tests {
196    use crate::segments::{
197        static_file::headers::HEADER_TABLES_TO_PRUNE, PruneInput, PruneLimiter, Segment,
198        SegmentOutput,
199    };
200    use alloy_primitives::{BlockNumber, B256, U256};
201    use assert_matches::assert_matches;
202    use reth_db::tables;
203    use reth_db_api::transaction::DbTx;
204    use reth_provider::{
205        DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
206        StaticFileProviderFactory,
207    };
208    use reth_prune_types::{
209        PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
210        SegmentOutputCheckpoint,
211    };
212    use reth_stages::test_utils::TestStageDB;
213    use reth_testing_utils::{generators, generators::random_header_range};
214    use tracing::trace;
215
216    #[test]
217    fn prune() {
218        reth_tracing::init_test_tracing();
219
220        let db = TestStageDB::default();
221        let mut rng = generators::rng();
222
223        let headers = random_header_range(&mut rng, 0..100, B256::ZERO);
224        let tx = db.factory.provider_rw().unwrap().into_tx();
225        for header in &headers {
226            TestStageDB::insert_header(None, &tx, header, U256::ZERO).unwrap();
227        }
228        tx.commit().unwrap();
229
230        assert_eq!(db.table::<tables::CanonicalHeaders>().unwrap().len(), headers.len());
231        assert_eq!(db.table::<tables::Headers>().unwrap().len(), headers.len());
232        assert_eq!(db.table::<tables::HeaderTerminalDifficulties>().unwrap().len(), headers.len());
233
234        let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
235            let segment = super::Headers::new(db.factory.static_file_provider());
236            let prune_mode = PruneMode::Before(to_block);
237            let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
238            let input = PruneInput {
239                previous_checkpoint: db
240                    .factory
241                    .provider()
242                    .unwrap()
243                    .get_prune_checkpoint(PruneSegment::Headers)
244                    .unwrap(),
245                to_block,
246                limiter: limiter.clone(),
247            };
248
249            let next_block_number_to_prune = db
250                .factory
251                .provider()
252                .unwrap()
253                .get_prune_checkpoint(PruneSegment::Headers)
254                .unwrap()
255                .and_then(|checkpoint| checkpoint.block_number)
256                .map(|block_number| block_number + 1)
257                .unwrap_or_default();
258
259            let provider = db.factory.database_provider_rw().unwrap();
260            let result = segment.prune(&provider, input.clone()).unwrap();
261            limiter.increment_deleted_entries_count_by(result.pruned);
262            trace!(target: "pruner::test",
263                expected_prune_progress=?expected_result.0,
264                expected_pruned=?expected_result.1,
265                result=?result,
266                "SegmentOutput"
267            );
268
269            assert_matches!(
270                result,
271                SegmentOutput {progress, pruned, checkpoint: Some(_)}
272                    if (progress, pruned) == expected_result
273            );
274            provider
275                .save_prune_checkpoint(
276                    PruneSegment::Headers,
277                    result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
278                )
279                .unwrap();
280            provider.commit().expect("commit");
281
282            let last_pruned_block_number = to_block.min(
283                next_block_number_to_prune +
284                    (input.limiter.deleted_entries_limit().unwrap() / HEADER_TABLES_TO_PRUNE - 1)
285                        as u64,
286            );
287
288            assert_eq!(
289                db.table::<tables::CanonicalHeaders>().unwrap().len(),
290                headers.len() - (last_pruned_block_number + 1) as usize
291            );
292            assert_eq!(
293                db.table::<tables::Headers>().unwrap().len(),
294                headers.len() - (last_pruned_block_number + 1) as usize
295            );
296            assert_eq!(
297                db.table::<tables::HeaderTerminalDifficulties>().unwrap().len(),
298                headers.len() - (last_pruned_block_number + 1) as usize
299            );
300            assert_eq!(
301                db.factory.provider().unwrap().get_prune_checkpoint(PruneSegment::Headers).unwrap(),
302                Some(PruneCheckpoint {
303                    block_number: Some(last_pruned_block_number),
304                    tx_number: None,
305                    prune_mode
306                })
307            );
308        };
309
310        test_prune(
311            3,
312            (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 9),
313        );
314        test_prune(3, (PruneProgress::Finished, 3));
315    }
316
317    #[test]
318    fn prune_cannot_be_done() {
319        let db = TestStageDB::default();
320
321        let limiter = PruneLimiter::default().set_deleted_entries_limit(0);
322
323        let input = PruneInput {
324            previous_checkpoint: None,
325            to_block: 1,
326            // Less than total number of tables for `Headers` segment
327            limiter,
328        };
329
330        let provider = db.factory.database_provider_rw().unwrap();
331        let segment = super::Headers::new(db.factory.static_file_provider());
332        let result = segment.prune(&provider, input).unwrap();
333        assert_eq!(
334            result,
335            SegmentOutput::not_done(
336                PruneInterruptReason::DeletedEntriesLimitReached,
337                Some(SegmentOutputCheckpoint::default())
338            )
339        );
340    }
341}