reth_prune/segments/static_file/
headers.rs1use std::num::NonZeroUsize;
2
3use crate::{
4 db_ext::DbTxPruneExt,
5 segments::{PruneInput, Segment},
6 PruneLimiter, PrunerError,
7};
8use alloy_primitives::BlockNumber;
9use itertools::Itertools;
10use reth_db::{
11 cursor::{DbCursorRO, RangeWalker},
12 tables,
13 transaction::DbTxMut,
14};
15use reth_provider::{providers::StaticFileProvider, DBProvider, StaticFileProviderFactory};
16use reth_prune_types::{
17 PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
18};
19use reth_static_file_types::StaticFileSegment;
20use tracing::trace;
21
22const HEADER_TABLES_TO_PRUNE: usize = 3;
24
25#[derive(Debug)]
26pub struct Headers<N> {
27 static_file_provider: StaticFileProvider<N>,
28}
29
30impl<N> Headers<N> {
31 pub const fn new(static_file_provider: StaticFileProvider<N>) -> Self {
32 Self { static_file_provider }
33 }
34}
35
36impl<Provider: StaticFileProviderFactory + DBProvider<Tx: DbTxMut>> Segment<Provider>
37 for Headers<Provider::Primitives>
38{
39 fn segment(&self) -> PruneSegment {
40 PruneSegment::Headers
41 }
42
43 fn mode(&self) -> Option<PruneMode> {
44 self.static_file_provider
45 .get_highest_static_file_block(StaticFileSegment::Headers)
46 .map(PruneMode::before_inclusive)
47 }
48
49 fn purpose(&self) -> PrunePurpose {
50 PrunePurpose::StaticFile
51 }
52
53 fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
54 let (block_range_start, block_range_end) = match input.get_next_block_range() {
55 Some(range) => (*range.start(), *range.end()),
56 None => {
57 trace!(target: "pruner", "No headers to prune");
58 return Ok(SegmentOutput::done())
59 }
60 };
61
62 let last_pruned_block =
63 if block_range_start == 0 { None } else { Some(block_range_start - 1) };
64
65 let range = last_pruned_block.map_or(0, |block| block + 1)..=block_range_end;
66
67 let mut headers_cursor = provider.tx_ref().cursor_write::<tables::Headers>()?;
68 let mut header_tds_cursor =
69 provider.tx_ref().cursor_write::<tables::HeaderTerminalDifficulties>()?;
70 let mut canonical_headers_cursor =
71 provider.tx_ref().cursor_write::<tables::CanonicalHeaders>()?;
72
73 let mut limiter = input.limiter.floor_deleted_entries_limit_to_multiple_of(
74 NonZeroUsize::new(HEADER_TABLES_TO_PRUNE).unwrap(),
75 );
76
77 let tables_iter = HeaderTablesIter::new(
78 provider,
79 &mut limiter,
80 headers_cursor.walk_range(range.clone())?,
81 header_tds_cursor.walk_range(range.clone())?,
82 canonical_headers_cursor.walk_range(range)?,
83 );
84
85 let mut last_pruned_block: Option<u64> = None;
86 let mut pruned = 0;
87 for res in tables_iter {
88 let HeaderTablesIterItem { pruned_block, entries_pruned } = res?;
89 last_pruned_block = Some(pruned_block);
90 pruned += entries_pruned;
91 }
92
93 let done = last_pruned_block == Some(block_range_end);
94 let progress = limiter.progress(done);
95
96 Ok(SegmentOutput {
97 progress,
98 pruned,
99 checkpoint: Some(SegmentOutputCheckpoint {
100 block_number: last_pruned_block,
101 tx_number: None,
102 }),
103 })
104 }
105}
106type Walker<'a, Provider, T> =
107 RangeWalker<'a, T, <<Provider as DBProvider>::Tx as DbTxMut>::CursorMut<T>>;
108
109#[allow(missing_debug_implementations)]
110struct HeaderTablesIter<'a, Provider>
111where
112 Provider: DBProvider<Tx: DbTxMut>,
113{
114 provider: &'a Provider,
115 limiter: &'a mut PruneLimiter,
116 headers_walker: Walker<'a, Provider, tables::Headers>,
117 header_tds_walker: Walker<'a, Provider, tables::HeaderTerminalDifficulties>,
118 canonical_headers_walker: Walker<'a, Provider, tables::CanonicalHeaders>,
119}
120
121struct HeaderTablesIterItem {
122 pruned_block: BlockNumber,
123 entries_pruned: usize,
124}
125
126impl<'a, Provider> HeaderTablesIter<'a, Provider>
127where
128 Provider: DBProvider<Tx: DbTxMut>,
129{
130 fn new(
131 provider: &'a Provider,
132 limiter: &'a mut PruneLimiter,
133 headers_walker: Walker<'a, Provider, tables::Headers>,
134 header_tds_walker: Walker<'a, Provider, tables::HeaderTerminalDifficulties>,
135 canonical_headers_walker: Walker<'a, Provider, tables::CanonicalHeaders>,
136 ) -> Self {
137 Self { provider, limiter, headers_walker, header_tds_walker, canonical_headers_walker }
138 }
139}
140
141impl<Provider> Iterator for HeaderTablesIter<'_, Provider>
142where
143 Provider: DBProvider<Tx: DbTxMut>,
144{
145 type Item = Result<HeaderTablesIterItem, PrunerError>;
146 fn next(&mut self) -> Option<Self::Item> {
147 if self.limiter.is_limit_reached() {
148 return None
149 }
150
151 let mut pruned_block_headers = None;
152 let mut pruned_block_td = None;
153 let mut pruned_block_canonical = None;
154
155 if let Err(err) = self.provider.tx_ref().prune_table_with_range_step(
156 &mut self.headers_walker,
157 self.limiter,
158 &mut |_| false,
159 &mut |row| pruned_block_headers = Some(row.0),
160 ) {
161 return Some(Err(err.into()))
162 }
163
164 if let Err(err) = self.provider.tx_ref().prune_table_with_range_step(
165 &mut self.header_tds_walker,
166 self.limiter,
167 &mut |_| false,
168 &mut |row| pruned_block_td = Some(row.0),
169 ) {
170 return Some(Err(err.into()))
171 }
172
173 if let Err(err) = self.provider.tx_ref().prune_table_with_range_step(
174 &mut self.canonical_headers_walker,
175 self.limiter,
176 &mut |_| false,
177 &mut |row| pruned_block_canonical = Some(row.0),
178 ) {
179 return Some(Err(err.into()))
180 }
181
182 if ![pruned_block_headers, pruned_block_td, pruned_block_canonical].iter().all_equal() {
183 return Some(Err(PrunerError::InconsistentData(
184 "All headers-related tables should be pruned up to the same height",
185 )))
186 }
187
188 pruned_block_headers.map(move |block| {
189 Ok(HeaderTablesIterItem { pruned_block: block, entries_pruned: HEADER_TABLES_TO_PRUNE })
190 })
191 }
192}
193
194#[cfg(test)]
195mod tests {
196 use crate::segments::{
197 static_file::headers::HEADER_TABLES_TO_PRUNE, PruneInput, PruneLimiter, Segment,
198 SegmentOutput,
199 };
200 use alloy_primitives::{BlockNumber, B256, U256};
201 use assert_matches::assert_matches;
202 use reth_db::tables;
203 use reth_db_api::transaction::DbTx;
204 use reth_provider::{
205 DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
206 StaticFileProviderFactory,
207 };
208 use reth_prune_types::{
209 PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
210 SegmentOutputCheckpoint,
211 };
212 use reth_stages::test_utils::TestStageDB;
213 use reth_testing_utils::{generators, generators::random_header_range};
214 use tracing::trace;
215
216 #[test]
217 fn prune() {
218 reth_tracing::init_test_tracing();
219
220 let db = TestStageDB::default();
221 let mut rng = generators::rng();
222
223 let headers = random_header_range(&mut rng, 0..100, B256::ZERO);
224 let tx = db.factory.provider_rw().unwrap().into_tx();
225 for header in &headers {
226 TestStageDB::insert_header(None, &tx, header, U256::ZERO).unwrap();
227 }
228 tx.commit().unwrap();
229
230 assert_eq!(db.table::<tables::CanonicalHeaders>().unwrap().len(), headers.len());
231 assert_eq!(db.table::<tables::Headers>().unwrap().len(), headers.len());
232 assert_eq!(db.table::<tables::HeaderTerminalDifficulties>().unwrap().len(), headers.len());
233
234 let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
235 let segment = super::Headers::new(db.factory.static_file_provider());
236 let prune_mode = PruneMode::Before(to_block);
237 let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
238 let input = PruneInput {
239 previous_checkpoint: db
240 .factory
241 .provider()
242 .unwrap()
243 .get_prune_checkpoint(PruneSegment::Headers)
244 .unwrap(),
245 to_block,
246 limiter: limiter.clone(),
247 };
248
249 let next_block_number_to_prune = db
250 .factory
251 .provider()
252 .unwrap()
253 .get_prune_checkpoint(PruneSegment::Headers)
254 .unwrap()
255 .and_then(|checkpoint| checkpoint.block_number)
256 .map(|block_number| block_number + 1)
257 .unwrap_or_default();
258
259 let provider = db.factory.database_provider_rw().unwrap();
260 let result = segment.prune(&provider, input.clone()).unwrap();
261 limiter.increment_deleted_entries_count_by(result.pruned);
262 trace!(target: "pruner::test",
263 expected_prune_progress=?expected_result.0,
264 expected_pruned=?expected_result.1,
265 result=?result,
266 "SegmentOutput"
267 );
268
269 assert_matches!(
270 result,
271 SegmentOutput {progress, pruned, checkpoint: Some(_)}
272 if (progress, pruned) == expected_result
273 );
274 provider
275 .save_prune_checkpoint(
276 PruneSegment::Headers,
277 result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
278 )
279 .unwrap();
280 provider.commit().expect("commit");
281
282 let last_pruned_block_number = to_block.min(
283 next_block_number_to_prune +
284 (input.limiter.deleted_entries_limit().unwrap() / HEADER_TABLES_TO_PRUNE - 1)
285 as u64,
286 );
287
288 assert_eq!(
289 db.table::<tables::CanonicalHeaders>().unwrap().len(),
290 headers.len() - (last_pruned_block_number + 1) as usize
291 );
292 assert_eq!(
293 db.table::<tables::Headers>().unwrap().len(),
294 headers.len() - (last_pruned_block_number + 1) as usize
295 );
296 assert_eq!(
297 db.table::<tables::HeaderTerminalDifficulties>().unwrap().len(),
298 headers.len() - (last_pruned_block_number + 1) as usize
299 );
300 assert_eq!(
301 db.factory.provider().unwrap().get_prune_checkpoint(PruneSegment::Headers).unwrap(),
302 Some(PruneCheckpoint {
303 block_number: Some(last_pruned_block_number),
304 tx_number: None,
305 prune_mode
306 })
307 );
308 };
309
310 test_prune(
311 3,
312 (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 9),
313 );
314 test_prune(3, (PruneProgress::Finished, 3));
315 }
316
317 #[test]
318 fn prune_cannot_be_done() {
319 let db = TestStageDB::default();
320
321 let limiter = PruneLimiter::default().set_deleted_entries_limit(0);
322
323 let input = PruneInput {
324 previous_checkpoint: None,
325 to_block: 1,
326 limiter,
328 };
329
330 let provider = db.factory.database_provider_rw().unwrap();
331 let segment = super::Headers::new(db.factory.static_file_provider());
332 let result = segment.prune(&provider, input).unwrap();
333 assert_eq!(
334 result,
335 SegmentOutput::not_done(
336 PruneInterruptReason::DeletedEntriesLimitReached,
337 Some(SegmentOutputCheckpoint::default())
338 )
339 );
340 }
341}