1mod manager;
2pub use manager::{StaticFileAccess, StaticFileProvider, StaticFileWriter};
3
4mod jar;
5pub use jar::StaticFileJarProvider;
6
7mod writer;
8pub use writer::{StaticFileProviderRW, StaticFileProviderRWRefMut};
9
10mod metrics;
11
12use reth_nippy_jar::NippyJar;
13use reth_primitives::{static_file::SegmentHeader, StaticFileSegment};
14use reth_storage_errors::provider::{ProviderError, ProviderResult};
15use std::{ops::Deref, sync::Arc};
16
17type LoadedJarRef<'a> = dashmap::mapref::one::Ref<'a, (u64, StaticFileSegment), LoadedJar>;
19
20#[derive(Debug)]
22pub struct LoadedJar {
23 jar: NippyJar<SegmentHeader>,
24 mmap_handle: Arc<reth_nippy_jar::DataReader>,
25}
26
27impl LoadedJar {
28 fn new(jar: NippyJar<SegmentHeader>) -> ProviderResult<Self> {
29 match jar.open_data_reader() {
30 Ok(data_reader) => {
31 let mmap_handle = Arc::new(data_reader);
32 Ok(Self { jar, mmap_handle })
33 }
34 Err(e) => Err(ProviderError::NippyJar(e.to_string())),
35 }
36 }
37
38 fn mmap_handle(&self) -> Arc<reth_nippy_jar::DataReader> {
40 self.mmap_handle.clone()
41 }
42
43 const fn segment(&self) -> StaticFileSegment {
44 self.jar.user_header().segment()
45 }
46}
47
48impl Deref for LoadedJar {
49 type Target = NippyJar<SegmentHeader>;
50 fn deref(&self) -> &Self::Target {
51 &self.jar
52 }
53}
54
55#[cfg(test)]
56mod tests {
57 use super::*;
58 use crate::{
59 test_utils::create_test_provider_factory, HeaderProvider, StaticFileProviderFactory,
60 };
61 use alloy_consensus::{Header, Transaction};
62 use alloy_primitives::{BlockHash, TxNumber, B256, U256};
63 use rand::seq::SliceRandom;
64 use reth_db::{
65 test_utils::create_test_static_files_dir, CanonicalHeaders, HeaderNumbers,
66 HeaderTerminalDifficulties, Headers,
67 };
68 use reth_db_api::transaction::DbTxMut;
69 use reth_primitives::{
70 static_file::{find_fixed_range, SegmentRangeInclusive, DEFAULT_BLOCKS_PER_STATIC_FILE},
71 EthPrimitives, Receipt, TransactionSigned,
72 };
73 use reth_storage_api::{ReceiptProvider, TransactionsProvider};
74 use reth_testing_utils::generators::{self, random_header_range};
75 use std::{fmt::Debug, fs, ops::Range, path::Path};
76
77 fn assert_eyre<T: PartialEq + Debug>(got: T, expected: T, msg: &str) -> eyre::Result<()> {
78 if got != expected {
79 eyre::bail!("{msg} | got: {got:?} expected: {expected:?})");
80 }
81 Ok(())
82 }
83
84 #[test]
85 fn test_snap() {
86 let row_count = 100u64;
88 let range = 0..=(row_count - 1);
89
90 let factory = create_test_provider_factory();
92 let static_files_path = tempfile::tempdir().unwrap();
93 let static_file = static_files_path.path().join(
94 StaticFileSegment::Headers
95 .filename(&find_fixed_range(*range.end(), DEFAULT_BLOCKS_PER_STATIC_FILE)),
96 );
97
98 let mut headers = random_header_range(
100 &mut generators::rng(),
101 *range.start()..(*range.end() + 1),
102 B256::random(),
103 );
104
105 let mut provider_rw = factory.provider_rw().unwrap();
106 let tx = provider_rw.tx_mut();
107 let mut td = U256::ZERO;
108 for header in headers.clone() {
109 td += header.header().difficulty;
110 let hash = header.hash();
111
112 tx.put::<CanonicalHeaders>(header.number, hash).unwrap();
113 tx.put::<Headers>(header.number, header.clone().unseal()).unwrap();
114 tx.put::<HeaderTerminalDifficulties>(header.number, td.into()).unwrap();
115 tx.put::<HeaderNumbers>(hash, header.number).unwrap();
116 }
117 provider_rw.commit().unwrap();
118
119 {
121 let manager = factory.static_file_provider();
122 let mut writer = manager.latest_writer(StaticFileSegment::Headers).unwrap();
123 let mut td = U256::ZERO;
124
125 for header in headers.clone() {
126 td += header.header().difficulty;
127 let hash = header.hash();
128 writer.append_header(&header.unseal(), td, &hash).unwrap();
129 }
130 writer.commit().unwrap();
131 }
132
133 {
135 let db_provider = factory.provider().unwrap();
136 let manager = db_provider.static_file_provider();
137 let jar_provider = manager
138 .get_segment_provider_from_block(StaticFileSegment::Headers, 0, Some(&static_file))
139 .unwrap();
140
141 assert!(!headers.is_empty());
142
143 headers.shuffle(&mut generators::rng());
145
146 for header in headers {
147 let header_hash = header.hash();
148 let header = header.unseal();
149
150 assert_eq!(header, db_provider.header(&header_hash).unwrap().unwrap());
152 assert_eq!(header, jar_provider.header_by_number(header.number).unwrap().unwrap());
153
154 assert_eq!(
156 db_provider.header_td(&header_hash).unwrap().unwrap(),
157 jar_provider.header_td_by_number(header.number).unwrap().unwrap()
158 );
159 }
160 }
161 }
162
163 #[test]
164 fn test_header_truncation() {
165 let (static_dir, _) = create_test_static_files_dir();
166
167 let blocks_per_file = 10; let files_per_range = 3; let file_set_count = 3; let initial_file_count = files_per_range * file_set_count + 1; let tip = blocks_per_file * file_set_count - 1; {
175 let sf_rw = StaticFileProvider::<EthPrimitives>::read_write(&static_dir)
176 .expect("Failed to create static file provider")
177 .with_custom_blocks_per_file(blocks_per_file);
178
179 let mut header_writer = sf_rw.latest_writer(StaticFileSegment::Headers).unwrap();
180
181 let mut header = Header::default();
183 for num in 0..=tip {
184 header.number = num;
185 header_writer
186 .append_header(&header, U256::default(), &BlockHash::default())
187 .unwrap();
188 }
189 header_writer.commit().unwrap();
190 }
191
192 fn prune_and_validate(
194 writer: &mut StaticFileProviderRWRefMut<'_, EthPrimitives>,
195 sf_rw: &StaticFileProvider<EthPrimitives>,
196 static_dir: impl AsRef<Path>,
197 prune_count: u64,
198 expected_tip: Option<u64>,
199 expected_file_count: u64,
200 ) -> eyre::Result<()> {
201 writer.prune_headers(prune_count)?;
202 writer.commit()?;
203
204 assert_eyre(
206 sf_rw.get_highest_static_file_block(StaticFileSegment::Headers),
207 expected_tip,
208 "block mismatch",
209 )?;
210
211 if let Some(id) = expected_tip {
212 assert_eyre(
213 sf_rw.header_by_number(id)?.map(|h| h.number),
214 expected_tip,
215 "header mismatch",
216 )?;
217 }
218
219 assert_eyre(
221 fs::read_dir(static_dir)?.count(),
222 expected_file_count as usize,
223 "file count mismatch",
224 )?;
225
226 Ok(())
227 }
228
229 type PruneCount = u64;
231 type ExpectedTip = u64;
232 type ExpectedFileCount = u64;
233 let mut tmp_tip = tip;
234 let test_cases: Vec<(PruneCount, Option<ExpectedTip>, ExpectedFileCount)> = vec![
235 {
237 tmp_tip -= 1;
238 (1, Some(tmp_tip), initial_file_count)
239 },
240 {
242 tmp_tip -= blocks_per_file - 1;
243 (blocks_per_file - 1, Some(tmp_tip), initial_file_count - files_per_range)
244 },
245 {
248 tmp_tip -= blocks_per_file + 1;
249 (blocks_per_file + 1, Some(tmp_tip), initial_file_count - files_per_range * 2)
250 },
251 {
253 (
254 tmp_tip,
255 Some(0), files_per_range + 1, )
258 },
259 {
261 (
262 1,
263 None, files_per_range + 1, )
266 },
267 ];
268
269 {
271 let sf_rw = StaticFileProvider::read_write(&static_dir)
272 .expect("Failed to create static file provider")
273 .with_custom_blocks_per_file(blocks_per_file);
274
275 assert_eq!(sf_rw.get_highest_static_file_block(StaticFileSegment::Headers), Some(tip));
276 assert_eq!(
277 fs::read_dir(static_dir.as_ref()).unwrap().count(),
278 initial_file_count as usize
279 );
280
281 let mut header_writer = sf_rw.latest_writer(StaticFileSegment::Headers).unwrap();
282
283 for (case, (prune_count, expected_tip, expected_file_count)) in
284 test_cases.into_iter().enumerate()
285 {
286 prune_and_validate(
287 &mut header_writer,
288 &sf_rw,
289 &static_dir,
290 prune_count,
291 expected_tip,
292 expected_file_count,
293 )
294 .map_err(|err| eyre::eyre!("Test case {case}: {err}"))
295 .unwrap();
296 }
297 }
298 }
299
300 fn setup_tx_based_scenario(
307 sf_rw: &StaticFileProvider<EthPrimitives>,
308 segment: StaticFileSegment,
309 blocks_per_file: u64,
310 ) {
311 fn setup_block_ranges(
312 writer: &mut StaticFileProviderRWRefMut<'_, EthPrimitives>,
313 sf_rw: &StaticFileProvider<EthPrimitives>,
314 segment: StaticFileSegment,
315 block_range: &Range<u64>,
316 mut tx_count: u64,
317 next_tx_num: &mut u64,
318 ) {
319 let mut receipt = Receipt::default();
320 let mut tx = TransactionSigned::default();
321
322 for block in block_range.clone() {
323 writer.increment_block(block).unwrap();
324
325 if tx_count > 0 {
327 if segment.is_receipts() {
328 receipt.cumulative_gas_used = *next_tx_num;
330 writer.append_receipt(*next_tx_num, &receipt).unwrap();
331 } else {
332 tx.transaction.set_nonce(*next_tx_num);
334 writer.append_transaction(*next_tx_num, &tx).unwrap();
335 }
336 *next_tx_num += 1;
337 tx_count -= 1;
338 }
339 }
340 writer.commit().unwrap();
341
342 let expected_block = block_range.end - 1;
344 let expected_tx = if tx_count == 0 { *next_tx_num - 1 } else { *next_tx_num };
345
346 assert_eq!(sf_rw.get_highest_static_file_block(segment), Some(expected_block),);
348 assert_eq!(sf_rw.get_highest_static_file_tx(segment), Some(expected_tx),);
349 }
350
351 let block_ranges = [
353 0..blocks_per_file,
354 blocks_per_file..blocks_per_file * 2,
355 blocks_per_file * 2..blocks_per_file * 3,
356 ];
357
358 let tx_counts = [
359 blocks_per_file - 1, 0, 1, ];
363
364 let mut writer = sf_rw.latest_writer(segment).unwrap();
365 let mut next_tx_num = 0;
366
367 for (block_range, tx_count) in block_ranges.iter().zip(tx_counts.iter()) {
369 setup_block_ranges(
370 &mut writer,
371 sf_rw,
372 segment,
373 block_range,
374 *tx_count,
375 &mut next_tx_num,
376 );
377 }
378
379 let expected_tx_ranges = vec![
381 Some(SegmentRangeInclusive::new(0, 8)),
382 None,
383 Some(SegmentRangeInclusive::new(9, 9)),
384 ];
385
386 block_ranges.iter().zip(expected_tx_ranges).for_each(|(block_range, expected_tx_range)| {
387 assert_eq!(
388 sf_rw
389 .get_segment_provider_from_block(segment, block_range.start, None)
390 .unwrap()
391 .user_header()
392 .tx_range(),
393 expected_tx_range.as_ref()
394 );
395 });
396
397 let tx_index = sf_rw.tx_index().read();
399 let expected_tx_index =
400 vec![(8, SegmentRangeInclusive::new(0, 9)), (9, SegmentRangeInclusive::new(20, 29))];
401 assert_eq!(
402 tx_index.get(&segment).map(|index| index.iter().map(|(k, v)| (*k, *v)).collect()),
403 (!expected_tx_index.is_empty()).then_some(expected_tx_index),
404 "tx index mismatch",
405 );
406 }
407
408 #[test]
409 fn test_tx_based_truncation() {
410 let segments = [StaticFileSegment::Transactions, StaticFileSegment::Receipts];
411 let blocks_per_file = 10; let files_per_range = 3; let file_set_count = 3; let initial_file_count = files_per_range * file_set_count + 1; #[allow(clippy::too_many_arguments)]
417 fn prune_and_validate(
418 sf_rw: &StaticFileProvider<EthPrimitives>,
419 static_dir: impl AsRef<Path>,
420 segment: StaticFileSegment,
421 prune_count: u64,
422 last_block: u64,
423 expected_tx_tip: Option<u64>,
424 expected_file_count: i32,
425 expected_tx_index: Vec<(TxNumber, SegmentRangeInclusive)>,
426 ) -> eyre::Result<()> {
427 let mut writer = sf_rw.latest_writer(segment)?;
428
429 if segment.is_receipts() {
431 writer.prune_receipts(prune_count, last_block)?;
432 } else {
433 writer.prune_transactions(prune_count, last_block)?;
434 }
435 writer.commit()?;
436
437 assert_eyre(
439 sf_rw.get_highest_static_file_block(segment),
440 Some(last_block),
441 "block mismatch",
442 )?;
443 assert_eyre(sf_rw.get_highest_static_file_tx(segment), expected_tx_tip, "tx mismatch")?;
444
445 if let Some(id) = expected_tx_tip {
448 if segment.is_receipts() {
449 assert_eyre(
450 expected_tx_tip,
451 sf_rw.receipt(id)?.map(|r| r.cumulative_gas_used),
452 "tx mismatch",
453 )?;
454 } else {
455 assert_eyre(
456 expected_tx_tip,
457 sf_rw.transaction_by_id(id)?.map(|t| t.nonce()),
458 "tx mismatch",
459 )?;
460 }
461 }
462
463 assert_eyre(
465 fs::read_dir(static_dir)?.count(),
466 expected_file_count as usize,
467 "file count mismatch",
468 )?;
469
470 let tx_index = sf_rw.tx_index().read();
472 assert_eyre(
473 tx_index.get(&segment).map(|index| index.iter().map(|(k, v)| (*k, *v)).collect()),
474 (!expected_tx_index.is_empty()).then_some(expected_tx_index),
475 "tx index mismatch",
476 )?;
477
478 Ok(())
479 }
480
481 for segment in segments {
482 let (static_dir, _) = create_test_static_files_dir();
483
484 let sf_rw = StaticFileProvider::read_write(&static_dir)
485 .expect("Failed to create static file provider")
486 .with_custom_blocks_per_file(blocks_per_file);
487
488 setup_tx_based_scenario(&sf_rw, segment, blocks_per_file);
489
490 let sf_rw = StaticFileProvider::read_write(&static_dir)
491 .expect("Failed to create static file provider")
492 .with_custom_blocks_per_file(blocks_per_file);
493 let highest_tx = sf_rw.get_highest_static_file_tx(segment).unwrap();
494
495 let test_cases = vec![
498 (
503 1,
504 blocks_per_file * 2,
505 Some(highest_tx - 1),
506 initial_file_count,
507 vec![(highest_tx - 1, SegmentRangeInclusive::new(0, 9))],
508 ),
509 (
513 0,
514 blocks_per_file - 1,
515 Some(highest_tx - 1),
516 files_per_range + 1, vec![(highest_tx - 1, SegmentRangeInclusive::new(0, 9))],
518 ),
519 (
521 highest_tx - 1,
522 1,
523 Some(0),
524 files_per_range + 1,
525 vec![(0, SegmentRangeInclusive::new(0, 1))],
526 ),
527 (1, 0, None, files_per_range + 1, vec![]),
529 ];
530
531 for (
533 case,
534 (prune_count, last_block, expected_tx_tip, expected_file_count, expected_tx_index),
535 ) in test_cases.into_iter().enumerate()
536 {
537 prune_and_validate(
538 &sf_rw,
539 &static_dir,
540 segment,
541 prune_count,
542 last_block,
543 expected_tx_tip,
544 expected_file_count,
545 expected_tx_index,
546 )
547 .map_err(|err| eyre::eyre!("Test case {case}: {err}"))
548 .unwrap();
549 }
550 }
551 }
552}