reth_prune/segments/
mod.rs

1mod receipts;
2mod set;
3mod static_file;
4mod user;
5
6use crate::{PruneLimiter, PrunerError};
7use alloy_primitives::{BlockNumber, TxNumber};
8use reth_provider::{errors::provider::ProviderResult, BlockReader, PruneCheckpointWriter};
9use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, SegmentOutput};
10pub use set::SegmentSet;
11pub use static_file::{
12    Headers as StaticFileHeaders, Receipts as StaticFileReceipts,
13    Transactions as StaticFileTransactions,
14};
15use std::{fmt::Debug, ops::RangeInclusive};
16use tracing::error;
17pub use user::{
18    AccountHistory, Receipts as UserReceipts, ReceiptsByLogs, SenderRecovery, StorageHistory,
19    TransactionLookup,
20};
21
22/// A segment represents a pruning of some portion of the data.
23///
24/// Segments are called from [`Pruner`](crate::Pruner) with the following lifecycle:
25/// 1. Call [`Segment::prune`] with `delete_limit` of [`PruneInput`].
26/// 2. If [`Segment::prune`] returned a [`Some`] in `checkpoint` of [`SegmentOutput`], call
27///    [`Segment::save_checkpoint`].
28/// 3. Subtract `pruned` of [`SegmentOutput`] from `delete_limit` of next [`PruneInput`].
29pub trait Segment<Provider>: Debug + Send + Sync {
30    /// Segment of data that's pruned.
31    fn segment(&self) -> PruneSegment;
32
33    /// Prune mode with which the segment was initialized.
34    fn mode(&self) -> Option<PruneMode>;
35
36    /// Purpose of the segment.
37    fn purpose(&self) -> PrunePurpose;
38
39    /// Prune data for [`Self::segment`] using the provided input.
40    fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError>;
41
42    /// Save checkpoint for [`Self::segment`] to the database.
43    fn save_checkpoint(
44        &self,
45        provider: &Provider,
46        checkpoint: PruneCheckpoint,
47    ) -> ProviderResult<()>
48    where
49        Provider: PruneCheckpointWriter,
50    {
51        provider.save_prune_checkpoint(self.segment(), checkpoint)
52    }
53}
54
55/// Segment pruning input, see [`Segment::prune`].
56#[derive(Debug)]
57#[cfg_attr(test, derive(Clone))]
58pub struct PruneInput {
59    pub(crate) previous_checkpoint: Option<PruneCheckpoint>,
60    /// Target block up to which the pruning needs to be done, inclusive.
61    pub(crate) to_block: BlockNumber,
62    /// Limits pruning of a segment.
63    pub(crate) limiter: PruneLimiter,
64}
65
66impl PruneInput {
67    /// Get next inclusive tx number range to prune according to the checkpoint and `to_block` block
68    /// number.
69    ///
70    /// To get the range start:
71    /// 1. If checkpoint exists, get next block body and return its first tx number.
72    /// 2. If checkpoint doesn't exist, return 0.
73    ///
74    /// To get the range end: get last tx number for `to_block`.
75    pub(crate) fn get_next_tx_num_range<Provider: BlockReader>(
76        &self,
77        provider: &Provider,
78    ) -> ProviderResult<Option<RangeInclusive<TxNumber>>> {
79        let from_tx_number = self.previous_checkpoint
80            // Checkpoint exists, prune from the next transaction after the highest pruned one
81            .and_then(|checkpoint| match checkpoint.tx_number {
82                Some(tx_number) => Some(tx_number + 1),
83                _ => {
84                    error!(target: "pruner", ?checkpoint, "Expected transaction number in prune checkpoint, found None");
85                    None
86                },
87            })
88            // No checkpoint exists, prune from genesis
89            .unwrap_or_default();
90
91        let to_tx_number = match provider.block_body_indices(self.to_block)? {
92            Some(body) => {
93                let last_tx = body.last_tx_num();
94                if last_tx + body.tx_count() == 0 {
95                    // Prevents a scenario where the pruner correctly starts at a finalized block,
96                    // but the first transaction (tx_num = 0) only appears on an non-finalized one.
97                    // Should only happen on a test/hive scenario.
98                    return Ok(None)
99                }
100                last_tx
101            }
102            None => return Ok(None),
103        };
104
105        let range = from_tx_number..=to_tx_number;
106        if range.is_empty() {
107            return Ok(None)
108        }
109
110        Ok(Some(range))
111    }
112
113    /// Get next inclusive block range to prune according to the checkpoint, `to_block` block
114    /// number and `limit`.
115    ///
116    /// To get the range start (`from_block`):
117    /// 1. If checkpoint exists, use next block.
118    /// 2. If checkpoint doesn't exist, use block 0.
119    ///
120    /// To get the range end: use block `to_block`.
121    pub(crate) fn get_next_block_range(&self) -> Option<RangeInclusive<BlockNumber>> {
122        let from_block = self.get_start_next_block_range();
123        let range = from_block..=self.to_block;
124        if range.is_empty() {
125            return None
126        }
127
128        Some(range)
129    }
130
131    /// Returns the start of the next block range.
132    ///
133    /// 1. If checkpoint exists, use next block.
134    /// 2. If checkpoint doesn't exist, use block 0.
135    pub(crate) fn get_start_next_block_range(&self) -> u64 {
136        self.previous_checkpoint
137            .and_then(|checkpoint| checkpoint.block_number)
138            // Checkpoint exists, prune from the next block after the highest pruned one
139            .map(|block_number| block_number + 1)
140            // No checkpoint exists, prune from genesis
141            .unwrap_or(0)
142    }
143}
144
145#[cfg(test)]
146mod tests {
147    use super::*;
148    use alloy_primitives::B256;
149    use reth_primitives_traits::BlockBody;
150    use reth_provider::{
151        providers::BlockchainProvider2,
152        test_utils::{create_test_provider_factory, MockEthProvider},
153    };
154    use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
155
156    #[test]
157    fn test_prune_input_get_next_tx_num_range_no_to_block() {
158        let input = PruneInput {
159            previous_checkpoint: None,
160            to_block: 10,
161            limiter: PruneLimiter::default(),
162        };
163
164        // Default provider with no block corresponding to block 10
165        let provider = MockEthProvider::default();
166
167        // No block body for block 10, expected None
168        let range = input.get_next_tx_num_range(&provider).expect("Expected range");
169        assert!(range.is_none());
170    }
171
172    #[test]
173    fn test_prune_input_get_next_tx_num_range_no_tx() {
174        let input = PruneInput {
175            previous_checkpoint: None,
176            to_block: 10,
177            limiter: PruneLimiter::default(),
178        };
179
180        let mut rng = generators::rng();
181        let factory = create_test_provider_factory();
182
183        // Generate 10 random blocks with no transactions
184        let blocks = random_block_range(
185            &mut rng,
186            0..=10,
187            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
188        );
189
190        // Insert the blocks into the database
191        let provider_rw = factory.provider_rw().expect("failed to get provider_rw");
192        for block in &blocks {
193            provider_rw
194                .insert_historical_block(
195                    block.clone().seal_with_senders().expect("failed to seal block with senders"),
196                )
197                .expect("failed to insert block");
198        }
199        provider_rw.commit().expect("failed to commit");
200
201        // Create a new provider
202        let provider = BlockchainProvider2::new(factory).unwrap();
203
204        // Since there are no transactions, expected None
205        let range = input.get_next_tx_num_range(&provider).expect("Expected range");
206        assert!(range.is_none());
207    }
208
209    #[test]
210    fn test_prune_input_get_next_tx_num_range_valid() {
211        // Create a new prune input
212        let input = PruneInput {
213            previous_checkpoint: None,
214            to_block: 10,
215            limiter: PruneLimiter::default(),
216        };
217
218        let mut rng = generators::rng();
219        let factory = create_test_provider_factory();
220
221        // Generate 10 random blocks with some transactions
222        let blocks = random_block_range(
223            &mut rng,
224            0..=10,
225            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..5, ..Default::default() },
226        );
227
228        // Insert the blocks into the database
229        let provider_rw = factory.provider_rw().expect("failed to get provider_rw");
230        for block in &blocks {
231            provider_rw
232                .insert_historical_block(
233                    block.clone().seal_with_senders().expect("failed to seal block with senders"),
234                )
235                .expect("failed to insert block");
236        }
237        provider_rw.commit().expect("failed to commit");
238
239        // Create a new provider
240        let provider = BlockchainProvider2::new(factory).unwrap();
241
242        // Get the next tx number range
243        let range = input.get_next_tx_num_range(&provider).expect("Expected range").unwrap();
244
245        // Calculate the total number of transactions
246        let num_txs =
247            blocks.iter().map(|block| block.body.transactions().len() as u64).sum::<u64>();
248
249        assert_eq!(range, 0..=num_txs - 1);
250    }
251
252    #[test]
253    fn test_prune_input_get_next_tx_checkpoint_without_tx_number() {
254        // Create a prune input with a previous checkpoint without a tx number (unexpected)
255        let input = PruneInput {
256            previous_checkpoint: Some(PruneCheckpoint {
257                block_number: Some(5),
258                tx_number: None,
259                prune_mode: PruneMode::Full,
260            }),
261            to_block: 10,
262            limiter: PruneLimiter::default(),
263        };
264
265        let mut rng = generators::rng();
266        let factory = create_test_provider_factory();
267
268        // Generate 10 random blocks
269        let blocks = random_block_range(
270            &mut rng,
271            0..=10,
272            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..5, ..Default::default() },
273        );
274
275        // Insert the blocks into the database
276        let provider_rw = factory.provider_rw().expect("failed to get provider_rw");
277        for block in &blocks {
278            provider_rw
279                .insert_historical_block(
280                    block.clone().seal_with_senders().expect("failed to seal block with senders"),
281                )
282                .expect("failed to insert block");
283        }
284        provider_rw.commit().expect("failed to commit");
285
286        // Create a new provider
287        let provider = BlockchainProvider2::new(factory).unwrap();
288
289        // Fetch the range and check if it is correct
290        let range = input.get_next_tx_num_range(&provider).expect("Expected range").unwrap();
291
292        // Calculate the total number of transactions
293        let num_txs =
294            blocks.iter().map(|block| block.body.transactions().len() as u64).sum::<u64>();
295
296        assert_eq!(range, 0..=num_txs - 1,);
297    }
298
299    #[test]
300    fn test_prune_input_get_next_tx_empty_range() {
301        // Create a new provider via factory
302        let mut rng = generators::rng();
303        let factory = create_test_provider_factory();
304
305        // Generate 10 random blocks
306        let blocks = random_block_range(
307            &mut rng,
308            0..=10,
309            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..5, ..Default::default() },
310        );
311
312        // Insert the blocks into the database
313        let provider_rw = factory.provider_rw().expect("failed to get provider_rw");
314        for block in &blocks {
315            provider_rw
316                .insert_historical_block(
317                    block.clone().seal_with_senders().expect("failed to seal block with senders"),
318                )
319                .expect("failed to insert block");
320        }
321        provider_rw.commit().expect("failed to commit");
322
323        // Create a new provider
324        let provider = BlockchainProvider2::new(factory).unwrap();
325
326        // Get the last tx number
327        // Calculate the total number of transactions
328        let num_txs =
329            blocks.iter().map(|block| block.body.transactions().len() as u64).sum::<u64>();
330        let max_range = num_txs - 1;
331
332        // Create a prune input with a previous checkpoint that is the last tx number
333        let input = PruneInput {
334            previous_checkpoint: Some(PruneCheckpoint {
335                block_number: Some(5),
336                tx_number: Some(max_range),
337                prune_mode: PruneMode::Full,
338            }),
339            to_block: 10,
340            limiter: PruneLimiter::default(),
341        };
342
343        // We expect an empty range since the previous checkpoint is the last tx number
344        let range = input.get_next_tx_num_range(&provider).expect("Expected range");
345        assert!(range.is_none());
346    }
347}