reth_provider/providers/static_file/
manager.rs

1use super::{
2    metrics::StaticFileProviderMetrics, writer::StaticFileWriters, LoadedJar,
3    StaticFileJarProvider, StaticFileProviderRW, StaticFileProviderRWRefMut,
4};
5use crate::{
6    to_range, BlockHashReader, BlockNumReader, BlockReader, BlockSource, HeaderProvider,
7    ReceiptProvider, StageCheckpointReader, StatsReader, TransactionVariant, TransactionsProvider,
8    TransactionsProviderExt, WithdrawalsProvider,
9};
10use alloy_consensus::{
11    transaction::{SignerRecoverable, TransactionMeta},
12    Header,
13};
14use alloy_eips::{eip2718::Encodable2718, eip4895::Withdrawals, BlockHashOrNumber};
15use alloy_primitives::{
16    b256, keccak256, Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256,
17};
18use dashmap::DashMap;
19use notify::{RecommendedWatcher, RecursiveMode, Watcher};
20use parking_lot::RwLock;
21use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec};
22use reth_db::{
23    lockfile::StorageLock,
24    static_file::{
25        iter_static_files, BlockHashMask, BodyIndicesMask, HeaderMask, HeaderWithHashMask,
26        ReceiptMask, StaticFileCursor, TDWithHashMask, TransactionMask,
27    },
28};
29use reth_db_api::{
30    cursor::DbCursorRO,
31    models::StoredBlockBodyIndices,
32    table::{Decompress, Table, Value},
33    tables,
34    transaction::DbTx,
35};
36use reth_ethereum_primitives::{Receipt, TransactionSigned};
37use reth_nippy_jar::{NippyJar, NippyJarChecker, CONFIG_FILE_EXTENSION};
38use reth_node_types::{FullNodePrimitives, NodePrimitives};
39use reth_primitives_traits::{RecoveredBlock, SealedBlock, SealedHeader, SignedTransaction};
40use reth_stages_types::{PipelineTarget, StageId};
41use reth_static_file_types::{
42    find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive, StaticFileSegment,
43    DEFAULT_BLOCKS_PER_STATIC_FILE,
44};
45use reth_storage_api::{BlockBodyIndicesProvider, DBProvider, OmmersProvider};
46use reth_storage_errors::provider::{ProviderError, ProviderResult};
47use std::{
48    collections::{hash_map::Entry, BTreeMap, HashMap},
49    fmt::Debug,
50    marker::PhantomData,
51    ops::{Deref, Range, RangeBounds, RangeInclusive},
52    path::{Path, PathBuf},
53    sync::{mpsc, Arc},
54};
55use tracing::{info, trace, warn};
56
57/// Alias type for a map that can be queried for block ranges from a transaction
58/// segment respectively. It uses `TxNumber` to represent the transaction end of a static file
59/// range.
60type SegmentRanges = HashMap<StaticFileSegment, BTreeMap<TxNumber, SegmentRangeInclusive>>;
61
62/// Access mode on a static file provider. RO/RW.
63#[derive(Debug, Default, PartialEq, Eq)]
64pub enum StaticFileAccess {
65    /// Read-only access.
66    #[default]
67    RO,
68    /// Read-write access.
69    RW,
70}
71
72impl StaticFileAccess {
73    /// Returns `true` if read-only access.
74    pub const fn is_read_only(&self) -> bool {
75        matches!(self, Self::RO)
76    }
77
78    /// Returns `true` if read-write access.
79    pub const fn is_read_write(&self) -> bool {
80        matches!(self, Self::RW)
81    }
82}
83
84/// [`StaticFileProvider`] manages all existing [`StaticFileJarProvider`].
85///
86/// "Static files" contain immutable chain history data, such as:
87///  - transactions
88///  - headers
89///  - receipts
90///
91/// This provider type is responsible for reading and writing to static files.
92#[derive(Debug)]
93pub struct StaticFileProvider<N>(pub(crate) Arc<StaticFileProviderInner<N>>);
94
95impl<N> Clone for StaticFileProvider<N> {
96    fn clone(&self) -> Self {
97        Self(self.0.clone())
98    }
99}
100
101impl<N: NodePrimitives> StaticFileProvider<N> {
102    /// Creates a new [`StaticFileProvider`] with the given [`StaticFileAccess`].
103    fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
104        let provider = Self(Arc::new(StaticFileProviderInner::new(path, access)?));
105        provider.initialize_index()?;
106        Ok(provider)
107    }
108
109    /// Creates a new [`StaticFileProvider`] with read-only access.
110    ///
111    /// Set `watch_directory` to `true` to track the most recent changes in static files. Otherwise,
112    /// new data won't be detected or queryable.
113    ///
114    /// Watching is recommended if the read-only provider is used on a directory that an active node
115    /// instance is modifying.
116    ///
117    /// See also [`StaticFileProvider::watch_directory`].
118    pub fn read_only(path: impl AsRef<Path>, watch_directory: bool) -> ProviderResult<Self> {
119        let provider = Self::new(path, StaticFileAccess::RO)?;
120
121        if watch_directory {
122            provider.watch_directory();
123        }
124
125        Ok(provider)
126    }
127
128    /// Creates a new [`StaticFileProvider`] with read-write access.
129    pub fn read_write(path: impl AsRef<Path>) -> ProviderResult<Self> {
130        Self::new(path, StaticFileAccess::RW)
131    }
132
133    /// Watches the directory for changes and updates the in-memory index when modifications
134    /// are detected.
135    ///
136    /// This may be necessary, since a non-node process that owns a [`StaticFileProvider`] does not
137    /// receive `update_index` notifications from a node that appends/truncates data.
138    pub fn watch_directory(&self) {
139        let provider = self.clone();
140        std::thread::spawn(move || {
141            let (tx, rx) = std::sync::mpsc::channel();
142            let mut watcher = RecommendedWatcher::new(
143                move |res| tx.send(res).unwrap(),
144                notify::Config::default(),
145            )
146            .expect("failed to create watcher");
147
148            watcher
149                .watch(&provider.path, RecursiveMode::NonRecursive)
150                .expect("failed to watch path");
151
152            // Some backends send repeated modified events
153            let mut last_event_timestamp = None;
154
155            while let Ok(res) = rx.recv() {
156                match res {
157                    Ok(event) => {
158                        // We only care about modified data events
159                        if !matches!(
160                            event.kind,
161                            notify::EventKind::Modify(_) |
162                                notify::EventKind::Create(_) |
163                                notify::EventKind::Remove(_)
164                        ) {
165                            continue
166                        }
167
168                        // We only trigger a re-initialization if a configuration file was
169                        // modified. This means that a
170                        // static_file_provider.commit() was called on the node after
171                        // appending/truncating rows
172                        for segment in event.paths {
173                            // Ensure it's a file with the .conf extension
174                            if segment
175                                .extension()
176                                .is_none_or(|s| s.to_str() != Some(CONFIG_FILE_EXTENSION))
177                            {
178                                continue
179                            }
180
181                            // Ensure it's well formatted static file name
182                            if StaticFileSegment::parse_filename(
183                                &segment.file_stem().expect("qed").to_string_lossy(),
184                            )
185                            .is_none()
186                            {
187                                continue
188                            }
189
190                            // If we can read the metadata and modified timestamp, ensure this is
191                            // not an old or repeated event.
192                            if let Ok(current_modified_timestamp) =
193                                std::fs::metadata(&segment).and_then(|m| m.modified())
194                            {
195                                if last_event_timestamp.is_some_and(|last_timestamp| {
196                                    last_timestamp >= current_modified_timestamp
197                                }) {
198                                    continue
199                                }
200                                last_event_timestamp = Some(current_modified_timestamp);
201                            }
202
203                            info!(target: "providers::static_file", updated_file = ?segment.file_stem(), "re-initializing static file provider index");
204                            if let Err(err) = provider.initialize_index() {
205                                warn!(target: "providers::static_file", "failed to re-initialize index: {err}");
206                            }
207                            break
208                        }
209                    }
210
211                    Err(err) => warn!(target: "providers::watcher", "watch error: {err:?}"),
212                }
213            }
214        });
215    }
216}
217
218impl<N: NodePrimitives> Deref for StaticFileProvider<N> {
219    type Target = StaticFileProviderInner<N>;
220
221    fn deref(&self) -> &Self::Target {
222        &self.0
223    }
224}
225
226/// [`StaticFileProviderInner`] manages all existing [`StaticFileJarProvider`].
227#[derive(Debug)]
228pub struct StaticFileProviderInner<N> {
229    /// Maintains a map which allows for concurrent access to different `NippyJars`, over different
230    /// segments and ranges.
231    map: DashMap<(BlockNumber, StaticFileSegment), LoadedJar>,
232    /// Max static file block for each segment
233    static_files_max_block: RwLock<HashMap<StaticFileSegment, u64>>,
234    /// Available static file block ranges on disk indexed by max transactions.
235    static_files_tx_index: RwLock<SegmentRanges>,
236    /// Directory where `static_files` are located
237    path: PathBuf,
238    /// Maintains a writer set of [`StaticFileSegment`].
239    writers: StaticFileWriters<N>,
240    /// Metrics for the static files.
241    metrics: Option<Arc<StaticFileProviderMetrics>>,
242    /// Access rights of the provider.
243    access: StaticFileAccess,
244    /// Number of blocks per file.
245    blocks_per_file: u64,
246    /// Write lock for when access is [`StaticFileAccess::RW`].
247    _lock_file: Option<StorageLock>,
248    /// Node primitives
249    _pd: PhantomData<N>,
250}
251
252impl<N: NodePrimitives> StaticFileProviderInner<N> {
253    /// Creates a new [`StaticFileProviderInner`].
254    fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
255        let _lock_file = if access.is_read_write() {
256            StorageLock::try_acquire(path.as_ref()).map_err(ProviderError::other)?.into()
257        } else {
258            None
259        };
260
261        let provider = Self {
262            map: Default::default(),
263            writers: Default::default(),
264            static_files_max_block: Default::default(),
265            static_files_tx_index: Default::default(),
266            path: path.as_ref().to_path_buf(),
267            metrics: None,
268            access,
269            blocks_per_file: DEFAULT_BLOCKS_PER_STATIC_FILE,
270            _lock_file,
271            _pd: Default::default(),
272        };
273
274        Ok(provider)
275    }
276
277    pub const fn is_read_only(&self) -> bool {
278        self.access.is_read_only()
279    }
280
281    /// Each static file has a fixed number of blocks. This gives out the range where the requested
282    /// block is positioned.
283    pub const fn find_fixed_range(&self, block: BlockNumber) -> SegmentRangeInclusive {
284        find_fixed_range(block, self.blocks_per_file)
285    }
286}
287
288impl<N: NodePrimitives> StaticFileProvider<N> {
289    /// Set a custom number of blocks per file.
290    #[cfg(any(test, feature = "test-utils"))]
291    pub fn with_custom_blocks_per_file(self, blocks_per_file: u64) -> Self {
292        let mut provider =
293            Arc::try_unwrap(self.0).expect("should be called when initializing only");
294        provider.blocks_per_file = blocks_per_file;
295        Self(Arc::new(provider))
296    }
297
298    /// Enables metrics on the [`StaticFileProvider`].
299    pub fn with_metrics(self) -> Self {
300        let mut provider =
301            Arc::try_unwrap(self.0).expect("should be called when initializing only");
302        provider.metrics = Some(Arc::new(StaticFileProviderMetrics::default()));
303        Self(Arc::new(provider))
304    }
305
306    /// Reports metrics for the static files.
307    pub fn report_metrics(&self) -> ProviderResult<()> {
308        let Some(metrics) = &self.metrics else { return Ok(()) };
309
310        let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
311        for (segment, ranges) in static_files {
312            let mut entries = 0;
313            let mut size = 0;
314
315            for (block_range, _) in &ranges {
316                let fixed_block_range = self.find_fixed_range(block_range.start());
317                let jar_provider = self
318                    .get_segment_provider(segment, || Some(fixed_block_range), None)?
319                    .ok_or_else(|| {
320                        ProviderError::MissingStaticFileBlock(segment, block_range.start())
321                    })?;
322
323                entries += jar_provider.rows();
324
325                let data_size = reth_fs_util::metadata(jar_provider.data_path())
326                    .map(|metadata| metadata.len())
327                    .unwrap_or_default();
328                let index_size = reth_fs_util::metadata(jar_provider.index_path())
329                    .map(|metadata| metadata.len())
330                    .unwrap_or_default();
331                let offsets_size = reth_fs_util::metadata(jar_provider.offsets_path())
332                    .map(|metadata| metadata.len())
333                    .unwrap_or_default();
334                let config_size = reth_fs_util::metadata(jar_provider.config_path())
335                    .map(|metadata| metadata.len())
336                    .unwrap_or_default();
337
338                size += data_size + index_size + offsets_size + config_size;
339            }
340
341            metrics.record_segment(segment, size, ranges.len(), entries);
342        }
343
344        Ok(())
345    }
346
347    /// Gets the [`StaticFileJarProvider`] of the requested segment and block.
348    pub fn get_segment_provider_from_block(
349        &self,
350        segment: StaticFileSegment,
351        block: BlockNumber,
352        path: Option<&Path>,
353    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
354        self.get_segment_provider(
355            segment,
356            || self.get_segment_ranges_from_block(segment, block),
357            path,
358        )?
359        .ok_or(ProviderError::MissingStaticFileBlock(segment, block))
360    }
361
362    /// Gets the [`StaticFileJarProvider`] of the requested segment and transaction.
363    pub fn get_segment_provider_from_transaction(
364        &self,
365        segment: StaticFileSegment,
366        tx: TxNumber,
367        path: Option<&Path>,
368    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
369        self.get_segment_provider(
370            segment,
371            || self.get_segment_ranges_from_transaction(segment, tx),
372            path,
373        )?
374        .ok_or(ProviderError::MissingStaticFileTx(segment, tx))
375    }
376
377    /// Gets the [`StaticFileJarProvider`] of the requested segment and block or transaction.
378    ///
379    /// `fn_range` should make sure the range goes through `find_fixed_range`.
380    pub fn get_segment_provider(
381        &self,
382        segment: StaticFileSegment,
383        fn_range: impl Fn() -> Option<SegmentRangeInclusive>,
384        path: Option<&Path>,
385    ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
386        // If we have a path, then get the block range from its name.
387        // Otherwise, check `self.available_static_files`
388        let block_range = match path {
389            Some(path) => StaticFileSegment::parse_filename(
390                &path
391                    .file_name()
392                    .ok_or_else(|| {
393                        ProviderError::MissingStaticFilePath(segment, path.to_path_buf())
394                    })?
395                    .to_string_lossy(),
396            )
397            .and_then(|(parsed_segment, block_range)| {
398                if parsed_segment == segment {
399                    return Some(block_range)
400                }
401                None
402            }),
403            None => fn_range(),
404        };
405
406        // Return cached `LoadedJar` or insert it for the first time, and then, return it.
407        if let Some(block_range) = block_range {
408            return Ok(Some(self.get_or_create_jar_provider(segment, &block_range)?))
409        }
410
411        Ok(None)
412    }
413
414    /// Given a segment and block range it removes the cached provider from the map.
415    ///
416    /// CAUTION: cached provider should be dropped before calling this or IT WILL deadlock.
417    pub fn remove_cached_provider(
418        &self,
419        segment: StaticFileSegment,
420        fixed_block_range_end: BlockNumber,
421    ) {
422        self.map.remove(&(fixed_block_range_end, segment));
423    }
424
425    /// Given a segment and block, it deletes the jar and all files from the respective block range.
426    ///
427    /// CAUTION: destructive. Deletes files on disk.
428    pub fn delete_jar(&self, segment: StaticFileSegment, block: BlockNumber) -> ProviderResult<()> {
429        let fixed_block_range = self.find_fixed_range(block);
430        let key = (fixed_block_range.end(), segment);
431        let jar = if let Some((_, jar)) = self.map.remove(&key) {
432            jar.jar
433        } else {
434            NippyJar::<SegmentHeader>::load(&self.path.join(segment.filename(&fixed_block_range)))
435                .map_err(ProviderError::other)?
436        };
437
438        jar.delete().map_err(ProviderError::other)?;
439
440        let mut segment_max_block = None;
441        if fixed_block_range.start() > 0 {
442            segment_max_block = Some(fixed_block_range.start() - 1)
443        };
444        self.update_index(segment, segment_max_block)?;
445
446        Ok(())
447    }
448
449    /// Given a segment and block range it returns a cached
450    /// [`StaticFileJarProvider`]. TODO(joshie): we should check the size and pop N if there's too
451    /// many.
452    fn get_or_create_jar_provider(
453        &self,
454        segment: StaticFileSegment,
455        fixed_block_range: &SegmentRangeInclusive,
456    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
457        let key = (fixed_block_range.end(), segment);
458
459        // Avoid using `entry` directly to avoid a write lock in the common case.
460        trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Getting provider");
461        let mut provider: StaticFileJarProvider<'_, N> = if let Some(jar) = self.map.get(&key) {
462            trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Jar found in cache");
463            jar.into()
464        } else {
465            trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Creating jar from scratch");
466            let path = self.path.join(segment.filename(fixed_block_range));
467            let jar = NippyJar::load(&path).map_err(ProviderError::other)?;
468            self.map.entry(key).insert(LoadedJar::new(jar)?).downgrade().into()
469        };
470
471        if let Some(metrics) = &self.metrics {
472            provider = provider.with_metrics(metrics.clone());
473        }
474        Ok(provider)
475    }
476
477    /// Gets a static file segment's block range from the provider inner block
478    /// index.
479    fn get_segment_ranges_from_block(
480        &self,
481        segment: StaticFileSegment,
482        block: u64,
483    ) -> Option<SegmentRangeInclusive> {
484        self.static_files_max_block
485            .read()
486            .get(&segment)
487            .filter(|max| **max >= block)
488            .map(|_| self.find_fixed_range(block))
489    }
490
491    /// Gets a static file segment's fixed block range from the provider inner
492    /// transaction index.
493    fn get_segment_ranges_from_transaction(
494        &self,
495        segment: StaticFileSegment,
496        tx: u64,
497    ) -> Option<SegmentRangeInclusive> {
498        let static_files = self.static_files_tx_index.read();
499        let segment_static_files = static_files.get(&segment)?;
500
501        // It's more probable that the request comes from a newer tx height, so we iterate
502        // the static_files in reverse.
503        let mut static_files_rev_iter = segment_static_files.iter().rev().peekable();
504
505        while let Some((tx_end, block_range)) = static_files_rev_iter.next() {
506            if tx > *tx_end {
507                // request tx is higher than highest static file tx
508                return None
509            }
510            let tx_start = static_files_rev_iter.peek().map(|(tx_end, _)| *tx_end + 1).unwrap_or(0);
511            if tx_start <= tx {
512                return Some(self.find_fixed_range(block_range.end()))
513            }
514        }
515        None
516    }
517
518    /// Updates the inner transaction and block indexes alongside the internal cached providers in
519    /// `self.map`.
520    ///
521    /// Any entry higher than `segment_max_block` will be deleted from the previous structures.
522    ///
523    /// If `segment_max_block` is None it means there's no static file for this segment.
524    pub fn update_index(
525        &self,
526        segment: StaticFileSegment,
527        segment_max_block: Option<BlockNumber>,
528    ) -> ProviderResult<()> {
529        let mut max_block = self.static_files_max_block.write();
530        let mut tx_index = self.static_files_tx_index.write();
531
532        match segment_max_block {
533            Some(segment_max_block) => {
534                // Update the max block for the segment
535                max_block.insert(segment, segment_max_block);
536                let fixed_range = self.find_fixed_range(segment_max_block);
537
538                let jar = NippyJar::<SegmentHeader>::load(
539                    &self.path.join(segment.filename(&fixed_range)),
540                )
541                .map_err(ProviderError::other)?;
542
543                // Updates the tx index by first removing all entries which have a higher
544                // block_start than our current static file.
545                if let Some(tx_range) = jar.user_header().tx_range() {
546                    let tx_end = tx_range.end();
547
548                    // Current block range has the same block start as `fixed_range``, but block end
549                    // might be different if we are still filling this static file.
550                    if let Some(current_block_range) = jar.user_header().block_range().copied() {
551                        // Considering that `update_index` is called when we either append/truncate,
552                        // we are sure that we are handling the latest data
553                        // points.
554                        //
555                        // Here we remove every entry of the index that has a block start higher or
556                        // equal than our current one. This is important in the case
557                        // that we prune a lot of rows resulting in a file (and thus
558                        // a higher block range) deletion.
559                        tx_index
560                            .entry(segment)
561                            .and_modify(|index| {
562                                index.retain(|_, block_range| {
563                                    block_range.start() < fixed_range.start()
564                                });
565                                index.insert(tx_end, current_block_range);
566                            })
567                            .or_insert_with(|| BTreeMap::from([(tx_end, current_block_range)]));
568                    }
569                } else if segment.is_tx_based() {
570                    // The unwinded file has no more transactions/receipts. However, the highest
571                    // block is within this files' block range. We only retain
572                    // entries with block ranges before the current one.
573                    tx_index.entry(segment).and_modify(|index| {
574                        index.retain(|_, block_range| block_range.start() < fixed_range.start());
575                    });
576
577                    // If the index is empty, just remove it.
578                    if tx_index.get(&segment).is_some_and(|index| index.is_empty()) {
579                        tx_index.remove(&segment);
580                    }
581                }
582
583                // Update the cached provider.
584                self.map.insert((fixed_range.end(), segment), LoadedJar::new(jar)?);
585
586                // Delete any cached provider that no longer has an associated jar.
587                self.map.retain(|(end, seg), _| !(*seg == segment && *end > fixed_range.end()));
588            }
589            None => {
590                tx_index.remove(&segment);
591                max_block.remove(&segment);
592            }
593        };
594
595        Ok(())
596    }
597
598    /// Initializes the inner transaction and block index
599    pub fn initialize_index(&self) -> ProviderResult<()> {
600        let mut max_block = self.static_files_max_block.write();
601        let mut tx_index = self.static_files_tx_index.write();
602
603        max_block.clear();
604        tx_index.clear();
605
606        for (segment, ranges) in iter_static_files(&self.path).map_err(ProviderError::other)? {
607            // Update last block for each segment
608            if let Some((block_range, _)) = ranges.last() {
609                max_block.insert(segment, block_range.end());
610            }
611
612            // Update tx -> block_range index
613            for (block_range, tx_range) in ranges {
614                if let Some(tx_range) = tx_range {
615                    let tx_end = tx_range.end();
616
617                    match tx_index.entry(segment) {
618                        Entry::Occupied(mut index) => {
619                            index.get_mut().insert(tx_end, block_range);
620                        }
621                        Entry::Vacant(index) => {
622                            index.insert(BTreeMap::from([(tx_end, block_range)]));
623                        }
624                    };
625                }
626            }
627        }
628
629        // If this is a re-initialization, we need to clear this as well
630        self.map.clear();
631
632        Ok(())
633    }
634
635    /// Ensures that any broken invariants which cannot be healed on the spot return a pipeline
636    /// target to unwind to.
637    ///
638    /// Two types of consistency checks are done for:
639    ///
640    /// 1) When a static file fails to commit but the underlying data was changed.
641    /// 2) When a static file was committed, but the required database transaction was not.
642    ///
643    /// For 1) it can self-heal if `self.access.is_read_only()` is set to `false`. Otherwise, it
644    /// will return an error.
645    /// For 2) the invariants below are checked, and if broken, might require a pipeline unwind
646    /// to heal.
647    ///
648    /// For each static file segment:
649    /// * the corresponding database table should overlap or have continuity in their keys
650    ///   ([`TxNumber`] or [`BlockNumber`]).
651    /// * its highest block should match the stage checkpoint block number if it's equal or higher
652    ///   than the corresponding database table last entry.
653    ///
654    /// Returns a [`Option`] of [`PipelineTarget::Unwind`] if any healing is further required.
655    ///
656    /// WARNING: No static file writer should be held before calling this function, otherwise it
657    /// will deadlock.
658    pub fn check_consistency<Provider>(
659        &self,
660        provider: &Provider,
661        has_receipt_pruning: bool,
662    ) -> ProviderResult<Option<PipelineTarget>>
663    where
664        Provider: DBProvider + BlockReader + StageCheckpointReader + ChainSpecProvider,
665        N: NodePrimitives<Receipt: Value, BlockHeader: Value, SignedTx: Value>,
666    {
667        // OVM historical import is broken and does not work with this check. It's importing
668        // duplicated receipts resulting in having more receipts than the expected transaction
669        // range.
670        //
671        // If we detect an OVM import was done (block #1 <https://optimistic.etherscan.io/block/1>), skip it.
672        // More on [#11099](https://github.com/paradigmxyz/reth/pull/11099).
673        if provider.chain_spec().is_optimism() &&
674            reth_chainspec::Chain::optimism_mainnet() == provider.chain_spec().chain_id()
675        {
676            // check whether we have the first OVM block: <https://optimistic.etherscan.io/block/0xbee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453>
677            const OVM_HEADER_1_HASH: B256 =
678                b256!("0xbee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453");
679            if provider.block_number(OVM_HEADER_1_HASH)?.is_some() {
680                info!(target: "reth::cli",
681                    "Skipping storage verification for OP mainnet, expected inconsistency in OVM chain"
682                );
683                return Ok(None)
684            }
685        }
686
687        info!(target: "reth::cli", "Verifying storage consistency.");
688
689        let mut unwind_target: Option<BlockNumber> = None;
690        let mut update_unwind_target = |new_target: BlockNumber| {
691            if let Some(target) = unwind_target.as_mut() {
692                *target = (*target).min(new_target);
693            } else {
694                unwind_target = Some(new_target);
695            }
696        };
697
698        for segment in StaticFileSegment::iter() {
699            // Not integrated yet
700            if segment.is_block_meta() {
701                continue
702            }
703
704            if has_receipt_pruning && segment.is_receipts() {
705                // Pruned nodes (including full node) do not store receipts as static files.
706                continue
707            }
708
709            let initial_highest_block = self.get_highest_static_file_block(segment);
710
711            //  File consistency is broken if:
712            //
713            // * appending data was interrupted before a config commit, then data file will be
714            //   truncated according to the config.
715            //
716            // * pruning data was interrupted before a config commit, then we have deleted data that
717            //   we are expected to still have. We need to check the Database and unwind everything
718            //   accordingly.
719            if self.access.is_read_only() {
720                self.check_segment_consistency(segment)?;
721            } else {
722                // Fetching the writer will attempt to heal any file level inconsistency.
723                self.latest_writer(segment)?;
724            }
725
726            // Only applies to block-based static files. (Headers)
727            //
728            // The updated `highest_block` may have decreased if we healed from a pruning
729            // interruption.
730            let mut highest_block = self.get_highest_static_file_block(segment);
731            if initial_highest_block != highest_block {
732                info!(
733                    target: "reth::providers::static_file",
734                    ?initial_highest_block,
735                    unwind_target = highest_block,
736                    ?segment,
737                    "Setting unwind target."
738                );
739                update_unwind_target(highest_block.unwrap_or_default());
740            }
741
742            // Only applies to transaction-based static files. (Receipts & Transactions)
743            //
744            // Make sure the last transaction matches the last block from its indices, since a heal
745            // from a pruning interruption might have decreased the number of transactions without
746            // being able to update the last block of the static file segment.
747            let highest_tx = self.get_highest_static_file_tx(segment);
748            if let Some(highest_tx) = highest_tx {
749                let mut last_block = highest_block.unwrap_or_default();
750                loop {
751                    if let Some(indices) = provider.block_body_indices(last_block)? {
752                        if indices.last_tx_num() <= highest_tx {
753                            break
754                        }
755                    } else {
756                        // If the block body indices can not be found, then it means that static
757                        // files is ahead of database, and the `ensure_invariants` check will fix
758                        // it by comparing with stage checkpoints.
759                        break
760                    }
761                    if last_block == 0 {
762                        break
763                    }
764                    last_block -= 1;
765
766                    info!(
767                        target: "reth::providers::static_file",
768                        highest_block = self.get_highest_static_file_block(segment),
769                        unwind_target = last_block,
770                        ?segment,
771                        "Setting unwind target."
772                    );
773                    highest_block = Some(last_block);
774                    update_unwind_target(last_block);
775                }
776            }
777
778            if let Some(unwind) = match segment {
779                StaticFileSegment::Headers => self
780                    .ensure_invariants::<_, tables::Headers<N::BlockHeader>>(
781                        provider,
782                        segment,
783                        highest_block,
784                        highest_block,
785                    )?,
786                StaticFileSegment::Transactions => self
787                    .ensure_invariants::<_, tables::Transactions<N::SignedTx>>(
788                        provider,
789                        segment,
790                        highest_tx,
791                        highest_block,
792                    )?,
793                StaticFileSegment::Receipts => self
794                    .ensure_invariants::<_, tables::Receipts<N::Receipt>>(
795                        provider,
796                        segment,
797                        highest_tx,
798                        highest_block,
799                    )?,
800                StaticFileSegment::BlockMeta => self
801                    .ensure_invariants::<_, tables::BlockBodyIndices>(
802                        provider,
803                        segment,
804                        highest_block,
805                        highest_block,
806                    )?,
807            } {
808                update_unwind_target(unwind);
809            }
810        }
811
812        Ok(unwind_target.map(PipelineTarget::Unwind))
813    }
814
815    /// Checks consistency of the latest static file segment and throws an error if at fault.
816    /// Read-only.
817    pub fn check_segment_consistency(&self, segment: StaticFileSegment) -> ProviderResult<()> {
818        if let Some(latest_block) = self.get_highest_static_file_block(segment) {
819            let file_path =
820                self.directory().join(segment.filename(&self.find_fixed_range(latest_block)));
821
822            let jar = NippyJar::<SegmentHeader>::load(&file_path).map_err(ProviderError::other)?;
823
824            NippyJarChecker::new(jar).check_consistency().map_err(ProviderError::other)?;
825        }
826        Ok(())
827    }
828
829    /// Check invariants for each corresponding table and static file segment:
830    ///
831    /// * the corresponding database table should overlap or have continuity in their keys
832    ///   ([`TxNumber`] or [`BlockNumber`]).
833    /// * its highest block should match the stage checkpoint block number if it's equal or higher
834    ///   than the corresponding database table last entry.
835    ///   * If the checkpoint block is higher, then request a pipeline unwind to the static file
836    ///     block. This is expressed by returning [`Some`] with the requested pipeline unwind
837    ///     target.
838    ///   * If the checkpoint block is lower, then heal by removing rows from the static file. In
839    ///     this case, the rows will be removed and [`None`] will be returned.
840    ///
841    /// * If the database tables overlap with static files and have contiguous keys, or the
842    ///   checkpoint block matches the highest static files block, then [`None`] will be returned.
843    fn ensure_invariants<Provider, T: Table<Key = u64>>(
844        &self,
845        provider: &Provider,
846        segment: StaticFileSegment,
847        highest_static_file_entry: Option<u64>,
848        highest_static_file_block: Option<BlockNumber>,
849    ) -> ProviderResult<Option<BlockNumber>>
850    where
851        Provider: DBProvider + BlockReader + StageCheckpointReader,
852    {
853        let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;
854
855        if let Some((db_first_entry, _)) = db_cursor.first()? {
856            if let (Some(highest_entry), Some(highest_block)) =
857                (highest_static_file_entry, highest_static_file_block)
858            {
859                // If there is a gap between the entry found in static file and
860                // database, then we have most likely lost static file data and need to unwind so we
861                // can load it again
862                if !(db_first_entry <= highest_entry || highest_entry + 1 == db_first_entry) {
863                    info!(
864                        target: "reth::providers::static_file",
865                        ?db_first_entry,
866                        ?highest_entry,
867                        unwind_target = highest_block,
868                        ?segment,
869                        "Setting unwind target."
870                    );
871                    return Ok(Some(highest_block))
872                }
873            }
874
875            if let Some((db_last_entry, _)) = db_cursor.last()? {
876                if highest_static_file_entry
877                    .is_none_or(|highest_entry| db_last_entry > highest_entry)
878                {
879                    return Ok(None)
880                }
881            }
882        }
883
884        let highest_static_file_entry = highest_static_file_entry.unwrap_or_default();
885        let highest_static_file_block = highest_static_file_block.unwrap_or_default();
886
887        // If static file entry is ahead of the database entries, then ensure the checkpoint block
888        // number matches.
889        let checkpoint_block_number = provider
890            .get_stage_checkpoint(match segment {
891                StaticFileSegment::Headers => StageId::Headers,
892                StaticFileSegment::Transactions | StaticFileSegment::BlockMeta => StageId::Bodies,
893                StaticFileSegment::Receipts => StageId::Execution,
894            })?
895            .unwrap_or_default()
896            .block_number;
897
898        // If the checkpoint is ahead, then we lost static file data. May be data corruption.
899        if checkpoint_block_number > highest_static_file_block {
900            info!(
901                target: "reth::providers::static_file",
902                checkpoint_block_number,
903                unwind_target = highest_static_file_block,
904                ?segment,
905                "Setting unwind target."
906            );
907            return Ok(Some(highest_static_file_block))
908        }
909
910        // If the checkpoint is behind, then we failed to do a database commit **but committed** to
911        // static files on executing a stage, or the reverse on unwinding a stage.
912        // All we need to do is to prune the extra static file rows.
913        if checkpoint_block_number < highest_static_file_block {
914            info!(
915                target: "reth::providers",
916                ?segment,
917                from = highest_static_file_block,
918                to = checkpoint_block_number,
919                "Unwinding static file segment."
920            );
921            let mut writer = self.latest_writer(segment)?;
922            if segment.is_headers() {
923                // TODO(joshie): is_block_meta
924                writer.prune_headers(highest_static_file_block - checkpoint_block_number)?;
925            } else if let Some(block) = provider.block_body_indices(checkpoint_block_number)? {
926                // todo joshie: is querying block_body_indices a potential issue once bbi is moved
927                // to sf as well
928                let number = highest_static_file_entry - block.last_tx_num();
929                if segment.is_receipts() {
930                    writer.prune_receipts(number, checkpoint_block_number)?;
931                } else {
932                    writer.prune_transactions(number, checkpoint_block_number)?;
933                }
934            }
935            writer.commit()?;
936        }
937
938        Ok(None)
939    }
940
941    /// Gets the highest static file block if it exists for a static file segment.
942    ///
943    /// If there is nothing on disk for the given segment, this will return [`None`].
944    pub fn get_highest_static_file_block(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
945        self.static_files_max_block.read().get(&segment).copied()
946    }
947
948    /// Gets the highest static file transaction.
949    ///
950    /// If there is nothing on disk for the given segment, this will return [`None`].
951    pub fn get_highest_static_file_tx(&self, segment: StaticFileSegment) -> Option<TxNumber> {
952        self.static_files_tx_index
953            .read()
954            .get(&segment)
955            .and_then(|index| index.last_key_value().map(|(last_tx, _)| *last_tx))
956    }
957
958    /// Gets the highest static file block for all segments.
959    pub fn get_highest_static_files(&self) -> HighestStaticFiles {
960        HighestStaticFiles {
961            headers: self.get_highest_static_file_block(StaticFileSegment::Headers),
962            receipts: self.get_highest_static_file_block(StaticFileSegment::Receipts),
963            transactions: self.get_highest_static_file_block(StaticFileSegment::Transactions),
964            block_meta: self.get_highest_static_file_block(StaticFileSegment::BlockMeta),
965        }
966    }
967
968    /// Iterates through segment `static_files` in reverse order, executing a function until it
969    /// returns some object. Useful for finding objects by [`TxHash`] or [`BlockHash`].
970    pub fn find_static_file<T>(
971        &self,
972        segment: StaticFileSegment,
973        func: impl Fn(StaticFileJarProvider<'_, N>) -> ProviderResult<Option<T>>,
974    ) -> ProviderResult<Option<T>> {
975        if let Some(highest_block) = self.get_highest_static_file_block(segment) {
976            let mut range = self.find_fixed_range(highest_block);
977            while range.end() > 0 {
978                if let Some(res) = func(self.get_or_create_jar_provider(segment, &range)?)? {
979                    return Ok(Some(res))
980                }
981                range = SegmentRangeInclusive::new(
982                    range.start().saturating_sub(self.blocks_per_file),
983                    range.end().saturating_sub(self.blocks_per_file),
984                );
985            }
986        }
987
988        Ok(None)
989    }
990
991    /// Fetches data within a specified range across multiple static files.
992    ///
993    /// This function iteratively retrieves data using `get_fn` for each item in the given range.
994    /// It continues fetching until the end of the range is reached or the provided `predicate`
995    /// returns false.
996    pub fn fetch_range_with_predicate<T, F, P>(
997        &self,
998        segment: StaticFileSegment,
999        range: Range<u64>,
1000        mut get_fn: F,
1001        mut predicate: P,
1002    ) -> ProviderResult<Vec<T>>
1003    where
1004        F: FnMut(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>>,
1005        P: FnMut(&T) -> bool,
1006    {
1007        let get_provider = |start: u64| {
1008            if segment.is_block_based() {
1009                self.get_segment_provider_from_block(segment, start, None)
1010            } else {
1011                self.get_segment_provider_from_transaction(segment, start, None)
1012            }
1013        };
1014
1015        let mut result = Vec::with_capacity((range.end - range.start).min(100) as usize);
1016        let mut provider = get_provider(range.start)?;
1017        let mut cursor = provider.cursor()?;
1018
1019        // advances number in range
1020        'outer: for number in range {
1021            // The `retrying` flag ensures a single retry attempt per `number`. If `get_fn` fails to
1022            // access data in two different static files, it halts further attempts by returning
1023            // an error, effectively preventing infinite retry loops.
1024            let mut retrying = false;
1025
1026            // advances static files if `get_fn` returns None
1027            'inner: loop {
1028                match get_fn(&mut cursor, number)? {
1029                    Some(res) => {
1030                        if !predicate(&res) {
1031                            break 'outer
1032                        }
1033                        result.push(res);
1034                        break 'inner
1035                    }
1036                    None => {
1037                        if retrying {
1038                            warn!(
1039                                target: "provider::static_file",
1040                                ?segment,
1041                                ?number,
1042                                "Could not find block or tx number on a range request"
1043                            );
1044
1045                            let err = if segment.is_block_based() {
1046                                ProviderError::MissingStaticFileBlock(segment, number)
1047                            } else {
1048                                ProviderError::MissingStaticFileTx(segment, number)
1049                            };
1050                            return Err(err)
1051                        }
1052                        // There is a very small chance of hitting a deadlock if two consecutive
1053                        // static files share the same bucket in the
1054                        // internal dashmap and we don't drop the current provider
1055                        // before requesting the next one.
1056                        drop(cursor);
1057                        drop(provider);
1058                        provider = get_provider(number)?;
1059                        cursor = provider.cursor()?;
1060                        retrying = true;
1061                    }
1062                }
1063            }
1064        }
1065
1066        Ok(result)
1067    }
1068
1069    /// Fetches data within a specified range across multiple static files.
1070    ///
1071    /// Returns an iterator over the data
1072    pub fn fetch_range_iter<'a, T, F>(
1073        &'a self,
1074        segment: StaticFileSegment,
1075        range: Range<u64>,
1076        get_fn: F,
1077    ) -> ProviderResult<impl Iterator<Item = ProviderResult<T>> + 'a>
1078    where
1079        F: Fn(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>> + 'a,
1080        T: std::fmt::Debug,
1081    {
1082        let get_provider = move |start: u64| {
1083            if segment.is_block_based() {
1084                self.get_segment_provider_from_block(segment, start, None)
1085            } else {
1086                self.get_segment_provider_from_transaction(segment, start, None)
1087            }
1088        };
1089
1090        let mut provider = Some(get_provider(range.start)?);
1091        Ok(range.filter_map(move |number| {
1092            match get_fn(&mut provider.as_ref().expect("qed").cursor().ok()?, number).transpose() {
1093                Some(result) => Some(result),
1094                None => {
1095                    // There is a very small chance of hitting a deadlock if two consecutive static
1096                    // files share the same bucket in the internal dashmap and
1097                    // we don't drop the current provider before requesting the
1098                    // next one.
1099                    provider.take();
1100                    provider = Some(get_provider(number).ok()?);
1101                    get_fn(&mut provider.as_ref().expect("qed").cursor().ok()?, number).transpose()
1102                }
1103            }
1104        }))
1105    }
1106
1107    /// Returns directory where `static_files` are located.
1108    pub fn directory(&self) -> &Path {
1109        &self.path
1110    }
1111
1112    /// Retrieves data from the database or static file, wherever it's available.
1113    ///
1114    /// # Arguments
1115    /// * `segment` - The segment of the static file to check against.
1116    /// * `index_key` - Requested index key, usually a block or transaction number.
1117    /// * `fetch_from_static_file` - A closure that defines how to fetch the data from the static
1118    ///   file provider.
1119    /// * `fetch_from_database` - A closure that defines how to fetch the data from the database
1120    ///   when the static file doesn't contain the required data or is not available.
1121    pub fn get_with_static_file_or_database<T, FS, FD>(
1122        &self,
1123        segment: StaticFileSegment,
1124        number: u64,
1125        fetch_from_static_file: FS,
1126        fetch_from_database: FD,
1127    ) -> ProviderResult<Option<T>>
1128    where
1129        FS: Fn(&Self) -> ProviderResult<Option<T>>,
1130        FD: Fn() -> ProviderResult<Option<T>>,
1131    {
1132        // If there is, check the maximum block or transaction number of the segment.
1133        let static_file_upper_bound = if segment.is_block_based() {
1134            self.get_highest_static_file_block(segment)
1135        } else {
1136            self.get_highest_static_file_tx(segment)
1137        };
1138
1139        if static_file_upper_bound
1140            .is_some_and(|static_file_upper_bound| static_file_upper_bound >= number)
1141        {
1142            return fetch_from_static_file(self)
1143        }
1144        fetch_from_database()
1145    }
1146
1147    /// Gets data within a specified range, potentially spanning different `static_files` and
1148    /// database.
1149    ///
1150    /// # Arguments
1151    /// * `segment` - The segment of the static file to query.
1152    /// * `block_range` - The range of data to fetch.
1153    /// * `fetch_from_static_file` - A function to fetch data from the `static_file`.
1154    /// * `fetch_from_database` - A function to fetch data from the database.
1155    /// * `predicate` - A function used to evaluate each item in the fetched data. Fetching is
1156    ///   terminated when this function returns false, thereby filtering the data based on the
1157    ///   provided condition.
1158    pub fn get_range_with_static_file_or_database<T, P, FS, FD>(
1159        &self,
1160        segment: StaticFileSegment,
1161        mut block_or_tx_range: Range<u64>,
1162        fetch_from_static_file: FS,
1163        mut fetch_from_database: FD,
1164        mut predicate: P,
1165    ) -> ProviderResult<Vec<T>>
1166    where
1167        FS: Fn(&Self, Range<u64>, &mut P) -> ProviderResult<Vec<T>>,
1168        FD: FnMut(Range<u64>, P) -> ProviderResult<Vec<T>>,
1169        P: FnMut(&T) -> bool,
1170    {
1171        let mut data = Vec::new();
1172
1173        // If there is, check the maximum block or transaction number of the segment.
1174        if let Some(static_file_upper_bound) = if segment.is_block_based() {
1175            self.get_highest_static_file_block(segment)
1176        } else {
1177            self.get_highest_static_file_tx(segment)
1178        } {
1179            if block_or_tx_range.start <= static_file_upper_bound {
1180                let end = block_or_tx_range.end.min(static_file_upper_bound + 1);
1181                data.extend(fetch_from_static_file(
1182                    self,
1183                    block_or_tx_range.start..end,
1184                    &mut predicate,
1185                )?);
1186                block_or_tx_range.start = end;
1187            }
1188        }
1189
1190        if block_or_tx_range.end > block_or_tx_range.start {
1191            data.extend(fetch_from_database(block_or_tx_range, predicate)?)
1192        }
1193
1194        Ok(data)
1195    }
1196
1197    /// Returns `static_files` directory
1198    #[cfg(any(test, feature = "test-utils"))]
1199    pub fn path(&self) -> &Path {
1200        &self.path
1201    }
1202
1203    /// Returns `static_files` transaction index
1204    #[cfg(any(test, feature = "test-utils"))]
1205    pub fn tx_index(&self) -> &RwLock<SegmentRanges> {
1206        &self.static_files_tx_index
1207    }
1208}
1209
1210/// Helper trait to manage different [`StaticFileProviderRW`] of an `Arc<StaticFileProvider`
1211pub trait StaticFileWriter {
1212    /// The primitives type used by the static file provider.
1213    type Primitives: Send + Sync + 'static;
1214
1215    /// Returns a mutable reference to a [`StaticFileProviderRW`] of a [`StaticFileSegment`].
1216    fn get_writer(
1217        &self,
1218        block: BlockNumber,
1219        segment: StaticFileSegment,
1220    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
1221
1222    /// Returns a mutable reference to a [`StaticFileProviderRW`] of the latest
1223    /// [`StaticFileSegment`].
1224    fn latest_writer(
1225        &self,
1226        segment: StaticFileSegment,
1227    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
1228
1229    /// Commits all changes of all [`StaticFileProviderRW`] of all [`StaticFileSegment`].
1230    fn commit(&self) -> ProviderResult<()>;
1231}
1232
1233impl<N: NodePrimitives> StaticFileWriter for StaticFileProvider<N> {
1234    type Primitives = N;
1235
1236    fn get_writer(
1237        &self,
1238        block: BlockNumber,
1239        segment: StaticFileSegment,
1240    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
1241        if self.access.is_read_only() {
1242            return Err(ProviderError::ReadOnlyStaticFileAccess)
1243        }
1244
1245        trace!(target: "provider::static_file", ?block, ?segment, "Getting static file writer.");
1246        self.writers.get_or_create(segment, || {
1247            StaticFileProviderRW::new(segment, block, Arc::downgrade(&self.0), self.metrics.clone())
1248        })
1249    }
1250
1251    fn latest_writer(
1252        &self,
1253        segment: StaticFileSegment,
1254    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
1255        self.get_writer(self.get_highest_static_file_block(segment).unwrap_or_default(), segment)
1256    }
1257
1258    fn commit(&self) -> ProviderResult<()> {
1259        self.writers.commit()
1260    }
1261}
1262
1263impl<N: NodePrimitives<BlockHeader: Value>> HeaderProvider for StaticFileProvider<N> {
1264    type Header = N::BlockHeader;
1265
1266    fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
1267        self.find_static_file(StaticFileSegment::Headers, |jar_provider| {
1268            Ok(jar_provider
1269                .cursor()?
1270                .get_two::<HeaderWithHashMask<Self::Header>>(block_hash.into())?
1271                .and_then(|(header, hash)| {
1272                    if &hash == block_hash {
1273                        return Some(header)
1274                    }
1275                    None
1276                }))
1277        })
1278    }
1279
1280    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1281        self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)
1282            .and_then(|provider| provider.header_by_number(num))
1283            .or_else(|err| {
1284                if let ProviderError::MissingStaticFileBlock(_, _) = err {
1285                    Ok(None)
1286                } else {
1287                    Err(err)
1288                }
1289            })
1290    }
1291
1292    fn header_td(&self, block_hash: &BlockHash) -> ProviderResult<Option<U256>> {
1293        self.find_static_file(StaticFileSegment::Headers, |jar_provider| {
1294            Ok(jar_provider
1295                .cursor()?
1296                .get_two::<TDWithHashMask>(block_hash.into())?
1297                .and_then(|(td, hash)| (&hash == block_hash).then_some(td.0)))
1298        })
1299    }
1300
1301    fn header_td_by_number(&self, num: BlockNumber) -> ProviderResult<Option<U256>> {
1302        self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)
1303            .and_then(|provider| provider.header_td_by_number(num))
1304            .or_else(|err| {
1305                if let ProviderError::MissingStaticFileBlock(_, _) = err {
1306                    Ok(None)
1307                } else {
1308                    Err(err)
1309                }
1310            })
1311    }
1312
1313    fn headers_range(
1314        &self,
1315        range: impl RangeBounds<BlockNumber>,
1316    ) -> ProviderResult<Vec<Self::Header>> {
1317        self.fetch_range_with_predicate(
1318            StaticFileSegment::Headers,
1319            to_range(range),
1320            |cursor, number| cursor.get_one::<HeaderMask<Self::Header>>(number.into()),
1321            |_| true,
1322        )
1323    }
1324
1325    fn sealed_header(
1326        &self,
1327        num: BlockNumber,
1328    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1329        self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)
1330            .and_then(|provider| provider.sealed_header(num))
1331            .or_else(|err| {
1332                if let ProviderError::MissingStaticFileBlock(_, _) = err {
1333                    Ok(None)
1334                } else {
1335                    Err(err)
1336                }
1337            })
1338    }
1339
1340    fn sealed_headers_while(
1341        &self,
1342        range: impl RangeBounds<BlockNumber>,
1343        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1344    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1345        self.fetch_range_with_predicate(
1346            StaticFileSegment::Headers,
1347            to_range(range),
1348            |cursor, number| {
1349                Ok(cursor
1350                    .get_two::<HeaderWithHashMask<Self::Header>>(number.into())?
1351                    .map(|(header, hash)| SealedHeader::new(header, hash)))
1352            },
1353            predicate,
1354        )
1355    }
1356}
1357
1358impl<N: NodePrimitives> BlockHashReader for StaticFileProvider<N> {
1359    fn block_hash(&self, num: u64) -> ProviderResult<Option<B256>> {
1360        self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)?.block_hash(num)
1361    }
1362
1363    fn canonical_hashes_range(
1364        &self,
1365        start: BlockNumber,
1366        end: BlockNumber,
1367    ) -> ProviderResult<Vec<B256>> {
1368        self.fetch_range_with_predicate(
1369            StaticFileSegment::Headers,
1370            start..end,
1371            |cursor, number| cursor.get_one::<BlockHashMask>(number.into()),
1372            |_| true,
1373        )
1374    }
1375}
1376
1377impl<N: NodePrimitives<SignedTx: Value + SignedTransaction, Receipt: Value>> ReceiptProvider
1378    for StaticFileProvider<N>
1379{
1380    type Receipt = N::Receipt;
1381
1382    fn receipt(&self, num: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1383        self.get_segment_provider_from_transaction(StaticFileSegment::Receipts, num, None)
1384            .and_then(|provider| provider.receipt(num))
1385            .or_else(|err| {
1386                if let ProviderError::MissingStaticFileTx(_, _) = err {
1387                    Ok(None)
1388                } else {
1389                    Err(err)
1390                }
1391            })
1392    }
1393
1394    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1395        if let Some(num) = self.transaction_id(hash)? {
1396            return self.receipt(num)
1397        }
1398        Ok(None)
1399    }
1400
1401    fn receipts_by_block(
1402        &self,
1403        _block: BlockHashOrNumber,
1404    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1405        unreachable!()
1406    }
1407
1408    fn receipts_by_tx_range(
1409        &self,
1410        range: impl RangeBounds<TxNumber>,
1411    ) -> ProviderResult<Vec<Self::Receipt>> {
1412        self.fetch_range_with_predicate(
1413            StaticFileSegment::Receipts,
1414            to_range(range),
1415            |cursor, number| cursor.get_one::<ReceiptMask<Self::Receipt>>(number.into()),
1416            |_| true,
1417        )
1418    }
1419}
1420
1421impl<N: FullNodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>>
1422    TransactionsProviderExt for StaticFileProvider<N>
1423{
1424    fn transaction_hashes_by_range(
1425        &self,
1426        tx_range: Range<TxNumber>,
1427    ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1428        let tx_range_size = (tx_range.end - tx_range.start) as usize;
1429
1430        // Transactions are different size, so chunks will not all take the same processing time. If
1431        // chunks are too big, there will be idle threads waiting for work. Choosing an
1432        // arbitrary smaller value to make sure it doesn't happen.
1433        let chunk_size = 100;
1434
1435        // iterator over the chunks
1436        let chunks = tx_range
1437            .clone()
1438            .step_by(chunk_size)
1439            .map(|start| start..std::cmp::min(start + chunk_size as u64, tx_range.end));
1440        let mut channels = Vec::with_capacity(tx_range_size.div_ceil(chunk_size));
1441
1442        for chunk_range in chunks {
1443            let (channel_tx, channel_rx) = mpsc::channel();
1444            channels.push(channel_rx);
1445
1446            let manager = self.clone();
1447
1448            // Spawn the task onto the global rayon pool
1449            // This task will send the results through the channel after it has calculated
1450            // the hash.
1451            rayon::spawn(move || {
1452                let mut rlp_buf = Vec::with_capacity(128);
1453                let _ = manager.fetch_range_with_predicate(
1454                    StaticFileSegment::Transactions,
1455                    chunk_range,
1456                    |cursor, number| {
1457                        Ok(cursor
1458                            .get_one::<TransactionMask<Self::Transaction>>(number.into())?
1459                            .map(|transaction| {
1460                                rlp_buf.clear();
1461                                let _ = channel_tx
1462                                    .send(calculate_hash((number, transaction), &mut rlp_buf));
1463                            }))
1464                    },
1465                    |_| true,
1466                );
1467            });
1468        }
1469
1470        let mut tx_list = Vec::with_capacity(tx_range_size);
1471
1472        // Iterate over channels and append the tx hashes unsorted
1473        for channel in channels {
1474            while let Ok(tx) = channel.recv() {
1475                let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
1476                tx_list.push((tx_hash, tx_id));
1477            }
1478        }
1479
1480        Ok(tx_list)
1481    }
1482}
1483
1484impl<N: NodePrimitives<SignedTx: Decompress + SignedTransaction>> TransactionsProvider
1485    for StaticFileProvider<N>
1486{
1487    type Transaction = N::SignedTx;
1488
1489    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1490        self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
1491            let mut cursor = jar_provider.cursor()?;
1492            if cursor
1493                .get_one::<TransactionMask<Self::Transaction>>((&tx_hash).into())?
1494                .and_then(|tx| (tx.trie_hash() == tx_hash).then_some(tx))
1495                .is_some()
1496            {
1497                Ok(cursor.number())
1498            } else {
1499                Ok(None)
1500            }
1501        })
1502    }
1503
1504    fn transaction_by_id(&self, num: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1505        self.get_segment_provider_from_transaction(StaticFileSegment::Transactions, num, None)
1506            .and_then(|provider| provider.transaction_by_id(num))
1507            .or_else(|err| {
1508                if let ProviderError::MissingStaticFileTx(_, _) = err {
1509                    Ok(None)
1510                } else {
1511                    Err(err)
1512                }
1513            })
1514    }
1515
1516    fn transaction_by_id_unhashed(
1517        &self,
1518        num: TxNumber,
1519    ) -> ProviderResult<Option<Self::Transaction>> {
1520        self.get_segment_provider_from_transaction(StaticFileSegment::Transactions, num, None)
1521            .and_then(|provider| provider.transaction_by_id_unhashed(num))
1522            .or_else(|err| {
1523                if let ProviderError::MissingStaticFileTx(_, _) = err {
1524                    Ok(None)
1525                } else {
1526                    Err(err)
1527                }
1528            })
1529    }
1530
1531    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1532        self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
1533            Ok(jar_provider
1534                .cursor()?
1535                .get_one::<TransactionMask<Self::Transaction>>((&hash).into())?
1536                .and_then(|tx| (tx.trie_hash() == hash).then_some(tx)))
1537        })
1538    }
1539
1540    fn transaction_by_hash_with_meta(
1541        &self,
1542        _hash: TxHash,
1543    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1544        // Required data not present in static_files
1545        Err(ProviderError::UnsupportedProvider)
1546    }
1547
1548    fn transaction_block(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1549        // Required data not present in static_files
1550        Err(ProviderError::UnsupportedProvider)
1551    }
1552
1553    fn transactions_by_block(
1554        &self,
1555        _block_id: BlockHashOrNumber,
1556    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1557        // Required data not present in static_files
1558        Err(ProviderError::UnsupportedProvider)
1559    }
1560
1561    fn transactions_by_block_range(
1562        &self,
1563        _range: impl RangeBounds<BlockNumber>,
1564    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1565        // Required data not present in static_files
1566        Err(ProviderError::UnsupportedProvider)
1567    }
1568
1569    fn transactions_by_tx_range(
1570        &self,
1571        range: impl RangeBounds<TxNumber>,
1572    ) -> ProviderResult<Vec<Self::Transaction>> {
1573        self.fetch_range_with_predicate(
1574            StaticFileSegment::Transactions,
1575            to_range(range),
1576            |cursor, number| cursor.get_one::<TransactionMask<Self::Transaction>>(number.into()),
1577            |_| true,
1578        )
1579    }
1580
1581    fn senders_by_tx_range(
1582        &self,
1583        range: impl RangeBounds<TxNumber>,
1584    ) -> ProviderResult<Vec<Address>> {
1585        let txes = self.transactions_by_tx_range(range)?;
1586        Ok(reth_primitives_traits::transaction::recover::recover_signers(&txes)?)
1587    }
1588
1589    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1590        match self.transaction_by_id_unhashed(id)? {
1591            Some(tx) => Ok(tx.recover_signer().ok()),
1592            None => Ok(None),
1593        }
1594    }
1595}
1596
1597/* Cannot be successfully implemented but must exist for trait requirements */
1598
1599impl<N: NodePrimitives> BlockNumReader for StaticFileProvider<N> {
1600    fn chain_info(&self) -> ProviderResult<ChainInfo> {
1601        // Required data not present in static_files
1602        Err(ProviderError::UnsupportedProvider)
1603    }
1604
1605    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1606        // Required data not present in static_files
1607        Err(ProviderError::UnsupportedProvider)
1608    }
1609
1610    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1611        // Required data not present in static_files
1612        Err(ProviderError::UnsupportedProvider)
1613    }
1614
1615    fn block_number(&self, _hash: B256) -> ProviderResult<Option<BlockNumber>> {
1616        // Required data not present in static_files
1617        Err(ProviderError::UnsupportedProvider)
1618    }
1619}
1620
1621impl<N: FullNodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> BlockReader
1622    for StaticFileProvider<N>
1623{
1624    type Block = N::Block;
1625
1626    fn find_block_by_hash(
1627        &self,
1628        _hash: B256,
1629        _source: BlockSource,
1630    ) -> ProviderResult<Option<Self::Block>> {
1631        // Required data not present in static_files
1632        Err(ProviderError::UnsupportedProvider)
1633    }
1634
1635    fn block(&self, _id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1636        // Required data not present in static_files
1637        Err(ProviderError::UnsupportedProvider)
1638    }
1639
1640    fn pending_block(&self) -> ProviderResult<Option<SealedBlock<Self::Block>>> {
1641        // Required data not present in static_files
1642        Err(ProviderError::UnsupportedProvider)
1643    }
1644
1645    fn pending_block_with_senders(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1646        // Required data not present in static_files
1647        Err(ProviderError::UnsupportedProvider)
1648    }
1649
1650    fn pending_block_and_receipts(
1651        &self,
1652    ) -> ProviderResult<Option<(SealedBlock<Self::Block>, Vec<Self::Receipt>)>> {
1653        // Required data not present in static_files
1654        Err(ProviderError::UnsupportedProvider)
1655    }
1656
1657    fn recovered_block(
1658        &self,
1659        _id: BlockHashOrNumber,
1660        _transaction_kind: TransactionVariant,
1661    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1662        // Required data not present in static_files
1663        Err(ProviderError::UnsupportedProvider)
1664    }
1665
1666    fn sealed_block_with_senders(
1667        &self,
1668        _id: BlockHashOrNumber,
1669        _transaction_kind: TransactionVariant,
1670    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1671        // Required data not present in static_files
1672        Err(ProviderError::UnsupportedProvider)
1673    }
1674
1675    fn block_range(&self, _range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1676        // Required data not present in static_files
1677        Err(ProviderError::UnsupportedProvider)
1678    }
1679
1680    fn block_with_senders_range(
1681        &self,
1682        _range: RangeInclusive<BlockNumber>,
1683    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1684        Err(ProviderError::UnsupportedProvider)
1685    }
1686
1687    fn recovered_block_range(
1688        &self,
1689        _range: RangeInclusive<BlockNumber>,
1690    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1691        Err(ProviderError::UnsupportedProvider)
1692    }
1693}
1694
1695impl<N: NodePrimitives> WithdrawalsProvider for StaticFileProvider<N> {
1696    fn withdrawals_by_block(
1697        &self,
1698        id: BlockHashOrNumber,
1699        timestamp: u64,
1700    ) -> ProviderResult<Option<Withdrawals>> {
1701        if let Some(num) = id.as_number() {
1702            return self
1703                .get_segment_provider_from_block(StaticFileSegment::BlockMeta, num, None)
1704                .and_then(|provider| provider.withdrawals_by_block(id, timestamp))
1705                .or_else(|err| {
1706                    if let ProviderError::MissingStaticFileBlock(_, _) = err {
1707                        Ok(None)
1708                    } else {
1709                        Err(err)
1710                    }
1711                })
1712        }
1713        // Only accepts block number queries
1714        Err(ProviderError::UnsupportedProvider)
1715    }
1716}
1717
1718impl<N: FullNodePrimitives<BlockHeader: Value>> OmmersProvider for StaticFileProvider<N> {
1719    fn ommers(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Vec<Self::Header>>> {
1720        if let Some(num) = id.as_number() {
1721            return self
1722                .get_segment_provider_from_block(StaticFileSegment::BlockMeta, num, None)
1723                .and_then(|provider| provider.ommers(id))
1724                .or_else(|err| {
1725                    if let ProviderError::MissingStaticFileBlock(_, _) = err {
1726                        Ok(None)
1727                    } else {
1728                        Err(err)
1729                    }
1730                })
1731        }
1732        // Only accepts block number queries
1733        Err(ProviderError::UnsupportedProvider)
1734    }
1735}
1736
1737impl<N: NodePrimitives> BlockBodyIndicesProvider for StaticFileProvider<N> {
1738    fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1739        self.get_segment_provider_from_block(StaticFileSegment::BlockMeta, num, None)
1740            .and_then(|provider| provider.block_body_indices(num))
1741            .or_else(|err| {
1742                if let ProviderError::MissingStaticFileBlock(_, _) = err {
1743                    Ok(None)
1744                } else {
1745                    Err(err)
1746                }
1747            })
1748    }
1749
1750    fn block_body_indices_range(
1751        &self,
1752        range: RangeInclusive<BlockNumber>,
1753    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1754        self.fetch_range_with_predicate(
1755            StaticFileSegment::BlockMeta,
1756            *range.start()..*range.end() + 1,
1757            |cursor, number| cursor.get_one::<BodyIndicesMask>(number.into()),
1758            |_| true,
1759        )
1760    }
1761}
1762
1763impl<N: NodePrimitives> StatsReader for StaticFileProvider<N> {
1764    fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
1765        match T::NAME {
1766            tables::CanonicalHeaders::NAME |
1767            tables::Headers::<Header>::NAME |
1768            tables::HeaderTerminalDifficulties::NAME => Ok(self
1769                .get_highest_static_file_block(StaticFileSegment::Headers)
1770                .map(|block| block + 1)
1771                .unwrap_or_default()
1772                as usize),
1773            tables::Receipts::<Receipt>::NAME => Ok(self
1774                .get_highest_static_file_tx(StaticFileSegment::Receipts)
1775                .map(|receipts| receipts + 1)
1776                .unwrap_or_default() as usize),
1777            tables::Transactions::<TransactionSigned>::NAME => Ok(self
1778                .get_highest_static_file_tx(StaticFileSegment::Transactions)
1779                .map(|txs| txs + 1)
1780                .unwrap_or_default()
1781                as usize),
1782            _ => Err(ProviderError::UnsupportedProvider),
1783        }
1784    }
1785}
1786
1787/// Calculates the tx hash for the given transaction and its id.
1788#[inline]
1789fn calculate_hash<T>(
1790    entry: (TxNumber, T),
1791    rlp_buf: &mut Vec<u8>,
1792) -> Result<(B256, TxNumber), Box<ProviderError>>
1793where
1794    T: Encodable2718,
1795{
1796    let (tx_id, tx) = entry;
1797    tx.encode_2718(rlp_buf);
1798    Ok((keccak256(rlp_buf), tx_id))
1799}