reth_provider/providers/static_file/
manager.rs

1use super::{
2    metrics::StaticFileProviderMetrics, writer::StaticFileWriters, LoadedJar,
3    StaticFileJarProvider, StaticFileProviderRW, StaticFileProviderRWRefMut,
4};
5use crate::{
6    to_range, BlockHashReader, BlockNumReader, BlockReader, BlockSource, HeaderProvider,
7    ReceiptProvider, StageCheckpointReader, StatsReader, TransactionVariant, TransactionsProvider,
8    TransactionsProviderExt, WithdrawalsProvider,
9};
10use alloy_consensus::Header;
11use alloy_eips::{
12    eip2718::Encodable2718,
13    eip4895::{Withdrawal, Withdrawals},
14    BlockHashOrNumber,
15};
16use alloy_primitives::{keccak256, Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256};
17use dashmap::DashMap;
18use notify::{RecommendedWatcher, RecursiveMode, Watcher};
19use parking_lot::RwLock;
20use reth_chainspec::{ChainInfo, ChainSpecProvider};
21use reth_db::{
22    lockfile::StorageLock,
23    static_file::{
24        iter_static_files, BlockHashMask, HeaderMask, HeaderWithHashMask, ReceiptMask,
25        StaticFileCursor, TDWithHashMask, TransactionMask,
26    },
27    table::{Decompress, Value},
28    tables,
29};
30use reth_db_api::{
31    cursor::DbCursorRO, models::StoredBlockBodyIndices, table::Table, transaction::DbTx,
32};
33use reth_nippy_jar::{NippyJar, NippyJarChecker, CONFIG_FILE_EXTENSION};
34use reth_node_types::{FullNodePrimitives, NodePrimitives};
35use reth_primitives::{
36    static_file::{
37        find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive,
38        DEFAULT_BLOCKS_PER_STATIC_FILE,
39    },
40    transaction::recover_signers,
41    BlockWithSenders, Receipt, SealedBlockFor, SealedBlockWithSenders, SealedHeader,
42    StaticFileSegment, TransactionMeta, TransactionSigned,
43};
44use reth_primitives_traits::SignedTransaction;
45use reth_stages_types::{PipelineTarget, StageId};
46use reth_storage_api::{BlockBodyIndicesProvider, DBProvider, OmmersProvider};
47use reth_storage_errors::provider::{ProviderError, ProviderResult};
48use std::{
49    collections::{hash_map::Entry, BTreeMap, HashMap},
50    fmt::Debug,
51    marker::PhantomData,
52    ops::{Deref, Range, RangeBounds, RangeInclusive},
53    path::{Path, PathBuf},
54    sync::{mpsc, Arc},
55};
56use strum::IntoEnumIterator;
57use tracing::{info, trace, warn};
58
59/// Alias type for a map that can be queried for block ranges from a transaction
60/// segment respectively. It uses `TxNumber` to represent the transaction end of a static file
61/// range.
62type SegmentRanges = HashMap<StaticFileSegment, BTreeMap<TxNumber, SegmentRangeInclusive>>;
63
64/// Access mode on a static file provider. RO/RW.
65#[derive(Debug, Default, PartialEq, Eq)]
66pub enum StaticFileAccess {
67    /// Read-only access.
68    #[default]
69    RO,
70    /// Read-write access.
71    RW,
72}
73
74impl StaticFileAccess {
75    /// Returns `true` if read-only access.
76    pub const fn is_read_only(&self) -> bool {
77        matches!(self, Self::RO)
78    }
79
80    /// Returns `true` if read-write access.
81    pub const fn is_read_write(&self) -> bool {
82        matches!(self, Self::RW)
83    }
84}
85
86/// [`StaticFileProvider`] manages all existing [`StaticFileJarProvider`].
87#[derive(Debug)]
88pub struct StaticFileProvider<N>(pub(crate) Arc<StaticFileProviderInner<N>>);
89
90impl<N> Clone for StaticFileProvider<N> {
91    fn clone(&self) -> Self {
92        Self(self.0.clone())
93    }
94}
95
96impl<N: NodePrimitives> StaticFileProvider<N> {
97    /// Creates a new [`StaticFileProvider`].
98    fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
99        let provider = Self(Arc::new(StaticFileProviderInner::new(path, access)?));
100        provider.initialize_index()?;
101        Ok(provider)
102    }
103
104    /// Creates a new [`StaticFileProvider`] with read-only access.
105    ///
106    /// Set `watch_directory` to `true` to track the most recent changes in static files. Otherwise,
107    /// new data won't be detected or queryable.
108    pub fn read_only(path: impl AsRef<Path>, watch_directory: bool) -> ProviderResult<Self> {
109        let provider = Self::new(path, StaticFileAccess::RO)?;
110
111        if watch_directory {
112            provider.watch_directory();
113        }
114
115        Ok(provider)
116    }
117
118    /// Creates a new [`StaticFileProvider`] with read-write access.
119    pub fn read_write(path: impl AsRef<Path>) -> ProviderResult<Self> {
120        Self::new(path, StaticFileAccess::RW)
121    }
122
123    /// Watches the directory for changes and updates the in-memory index when modifications
124    /// are detected.
125    ///
126    /// This may be necessary, since a non-node process that owns a [`StaticFileProvider`] does not
127    /// receive `update_index` notifications from a node that appends/truncates data.
128    pub fn watch_directory(&self) {
129        let provider = self.clone();
130        std::thread::spawn(move || {
131            let (tx, rx) = std::sync::mpsc::channel();
132            let mut watcher = RecommendedWatcher::new(
133                move |res| tx.send(res).unwrap(),
134                notify::Config::default(),
135            )
136            .expect("failed to create watcher");
137
138            watcher
139                .watch(&provider.path, RecursiveMode::NonRecursive)
140                .expect("failed to watch path");
141
142            // Some backends send repeated modified events
143            let mut last_event_timestamp = None;
144
145            while let Ok(res) = rx.recv() {
146                match res {
147                    Ok(event) => {
148                        // We only care about modified data events
149                        if !matches!(
150                            event.kind,
151                            notify::EventKind::Modify(notify::event::ModifyKind::Data(_))
152                        ) {
153                            continue
154                        }
155
156                        // We only trigger a re-initialization if a configuration file was
157                        // modified. This means that a
158                        // static_file_provider.commit() was called on the node after
159                        // appending/truncating rows
160                        for segment in event.paths {
161                            // Ensure it's a file with the .conf extension
162                            #[allow(clippy::nonminimal_bool)]
163                            if !segment
164                                .extension()
165                                .is_some_and(|s| s.to_str() == Some(CONFIG_FILE_EXTENSION))
166                            {
167                                continue
168                            }
169
170                            // Ensure it's well formatted static file name
171                            if StaticFileSegment::parse_filename(
172                                &segment.file_stem().expect("qed").to_string_lossy(),
173                            )
174                            .is_none()
175                            {
176                                continue
177                            }
178
179                            // If we can read the metadata and modified timestamp, ensure this is
180                            // not an old or repeated event.
181                            if let Ok(current_modified_timestamp) =
182                                std::fs::metadata(&segment).and_then(|m| m.modified())
183                            {
184                                if last_event_timestamp.is_some_and(|last_timestamp| {
185                                    last_timestamp >= current_modified_timestamp
186                                }) {
187                                    continue
188                                }
189                                last_event_timestamp = Some(current_modified_timestamp);
190                            }
191
192                            info!(target: "providers::static_file", updated_file = ?segment.file_stem(), "re-initializing static file provider index");
193                            if let Err(err) = provider.initialize_index() {
194                                warn!(target: "providers::static_file", "failed to re-initialize index: {err}");
195                            }
196                            break
197                        }
198                    }
199
200                    Err(err) => warn!(target: "providers::watcher", "watch error: {err:?}"),
201                }
202            }
203        });
204    }
205}
206
207impl<N: NodePrimitives> Deref for StaticFileProvider<N> {
208    type Target = StaticFileProviderInner<N>;
209
210    fn deref(&self) -> &Self::Target {
211        &self.0
212    }
213}
214
215/// [`StaticFileProviderInner`] manages all existing [`StaticFileJarProvider`].
216#[derive(Debug)]
217pub struct StaticFileProviderInner<N> {
218    /// Maintains a map which allows for concurrent access to different `NippyJars`, over different
219    /// segments and ranges.
220    map: DashMap<(BlockNumber, StaticFileSegment), LoadedJar>,
221    /// Max static file block for each segment
222    static_files_max_block: RwLock<HashMap<StaticFileSegment, u64>>,
223    /// Available static file block ranges on disk indexed by max transactions.
224    static_files_tx_index: RwLock<SegmentRanges>,
225    /// Directory where `static_files` are located
226    path: PathBuf,
227    /// Maintains a writer set of [`StaticFileSegment`].
228    writers: StaticFileWriters<N>,
229    /// Metrics for the static files.
230    metrics: Option<Arc<StaticFileProviderMetrics>>,
231    /// Access rights of the provider.
232    access: StaticFileAccess,
233    /// Number of blocks per file.
234    blocks_per_file: u64,
235    /// Write lock for when access is [`StaticFileAccess::RW`].
236    _lock_file: Option<StorageLock>,
237    /// Node primitives
238    _pd: PhantomData<N>,
239}
240
241impl<N: NodePrimitives> StaticFileProviderInner<N> {
242    /// Creates a new [`StaticFileProviderInner`].
243    fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
244        let _lock_file = if access.is_read_write() {
245            StorageLock::try_acquire(path.as_ref())?.into()
246        } else {
247            None
248        };
249
250        let provider = Self {
251            map: Default::default(),
252            writers: Default::default(),
253            static_files_max_block: Default::default(),
254            static_files_tx_index: Default::default(),
255            path: path.as_ref().to_path_buf(),
256            metrics: None,
257            access,
258            blocks_per_file: DEFAULT_BLOCKS_PER_STATIC_FILE,
259            _lock_file,
260            _pd: Default::default(),
261        };
262
263        Ok(provider)
264    }
265
266    pub const fn is_read_only(&self) -> bool {
267        self.access.is_read_only()
268    }
269
270    /// Each static file has a fixed number of blocks. This gives out the range where the requested
271    /// block is positioned.
272    pub const fn find_fixed_range(&self, block: BlockNumber) -> SegmentRangeInclusive {
273        find_fixed_range(block, self.blocks_per_file)
274    }
275}
276
277impl<N: NodePrimitives> StaticFileProvider<N> {
278    /// Set a custom number of blocks per file.
279    #[cfg(any(test, feature = "test-utils"))]
280    pub fn with_custom_blocks_per_file(self, blocks_per_file: u64) -> Self {
281        let mut provider =
282            Arc::try_unwrap(self.0).expect("should be called when initializing only");
283        provider.blocks_per_file = blocks_per_file;
284        Self(Arc::new(provider))
285    }
286
287    /// Enables metrics on the [`StaticFileProvider`].
288    pub fn with_metrics(self) -> Self {
289        let mut provider =
290            Arc::try_unwrap(self.0).expect("should be called when initializing only");
291        provider.metrics = Some(Arc::new(StaticFileProviderMetrics::default()));
292        Self(Arc::new(provider))
293    }
294
295    /// Reports metrics for the static files.
296    pub fn report_metrics(&self) -> ProviderResult<()> {
297        let Some(metrics) = &self.metrics else { return Ok(()) };
298
299        let static_files =
300            iter_static_files(&self.path).map_err(|e| ProviderError::NippyJar(e.to_string()))?;
301        for (segment, ranges) in static_files {
302            let mut entries = 0;
303            let mut size = 0;
304
305            for (block_range, _) in &ranges {
306                let fixed_block_range = self.find_fixed_range(block_range.start());
307                let jar_provider = self
308                    .get_segment_provider(segment, || Some(fixed_block_range), None)?
309                    .ok_or_else(|| {
310                        ProviderError::MissingStaticFileBlock(segment, block_range.start())
311                    })?;
312
313                entries += jar_provider.rows();
314
315                let data_size = reth_fs_util::metadata(jar_provider.data_path())
316                    .map(|metadata| metadata.len())
317                    .unwrap_or_default();
318                let index_size = reth_fs_util::metadata(jar_provider.index_path())
319                    .map(|metadata| metadata.len())
320                    .unwrap_or_default();
321                let offsets_size = reth_fs_util::metadata(jar_provider.offsets_path())
322                    .map(|metadata| metadata.len())
323                    .unwrap_or_default();
324                let config_size = reth_fs_util::metadata(jar_provider.config_path())
325                    .map(|metadata| metadata.len())
326                    .unwrap_or_default();
327
328                size += data_size + index_size + offsets_size + config_size;
329            }
330
331            metrics.record_segment(segment, size, ranges.len(), entries);
332        }
333
334        Ok(())
335    }
336
337    /// Gets the [`StaticFileJarProvider`] of the requested segment and block.
338    pub fn get_segment_provider_from_block(
339        &self,
340        segment: StaticFileSegment,
341        block: BlockNumber,
342        path: Option<&Path>,
343    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
344        self.get_segment_provider(
345            segment,
346            || self.get_segment_ranges_from_block(segment, block),
347            path,
348        )?
349        .ok_or(ProviderError::MissingStaticFileBlock(segment, block))
350    }
351
352    /// Gets the [`StaticFileJarProvider`] of the requested segment and transaction.
353    pub fn get_segment_provider_from_transaction(
354        &self,
355        segment: StaticFileSegment,
356        tx: TxNumber,
357        path: Option<&Path>,
358    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
359        self.get_segment_provider(
360            segment,
361            || self.get_segment_ranges_from_transaction(segment, tx),
362            path,
363        )?
364        .ok_or(ProviderError::MissingStaticFileTx(segment, tx))
365    }
366
367    /// Gets the [`StaticFileJarProvider`] of the requested segment and block or transaction.
368    ///
369    /// `fn_range` should make sure the range goes through `find_fixed_range`.
370    pub fn get_segment_provider(
371        &self,
372        segment: StaticFileSegment,
373        fn_range: impl Fn() -> Option<SegmentRangeInclusive>,
374        path: Option<&Path>,
375    ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
376        // If we have a path, then get the block range from its name.
377        // Otherwise, check `self.available_static_files`
378        let block_range = match path {
379            Some(path) => StaticFileSegment::parse_filename(
380                &path
381                    .file_name()
382                    .ok_or_else(|| {
383                        ProviderError::MissingStaticFilePath(segment, path.to_path_buf())
384                    })?
385                    .to_string_lossy(),
386            )
387            .and_then(|(parsed_segment, block_range)| {
388                if parsed_segment == segment {
389                    return Some(block_range)
390                }
391                None
392            }),
393            None => fn_range(),
394        };
395
396        // Return cached `LoadedJar` or insert it for the first time, and then, return it.
397        if let Some(block_range) = block_range {
398            return Ok(Some(self.get_or_create_jar_provider(segment, &block_range)?))
399        }
400
401        Ok(None)
402    }
403
404    /// Given a segment and block range it removes the cached provider from the map.
405    ///
406    /// CAUTION: cached provider should be dropped before calling this or IT WILL deadlock.
407    pub fn remove_cached_provider(
408        &self,
409        segment: StaticFileSegment,
410        fixed_block_range_end: BlockNumber,
411    ) {
412        self.map.remove(&(fixed_block_range_end, segment));
413    }
414
415    /// Given a segment and block, it deletes the jar and all files from the respective block range.
416    ///
417    /// CAUTION: destructive. Deletes files on disk.
418    pub fn delete_jar(&self, segment: StaticFileSegment, block: BlockNumber) -> ProviderResult<()> {
419        let fixed_block_range = self.find_fixed_range(block);
420        let key = (fixed_block_range.end(), segment);
421        let jar = if let Some((_, jar)) = self.map.remove(&key) {
422            jar.jar
423        } else {
424            NippyJar::<SegmentHeader>::load(&self.path.join(segment.filename(&fixed_block_range)))
425                .map_err(|e| ProviderError::NippyJar(e.to_string()))?
426        };
427
428        jar.delete().map_err(|e| ProviderError::NippyJar(e.to_string()))?;
429
430        let mut segment_max_block = None;
431        if fixed_block_range.start() > 0 {
432            segment_max_block = Some(fixed_block_range.start() - 1)
433        };
434        self.update_index(segment, segment_max_block)?;
435
436        Ok(())
437    }
438
439    /// Given a segment and block range it returns a cached
440    /// [`StaticFileJarProvider`]. TODO(joshie): we should check the size and pop N if there's too
441    /// many.
442    fn get_or_create_jar_provider(
443        &self,
444        segment: StaticFileSegment,
445        fixed_block_range: &SegmentRangeInclusive,
446    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
447        let key = (fixed_block_range.end(), segment);
448
449        // Avoid using `entry` directly to avoid a write lock in the common case.
450        trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Getting provider");
451        let mut provider: StaticFileJarProvider<'_, N> = if let Some(jar) = self.map.get(&key) {
452            trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Jar found in cache");
453            jar.into()
454        } else {
455            trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Creating jar from scratch");
456            let path = self.path.join(segment.filename(fixed_block_range));
457            let jar = NippyJar::load(&path).map_err(|e| ProviderError::NippyJar(e.to_string()))?;
458            self.map.entry(key).insert(LoadedJar::new(jar)?).downgrade().into()
459        };
460
461        if let Some(metrics) = &self.metrics {
462            provider = provider.with_metrics(metrics.clone());
463        }
464        Ok(provider)
465    }
466
467    /// Gets a static file segment's block range from the provider inner block
468    /// index.
469    fn get_segment_ranges_from_block(
470        &self,
471        segment: StaticFileSegment,
472        block: u64,
473    ) -> Option<SegmentRangeInclusive> {
474        self.static_files_max_block
475            .read()
476            .get(&segment)
477            .filter(|max| **max >= block)
478            .map(|_| self.find_fixed_range(block))
479    }
480
481    /// Gets a static file segment's fixed block range from the provider inner
482    /// transaction index.
483    fn get_segment_ranges_from_transaction(
484        &self,
485        segment: StaticFileSegment,
486        tx: u64,
487    ) -> Option<SegmentRangeInclusive> {
488        let static_files = self.static_files_tx_index.read();
489        let segment_static_files = static_files.get(&segment)?;
490
491        // It's more probable that the request comes from a newer tx height, so we iterate
492        // the static_files in reverse.
493        let mut static_files_rev_iter = segment_static_files.iter().rev().peekable();
494
495        while let Some((tx_end, block_range)) = static_files_rev_iter.next() {
496            if tx > *tx_end {
497                // request tx is higher than highest static file tx
498                return None
499            }
500            let tx_start = static_files_rev_iter.peek().map(|(tx_end, _)| *tx_end + 1).unwrap_or(0);
501            if tx_start <= tx {
502                return Some(self.find_fixed_range(block_range.end()))
503            }
504        }
505        None
506    }
507
508    /// Updates the inner transaction and block indexes alongside the internal cached providers in
509    /// `self.map`.
510    ///
511    /// Any entry higher than `segment_max_block` will be deleted from the previous structures.
512    ///
513    /// If `segment_max_block` is None it means there's no static file for this segment.
514    pub fn update_index(
515        &self,
516        segment: StaticFileSegment,
517        segment_max_block: Option<BlockNumber>,
518    ) -> ProviderResult<()> {
519        let mut max_block = self.static_files_max_block.write();
520        let mut tx_index = self.static_files_tx_index.write();
521
522        match segment_max_block {
523            Some(segment_max_block) => {
524                // Update the max block for the segment
525                max_block.insert(segment, segment_max_block);
526                let fixed_range = self.find_fixed_range(segment_max_block);
527
528                let jar = NippyJar::<SegmentHeader>::load(
529                    &self.path.join(segment.filename(&fixed_range)),
530                )
531                .map_err(|e| ProviderError::NippyJar(e.to_string()))?;
532
533                // Updates the tx index by first removing all entries which have a higher
534                // block_start than our current static file.
535                if let Some(tx_range) = jar.user_header().tx_range() {
536                    let tx_end = tx_range.end();
537
538                    // Current block range has the same block start as `fixed_range``, but block end
539                    // might be different if we are still filling this static file.
540                    if let Some(current_block_range) = jar.user_header().block_range().copied() {
541                        // Considering that `update_index` is called when we either append/truncate,
542                        // we are sure that we are handling the latest data
543                        // points.
544                        //
545                        // Here we remove every entry of the index that has a block start higher or
546                        // equal than our current one. This is important in the case
547                        // that we prune a lot of rows resulting in a file (and thus
548                        // a higher block range) deletion.
549                        tx_index
550                            .entry(segment)
551                            .and_modify(|index| {
552                                index.retain(|_, block_range| {
553                                    block_range.start() < fixed_range.start()
554                                });
555                                index.insert(tx_end, current_block_range);
556                            })
557                            .or_insert_with(|| BTreeMap::from([(tx_end, current_block_range)]));
558                    }
559                } else if segment.is_tx_based() {
560                    // The unwinded file has no more transactions/receipts. However, the highest
561                    // block is within this files' block range. We only retain
562                    // entries with block ranges before the current one.
563                    tx_index.entry(segment).and_modify(|index| {
564                        index.retain(|_, block_range| block_range.start() < fixed_range.start());
565                    });
566
567                    // If the index is empty, just remove it.
568                    if tx_index.get(&segment).is_some_and(|index| index.is_empty()) {
569                        tx_index.remove(&segment);
570                    }
571                }
572
573                // Update the cached provider.
574                self.map.insert((fixed_range.end(), segment), LoadedJar::new(jar)?);
575
576                // Delete any cached provider that no longer has an associated jar.
577                self.map.retain(|(end, seg), _| !(*seg == segment && *end > fixed_range.end()));
578            }
579            None => {
580                tx_index.remove(&segment);
581                max_block.remove(&segment);
582            }
583        };
584
585        Ok(())
586    }
587
588    /// Initializes the inner transaction and block index
589    pub fn initialize_index(&self) -> ProviderResult<()> {
590        let mut max_block = self.static_files_max_block.write();
591        let mut tx_index = self.static_files_tx_index.write();
592
593        max_block.clear();
594        tx_index.clear();
595
596        for (segment, ranges) in
597            iter_static_files(&self.path).map_err(|e| ProviderError::NippyJar(e.to_string()))?
598        {
599            // Update last block for each segment
600            if let Some((block_range, _)) = ranges.last() {
601                max_block.insert(segment, block_range.end());
602            }
603
604            // Update tx -> block_range index
605            for (block_range, tx_range) in ranges {
606                if let Some(tx_range) = tx_range {
607                    let tx_end = tx_range.end();
608
609                    match tx_index.entry(segment) {
610                        Entry::Occupied(mut index) => {
611                            index.get_mut().insert(tx_end, block_range);
612                        }
613                        Entry::Vacant(index) => {
614                            index.insert(BTreeMap::from([(tx_end, block_range)]));
615                        }
616                    };
617                }
618            }
619        }
620
621        // If this is a re-initialization, we need to clear this as well
622        self.map.clear();
623
624        Ok(())
625    }
626
627    /// Ensures that any broken invariants which cannot be healed on the spot return a pipeline
628    /// target to unwind to.
629    ///
630    /// Two types of consistency checks are done for:
631    ///
632    /// 1) When a static file fails to commit but the underlying data was changed.
633    /// 2) When a static file was committed, but the required database transaction was not.
634    ///
635    /// For 1) it can self-heal if `self.access.is_read_only()` is set to `false`. Otherwise, it
636    /// will return an error.
637    /// For 2) the invariants below are checked, and if broken, might require a pipeline unwind
638    /// to heal.
639    ///
640    /// For each static file segment:
641    /// * the corresponding database table should overlap or have continuity in their keys
642    ///   ([`TxNumber`] or [`BlockNumber`]).
643    /// * its highest block should match the stage checkpoint block number if it's equal or higher
644    ///   than the corresponding database table last entry.
645    ///
646    /// Returns a [`Option`] of [`PipelineTarget::Unwind`] if any healing is further required.
647    ///
648    /// WARNING: No static file writer should be held before calling this function, otherwise it
649    /// will deadlock.
650    #[allow(clippy::while_let_loop)]
651    pub fn check_consistency<Provider>(
652        &self,
653        provider: &Provider,
654        has_receipt_pruning: bool,
655    ) -> ProviderResult<Option<PipelineTarget>>
656    where
657        Provider: DBProvider + BlockReader + StageCheckpointReader + ChainSpecProvider,
658    {
659        // OVM historical import is broken and does not work with this check. It's importing
660        // duplicated receipts resulting in having more receipts than the expected transaction
661        // range.
662        //
663        // If we detect an OVM import was done (block #1 <https://optimistic.etherscan.io/block/1>), skip it.
664        // More on [#11099](https://github.com/paradigmxyz/reth/pull/11099).
665        #[cfg(feature = "optimism")]
666        if reth_chainspec::EthChainSpec::chain(&provider.chain_spec()) ==
667            reth_chainspec::Chain::optimism_mainnet() &&
668            provider
669                .block_number(reth_optimism_primitives::bedrock::OVM_HEADER_1_HASH)?
670                .is_some()
671        {
672            info!(target: "reth::cli",
673                "Skipping storage verification for OP mainnet, expected inconsistency in OVM chain"
674            );
675            return Ok(None)
676        }
677
678        info!(target: "reth::cli", "Verifying storage consistency.");
679
680        let mut unwind_target: Option<BlockNumber> = None;
681        let mut update_unwind_target = |new_target: BlockNumber| {
682            if let Some(target) = unwind_target.as_mut() {
683                *target = (*target).min(new_target);
684            } else {
685                unwind_target = Some(new_target);
686            }
687        };
688
689        for segment in StaticFileSegment::iter() {
690            if has_receipt_pruning && segment.is_receipts() {
691                // Pruned nodes (including full node) do not store receipts as static files.
692                continue
693            }
694
695            let initial_highest_block = self.get_highest_static_file_block(segment);
696
697            //  File consistency is broken if:
698            //
699            // * appending data was interrupted before a config commit, then data file will be
700            //   truncated according to the config.
701            //
702            // * pruning data was interrupted before a config commit, then we have deleted data that
703            //   we are expected to still have. We need to check the Database and unwind everything
704            //   accordingly.
705            if self.access.is_read_only() {
706                self.check_segment_consistency(segment)?;
707            } else {
708                // Fetching the writer will attempt to heal any file level inconsistency.
709                self.latest_writer(segment)?;
710            }
711
712            // Only applies to block-based static files. (Headers)
713            //
714            // The updated `highest_block` may have decreased if we healed from a pruning
715            // interruption.
716            let mut highest_block = self.get_highest_static_file_block(segment);
717            if initial_highest_block != highest_block {
718                info!(
719                    target: "reth::providers::static_file",
720                    ?initial_highest_block,
721                    unwind_target = highest_block,
722                    ?segment,
723                    "Setting unwind target."
724                );
725                update_unwind_target(highest_block.unwrap_or_default());
726            }
727
728            // Only applies to transaction-based static files. (Receipts & Transactions)
729            //
730            // Make sure the last transaction matches the last block from its indices, since a heal
731            // from a pruning interruption might have decreased the number of transactions without
732            // being able to update the last block of the static file segment.
733            let highest_tx = self.get_highest_static_file_tx(segment);
734            if let Some(highest_tx) = highest_tx {
735                let mut last_block = highest_block.unwrap_or_default();
736                loop {
737                    if let Some(indices) = provider.block_body_indices(last_block)? {
738                        if indices.last_tx_num() <= highest_tx {
739                            break
740                        }
741                    } else {
742                        // If the block body indices can not be found, then it means that static
743                        // files is ahead of database, and the `ensure_invariants` check will fix
744                        // it by comparing with stage checkpoints.
745                        break
746                    }
747                    if last_block == 0 {
748                        break
749                    }
750                    last_block -= 1;
751
752                    info!(
753                        target: "reth::providers::static_file",
754                        highest_block = self.get_highest_static_file_block(segment),
755                        unwind_target = last_block,
756                        ?segment,
757                        "Setting unwind target."
758                    );
759                    highest_block = Some(last_block);
760                    update_unwind_target(last_block);
761                }
762            }
763
764            if let Some(unwind) = match segment {
765                StaticFileSegment::Headers => self.ensure_invariants::<_, tables::Headers>(
766                    provider,
767                    segment,
768                    highest_block,
769                    highest_block,
770                )?,
771                StaticFileSegment::Transactions => self
772                    .ensure_invariants::<_, tables::Transactions>(
773                        provider,
774                        segment,
775                        highest_tx,
776                        highest_block,
777                    )?,
778                StaticFileSegment::Receipts => self.ensure_invariants::<_, tables::Receipts>(
779                    provider,
780                    segment,
781                    highest_tx,
782                    highest_block,
783                )?,
784            } {
785                update_unwind_target(unwind);
786            }
787        }
788
789        Ok(unwind_target.map(PipelineTarget::Unwind))
790    }
791
792    /// Checks consistency of the latest static file segment and throws an error if at fault.
793    /// Read-only.
794    pub fn check_segment_consistency(&self, segment: StaticFileSegment) -> ProviderResult<()> {
795        if let Some(latest_block) = self.get_highest_static_file_block(segment) {
796            let file_path =
797                self.directory().join(segment.filename(&self.find_fixed_range(latest_block)));
798
799            let jar = NippyJar::<SegmentHeader>::load(&file_path)
800                .map_err(|e| ProviderError::NippyJar(e.to_string()))?;
801
802            NippyJarChecker::new(jar)
803                .check_consistency()
804                .map_err(|e| ProviderError::NippyJar(e.to_string()))?;
805        }
806        Ok(())
807    }
808
809    /// Check invariants for each corresponding table and static file segment:
810    ///
811    /// * the corresponding database table should overlap or have continuity in their keys
812    ///   ([`TxNumber`] or [`BlockNumber`]).
813    /// * its highest block should match the stage checkpoint block number if it's equal or higher
814    ///   than the corresponding database table last entry.
815    ///   * If the checkpoint block is higher, then request a pipeline unwind to the static file
816    ///     block. This is expressed by returning [`Some`] with the requested pipeline unwind
817    ///     target.
818    ///   * If the checkpoint block is lower, then heal by removing rows from the static file. In
819    ///     this case, the rows will be removed and [`None`] will be returned.
820    ///
821    /// * If the database tables overlap with static files and have contiguous keys, or the
822    ///   checkpoint block matches the highest static files block, then [`None`] will be returned.
823    fn ensure_invariants<Provider, T: Table<Key = u64>>(
824        &self,
825        provider: &Provider,
826        segment: StaticFileSegment,
827        highest_static_file_entry: Option<u64>,
828        highest_static_file_block: Option<BlockNumber>,
829    ) -> ProviderResult<Option<BlockNumber>>
830    where
831        Provider: DBProvider + BlockReader + StageCheckpointReader,
832    {
833        let highest_static_file_entry = highest_static_file_entry.unwrap_or_default();
834        let highest_static_file_block = highest_static_file_block.unwrap_or_default();
835        let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;
836
837        if let Some((db_first_entry, _)) = db_cursor.first()? {
838            // If there is a gap between the entry found in static file and
839            // database, then we have most likely lost static file data and need to unwind so we can
840            // load it again
841            if !(db_first_entry <= highest_static_file_entry ||
842                highest_static_file_entry + 1 == db_first_entry)
843            {
844                info!(
845                    target: "reth::providers::static_file",
846                    ?db_first_entry,
847                    ?highest_static_file_entry,
848                    unwind_target = highest_static_file_block,
849                    ?segment,
850                    "Setting unwind target."
851                );
852                return Ok(Some(highest_static_file_block))
853            }
854
855            if let Some((db_last_entry, _)) = db_cursor.last()? {
856                if db_last_entry > highest_static_file_entry {
857                    return Ok(None)
858                }
859            }
860        }
861
862        // If static file entry is ahead of the database entries, then ensure the checkpoint block
863        // number matches.
864        let checkpoint_block_number = provider
865            .get_stage_checkpoint(match segment {
866                StaticFileSegment::Headers => StageId::Headers,
867                StaticFileSegment::Transactions => StageId::Bodies,
868                StaticFileSegment::Receipts => StageId::Execution,
869            })?
870            .unwrap_or_default()
871            .block_number;
872
873        // If the checkpoint is ahead, then we lost static file data. May be data corruption.
874        if checkpoint_block_number > highest_static_file_block {
875            info!(
876                target: "reth::providers::static_file",
877                checkpoint_block_number,
878                unwind_target = highest_static_file_block,
879                ?segment,
880                "Setting unwind target."
881            );
882            return Ok(Some(highest_static_file_block))
883        }
884
885        // If the checkpoint is behind, then we failed to do a database commit **but committed** to
886        // static files on executing a stage, or the reverse on unwinding a stage.
887        // All we need to do is to prune the extra static file rows.
888        if checkpoint_block_number < highest_static_file_block {
889            info!(
890                target: "reth::providers",
891                ?segment,
892                from = highest_static_file_block,
893                to = checkpoint_block_number,
894                "Unwinding static file segment."
895            );
896            let mut writer = self.latest_writer(segment)?;
897            if segment.is_headers() {
898                writer.prune_headers(highest_static_file_block - checkpoint_block_number)?;
899            } else if let Some(block) = provider.block_body_indices(checkpoint_block_number)? {
900                let number = highest_static_file_entry - block.last_tx_num();
901                if segment.is_receipts() {
902                    writer.prune_receipts(number, checkpoint_block_number)?;
903                } else {
904                    writer.prune_transactions(number, checkpoint_block_number)?;
905                }
906            }
907            writer.commit()?;
908        }
909
910        Ok(None)
911    }
912
913    /// Gets the highest static file block if it exists for a static file segment.
914    ///
915    /// If there is nothing on disk for the given segment, this will return [`None`].
916    pub fn get_highest_static_file_block(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
917        self.static_files_max_block.read().get(&segment).copied()
918    }
919
920    /// Gets the highest static file transaction.
921    ///
922    /// If there is nothing on disk for the given segment, this will return [`None`].
923    pub fn get_highest_static_file_tx(&self, segment: StaticFileSegment) -> Option<TxNumber> {
924        self.static_files_tx_index
925            .read()
926            .get(&segment)
927            .and_then(|index| index.last_key_value().map(|(last_tx, _)| *last_tx))
928    }
929
930    /// Gets the highest static file block for all segments.
931    pub fn get_highest_static_files(&self) -> HighestStaticFiles {
932        HighestStaticFiles {
933            headers: self.get_highest_static_file_block(StaticFileSegment::Headers),
934            receipts: self.get_highest_static_file_block(StaticFileSegment::Receipts),
935            transactions: self.get_highest_static_file_block(StaticFileSegment::Transactions),
936        }
937    }
938
939    /// Iterates through segment `static_files` in reverse order, executing a function until it
940    /// returns some object. Useful for finding objects by [`TxHash`] or [`BlockHash`].
941    pub fn find_static_file<T>(
942        &self,
943        segment: StaticFileSegment,
944        func: impl Fn(StaticFileJarProvider<'_, N>) -> ProviderResult<Option<T>>,
945    ) -> ProviderResult<Option<T>> {
946        if let Some(highest_block) = self.get_highest_static_file_block(segment) {
947            let mut range = self.find_fixed_range(highest_block);
948            while range.end() > 0 {
949                if let Some(res) = func(self.get_or_create_jar_provider(segment, &range)?)? {
950                    return Ok(Some(res))
951                }
952                range = SegmentRangeInclusive::new(
953                    range.start().saturating_sub(self.blocks_per_file),
954                    range.end().saturating_sub(self.blocks_per_file),
955                );
956            }
957        }
958
959        Ok(None)
960    }
961
962    /// Fetches data within a specified range across multiple static files.
963    ///
964    /// This function iteratively retrieves data using `get_fn` for each item in the given range.
965    /// It continues fetching until the end of the range is reached or the provided `predicate`
966    /// returns false.
967    pub fn fetch_range_with_predicate<T, F, P>(
968        &self,
969        segment: StaticFileSegment,
970        range: Range<u64>,
971        mut get_fn: F,
972        mut predicate: P,
973    ) -> ProviderResult<Vec<T>>
974    where
975        F: FnMut(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>>,
976        P: FnMut(&T) -> bool,
977    {
978        let get_provider = |start: u64| match segment {
979            StaticFileSegment::Headers => {
980                self.get_segment_provider_from_block(segment, start, None)
981            }
982            StaticFileSegment::Transactions | StaticFileSegment::Receipts => {
983                self.get_segment_provider_from_transaction(segment, start, None)
984            }
985        };
986
987        let mut result = Vec::with_capacity((range.end - range.start).min(100) as usize);
988        let mut provider = get_provider(range.start)?;
989        let mut cursor = provider.cursor()?;
990
991        // advances number in range
992        'outer: for number in range {
993            // The `retrying` flag ensures a single retry attempt per `number`. If `get_fn` fails to
994            // access data in two different static files, it halts further attempts by returning
995            // an error, effectively preventing infinite retry loops.
996            let mut retrying = false;
997
998            // advances static files if `get_fn` returns None
999            'inner: loop {
1000                match get_fn(&mut cursor, number)? {
1001                    Some(res) => {
1002                        if !predicate(&res) {
1003                            break 'outer
1004                        }
1005                        result.push(res);
1006                        break 'inner
1007                    }
1008                    None => {
1009                        if retrying {
1010                            warn!(
1011                                target: "provider::static_file",
1012                                ?segment,
1013                                ?number,
1014                                "Could not find block or tx number on a range request"
1015                            );
1016
1017                            let err = if segment.is_headers() {
1018                                ProviderError::MissingStaticFileBlock(segment, number)
1019                            } else {
1020                                ProviderError::MissingStaticFileTx(segment, number)
1021                            };
1022                            return Err(err)
1023                        }
1024                        // There is a very small chance of hitting a deadlock if two consecutive
1025                        // static files share the same bucket in the
1026                        // internal dashmap and we don't drop the current provider
1027                        // before requesting the next one.
1028                        drop(cursor);
1029                        drop(provider);
1030                        provider = get_provider(number)?;
1031                        cursor = provider.cursor()?;
1032                        retrying = true;
1033                    }
1034                }
1035            }
1036        }
1037
1038        Ok(result)
1039    }
1040
1041    /// Fetches data within a specified range across multiple static files.
1042    ///
1043    /// Returns an iterator over the data
1044    pub fn fetch_range_iter<'a, T, F>(
1045        &'a self,
1046        segment: StaticFileSegment,
1047        range: Range<u64>,
1048        get_fn: F,
1049    ) -> ProviderResult<impl Iterator<Item = ProviderResult<T>> + 'a>
1050    where
1051        F: Fn(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>> + 'a,
1052        T: std::fmt::Debug,
1053    {
1054        let get_provider = move |start: u64| match segment {
1055            StaticFileSegment::Headers => {
1056                self.get_segment_provider_from_block(segment, start, None)
1057            }
1058            StaticFileSegment::Transactions | StaticFileSegment::Receipts => {
1059                self.get_segment_provider_from_transaction(segment, start, None)
1060            }
1061        };
1062
1063        let mut provider = Some(get_provider(range.start)?);
1064        Ok(range.filter_map(move |number| {
1065            match get_fn(&mut provider.as_ref().expect("qed").cursor().ok()?, number).transpose() {
1066                Some(result) => Some(result),
1067                None => {
1068                    // There is a very small chance of hitting a deadlock if two consecutive static
1069                    // files share the same bucket in the internal dashmap and
1070                    // we don't drop the current provider before requesting the
1071                    // next one.
1072                    provider.take();
1073                    provider = Some(get_provider(number).ok()?);
1074                    get_fn(&mut provider.as_ref().expect("qed").cursor().ok()?, number).transpose()
1075                }
1076            }
1077        }))
1078    }
1079
1080    /// Returns directory where `static_files` are located.
1081    pub fn directory(&self) -> &Path {
1082        &self.path
1083    }
1084
1085    /// Retrieves data from the database or static file, wherever it's available.
1086    ///
1087    /// # Arguments
1088    /// * `segment` - The segment of the static file to check against.
1089    /// * `index_key` - Requested index key, usually a block or transaction number.
1090    /// * `fetch_from_static_file` - A closure that defines how to fetch the data from the static
1091    ///   file provider.
1092    /// * `fetch_from_database` - A closure that defines how to fetch the data from the database
1093    ///   when the static file doesn't contain the required data or is not available.
1094    pub fn get_with_static_file_or_database<T, FS, FD>(
1095        &self,
1096        segment: StaticFileSegment,
1097        number: u64,
1098        fetch_from_static_file: FS,
1099        fetch_from_database: FD,
1100    ) -> ProviderResult<Option<T>>
1101    where
1102        FS: Fn(&Self) -> ProviderResult<Option<T>>,
1103        FD: Fn() -> ProviderResult<Option<T>>,
1104    {
1105        // If there is, check the maximum block or transaction number of the segment.
1106        let static_file_upper_bound = match segment {
1107            StaticFileSegment::Headers => self.get_highest_static_file_block(segment),
1108            StaticFileSegment::Transactions | StaticFileSegment::Receipts => {
1109                self.get_highest_static_file_tx(segment)
1110            }
1111        };
1112
1113        if static_file_upper_bound
1114            .is_some_and(|static_file_upper_bound| static_file_upper_bound >= number)
1115        {
1116            return fetch_from_static_file(self)
1117        }
1118        fetch_from_database()
1119    }
1120
1121    /// Gets data within a specified range, potentially spanning different `static_files` and
1122    /// database.
1123    ///
1124    /// # Arguments
1125    /// * `segment` - The segment of the static file to query.
1126    /// * `block_range` - The range of data to fetch.
1127    /// * `fetch_from_static_file` - A function to fetch data from the `static_file`.
1128    /// * `fetch_from_database` - A function to fetch data from the database.
1129    /// * `predicate` - A function used to evaluate each item in the fetched data. Fetching is
1130    ///   terminated when this function returns false, thereby filtering the data based on the
1131    ///   provided condition.
1132    pub fn get_range_with_static_file_or_database<T, P, FS, FD>(
1133        &self,
1134        segment: StaticFileSegment,
1135        mut block_or_tx_range: Range<u64>,
1136        fetch_from_static_file: FS,
1137        mut fetch_from_database: FD,
1138        mut predicate: P,
1139    ) -> ProviderResult<Vec<T>>
1140    where
1141        FS: Fn(&Self, Range<u64>, &mut P) -> ProviderResult<Vec<T>>,
1142        FD: FnMut(Range<u64>, P) -> ProviderResult<Vec<T>>,
1143        P: FnMut(&T) -> bool,
1144    {
1145        let mut data = Vec::new();
1146
1147        // If there is, check the maximum block or transaction number of the segment.
1148        if let Some(static_file_upper_bound) = match segment {
1149            StaticFileSegment::Headers => self.get_highest_static_file_block(segment),
1150            StaticFileSegment::Transactions | StaticFileSegment::Receipts => {
1151                self.get_highest_static_file_tx(segment)
1152            }
1153        } {
1154            if block_or_tx_range.start <= static_file_upper_bound {
1155                let end = block_or_tx_range.end.min(static_file_upper_bound + 1);
1156                data.extend(fetch_from_static_file(
1157                    self,
1158                    block_or_tx_range.start..end,
1159                    &mut predicate,
1160                )?);
1161                block_or_tx_range.start = end;
1162            }
1163        }
1164
1165        if block_or_tx_range.end > block_or_tx_range.start {
1166            data.extend(fetch_from_database(block_or_tx_range, predicate)?)
1167        }
1168
1169        Ok(data)
1170    }
1171
1172    /// Returns `static_files` directory
1173    #[cfg(any(test, feature = "test-utils"))]
1174    pub fn path(&self) -> &Path {
1175        &self.path
1176    }
1177
1178    /// Returns `static_files` transaction index
1179    #[cfg(any(test, feature = "test-utils"))]
1180    pub fn tx_index(&self) -> &RwLock<SegmentRanges> {
1181        &self.static_files_tx_index
1182    }
1183}
1184
1185/// Helper trait to manage different [`StaticFileProviderRW`] of an `Arc<StaticFileProvider`
1186pub trait StaticFileWriter {
1187    /// The primitives type used by the static file provider.
1188    type Primitives: Send + Sync + 'static;
1189
1190    /// Returns a mutable reference to a [`StaticFileProviderRW`] of a [`StaticFileSegment`].
1191    fn get_writer(
1192        &self,
1193        block: BlockNumber,
1194        segment: StaticFileSegment,
1195    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
1196
1197    /// Returns a mutable reference to a [`StaticFileProviderRW`] of the latest
1198    /// [`StaticFileSegment`].
1199    fn latest_writer(
1200        &self,
1201        segment: StaticFileSegment,
1202    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
1203
1204    /// Commits all changes of all [`StaticFileProviderRW`] of all [`StaticFileSegment`].
1205    fn commit(&self) -> ProviderResult<()>;
1206}
1207
1208impl<N: NodePrimitives> StaticFileWriter for StaticFileProvider<N> {
1209    type Primitives = N;
1210
1211    fn get_writer(
1212        &self,
1213        block: BlockNumber,
1214        segment: StaticFileSegment,
1215    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
1216        if self.access.is_read_only() {
1217            return Err(ProviderError::ReadOnlyStaticFileAccess)
1218        }
1219
1220        trace!(target: "provider::static_file", ?block, ?segment, "Getting static file writer.");
1221        self.writers.get_or_create(segment, || {
1222            StaticFileProviderRW::new(segment, block, Arc::downgrade(&self.0), self.metrics.clone())
1223        })
1224    }
1225
1226    fn latest_writer(
1227        &self,
1228        segment: StaticFileSegment,
1229    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
1230        self.get_writer(self.get_highest_static_file_block(segment).unwrap_or_default(), segment)
1231    }
1232
1233    fn commit(&self) -> ProviderResult<()> {
1234        self.writers.commit()
1235    }
1236}
1237
1238impl<N: NodePrimitives<BlockHeader: Value>> HeaderProvider for StaticFileProvider<N> {
1239    type Header = N::BlockHeader;
1240
1241    fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
1242        self.find_static_file(StaticFileSegment::Headers, |jar_provider| {
1243            Ok(jar_provider
1244                .cursor()?
1245                .get_two::<HeaderWithHashMask<Self::Header>>(block_hash.into())?
1246                .and_then(|(header, hash)| {
1247                    if &hash == block_hash {
1248                        return Some(header)
1249                    }
1250                    None
1251                }))
1252        })
1253    }
1254
1255    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1256        self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)
1257            .and_then(|provider| provider.header_by_number(num))
1258            .or_else(|err| {
1259                if let ProviderError::MissingStaticFileBlock(_, _) = err {
1260                    Ok(None)
1261                } else {
1262                    Err(err)
1263                }
1264            })
1265    }
1266
1267    fn header_td(&self, block_hash: &BlockHash) -> ProviderResult<Option<U256>> {
1268        self.find_static_file(StaticFileSegment::Headers, |jar_provider| {
1269            Ok(jar_provider
1270                .cursor()?
1271                .get_two::<TDWithHashMask>(block_hash.into())?
1272                .and_then(|(td, hash)| (&hash == block_hash).then_some(td.0)))
1273        })
1274    }
1275
1276    fn header_td_by_number(&self, num: BlockNumber) -> ProviderResult<Option<U256>> {
1277        self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)
1278            .and_then(|provider| provider.header_td_by_number(num))
1279            .or_else(|err| {
1280                if let ProviderError::MissingStaticFileBlock(_, _) = err {
1281                    Ok(None)
1282                } else {
1283                    Err(err)
1284                }
1285            })
1286    }
1287
1288    fn headers_range(
1289        &self,
1290        range: impl RangeBounds<BlockNumber>,
1291    ) -> ProviderResult<Vec<Self::Header>> {
1292        self.fetch_range_with_predicate(
1293            StaticFileSegment::Headers,
1294            to_range(range),
1295            |cursor, number| cursor.get_one::<HeaderMask<Self::Header>>(number.into()),
1296            |_| true,
1297        )
1298    }
1299
1300    fn sealed_header(
1301        &self,
1302        num: BlockNumber,
1303    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1304        self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)
1305            .and_then(|provider| provider.sealed_header(num))
1306            .or_else(|err| {
1307                if let ProviderError::MissingStaticFileBlock(_, _) = err {
1308                    Ok(None)
1309                } else {
1310                    Err(err)
1311                }
1312            })
1313    }
1314
1315    fn sealed_headers_while(
1316        &self,
1317        range: impl RangeBounds<BlockNumber>,
1318        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1319    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1320        self.fetch_range_with_predicate(
1321            StaticFileSegment::Headers,
1322            to_range(range),
1323            |cursor, number| {
1324                Ok(cursor
1325                    .get_two::<HeaderWithHashMask<Self::Header>>(number.into())?
1326                    .map(|(header, hash)| SealedHeader::new(header, hash)))
1327            },
1328            predicate,
1329        )
1330    }
1331}
1332
1333impl<N: NodePrimitives> BlockHashReader for StaticFileProvider<N> {
1334    fn block_hash(&self, num: u64) -> ProviderResult<Option<B256>> {
1335        self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)?.block_hash(num)
1336    }
1337
1338    fn canonical_hashes_range(
1339        &self,
1340        start: BlockNumber,
1341        end: BlockNumber,
1342    ) -> ProviderResult<Vec<B256>> {
1343        self.fetch_range_with_predicate(
1344            StaticFileSegment::Headers,
1345            start..end,
1346            |cursor, number| cursor.get_one::<BlockHashMask>(number.into()),
1347            |_| true,
1348        )
1349    }
1350}
1351
1352impl<N: NodePrimitives<SignedTx: Value + SignedTransaction, Receipt: Value>> ReceiptProvider
1353    for StaticFileProvider<N>
1354{
1355    type Receipt = N::Receipt;
1356
1357    fn receipt(&self, num: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1358        self.get_segment_provider_from_transaction(StaticFileSegment::Receipts, num, None)
1359            .and_then(|provider| provider.receipt(num))
1360            .or_else(|err| {
1361                if let ProviderError::MissingStaticFileTx(_, _) = err {
1362                    Ok(None)
1363                } else {
1364                    Err(err)
1365                }
1366            })
1367    }
1368
1369    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1370        if let Some(num) = self.transaction_id(hash)? {
1371            return self.receipt(num)
1372        }
1373        Ok(None)
1374    }
1375
1376    fn receipts_by_block(
1377        &self,
1378        _block: BlockHashOrNumber,
1379    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1380        unreachable!()
1381    }
1382
1383    fn receipts_by_tx_range(
1384        &self,
1385        range: impl RangeBounds<TxNumber>,
1386    ) -> ProviderResult<Vec<Self::Receipt>> {
1387        self.fetch_range_with_predicate(
1388            StaticFileSegment::Receipts,
1389            to_range(range),
1390            |cursor, number| cursor.get_one::<ReceiptMask<Self::Receipt>>(number.into()),
1391            |_| true,
1392        )
1393    }
1394}
1395
1396impl<N: FullNodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>>
1397    TransactionsProviderExt for StaticFileProvider<N>
1398{
1399    fn transaction_hashes_by_range(
1400        &self,
1401        tx_range: Range<TxNumber>,
1402    ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1403        let tx_range_size = (tx_range.end - tx_range.start) as usize;
1404
1405        // Transactions are different size, so chunks will not all take the same processing time. If
1406        // chunks are too big, there will be idle threads waiting for work. Choosing an
1407        // arbitrary smaller value to make sure it doesn't happen.
1408        let chunk_size = 100;
1409
1410        // iterator over the chunks
1411        let chunks = tx_range
1412            .clone()
1413            .step_by(chunk_size)
1414            .map(|start| start..std::cmp::min(start + chunk_size as u64, tx_range.end));
1415        let mut channels = Vec::with_capacity(tx_range_size.div_ceil(chunk_size));
1416
1417        for chunk_range in chunks {
1418            let (channel_tx, channel_rx) = mpsc::channel();
1419            channels.push(channel_rx);
1420
1421            let manager = self.clone();
1422
1423            // Spawn the task onto the global rayon pool
1424            // This task will send the results through the channel after it has calculated
1425            // the hash.
1426            rayon::spawn(move || {
1427                let mut rlp_buf = Vec::with_capacity(128);
1428                let _ = manager.fetch_range_with_predicate(
1429                    StaticFileSegment::Transactions,
1430                    chunk_range,
1431                    |cursor, number| {
1432                        Ok(cursor
1433                            .get_one::<TransactionMask<Self::Transaction>>(number.into())?
1434                            .map(|transaction| {
1435                                rlp_buf.clear();
1436                                let _ = channel_tx
1437                                    .send(calculate_hash((number, transaction), &mut rlp_buf));
1438                            }))
1439                    },
1440                    |_| true,
1441                );
1442            });
1443        }
1444
1445        let mut tx_list = Vec::with_capacity(tx_range_size);
1446
1447        // Iterate over channels and append the tx hashes unsorted
1448        for channel in channels {
1449            while let Ok(tx) = channel.recv() {
1450                let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
1451                tx_list.push((tx_hash, tx_id));
1452            }
1453        }
1454
1455        Ok(tx_list)
1456    }
1457}
1458
1459impl<N: NodePrimitives<SignedTx: Decompress + SignedTransaction>> TransactionsProvider
1460    for StaticFileProvider<N>
1461{
1462    type Transaction = N::SignedTx;
1463
1464    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1465        self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
1466            let mut cursor = jar_provider.cursor()?;
1467            if cursor
1468                .get_one::<TransactionMask<Self::Transaction>>((&tx_hash).into())?
1469                .and_then(|tx| (tx.trie_hash() == tx_hash).then_some(tx))
1470                .is_some()
1471            {
1472                Ok(cursor.number())
1473            } else {
1474                Ok(None)
1475            }
1476        })
1477    }
1478
1479    fn transaction_by_id(&self, num: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1480        self.get_segment_provider_from_transaction(StaticFileSegment::Transactions, num, None)
1481            .and_then(|provider| provider.transaction_by_id(num))
1482            .or_else(|err| {
1483                if let ProviderError::MissingStaticFileTx(_, _) = err {
1484                    Ok(None)
1485                } else {
1486                    Err(err)
1487                }
1488            })
1489    }
1490
1491    fn transaction_by_id_unhashed(
1492        &self,
1493        num: TxNumber,
1494    ) -> ProviderResult<Option<Self::Transaction>> {
1495        self.get_segment_provider_from_transaction(StaticFileSegment::Transactions, num, None)
1496            .and_then(|provider| provider.transaction_by_id_unhashed(num))
1497            .or_else(|err| {
1498                if let ProviderError::MissingStaticFileTx(_, _) = err {
1499                    Ok(None)
1500                } else {
1501                    Err(err)
1502                }
1503            })
1504    }
1505
1506    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1507        self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
1508            Ok(jar_provider
1509                .cursor()?
1510                .get_one::<TransactionMask<Self::Transaction>>((&hash).into())?
1511                .and_then(|tx| (tx.trie_hash() == hash).then_some(tx)))
1512        })
1513    }
1514
1515    fn transaction_by_hash_with_meta(
1516        &self,
1517        _hash: TxHash,
1518    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1519        // Required data not present in static_files
1520        Err(ProviderError::UnsupportedProvider)
1521    }
1522
1523    fn transaction_block(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1524        // Required data not present in static_files
1525        Err(ProviderError::UnsupportedProvider)
1526    }
1527
1528    fn transactions_by_block(
1529        &self,
1530        _block_id: BlockHashOrNumber,
1531    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1532        // Required data not present in static_files
1533        Err(ProviderError::UnsupportedProvider)
1534    }
1535
1536    fn transactions_by_block_range(
1537        &self,
1538        _range: impl RangeBounds<BlockNumber>,
1539    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1540        // Required data not present in static_files
1541        Err(ProviderError::UnsupportedProvider)
1542    }
1543
1544    fn transactions_by_tx_range(
1545        &self,
1546        range: impl RangeBounds<TxNumber>,
1547    ) -> ProviderResult<Vec<Self::Transaction>> {
1548        self.fetch_range_with_predicate(
1549            StaticFileSegment::Transactions,
1550            to_range(range),
1551            |cursor, number| cursor.get_one::<TransactionMask<Self::Transaction>>(number.into()),
1552            |_| true,
1553        )
1554    }
1555
1556    fn senders_by_tx_range(
1557        &self,
1558        range: impl RangeBounds<TxNumber>,
1559    ) -> ProviderResult<Vec<Address>> {
1560        let txes = self.transactions_by_tx_range(range)?;
1561        recover_signers(&txes, txes.len()).ok_or(ProviderError::SenderRecoveryError)
1562    }
1563
1564    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1565        Ok(self.transaction_by_id_unhashed(id)?.and_then(|tx| tx.recover_signer()))
1566    }
1567}
1568
1569/* Cannot be successfully implemented but must exist for trait requirements */
1570
1571impl<N: NodePrimitives> BlockNumReader for StaticFileProvider<N> {
1572    fn chain_info(&self) -> ProviderResult<ChainInfo> {
1573        // Required data not present in static_files
1574        Err(ProviderError::UnsupportedProvider)
1575    }
1576
1577    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1578        // Required data not present in static_files
1579        Err(ProviderError::UnsupportedProvider)
1580    }
1581
1582    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1583        // Required data not present in static_files
1584        Err(ProviderError::UnsupportedProvider)
1585    }
1586
1587    fn block_number(&self, _hash: B256) -> ProviderResult<Option<BlockNumber>> {
1588        // Required data not present in static_files
1589        Err(ProviderError::UnsupportedProvider)
1590    }
1591}
1592
1593impl<N: FullNodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> BlockReader
1594    for StaticFileProvider<N>
1595{
1596    type Block = N::Block;
1597
1598    fn find_block_by_hash(
1599        &self,
1600        _hash: B256,
1601        _source: BlockSource,
1602    ) -> ProviderResult<Option<Self::Block>> {
1603        // Required data not present in static_files
1604        Err(ProviderError::UnsupportedProvider)
1605    }
1606
1607    fn block(&self, _id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1608        // Required data not present in static_files
1609        Err(ProviderError::UnsupportedProvider)
1610    }
1611
1612    fn pending_block(&self) -> ProviderResult<Option<SealedBlockFor<Self::Block>>> {
1613        // Required data not present in static_files
1614        Err(ProviderError::UnsupportedProvider)
1615    }
1616
1617    fn pending_block_with_senders(
1618        &self,
1619    ) -> ProviderResult<Option<SealedBlockWithSenders<Self::Block>>> {
1620        // Required data not present in static_files
1621        Err(ProviderError::UnsupportedProvider)
1622    }
1623
1624    fn pending_block_and_receipts(
1625        &self,
1626    ) -> ProviderResult<Option<(SealedBlockFor<Self::Block>, Vec<Self::Receipt>)>> {
1627        // Required data not present in static_files
1628        Err(ProviderError::UnsupportedProvider)
1629    }
1630
1631    fn block_with_senders(
1632        &self,
1633        _id: BlockHashOrNumber,
1634        _transaction_kind: TransactionVariant,
1635    ) -> ProviderResult<Option<BlockWithSenders<Self::Block>>> {
1636        // Required data not present in static_files
1637        Err(ProviderError::UnsupportedProvider)
1638    }
1639
1640    fn sealed_block_with_senders(
1641        &self,
1642        _id: BlockHashOrNumber,
1643        _transaction_kind: TransactionVariant,
1644    ) -> ProviderResult<Option<SealedBlockWithSenders<Self::Block>>> {
1645        // Required data not present in static_files
1646        Err(ProviderError::UnsupportedProvider)
1647    }
1648
1649    fn block_range(&self, _range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1650        // Required data not present in static_files
1651        Err(ProviderError::UnsupportedProvider)
1652    }
1653
1654    fn block_with_senders_range(
1655        &self,
1656        _range: RangeInclusive<BlockNumber>,
1657    ) -> ProviderResult<Vec<BlockWithSenders<Self::Block>>> {
1658        Err(ProviderError::UnsupportedProvider)
1659    }
1660
1661    fn sealed_block_with_senders_range(
1662        &self,
1663        _range: RangeInclusive<BlockNumber>,
1664    ) -> ProviderResult<Vec<SealedBlockWithSenders<Self::Block>>> {
1665        Err(ProviderError::UnsupportedProvider)
1666    }
1667}
1668
1669impl<N: NodePrimitives> WithdrawalsProvider for StaticFileProvider<N> {
1670    fn withdrawals_by_block(
1671        &self,
1672        _id: BlockHashOrNumber,
1673        _timestamp: u64,
1674    ) -> ProviderResult<Option<Withdrawals>> {
1675        // Required data not present in static_files
1676        Err(ProviderError::UnsupportedProvider)
1677    }
1678
1679    fn latest_withdrawal(&self) -> ProviderResult<Option<Withdrawal>> {
1680        // Required data not present in static_files
1681        Err(ProviderError::UnsupportedProvider)
1682    }
1683}
1684
1685impl<N: FullNodePrimitives<BlockHeader: Value>> OmmersProvider for StaticFileProvider<N> {
1686    fn ommers(&self, _id: BlockHashOrNumber) -> ProviderResult<Option<Vec<Self::Header>>> {
1687        // Required data not present in static_files
1688        Err(ProviderError::UnsupportedProvider)
1689    }
1690}
1691
1692impl<N: Send + Sync> BlockBodyIndicesProvider for StaticFileProvider<N> {
1693    fn block_body_indices(&self, _num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1694        // Required data not present in static_files
1695        Err(ProviderError::UnsupportedProvider)
1696    }
1697}
1698
1699impl<N: NodePrimitives> StatsReader for StaticFileProvider<N> {
1700    fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
1701        match T::NAME {
1702            tables::CanonicalHeaders::NAME |
1703            tables::Headers::<Header>::NAME |
1704            tables::HeaderTerminalDifficulties::NAME => Ok(self
1705                .get_highest_static_file_block(StaticFileSegment::Headers)
1706                .map(|block| block + 1)
1707                .unwrap_or_default()
1708                as usize),
1709            tables::Receipts::<Receipt>::NAME => Ok(self
1710                .get_highest_static_file_tx(StaticFileSegment::Receipts)
1711                .map(|receipts| receipts + 1)
1712                .unwrap_or_default() as usize),
1713            tables::Transactions::<TransactionSigned>::NAME => Ok(self
1714                .get_highest_static_file_tx(StaticFileSegment::Transactions)
1715                .map(|txs| txs + 1)
1716                .unwrap_or_default()
1717                as usize),
1718            _ => Err(ProviderError::UnsupportedProvider),
1719        }
1720    }
1721}
1722
1723/// Calculates the tx hash for the given transaction and its id.
1724#[inline]
1725fn calculate_hash<T>(
1726    entry: (TxNumber, T),
1727    rlp_buf: &mut Vec<u8>,
1728) -> Result<(B256, TxNumber), Box<ProviderError>>
1729where
1730    T: Encodable2718,
1731{
1732    let (tx_id, tx) = entry;
1733    tx.encode_2718(rlp_buf);
1734    Ok((keccak256(rlp_buf), tx_id))
1735}