reth_provider/providers/static_file/
mod.rs

1mod manager;
2pub use manager::{StaticFileAccess, StaticFileProvider, StaticFileWriter};
3
4mod jar;
5pub use jar::StaticFileJarProvider;
6
7mod writer;
8pub use writer::{StaticFileProviderRW, StaticFileProviderRWRefMut};
9
10mod metrics;
11
12use reth_nippy_jar::NippyJar;
13use reth_primitives::{static_file::SegmentHeader, StaticFileSegment};
14use reth_storage_errors::provider::{ProviderError, ProviderResult};
15use std::{ops::Deref, sync::Arc};
16
17/// Alias type for each specific `NippyJar`.
18type LoadedJarRef<'a> = dashmap::mapref::one::Ref<'a, (u64, StaticFileSegment), LoadedJar>;
19
20/// Helper type to reuse an associated static file mmap handle on created cursors.
21#[derive(Debug)]
22pub struct LoadedJar {
23    jar: NippyJar<SegmentHeader>,
24    mmap_handle: Arc<reth_nippy_jar::DataReader>,
25}
26
27impl LoadedJar {
28    fn new(jar: NippyJar<SegmentHeader>) -> ProviderResult<Self> {
29        match jar.open_data_reader() {
30            Ok(data_reader) => {
31                let mmap_handle = Arc::new(data_reader);
32                Ok(Self { jar, mmap_handle })
33            }
34            Err(e) => Err(ProviderError::NippyJar(e.to_string())),
35        }
36    }
37
38    /// Returns a clone of the mmap handle that can be used to instantiate a cursor.
39    fn mmap_handle(&self) -> Arc<reth_nippy_jar::DataReader> {
40        self.mmap_handle.clone()
41    }
42
43    const fn segment(&self) -> StaticFileSegment {
44        self.jar.user_header().segment()
45    }
46}
47
48impl Deref for LoadedJar {
49    type Target = NippyJar<SegmentHeader>;
50    fn deref(&self) -> &Self::Target {
51        &self.jar
52    }
53}
54
55#[cfg(test)]
56mod tests {
57    use super::*;
58    use crate::{
59        test_utils::create_test_provider_factory, HeaderProvider, StaticFileProviderFactory,
60    };
61    use alloy_consensus::{Header, Transaction};
62    use alloy_primitives::{BlockHash, TxNumber, B256, U256};
63    use rand::seq::SliceRandom;
64    use reth_db::{
65        test_utils::create_test_static_files_dir, CanonicalHeaders, HeaderNumbers,
66        HeaderTerminalDifficulties, Headers,
67    };
68    use reth_db_api::transaction::DbTxMut;
69    use reth_primitives::{
70        static_file::{find_fixed_range, SegmentRangeInclusive, DEFAULT_BLOCKS_PER_STATIC_FILE},
71        EthPrimitives, Receipt, TransactionSigned,
72    };
73    use reth_storage_api::{ReceiptProvider, TransactionsProvider};
74    use reth_testing_utils::generators::{self, random_header_range};
75    use std::{fmt::Debug, fs, ops::Range, path::Path};
76
77    fn assert_eyre<T: PartialEq + Debug>(got: T, expected: T, msg: &str) -> eyre::Result<()> {
78        if got != expected {
79            eyre::bail!("{msg} | got: {got:?} expected: {expected:?})");
80        }
81        Ok(())
82    }
83
84    #[test]
85    fn test_snap() {
86        // Ranges
87        let row_count = 100u64;
88        let range = 0..=(row_count - 1);
89
90        // Data sources
91        let factory = create_test_provider_factory();
92        let static_files_path = tempfile::tempdir().unwrap();
93        let static_file = static_files_path.path().join(
94            StaticFileSegment::Headers
95                .filename(&find_fixed_range(*range.end(), DEFAULT_BLOCKS_PER_STATIC_FILE)),
96        );
97
98        // Setup data
99        let mut headers = random_header_range(
100            &mut generators::rng(),
101            *range.start()..(*range.end() + 1),
102            B256::random(),
103        );
104
105        let mut provider_rw = factory.provider_rw().unwrap();
106        let tx = provider_rw.tx_mut();
107        let mut td = U256::ZERO;
108        for header in headers.clone() {
109            td += header.header().difficulty;
110            let hash = header.hash();
111
112            tx.put::<CanonicalHeaders>(header.number, hash).unwrap();
113            tx.put::<Headers>(header.number, header.clone().unseal()).unwrap();
114            tx.put::<HeaderTerminalDifficulties>(header.number, td.into()).unwrap();
115            tx.put::<HeaderNumbers>(hash, header.number).unwrap();
116        }
117        provider_rw.commit().unwrap();
118
119        // Create StaticFile
120        {
121            let manager = factory.static_file_provider();
122            let mut writer = manager.latest_writer(StaticFileSegment::Headers).unwrap();
123            let mut td = U256::ZERO;
124
125            for header in headers.clone() {
126                td += header.header().difficulty;
127                let hash = header.hash();
128                writer.append_header(&header.unseal(), td, &hash).unwrap();
129            }
130            writer.commit().unwrap();
131        }
132
133        // Use providers to query Header data and compare if it matches
134        {
135            let db_provider = factory.provider().unwrap();
136            let manager = db_provider.static_file_provider();
137            let jar_provider = manager
138                .get_segment_provider_from_block(StaticFileSegment::Headers, 0, Some(&static_file))
139                .unwrap();
140
141            assert!(!headers.is_empty());
142
143            // Shuffled for chaos.
144            headers.shuffle(&mut generators::rng());
145
146            for header in headers {
147                let header_hash = header.hash();
148                let header = header.unseal();
149
150                // Compare Header
151                assert_eq!(header, db_provider.header(&header_hash).unwrap().unwrap());
152                assert_eq!(header, jar_provider.header_by_number(header.number).unwrap().unwrap());
153
154                // Compare HeaderTerminalDifficulties
155                assert_eq!(
156                    db_provider.header_td(&header_hash).unwrap().unwrap(),
157                    jar_provider.header_td_by_number(header.number).unwrap().unwrap()
158                );
159            }
160        }
161    }
162
163    #[test]
164    fn test_header_truncation() {
165        let (static_dir, _) = create_test_static_files_dir();
166
167        let blocks_per_file = 10; // Number of headers per file
168        let files_per_range = 3; // Number of files per range (data/conf/offset files)
169        let file_set_count = 3; // Number of sets of files to create
170        let initial_file_count = files_per_range * file_set_count + 1; // Includes lockfile
171        let tip = blocks_per_file * file_set_count - 1; // Initial highest block (29 in this case)
172
173        // [ Headers Creation and Commit ]
174        {
175            let sf_rw = StaticFileProvider::<EthPrimitives>::read_write(&static_dir)
176                .expect("Failed to create static file provider")
177                .with_custom_blocks_per_file(blocks_per_file);
178
179            let mut header_writer = sf_rw.latest_writer(StaticFileSegment::Headers).unwrap();
180
181            // Append headers from 0 to the tip (29) and commit
182            let mut header = Header::default();
183            for num in 0..=tip {
184                header.number = num;
185                header_writer
186                    .append_header(&header, U256::default(), &BlockHash::default())
187                    .unwrap();
188            }
189            header_writer.commit().unwrap();
190        }
191
192        // Helper function to prune headers and validate truncation results
193        fn prune_and_validate(
194            writer: &mut StaticFileProviderRWRefMut<'_, EthPrimitives>,
195            sf_rw: &StaticFileProvider<EthPrimitives>,
196            static_dir: impl AsRef<Path>,
197            prune_count: u64,
198            expected_tip: Option<u64>,
199            expected_file_count: u64,
200        ) -> eyre::Result<()> {
201            writer.prune_headers(prune_count)?;
202            writer.commit()?;
203
204            // Validate the highest block after pruning
205            assert_eyre(
206                sf_rw.get_highest_static_file_block(StaticFileSegment::Headers),
207                expected_tip,
208                "block mismatch",
209            )?;
210
211            if let Some(id) = expected_tip {
212                assert_eyre(
213                    sf_rw.header_by_number(id)?.map(|h| h.number),
214                    expected_tip,
215                    "header mismatch",
216                )?;
217            }
218
219            // Validate the number of files remaining in the directory
220            assert_eyre(
221                fs::read_dir(static_dir)?.count(),
222                expected_file_count as usize,
223                "file count mismatch",
224            )?;
225
226            Ok(())
227        }
228
229        // [ Test Cases ]
230        type PruneCount = u64;
231        type ExpectedTip = u64;
232        type ExpectedFileCount = u64;
233        let mut tmp_tip = tip;
234        let test_cases: Vec<(PruneCount, Option<ExpectedTip>, ExpectedFileCount)> = vec![
235            // Case 0: Pruning 1 header
236            {
237                tmp_tip -= 1;
238                (1, Some(tmp_tip), initial_file_count)
239            },
240            // Case 1: Pruning remaining rows from file should result in its deletion
241            {
242                tmp_tip -= blocks_per_file - 1;
243                (blocks_per_file - 1, Some(tmp_tip), initial_file_count - files_per_range)
244            },
245            // Case 2: Pruning more headers than a single file has (tip reduced by
246            // blocks_per_file + 1) should result in a file set deletion
247            {
248                tmp_tip -= blocks_per_file + 1;
249                (blocks_per_file + 1, Some(tmp_tip), initial_file_count - files_per_range * 2)
250            },
251            // Case 3: Pruning all remaining headers from the file except the genesis header
252            {
253                (
254                    tmp_tip,
255                    Some(0),             // Only genesis block remains
256                    files_per_range + 1, // The file set with block 0 should remain
257                )
258            },
259            // Case 4: Pruning the genesis header (should not delete the file set with block 0)
260            {
261                (
262                    1,
263                    None,                // No blocks left
264                    files_per_range + 1, // The file set with block 0 remains
265                )
266            },
267        ];
268
269        // Test cases execution
270        {
271            let sf_rw = StaticFileProvider::read_write(&static_dir)
272                .expect("Failed to create static file provider")
273                .with_custom_blocks_per_file(blocks_per_file);
274
275            assert_eq!(sf_rw.get_highest_static_file_block(StaticFileSegment::Headers), Some(tip));
276            assert_eq!(
277                fs::read_dir(static_dir.as_ref()).unwrap().count(),
278                initial_file_count as usize
279            );
280
281            let mut header_writer = sf_rw.latest_writer(StaticFileSegment::Headers).unwrap();
282
283            for (case, (prune_count, expected_tip, expected_file_count)) in
284                test_cases.into_iter().enumerate()
285            {
286                prune_and_validate(
287                    &mut header_writer,
288                    &sf_rw,
289                    &static_dir,
290                    prune_count,
291                    expected_tip,
292                    expected_file_count,
293                )
294                .map_err(|err| eyre::eyre!("Test case {case}: {err}"))
295                .unwrap();
296            }
297        }
298    }
299
300    /// 3 block ranges are built
301    ///
302    /// for `blocks_per_file = 10`:
303    /// * `0..=9` : except genesis, every block has a tx/receipt
304    /// * `10..=19`: no txs/receipts
305    /// * `20..=29`: only one tx/receipt
306    fn setup_tx_based_scenario(
307        sf_rw: &StaticFileProvider<EthPrimitives>,
308        segment: StaticFileSegment,
309        blocks_per_file: u64,
310    ) {
311        fn setup_block_ranges(
312            writer: &mut StaticFileProviderRWRefMut<'_, EthPrimitives>,
313            sf_rw: &StaticFileProvider<EthPrimitives>,
314            segment: StaticFileSegment,
315            block_range: &Range<u64>,
316            mut tx_count: u64,
317            next_tx_num: &mut u64,
318        ) {
319            let mut receipt = Receipt::default();
320            let mut tx = TransactionSigned::default();
321
322            for block in block_range.clone() {
323                writer.increment_block(block).unwrap();
324
325                // Append transaction/receipt if there's still a transaction count to append
326                if tx_count > 0 {
327                    if segment.is_receipts() {
328                        // Used as ID for validation
329                        receipt.cumulative_gas_used = *next_tx_num;
330                        writer.append_receipt(*next_tx_num, &receipt).unwrap();
331                    } else {
332                        // Used as ID for validation
333                        tx.transaction.set_nonce(*next_tx_num);
334                        writer.append_transaction(*next_tx_num, &tx).unwrap();
335                    }
336                    *next_tx_num += 1;
337                    tx_count -= 1;
338                }
339            }
340            writer.commit().unwrap();
341
342            // Calculate expected values based on the range and transactions
343            let expected_block = block_range.end - 1;
344            let expected_tx = if tx_count == 0 { *next_tx_num - 1 } else { *next_tx_num };
345
346            // Perform assertions after processing the blocks
347            assert_eq!(sf_rw.get_highest_static_file_block(segment), Some(expected_block),);
348            assert_eq!(sf_rw.get_highest_static_file_tx(segment), Some(expected_tx),);
349        }
350
351        // Define the block ranges and transaction counts as vectors
352        let block_ranges = [
353            0..blocks_per_file,
354            blocks_per_file..blocks_per_file * 2,
355            blocks_per_file * 2..blocks_per_file * 3,
356        ];
357
358        let tx_counts = [
359            blocks_per_file - 1, // First range: tx per block except genesis
360            0,                   // Second range: no transactions
361            1,                   // Third range: 1 transaction in the second block
362        ];
363
364        let mut writer = sf_rw.latest_writer(segment).unwrap();
365        let mut next_tx_num = 0;
366
367        // Loop through setup scenarios
368        for (block_range, tx_count) in block_ranges.iter().zip(tx_counts.iter()) {
369            setup_block_ranges(
370                &mut writer,
371                sf_rw,
372                segment,
373                block_range,
374                *tx_count,
375                &mut next_tx_num,
376            );
377        }
378
379        // Ensure that scenario was properly setup
380        let expected_tx_ranges = vec![
381            Some(SegmentRangeInclusive::new(0, 8)),
382            None,
383            Some(SegmentRangeInclusive::new(9, 9)),
384        ];
385
386        block_ranges.iter().zip(expected_tx_ranges).for_each(|(block_range, expected_tx_range)| {
387            assert_eq!(
388                sf_rw
389                    .get_segment_provider_from_block(segment, block_range.start, None)
390                    .unwrap()
391                    .user_header()
392                    .tx_range(),
393                expected_tx_range.as_ref()
394            );
395        });
396
397        // Ensure transaction index
398        let tx_index = sf_rw.tx_index().read();
399        let expected_tx_index =
400            vec![(8, SegmentRangeInclusive::new(0, 9)), (9, SegmentRangeInclusive::new(20, 29))];
401        assert_eq!(
402            tx_index.get(&segment).map(|index| index.iter().map(|(k, v)| (*k, *v)).collect()),
403            (!expected_tx_index.is_empty()).then_some(expected_tx_index),
404            "tx index mismatch",
405        );
406    }
407
408    #[test]
409    fn test_tx_based_truncation() {
410        let segments = [StaticFileSegment::Transactions, StaticFileSegment::Receipts];
411        let blocks_per_file = 10; // Number of blocks per file
412        let files_per_range = 3; // Number of files per range (data/conf/offset files)
413        let file_set_count = 3; // Number of sets of files to create
414        let initial_file_count = files_per_range * file_set_count + 1; // Includes lockfile
415
416        #[allow(clippy::too_many_arguments)]
417        fn prune_and_validate(
418            sf_rw: &StaticFileProvider<EthPrimitives>,
419            static_dir: impl AsRef<Path>,
420            segment: StaticFileSegment,
421            prune_count: u64,
422            last_block: u64,
423            expected_tx_tip: Option<u64>,
424            expected_file_count: i32,
425            expected_tx_index: Vec<(TxNumber, SegmentRangeInclusive)>,
426        ) -> eyre::Result<()> {
427            let mut writer = sf_rw.latest_writer(segment)?;
428
429            // Prune transactions or receipts based on the segment type
430            if segment.is_receipts() {
431                writer.prune_receipts(prune_count, last_block)?;
432            } else {
433                writer.prune_transactions(prune_count, last_block)?;
434            }
435            writer.commit()?;
436
437            // Verify the highest block and transaction tips
438            assert_eyre(
439                sf_rw.get_highest_static_file_block(segment),
440                Some(last_block),
441                "block mismatch",
442            )?;
443            assert_eyre(sf_rw.get_highest_static_file_tx(segment), expected_tx_tip, "tx mismatch")?;
444
445            // Verify that transactions and receipts are returned correctly. Uses
446            // cumulative_gas_used & nonce as ids.
447            if let Some(id) = expected_tx_tip {
448                if segment.is_receipts() {
449                    assert_eyre(
450                        expected_tx_tip,
451                        sf_rw.receipt(id)?.map(|r| r.cumulative_gas_used),
452                        "tx mismatch",
453                    )?;
454                } else {
455                    assert_eyre(
456                        expected_tx_tip,
457                        sf_rw.transaction_by_id(id)?.map(|t| t.nonce()),
458                        "tx mismatch",
459                    )?;
460                }
461            }
462
463            // Ensure the file count has reduced as expected
464            assert_eyre(
465                fs::read_dir(static_dir)?.count(),
466                expected_file_count as usize,
467                "file count mismatch",
468            )?;
469
470            // Ensure that the inner tx index (max_tx -> block range) is as expected
471            let tx_index = sf_rw.tx_index().read();
472            assert_eyre(
473                tx_index.get(&segment).map(|index| index.iter().map(|(k, v)| (*k, *v)).collect()),
474                (!expected_tx_index.is_empty()).then_some(expected_tx_index),
475                "tx index mismatch",
476            )?;
477
478            Ok(())
479        }
480
481        for segment in segments {
482            let (static_dir, _) = create_test_static_files_dir();
483
484            let sf_rw = StaticFileProvider::read_write(&static_dir)
485                .expect("Failed to create static file provider")
486                .with_custom_blocks_per_file(blocks_per_file);
487
488            setup_tx_based_scenario(&sf_rw, segment, blocks_per_file);
489
490            let sf_rw = StaticFileProvider::read_write(&static_dir)
491                .expect("Failed to create static file provider")
492                .with_custom_blocks_per_file(blocks_per_file);
493            let highest_tx = sf_rw.get_highest_static_file_tx(segment).unwrap();
494
495            // Test cases
496            // [prune_count, last_block, expected_tx_tip, expected_file_count, expected_tx_index)
497            let test_cases = vec![
498                // Case 0: 20..=29 has only one tx. Prune the only tx of the block range.
499                // It ensures that the file is not deleted even though there are no rows, since the
500                // `last_block` which is passed to the prune method is the first
501                // block of the range.
502                (
503                    1,
504                    blocks_per_file * 2,
505                    Some(highest_tx - 1),
506                    initial_file_count,
507                    vec![(highest_tx - 1, SegmentRangeInclusive::new(0, 9))],
508                ),
509                // Case 1: 10..=19 has no txs. There are no txes in the whole block range, but want
510                // to unwind to block 9. Ensures that the 20..=29 and 10..=19 files
511                // are deleted.
512                (
513                    0,
514                    blocks_per_file - 1,
515                    Some(highest_tx - 1),
516                    files_per_range + 1, // includes lockfile
517                    vec![(highest_tx - 1, SegmentRangeInclusive::new(0, 9))],
518                ),
519                // Case 2: Prune most txs up to block 1.
520                (
521                    highest_tx - 1,
522                    1,
523                    Some(0),
524                    files_per_range + 1,
525                    vec![(0, SegmentRangeInclusive::new(0, 1))],
526                ),
527                // Case 3: Prune remaining tx and ensure that file is not deleted.
528                (1, 0, None, files_per_range + 1, vec![]),
529            ];
530
531            // Loop through test cases
532            for (
533                case,
534                (prune_count, last_block, expected_tx_tip, expected_file_count, expected_tx_index),
535            ) in test_cases.into_iter().enumerate()
536            {
537                prune_and_validate(
538                    &sf_rw,
539                    &static_dir,
540                    segment,
541                    prune_count,
542                    last_block,
543                    expected_tx_tip,
544                    expected_file_count,
545                    expected_tx_index,
546                )
547                .map_err(|err| eyre::eyre!("Test case {case}: {err}"))
548                .unwrap();
549            }
550        }
551    }
552}