reth_provider/providers/
consistent.rs

1use super::{DatabaseProviderRO, ProviderFactory, ProviderNodeTypes};
2use crate::{
3    providers::StaticFileProvider, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader,
4    BlockReader, BlockReaderIdExt, BlockSource, ChainSpecProvider, ChangeSetReader, EvmEnvProvider,
5    HeaderProvider, ProviderError, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt,
6    StageCheckpointReader, StateReader, StaticFileProviderFactory, TransactionVariant,
7    TransactionsProvider, WithdrawalsProvider,
8};
9use alloy_consensus::BlockHeader;
10use alloy_eips::{
11    eip2718::Encodable2718,
12    eip4895::{Withdrawal, Withdrawals},
13    BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag, HashOrNumber,
14};
15use alloy_primitives::{Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256};
16use reth_chain_state::{BlockState, CanonicalInMemoryState, MemoryOverlayStateProviderRef};
17use reth_chainspec::{ChainInfo, EthereumHardforks};
18use reth_db::models::BlockNumberAddress;
19use reth_db_api::models::{AccountBeforeTx, StoredBlockBodyIndices};
20use reth_evm::ConfigureEvmEnv;
21use reth_execution_types::{BundleStateInit, ExecutionOutcome, RevertsInit};
22use reth_node_types::{BlockTy, HeaderTy, ReceiptTy, TxTy};
23use reth_primitives::{
24    Account, BlockWithSenders, SealedBlockFor, SealedBlockWithSenders, SealedHeader, StorageEntry,
25    TransactionMeta,
26};
27use reth_primitives_traits::BlockBody;
28use reth_prune_types::{PruneCheckpoint, PruneSegment};
29use reth_stages_types::{StageCheckpoint, StageId};
30use reth_storage_api::{
31    BlockBodyIndicesProvider, DatabaseProviderFactory, NodePrimitivesProvider, OmmersProvider,
32    StateProvider, StorageChangeSetReader,
33};
34use reth_storage_errors::provider::ProviderResult;
35use revm::{
36    db::states::PlainStorageRevert,
37    primitives::{BlockEnv, CfgEnvWithHandlerCfg},
38};
39use std::{
40    collections::{hash_map, HashMap},
41    ops::{Add, Bound, RangeBounds, RangeInclusive, Sub},
42    sync::Arc,
43};
44use tracing::trace;
45
46/// Type that interacts with a snapshot view of the blockchain (storage and in-memory) at time of
47/// instantiation, EXCEPT for pending, safe and finalized block which might change while holding
48/// this provider.
49///
50/// CAUTION: Avoid holding this provider for too long or the inner database transaction will
51/// time-out.
52#[derive(Debug)]
53#[doc(hidden)] // triggers ICE for `cargo docs`
54pub struct ConsistentProvider<N: ProviderNodeTypes> {
55    /// Storage provider.
56    storage_provider: <ProviderFactory<N> as DatabaseProviderFactory>::Provider,
57    /// Head block at time of [`Self`] creation
58    head_block: Option<Arc<BlockState<N::Primitives>>>,
59    /// In-memory canonical state. This is not a snapshot, and can change! Use with caution.
60    canonical_in_memory_state: CanonicalInMemoryState<N::Primitives>,
61}
62
63impl<N: ProviderNodeTypes> ConsistentProvider<N> {
64    /// Create a new provider using [`ProviderFactory`] and [`CanonicalInMemoryState`],
65    ///
66    /// Underneath it will take a snapshot by fetching [`CanonicalInMemoryState::head_state`] and
67    /// [`ProviderFactory::database_provider_ro`] effectively maintaining one single snapshotted
68    /// view of memory and database.
69    pub fn new(
70        storage_provider_factory: ProviderFactory<N>,
71        state: CanonicalInMemoryState<N::Primitives>,
72    ) -> ProviderResult<Self> {
73        // Each one provides a snapshot at the time of instantiation, but its order matters.
74        //
75        // If we acquire first the database provider, it's possible that before the in-memory chain
76        // snapshot is instantiated, it will flush blocks to disk. This would
77        // mean that our database provider would not have access to the flushed blocks (since it's
78        // working under an older view), while the in-memory state may have deleted them
79        // entirely. Resulting in gaps on the range.
80        let head_block = state.head_state();
81        let storage_provider = storage_provider_factory.database_provider_ro()?;
82        Ok(Self { storage_provider, head_block, canonical_in_memory_state: state })
83    }
84
85    // Helper function to convert range bounds
86    fn convert_range_bounds<T>(
87        &self,
88        range: impl RangeBounds<T>,
89        end_unbounded: impl FnOnce() -> T,
90    ) -> (T, T)
91    where
92        T: Copy + Add<Output = T> + Sub<Output = T> + From<u8>,
93    {
94        let start = match range.start_bound() {
95            Bound::Included(&n) => n,
96            Bound::Excluded(&n) => n + T::from(1u8),
97            Bound::Unbounded => T::from(0u8),
98        };
99
100        let end = match range.end_bound() {
101            Bound::Included(&n) => n,
102            Bound::Excluded(&n) => n - T::from(1u8),
103            Bound::Unbounded => end_unbounded(),
104        };
105
106        (start, end)
107    }
108
109    /// Storage provider for latest block
110    fn latest_ref<'a>(&'a self) -> ProviderResult<Box<dyn StateProvider + 'a>> {
111        trace!(target: "providers::blockchain", "Getting latest block state provider");
112
113        // use latest state provider if the head state exists
114        if let Some(state) = &self.head_block {
115            trace!(target: "providers::blockchain", "Using head state for latest state provider");
116            Ok(self.block_state_provider_ref(state)?.boxed())
117        } else {
118            trace!(target: "providers::blockchain", "Using database state for latest state provider");
119            Ok(self.storage_provider.latest())
120        }
121    }
122
123    fn history_by_block_hash_ref<'a>(
124        &'a self,
125        block_hash: BlockHash,
126    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
127        trace!(target: "providers::blockchain", ?block_hash, "Getting history by block hash");
128
129        self.get_in_memory_or_storage_by_block(
130            block_hash.into(),
131            |_| self.storage_provider.history_by_block_hash(block_hash),
132            |block_state| {
133                let state_provider = self.block_state_provider_ref(block_state)?;
134                Ok(Box::new(state_provider))
135            },
136        )
137    }
138
139    /// Returns a state provider indexed by the given block number or tag.
140    fn state_by_block_number_ref<'a>(
141        &'a self,
142        number: BlockNumber,
143    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
144        let hash =
145            self.block_hash(number)?.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
146        self.history_by_block_hash_ref(hash)
147    }
148
149    /// Return the last N blocks of state, recreating the [`ExecutionOutcome`].
150    ///
151    /// If the range is empty, or there are no blocks for the given range, then this returns `None`.
152    pub fn get_state(
153        &self,
154        range: RangeInclusive<BlockNumber>,
155    ) -> ProviderResult<Option<ExecutionOutcome<ReceiptTy<N>>>> {
156        if range.is_empty() {
157            return Ok(None)
158        }
159        let start_block_number = *range.start();
160        let end_block_number = *range.end();
161
162        // We are not removing block meta as it is used to get block changesets.
163        let mut block_bodies = Vec::new();
164        for block_num in range.clone() {
165            let block_body = self
166                .block_body_indices(block_num)?
167                .ok_or(ProviderError::BlockBodyIndicesNotFound(block_num))?;
168            block_bodies.push((block_num, block_body))
169        }
170
171        // get transaction receipts
172        let Some(from_transaction_num) = block_bodies.first().map(|body| body.1.first_tx_num())
173        else {
174            return Ok(None)
175        };
176        let Some(to_transaction_num) = block_bodies.last().map(|body| body.1.last_tx_num()) else {
177            return Ok(None)
178        };
179
180        let mut account_changeset = Vec::new();
181        for block_num in range.clone() {
182            let changeset =
183                self.account_block_changeset(block_num)?.into_iter().map(|elem| (block_num, elem));
184            account_changeset.extend(changeset);
185        }
186
187        let mut storage_changeset = Vec::new();
188        for block_num in range {
189            let changeset = self.storage_changeset(block_num)?;
190            storage_changeset.extend(changeset);
191        }
192
193        let (state, reverts) =
194            self.populate_bundle_state(account_changeset, storage_changeset, end_block_number)?;
195
196        let mut receipt_iter =
197            self.receipts_by_tx_range(from_transaction_num..=to_transaction_num)?.into_iter();
198
199        let mut receipts = Vec::with_capacity(block_bodies.len());
200        // loop break if we are at the end of the blocks.
201        for (_, block_body) in block_bodies {
202            let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
203            for tx_num in block_body.tx_num_range() {
204                let receipt = receipt_iter
205                    .next()
206                    .ok_or_else(|| ProviderError::ReceiptNotFound(tx_num.into()))?;
207                block_receipts.push(Some(receipt));
208            }
209            receipts.push(block_receipts);
210        }
211
212        Ok(Some(ExecutionOutcome::new_init(
213            state,
214            reverts,
215            // We skip new contracts since we never delete them from the database
216            Vec::new(),
217            receipts.into(),
218            start_block_number,
219            Vec::new(),
220        )))
221    }
222
223    /// Populate a [`BundleStateInit`] and [`RevertsInit`] using cursors over the
224    /// [`reth_db::PlainAccountState`] and [`reth_db::PlainStorageState`] tables, based on the given
225    /// storage and account changesets.
226    fn populate_bundle_state(
227        &self,
228        account_changeset: Vec<(u64, AccountBeforeTx)>,
229        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
230        block_range_end: BlockNumber,
231    ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
232        let mut state: BundleStateInit = HashMap::new();
233        let mut reverts: RevertsInit = HashMap::new();
234        let state_provider = self.state_by_block_number_ref(block_range_end)?;
235
236        // add account changeset changes
237        for (block_number, account_before) in account_changeset.into_iter().rev() {
238            let AccountBeforeTx { info: old_info, address } = account_before;
239            match state.entry(address) {
240                hash_map::Entry::Vacant(entry) => {
241                    let new_info = state_provider.basic_account(address)?;
242                    entry.insert((old_info, new_info, HashMap::new()));
243                }
244                hash_map::Entry::Occupied(mut entry) => {
245                    // overwrite old account state.
246                    entry.get_mut().0 = old_info;
247                }
248            }
249            // insert old info into reverts.
250            reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
251        }
252
253        // add storage changeset changes
254        for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
255            let BlockNumberAddress((block_number, address)) = block_and_address;
256            // get account state or insert from plain state.
257            let account_state = match state.entry(address) {
258                hash_map::Entry::Vacant(entry) => {
259                    let present_info = state_provider.basic_account(address)?;
260                    entry.insert((present_info, present_info, HashMap::new()))
261                }
262                hash_map::Entry::Occupied(entry) => entry.into_mut(),
263            };
264
265            // match storage.
266            match account_state.2.entry(old_storage.key) {
267                hash_map::Entry::Vacant(entry) => {
268                    let new_storage_value =
269                        state_provider.storage(address, old_storage.key)?.unwrap_or_default();
270                    entry.insert((
271                        (old_storage.value, old_storage.is_private),
272                        (new_storage_value.value, old_storage.is_private),
273                    ));
274                }
275                hash_map::Entry::Occupied(mut entry) => {
276                    entry.get_mut().0 = (old_storage.value, old_storage.is_private);
277                }
278            };
279
280            reverts
281                .entry(block_number)
282                .or_default()
283                .entry(address)
284                .or_default()
285                .1
286                .push(old_storage);
287        }
288
289        Ok((state, reverts))
290    }
291
292    /// Fetches a range of data from both in-memory state and persistent storage while a predicate
293    /// is met.
294    ///
295    /// Creates a snapshot of the in-memory chain state and database provider to prevent
296    /// inconsistencies. Splits the range into in-memory and storage sections, prioritizing
297    /// recent in-memory blocks in case of overlaps.
298    ///
299    /// * `fetch_db_range` function (`F`) provides access to the database provider, allowing the
300    ///   user to retrieve the required items from the database using [`RangeInclusive`].
301    /// * `map_block_state_item` function (`G`) provides each block of the range in the in-memory
302    ///   state, allowing for selection or filtering for the desired data.
303    fn get_in_memory_or_storage_by_block_range_while<T, F, G, P>(
304        &self,
305        range: impl RangeBounds<BlockNumber>,
306        fetch_db_range: F,
307        map_block_state_item: G,
308        mut predicate: P,
309    ) -> ProviderResult<Vec<T>>
310    where
311        F: FnOnce(
312            &DatabaseProviderRO<N::DB, N>,
313            RangeInclusive<BlockNumber>,
314            &mut P,
315        ) -> ProviderResult<Vec<T>>,
316        G: Fn(&BlockState<N::Primitives>, &mut P) -> Option<T>,
317        P: FnMut(&T) -> bool,
318    {
319        // Each one provides a snapshot at the time of instantiation, but its order matters.
320        //
321        // If we acquire first the database provider, it's possible that before the in-memory chain
322        // snapshot is instantiated, it will flush blocks to disk. This would
323        // mean that our database provider would not have access to the flushed blocks (since it's
324        // working under an older view), while the in-memory state may have deleted them
325        // entirely. Resulting in gaps on the range.
326        let mut in_memory_chain =
327            self.head_block.as_ref().map(|b| b.chain().collect::<Vec<_>>()).unwrap_or_default();
328        let db_provider = &self.storage_provider;
329
330        let (start, end) = self.convert_range_bounds(range, || {
331            // the first block is the highest one.
332            in_memory_chain
333                .first()
334                .map(|b| b.number())
335                .unwrap_or_else(|| db_provider.last_block_number().unwrap_or_default())
336        });
337
338        if start > end {
339            return Ok(vec![])
340        }
341
342        // Split range into storage_range and in-memory range. If the in-memory range is not
343        // necessary drop it early.
344        //
345        // The last block of `in_memory_chain` is the lowest block number.
346        let (in_memory, storage_range) = match in_memory_chain.last().as_ref().map(|b| b.number()) {
347            Some(lowest_memory_block) if lowest_memory_block <= end => {
348                let highest_memory_block =
349                    in_memory_chain.first().as_ref().map(|b| b.number()).expect("qed");
350
351                // Database will for a time overlap with in-memory-chain blocks. In
352                // case of a re-org, it can mean that the database blocks are of a forked chain, and
353                // so, we should prioritize the in-memory overlapped blocks.
354                let in_memory_range =
355                    lowest_memory_block.max(start)..=end.min(highest_memory_block);
356
357                // If requested range is in the middle of the in-memory range, remove the necessary
358                // lowest blocks
359                in_memory_chain.truncate(
360                    in_memory_chain
361                        .len()
362                        .saturating_sub(start.saturating_sub(lowest_memory_block) as usize),
363                );
364
365                let storage_range =
366                    (lowest_memory_block > start).then(|| start..=lowest_memory_block - 1);
367
368                (Some((in_memory_chain, in_memory_range)), storage_range)
369            }
370            _ => {
371                // Drop the in-memory chain so we don't hold blocks in memory.
372                drop(in_memory_chain);
373
374                (None, Some(start..=end))
375            }
376        };
377
378        let mut items = Vec::with_capacity((end - start + 1) as usize);
379
380        if let Some(storage_range) = storage_range {
381            let mut db_items = fetch_db_range(db_provider, storage_range.clone(), &mut predicate)?;
382            items.append(&mut db_items);
383
384            // The predicate was not met, if the number of items differs from the expected. So, we
385            // return what we have.
386            if items.len() as u64 != storage_range.end() - storage_range.start() + 1 {
387                return Ok(items)
388            }
389        }
390
391        if let Some((in_memory_chain, in_memory_range)) = in_memory {
392            for (num, block) in in_memory_range.zip(in_memory_chain.into_iter().rev()) {
393                debug_assert!(num == block.number());
394                if let Some(item) = map_block_state_item(block, &mut predicate) {
395                    items.push(item);
396                } else {
397                    break
398                }
399            }
400        }
401
402        Ok(items)
403    }
404
405    /// This uses a given [`BlockState`] to initialize a state provider for that block.
406    fn block_state_provider_ref(
407        &self,
408        state: &BlockState<N::Primitives>,
409    ) -> ProviderResult<MemoryOverlayStateProviderRef<'_, N::Primitives>> {
410        let anchor_hash = state.anchor().hash;
411        let latest_historical = self.history_by_block_hash_ref(anchor_hash)?;
412        let in_memory = state.chain().map(|block_state| block_state.block()).collect();
413        Ok(MemoryOverlayStateProviderRef::new(latest_historical, in_memory))
414    }
415
416    /// Fetches data from either in-memory state or persistent storage for a range of transactions.
417    ///
418    /// * `fetch_from_db`: has a `DatabaseProviderRO` and the storage specific range.
419    /// * `fetch_from_block_state`: has a [`RangeInclusive`] of elements that should be fetched from
420    ///   [`BlockState`]. [`RangeInclusive`] is necessary to handle partial look-ups of a block.
421    fn get_in_memory_or_storage_by_tx_range<S, M, R>(
422        &self,
423        range: impl RangeBounds<BlockNumber>,
424        fetch_from_db: S,
425        fetch_from_block_state: M,
426    ) -> ProviderResult<Vec<R>>
427    where
428        S: FnOnce(
429            &DatabaseProviderRO<N::DB, N>,
430            RangeInclusive<TxNumber>,
431        ) -> ProviderResult<Vec<R>>,
432        M: Fn(RangeInclusive<usize>, &BlockState<N::Primitives>) -> ProviderResult<Vec<R>>,
433    {
434        let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
435        let provider = &self.storage_provider;
436
437        // Get the last block number stored in the storage which does NOT overlap with in-memory
438        // chain.
439        let last_database_block_number = in_mem_chain
440            .last()
441            .map(|b| Ok(b.anchor().number))
442            .unwrap_or_else(|| provider.last_block_number())?;
443
444        // Get the next tx number for the last block stored in the storage, which marks the start of
445        // the in-memory state.
446        let last_block_body_index = provider
447            .block_body_indices(last_database_block_number)?
448            .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
449        let mut in_memory_tx_num = last_block_body_index.next_tx_num();
450
451        let (start, end) = self.convert_range_bounds(range, || {
452            in_mem_chain
453                .iter()
454                .map(|b| b.block_ref().block().body.transactions().len() as u64)
455                .sum::<u64>() +
456                last_block_body_index.last_tx_num()
457        });
458
459        if start > end {
460            return Ok(vec![])
461        }
462
463        let mut tx_range = start..=end;
464
465        // If the range is entirely before the first in-memory transaction number, fetch from
466        // storage
467        if *tx_range.end() < in_memory_tx_num {
468            return fetch_from_db(provider, tx_range);
469        }
470
471        let mut items = Vec::with_capacity((tx_range.end() - tx_range.start() + 1) as usize);
472
473        // If the range spans storage and memory, get elements from storage first.
474        if *tx_range.start() < in_memory_tx_num {
475            // Determine the range that needs to be fetched from storage.
476            let db_range = *tx_range.start()..=in_memory_tx_num.saturating_sub(1);
477
478            // Set the remaining transaction range for in-memory
479            tx_range = in_memory_tx_num..=*tx_range.end();
480
481            items.extend(fetch_from_db(provider, db_range)?);
482        }
483
484        // Iterate from the lowest block to the highest in-memory chain
485        for block_state in in_mem_chain.iter().rev() {
486            let block_tx_count = block_state.block_ref().block().body.transactions().len();
487            let remaining = (tx_range.end() - tx_range.start() + 1) as usize;
488
489            // If the transaction range start is equal or higher than the next block first
490            // transaction, advance
491            if *tx_range.start() >= in_memory_tx_num + block_tx_count as u64 {
492                in_memory_tx_num += block_tx_count as u64;
493                continue
494            }
495
496            // This should only be more than 0 once, in case of a partial range inside a block.
497            let skip = (tx_range.start() - in_memory_tx_num) as usize;
498
499            items.extend(fetch_from_block_state(
500                skip..=skip + (remaining.min(block_tx_count - skip) - 1),
501                block_state,
502            )?);
503
504            in_memory_tx_num += block_tx_count as u64;
505
506            // Break if the range has been fully processed
507            if in_memory_tx_num > *tx_range.end() {
508                break
509            }
510
511            // Set updated range
512            tx_range = in_memory_tx_num..=*tx_range.end();
513        }
514
515        Ok(items)
516    }
517
518    /// Fetches data from either in-memory state or persistent storage by transaction
519    /// [`HashOrNumber`].
520    fn get_in_memory_or_storage_by_tx<S, M, R>(
521        &self,
522        id: HashOrNumber,
523        fetch_from_db: S,
524        fetch_from_block_state: M,
525    ) -> ProviderResult<Option<R>>
526    where
527        S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<Option<R>>,
528        M: Fn(usize, TxNumber, &BlockState<N::Primitives>) -> ProviderResult<Option<R>>,
529    {
530        let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
531        let provider = &self.storage_provider;
532
533        // Get the last block number stored in the database which does NOT overlap with in-memory
534        // chain.
535        let last_database_block_number = in_mem_chain
536            .last()
537            .map(|b| Ok(b.anchor().number))
538            .unwrap_or_else(|| provider.last_block_number())?;
539
540        // Get the next tx number for the last block stored in the database and consider it the
541        // first tx number of the in-memory state
542        let last_block_body_index = provider
543            .block_body_indices(last_database_block_number)?
544            .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
545        let mut in_memory_tx_num = last_block_body_index.next_tx_num();
546
547        // If the transaction number is less than the first in-memory transaction number, make a
548        // database lookup
549        if let HashOrNumber::Number(id) = id {
550            if id < in_memory_tx_num {
551                return fetch_from_db(provider)
552            }
553        }
554
555        // Iterate from the lowest block to the highest
556        for block_state in in_mem_chain.iter().rev() {
557            let executed_block = block_state.block_ref();
558            let block = executed_block.block();
559
560            for tx_index in 0..block.body.transactions().len() {
561                match id {
562                    HashOrNumber::Hash(tx_hash) => {
563                        if tx_hash == block.body.transactions()[tx_index].trie_hash() {
564                            return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
565                        }
566                    }
567                    HashOrNumber::Number(id) => {
568                        if id == in_memory_tx_num {
569                            return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
570                        }
571                    }
572                }
573
574                in_memory_tx_num += 1;
575            }
576        }
577
578        // Not found in-memory, so check database.
579        if let HashOrNumber::Hash(_) = id {
580            return fetch_from_db(provider)
581        }
582
583        Ok(None)
584    }
585
586    /// Fetches data from either in-memory state or persistent storage by [`BlockHashOrNumber`].
587    pub(crate) fn get_in_memory_or_storage_by_block<S, M, R>(
588        &self,
589        id: BlockHashOrNumber,
590        fetch_from_db: S,
591        fetch_from_block_state: M,
592    ) -> ProviderResult<R>
593    where
594        S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<R>,
595        M: Fn(&BlockState<N::Primitives>) -> ProviderResult<R>,
596    {
597        if let Some(Some(block_state)) = self.head_block.as_ref().map(|b| b.block_on_chain(id)) {
598            return fetch_from_block_state(block_state)
599        }
600        fetch_from_db(&self.storage_provider)
601    }
602}
603
604impl<N: ProviderNodeTypes> ConsistentProvider<N> {
605    /// Ensures that the given block number is canonical (synced)
606    ///
607    /// This is a helper for guarding the `HistoricalStateProvider` against block numbers that are
608    /// out of range and would lead to invalid results, mainly during initial sync.
609    ///
610    /// Verifying the `block_number` would be expensive since we need to lookup sync table
611    /// Instead, we ensure that the `block_number` is within the range of the
612    /// [`Self::best_block_number`] which is updated when a block is synced.
613    #[inline]
614    pub(crate) fn ensure_canonical_block(&self, block_number: BlockNumber) -> ProviderResult<()> {
615        let latest = self.best_block_number()?;
616        if block_number > latest {
617            Err(ProviderError::HeaderNotFound(block_number.into()))
618        } else {
619            Ok(())
620        }
621    }
622}
623
624impl<N: ProviderNodeTypes> NodePrimitivesProvider for ConsistentProvider<N> {
625    type Primitives = N::Primitives;
626}
627
628impl<N: ProviderNodeTypes> StaticFileProviderFactory for ConsistentProvider<N> {
629    fn static_file_provider(&self) -> StaticFileProvider<N::Primitives> {
630        self.storage_provider.static_file_provider()
631    }
632}
633
634impl<N: ProviderNodeTypes> HeaderProvider for ConsistentProvider<N> {
635    type Header = HeaderTy<N>;
636
637    fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
638        self.get_in_memory_or_storage_by_block(
639            (*block_hash).into(),
640            |db_provider| db_provider.header(block_hash),
641            |block_state| Ok(Some(block_state.block_ref().block().header.header().clone())),
642        )
643    }
644
645    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
646        self.get_in_memory_or_storage_by_block(
647            num.into(),
648            |db_provider| db_provider.header_by_number(num),
649            |block_state| Ok(Some(block_state.block_ref().block().header.header().clone())),
650        )
651    }
652
653    fn header_td(&self, hash: &BlockHash) -> ProviderResult<Option<U256>> {
654        if let Some(num) = self.block_number(*hash)? {
655            self.header_td_by_number(num)
656        } else {
657            Ok(None)
658        }
659    }
660
661    fn header_td_by_number(&self, number: BlockNumber) -> ProviderResult<Option<U256>> {
662        let number = if self.head_block.as_ref().map(|b| b.block_on_chain(number.into())).is_some()
663        {
664            // If the block exists in memory, we should return a TD for it.
665            //
666            // The canonical in memory state should only store post-merge blocks. Post-merge blocks
667            // have zero difficulty. This means we can use the total difficulty for the last
668            // finalized block number if present (so that we are not affected by reorgs), if not the
669            // last number in the database will be used.
670            if let Some(last_finalized_num_hash) =
671                self.canonical_in_memory_state.get_finalized_num_hash()
672            {
673                last_finalized_num_hash.number
674            } else {
675                self.last_block_number()?
676            }
677        } else {
678            // Otherwise, return what we have on disk for the input block
679            number
680        };
681        self.storage_provider.header_td_by_number(number)
682    }
683
684    fn headers_range(
685        &self,
686        range: impl RangeBounds<BlockNumber>,
687    ) -> ProviderResult<Vec<Self::Header>> {
688        self.get_in_memory_or_storage_by_block_range_while(
689            range,
690            |db_provider, range, _| db_provider.headers_range(range),
691            |block_state, _| Some(block_state.block_ref().block().header.header().clone()),
692            |_| true,
693        )
694    }
695
696    fn sealed_header(
697        &self,
698        number: BlockNumber,
699    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
700        self.get_in_memory_or_storage_by_block(
701            number.into(),
702            |db_provider| db_provider.sealed_header(number),
703            |block_state| Ok(Some(block_state.block_ref().block().header.clone())),
704        )
705    }
706
707    fn sealed_headers_range(
708        &self,
709        range: impl RangeBounds<BlockNumber>,
710    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
711        self.get_in_memory_or_storage_by_block_range_while(
712            range,
713            |db_provider, range, _| db_provider.sealed_headers_range(range),
714            |block_state, _| Some(block_state.block_ref().block().header.clone()),
715            |_| true,
716        )
717    }
718
719    fn sealed_headers_while(
720        &self,
721        range: impl RangeBounds<BlockNumber>,
722        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
723    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
724        self.get_in_memory_or_storage_by_block_range_while(
725            range,
726            |db_provider, range, predicate| db_provider.sealed_headers_while(range, predicate),
727            |block_state, predicate| {
728                let header = &block_state.block_ref().block().header;
729                predicate(header).then(|| header.clone())
730            },
731            predicate,
732        )
733    }
734}
735
736impl<N: ProviderNodeTypes> BlockHashReader for ConsistentProvider<N> {
737    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
738        self.get_in_memory_or_storage_by_block(
739            number.into(),
740            |db_provider| db_provider.block_hash(number),
741            |block_state| Ok(Some(block_state.hash())),
742        )
743    }
744
745    fn canonical_hashes_range(
746        &self,
747        start: BlockNumber,
748        end: BlockNumber,
749    ) -> ProviderResult<Vec<B256>> {
750        self.get_in_memory_or_storage_by_block_range_while(
751            start..end,
752            |db_provider, inclusive_range, _| {
753                db_provider
754                    .canonical_hashes_range(*inclusive_range.start(), *inclusive_range.end() + 1)
755            },
756            |block_state, _| Some(block_state.hash()),
757            |_| true,
758        )
759    }
760}
761
762impl<N: ProviderNodeTypes> BlockNumReader for ConsistentProvider<N> {
763    fn chain_info(&self) -> ProviderResult<ChainInfo> {
764        let best_number = self.best_block_number()?;
765        Ok(ChainInfo { best_hash: self.block_hash(best_number)?.unwrap_or_default(), best_number })
766    }
767
768    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
769        self.head_block.as_ref().map(|b| Ok(b.number())).unwrap_or_else(|| self.last_block_number())
770    }
771
772    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
773        self.storage_provider.last_block_number()
774    }
775
776    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
777        self.get_in_memory_or_storage_by_block(
778            hash.into(),
779            |db_provider| db_provider.block_number(hash),
780            |block_state| Ok(Some(block_state.number())),
781        )
782    }
783}
784
785impl<N: ProviderNodeTypes> BlockIdReader for ConsistentProvider<N> {
786    fn pending_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
787        Ok(self.canonical_in_memory_state.pending_block_num_hash())
788    }
789
790    fn safe_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
791        Ok(self.canonical_in_memory_state.get_safe_num_hash())
792    }
793
794    fn finalized_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
795        Ok(self.canonical_in_memory_state.get_finalized_num_hash())
796    }
797}
798
799impl<N: ProviderNodeTypes> BlockReader for ConsistentProvider<N> {
800    type Block = BlockTy<N>;
801
802    fn find_block_by_hash(
803        &self,
804        hash: B256,
805        source: BlockSource,
806    ) -> ProviderResult<Option<Self::Block>> {
807        match source {
808            BlockSource::Any | BlockSource::Canonical => {
809                // Note: it's fine to return the unsealed block because the caller already has
810                // the hash
811                self.get_in_memory_or_storage_by_block(
812                    hash.into(),
813                    |db_provider| db_provider.find_block_by_hash(hash, source),
814                    |block_state| Ok(Some(block_state.block_ref().block().clone().unseal())),
815                )
816            }
817            BlockSource::Pending => {
818                Ok(self.canonical_in_memory_state.pending_block().map(|block| block.unseal()))
819            }
820        }
821    }
822
823    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
824        self.get_in_memory_or_storage_by_block(
825            id,
826            |db_provider| db_provider.block(id),
827            |block_state| Ok(Some(block_state.block_ref().block().clone().unseal())),
828        )
829    }
830
831    fn pending_block(&self) -> ProviderResult<Option<SealedBlockFor<Self::Block>>> {
832        Ok(self.canonical_in_memory_state.pending_block())
833    }
834
835    fn pending_block_with_senders(
836        &self,
837    ) -> ProviderResult<Option<SealedBlockWithSenders<Self::Block>>> {
838        Ok(self.canonical_in_memory_state.pending_block_with_senders())
839    }
840
841    fn pending_block_and_receipts(
842        &self,
843    ) -> ProviderResult<Option<(SealedBlockFor<Self::Block>, Vec<Self::Receipt>)>> {
844        Ok(self.canonical_in_memory_state.pending_block_and_receipts())
845    }
846
847    /// Returns the block with senders with matching number or hash from database.
848    ///
849    /// **NOTE: If [`TransactionVariant::NoHash`] is provided then the transactions have invalid
850    /// hashes, since they would need to be calculated on the spot, and we want fast querying.**
851    ///
852    /// Returns `None` if block is not found.
853    fn block_with_senders(
854        &self,
855        id: BlockHashOrNumber,
856        transaction_kind: TransactionVariant,
857    ) -> ProviderResult<Option<BlockWithSenders<Self::Block>>> {
858        self.get_in_memory_or_storage_by_block(
859            id,
860            |db_provider| db_provider.block_with_senders(id, transaction_kind),
861            |block_state| Ok(Some(block_state.block_with_senders())),
862        )
863    }
864
865    fn sealed_block_with_senders(
866        &self,
867        id: BlockHashOrNumber,
868        transaction_kind: TransactionVariant,
869    ) -> ProviderResult<Option<SealedBlockWithSenders<Self::Block>>> {
870        self.get_in_memory_or_storage_by_block(
871            id,
872            |db_provider| db_provider.sealed_block_with_senders(id, transaction_kind),
873            |block_state| Ok(Some(block_state.sealed_block_with_senders())),
874        )
875    }
876
877    fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
878        self.get_in_memory_or_storage_by_block_range_while(
879            range,
880            |db_provider, range, _| db_provider.block_range(range),
881            |block_state, _| Some(block_state.block_ref().block().clone().unseal()),
882            |_| true,
883        )
884    }
885
886    fn block_with_senders_range(
887        &self,
888        range: RangeInclusive<BlockNumber>,
889    ) -> ProviderResult<Vec<BlockWithSenders<Self::Block>>> {
890        self.get_in_memory_or_storage_by_block_range_while(
891            range,
892            |db_provider, range, _| db_provider.block_with_senders_range(range),
893            |block_state, _| Some(block_state.block_with_senders()),
894            |_| true,
895        )
896    }
897
898    fn sealed_block_with_senders_range(
899        &self,
900        range: RangeInclusive<BlockNumber>,
901    ) -> ProviderResult<Vec<SealedBlockWithSenders<Self::Block>>> {
902        self.get_in_memory_or_storage_by_block_range_while(
903            range,
904            |db_provider, range, _| db_provider.sealed_block_with_senders_range(range),
905            |block_state, _| Some(block_state.sealed_block_with_senders()),
906            |_| true,
907        )
908    }
909}
910
911impl<N: ProviderNodeTypes> TransactionsProvider for ConsistentProvider<N> {
912    type Transaction = TxTy<N>;
913
914    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
915        self.get_in_memory_or_storage_by_tx(
916            tx_hash.into(),
917            |db_provider| db_provider.transaction_id(tx_hash),
918            |_, tx_number, _| Ok(Some(tx_number)),
919        )
920    }
921
922    fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
923        self.get_in_memory_or_storage_by_tx(
924            id.into(),
925            |provider| provider.transaction_by_id(id),
926            |tx_index, _, block_state| {
927                Ok(block_state
928                    .block_ref()
929                    .block()
930                    .body
931                    .transactions()
932                    .get(tx_index)
933                    .cloned()
934                    .map(Into::into))
935            },
936        )
937    }
938
939    fn transaction_by_id_unhashed(
940        &self,
941        id: TxNumber,
942    ) -> ProviderResult<Option<Self::Transaction>> {
943        self.get_in_memory_or_storage_by_tx(
944            id.into(),
945            |provider| provider.transaction_by_id_unhashed(id),
946            |tx_index, _, block_state| {
947                Ok(block_state
948                    .block_ref()
949                    .block()
950                    .body
951                    .transactions()
952                    .get(tx_index)
953                    .cloned()
954                    .map(Into::into))
955            },
956        )
957    }
958
959    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
960        if let Some(tx) = self.head_block.as_ref().and_then(|b| b.transaction_on_chain(hash)) {
961            return Ok(Some(tx))
962        }
963
964        self.storage_provider.transaction_by_hash(hash)
965    }
966
967    fn transaction_by_hash_with_meta(
968        &self,
969        tx_hash: TxHash,
970    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
971        if let Some((tx, meta)) =
972            self.head_block.as_ref().and_then(|b| b.transaction_meta_on_chain(tx_hash))
973        {
974            return Ok(Some((tx, meta)))
975        }
976
977        self.storage_provider.transaction_by_hash_with_meta(tx_hash)
978    }
979
980    fn transaction_block(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
981        self.get_in_memory_or_storage_by_tx(
982            id.into(),
983            |provider| provider.transaction_block(id),
984            |_, _, block_state| Ok(Some(block_state.block_ref().block().number())),
985        )
986    }
987
988    fn transactions_by_block(
989        &self,
990        id: BlockHashOrNumber,
991    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
992        self.get_in_memory_or_storage_by_block(
993            id,
994            |provider| provider.transactions_by_block(id),
995            |block_state| Ok(Some(block_state.block_ref().block().body.transactions().to_vec())),
996        )
997    }
998
999    fn transactions_by_block_range(
1000        &self,
1001        range: impl RangeBounds<BlockNumber>,
1002    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1003        self.get_in_memory_or_storage_by_block_range_while(
1004            range,
1005            |db_provider, range, _| db_provider.transactions_by_block_range(range),
1006            |block_state, _| Some(block_state.block_ref().block().body.transactions().to_vec()),
1007            |_| true,
1008        )
1009    }
1010
1011    fn transactions_by_tx_range(
1012        &self,
1013        range: impl RangeBounds<TxNumber>,
1014    ) -> ProviderResult<Vec<Self::Transaction>> {
1015        self.get_in_memory_or_storage_by_tx_range(
1016            range,
1017            |db_provider, db_range| db_provider.transactions_by_tx_range(db_range),
1018            |index_range, block_state| {
1019                Ok(block_state.block_ref().block().body.transactions()[index_range].to_vec())
1020            },
1021        )
1022    }
1023
1024    fn senders_by_tx_range(
1025        &self,
1026        range: impl RangeBounds<TxNumber>,
1027    ) -> ProviderResult<Vec<Address>> {
1028        self.get_in_memory_or_storage_by_tx_range(
1029            range,
1030            |db_provider, db_range| db_provider.senders_by_tx_range(db_range),
1031            |index_range, block_state| Ok(block_state.block_ref().senders[index_range].to_vec()),
1032        )
1033    }
1034
1035    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1036        self.get_in_memory_or_storage_by_tx(
1037            id.into(),
1038            |provider| provider.transaction_sender(id),
1039            |tx_index, _, block_state| Ok(block_state.block_ref().senders.get(tx_index).copied()),
1040        )
1041    }
1042}
1043
1044impl<N: ProviderNodeTypes> ReceiptProvider for ConsistentProvider<N> {
1045    type Receipt = ReceiptTy<N>;
1046
1047    fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1048        self.get_in_memory_or_storage_by_tx(
1049            id.into(),
1050            |provider| provider.receipt(id),
1051            |tx_index, _, block_state| {
1052                Ok(block_state.executed_block_receipts().get(tx_index).cloned())
1053            },
1054        )
1055    }
1056
1057    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1058        for block_state in self.head_block.iter().flat_map(|b| b.chain()) {
1059            let executed_block = block_state.block_ref();
1060            let block = executed_block.block();
1061            let receipts = block_state.executed_block_receipts();
1062
1063            // assuming 1:1 correspondence between transactions and receipts
1064            debug_assert_eq!(
1065                block.body.transactions().len(),
1066                receipts.len(),
1067                "Mismatch between transaction and receipt count"
1068            );
1069
1070            if let Some(tx_index) =
1071                block.body.transactions().iter().position(|tx| tx.trie_hash() == hash)
1072            {
1073                // safe to use tx_index for receipts due to 1:1 correspondence
1074                return Ok(receipts.get(tx_index).cloned());
1075            }
1076        }
1077
1078        self.storage_provider.receipt_by_hash(hash)
1079    }
1080
1081    fn receipts_by_block(
1082        &self,
1083        block: BlockHashOrNumber,
1084    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1085        self.get_in_memory_or_storage_by_block(
1086            block,
1087            |db_provider| db_provider.receipts_by_block(block),
1088            |block_state| Ok(Some(block_state.executed_block_receipts())),
1089        )
1090    }
1091
1092    fn receipts_by_tx_range(
1093        &self,
1094        range: impl RangeBounds<TxNumber>,
1095    ) -> ProviderResult<Vec<Self::Receipt>> {
1096        self.get_in_memory_or_storage_by_tx_range(
1097            range,
1098            |db_provider, db_range| db_provider.receipts_by_tx_range(db_range),
1099            |index_range, block_state| {
1100                Ok(block_state.executed_block_receipts().drain(index_range).collect())
1101            },
1102        )
1103    }
1104}
1105
1106impl<N: ProviderNodeTypes> ReceiptProviderIdExt for ConsistentProvider<N> {
1107    fn receipts_by_block_id(&self, block: BlockId) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1108        match block {
1109            BlockId::Hash(rpc_block_hash) => {
1110                let mut receipts = self.receipts_by_block(rpc_block_hash.block_hash.into())?;
1111                if receipts.is_none() && !rpc_block_hash.require_canonical.unwrap_or(false) {
1112                    if let Some(state) = self
1113                        .head_block
1114                        .as_ref()
1115                        .and_then(|b| b.block_on_chain(rpc_block_hash.block_hash.into()))
1116                    {
1117                        receipts = Some(state.executed_block_receipts());
1118                    }
1119                }
1120                Ok(receipts)
1121            }
1122            BlockId::Number(num_tag) => match num_tag {
1123                BlockNumberOrTag::Pending => Ok(self
1124                    .canonical_in_memory_state
1125                    .pending_state()
1126                    .map(|block_state| block_state.executed_block_receipts())),
1127                _ => {
1128                    if let Some(num) = self.convert_block_number(num_tag)? {
1129                        self.receipts_by_block(num.into())
1130                    } else {
1131                        Ok(None)
1132                    }
1133                }
1134            },
1135        }
1136    }
1137}
1138
1139impl<N: ProviderNodeTypes> WithdrawalsProvider for ConsistentProvider<N> {
1140    fn withdrawals_by_block(
1141        &self,
1142        id: BlockHashOrNumber,
1143        timestamp: u64,
1144    ) -> ProviderResult<Option<Withdrawals>> {
1145        if !self.chain_spec().is_shanghai_active_at_timestamp(timestamp) {
1146            return Ok(None)
1147        }
1148
1149        self.get_in_memory_or_storage_by_block(
1150            id,
1151            |db_provider| db_provider.withdrawals_by_block(id, timestamp),
1152            |block_state| Ok(block_state.block_ref().block().body.withdrawals().cloned()),
1153        )
1154    }
1155
1156    fn latest_withdrawal(&self) -> ProviderResult<Option<Withdrawal>> {
1157        let best_block_num = self.best_block_number()?;
1158
1159        self.get_in_memory_or_storage_by_block(
1160            best_block_num.into(),
1161            |db_provider| db_provider.latest_withdrawal(),
1162            |block_state| {
1163                Ok(block_state
1164                    .block_ref()
1165                    .block()
1166                    .body
1167                    .withdrawals()
1168                    .cloned()
1169                    .and_then(|mut w| w.pop()))
1170            },
1171        )
1172    }
1173}
1174
1175impl<N: ProviderNodeTypes> OmmersProvider for ConsistentProvider<N> {
1176    fn ommers(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Vec<HeaderTy<N>>>> {
1177        self.get_in_memory_or_storage_by_block(
1178            id,
1179            |db_provider| db_provider.ommers(id),
1180            |block_state| {
1181                if self.chain_spec().final_paris_total_difficulty(block_state.number()).is_some() {
1182                    return Ok(Some(Vec::new()))
1183                }
1184
1185                Ok(block_state.block_ref().block().body.ommers().map(|o| o.to_vec()))
1186            },
1187        )
1188    }
1189}
1190
1191impl<N: ProviderNodeTypes> BlockBodyIndicesProvider for ConsistentProvider<N> {
1192    fn block_body_indices(
1193        &self,
1194        number: BlockNumber,
1195    ) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1196        self.get_in_memory_or_storage_by_block(
1197            number.into(),
1198            |db_provider| db_provider.block_body_indices(number),
1199            |block_state| {
1200                // Find the last block indices on database
1201                let last_storage_block_number = block_state.anchor().number;
1202                let mut stored_indices = self
1203                    .storage_provider
1204                    .block_body_indices(last_storage_block_number)?
1205                    .ok_or(ProviderError::BlockBodyIndicesNotFound(last_storage_block_number))?;
1206
1207                // Prepare our block indices
1208                stored_indices.first_tx_num = stored_indices.next_tx_num();
1209                stored_indices.tx_count = 0;
1210
1211                // Iterate from the lowest block in memory until our target block
1212                for state in block_state.chain().collect::<Vec<_>>().into_iter().rev() {
1213                    let block_tx_count = state.block_ref().block.body.transactions().len() as u64;
1214                    if state.block_ref().block().number() == number {
1215                        stored_indices.tx_count = block_tx_count;
1216                    } else {
1217                        stored_indices.first_tx_num += block_tx_count;
1218                    }
1219                }
1220
1221                Ok(Some(stored_indices))
1222            },
1223        )
1224    }
1225}
1226
1227impl<N: ProviderNodeTypes> StageCheckpointReader for ConsistentProvider<N> {
1228    fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1229        self.storage_provider.get_stage_checkpoint(id)
1230    }
1231
1232    fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1233        self.storage_provider.get_stage_checkpoint_progress(id)
1234    }
1235
1236    fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1237        self.storage_provider.get_all_checkpoints()
1238    }
1239}
1240
1241impl<N: ProviderNodeTypes> EvmEnvProvider<HeaderTy<N>> for ConsistentProvider<N> {
1242    fn env_with_header<EvmConfig>(
1243        &self,
1244        header: &HeaderTy<N>,
1245        evm_config: EvmConfig,
1246    ) -> ProviderResult<(CfgEnvWithHandlerCfg, BlockEnv)>
1247    where
1248        EvmConfig: ConfigureEvmEnv<Header = HeaderTy<N>>,
1249    {
1250        let total_difficulty = self
1251            .header_td_by_number(header.number())?
1252            .ok_or_else(|| ProviderError::HeaderNotFound(header.number().into()))?;
1253        Ok(evm_config.cfg_and_block_env(header, total_difficulty))
1254    }
1255}
1256
1257impl<N: ProviderNodeTypes> PruneCheckpointReader for ConsistentProvider<N> {
1258    fn get_prune_checkpoint(
1259        &self,
1260        segment: PruneSegment,
1261    ) -> ProviderResult<Option<PruneCheckpoint>> {
1262        self.storage_provider.get_prune_checkpoint(segment)
1263    }
1264
1265    fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
1266        self.storage_provider.get_prune_checkpoints()
1267    }
1268}
1269
1270impl<N: ProviderNodeTypes> ChainSpecProvider for ConsistentProvider<N> {
1271    type ChainSpec = N::ChainSpec;
1272
1273    fn chain_spec(&self) -> Arc<N::ChainSpec> {
1274        ChainSpecProvider::chain_spec(&self.storage_provider)
1275    }
1276}
1277
1278impl<N: ProviderNodeTypes> BlockReaderIdExt for ConsistentProvider<N> {
1279    fn block_by_id(&self, id: BlockId) -> ProviderResult<Option<Self::Block>> {
1280        match id {
1281            BlockId::Number(num) => self.block_by_number_or_tag(num),
1282            BlockId::Hash(hash) => {
1283                // TODO: should we only apply this for the RPCs that are listed in EIP-1898?
1284                // so not at the provider level?
1285                // if we decide to do this at a higher level, then we can make this an automatic
1286                // trait impl
1287                if Some(true) == hash.require_canonical {
1288                    // check the database, canonical blocks are only stored in the database
1289                    self.find_block_by_hash(hash.block_hash, BlockSource::Canonical)
1290                } else {
1291                    self.block_by_hash(hash.block_hash)
1292                }
1293            }
1294        }
1295    }
1296
1297    fn header_by_number_or_tag(&self, id: BlockNumberOrTag) -> ProviderResult<Option<HeaderTy<N>>> {
1298        Ok(match id {
1299            BlockNumberOrTag::Latest => {
1300                Some(self.canonical_in_memory_state.get_canonical_head().unseal())
1301            }
1302            BlockNumberOrTag::Finalized => {
1303                self.canonical_in_memory_state.get_finalized_header().map(|h| h.unseal())
1304            }
1305            BlockNumberOrTag::Safe => {
1306                self.canonical_in_memory_state.get_safe_header().map(|h| h.unseal())
1307            }
1308            BlockNumberOrTag::Earliest => self.header_by_number(0)?,
1309            BlockNumberOrTag::Pending => self.canonical_in_memory_state.pending_header(),
1310
1311            BlockNumberOrTag::Number(num) => self.header_by_number(num)?,
1312        })
1313    }
1314
1315    fn sealed_header_by_number_or_tag(
1316        &self,
1317        id: BlockNumberOrTag,
1318    ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1319        match id {
1320            BlockNumberOrTag::Latest => {
1321                Ok(Some(self.canonical_in_memory_state.get_canonical_head()))
1322            }
1323            BlockNumberOrTag::Finalized => {
1324                Ok(self.canonical_in_memory_state.get_finalized_header())
1325            }
1326            BlockNumberOrTag::Safe => Ok(self.canonical_in_memory_state.get_safe_header()),
1327            BlockNumberOrTag::Earliest => self
1328                .header_by_number(0)?
1329                .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal(h)))),
1330            BlockNumberOrTag::Pending => Ok(self.canonical_in_memory_state.pending_sealed_header()),
1331            BlockNumberOrTag::Number(num) => self
1332                .header_by_number(num)?
1333                .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal(h)))),
1334        }
1335    }
1336
1337    fn sealed_header_by_id(
1338        &self,
1339        id: BlockId,
1340    ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1341        Ok(match id {
1342            BlockId::Number(num) => self.sealed_header_by_number_or_tag(num)?,
1343            BlockId::Hash(hash) => self.header(&hash.block_hash)?.map(SealedHeader::seal),
1344        })
1345    }
1346
1347    fn header_by_id(&self, id: BlockId) -> ProviderResult<Option<HeaderTy<N>>> {
1348        Ok(match id {
1349            BlockId::Number(num) => self.header_by_number_or_tag(num)?,
1350            BlockId::Hash(hash) => self.header(&hash.block_hash)?,
1351        })
1352    }
1353
1354    fn ommers_by_id(&self, id: BlockId) -> ProviderResult<Option<Vec<HeaderTy<N>>>> {
1355        match id {
1356            BlockId::Number(num) => self.ommers_by_number_or_tag(num),
1357            BlockId::Hash(hash) => {
1358                // TODO: EIP-1898 question, see above
1359                // here it is not handled
1360                self.ommers(BlockHashOrNumber::Hash(hash.block_hash))
1361            }
1362        }
1363    }
1364}
1365
1366impl<N: ProviderNodeTypes> StorageChangeSetReader for ConsistentProvider<N> {
1367    fn storage_changeset(
1368        &self,
1369        block_number: BlockNumber,
1370    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1371        if let Some(state) =
1372            self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1373        {
1374            let changesets = state
1375                .block()
1376                .execution_output
1377                .bundle
1378                .reverts
1379                .clone()
1380                .to_plain_state_reverts()
1381                .storage
1382                .into_iter()
1383                .flatten()
1384                .flat_map(|revert: PlainStorageRevert| {
1385                    revert.storage_revert.into_iter().map(move |(key, value)| {
1386                        (
1387                            BlockNumberAddress((block_number, revert.address)),
1388                            StorageEntry {
1389                                key: key.into(),
1390                                value: value.to_previous_value().value,
1391                                is_private: value.to_previous_value().is_private,
1392                            },
1393                        )
1394                    })
1395                })
1396                .collect();
1397            Ok(changesets)
1398        } else {
1399            // Perform checks on whether or not changesets exist for the block.
1400
1401            // No prune checkpoint means history should exist and we should `unwrap_or(true)`
1402            let storage_history_exists = self
1403                .storage_provider
1404                .get_prune_checkpoint(PruneSegment::StorageHistory)?
1405                .and_then(|checkpoint| {
1406                    // return true if the block number is ahead of the prune checkpoint.
1407                    //
1408                    // The checkpoint stores the highest pruned block number, so we should make
1409                    // sure the block_number is strictly greater.
1410                    checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1411                })
1412                .unwrap_or(true);
1413
1414            if !storage_history_exists {
1415                return Err(ProviderError::StateAtBlockPruned(block_number))
1416            }
1417
1418            self.storage_provider.storage_changeset(block_number)
1419        }
1420    }
1421}
1422
1423impl<N: ProviderNodeTypes> ChangeSetReader for ConsistentProvider<N> {
1424    fn account_block_changeset(
1425        &self,
1426        block_number: BlockNumber,
1427    ) -> ProviderResult<Vec<AccountBeforeTx>> {
1428        if let Some(state) =
1429            self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1430        {
1431            let changesets = state
1432                .block_ref()
1433                .execution_output
1434                .bundle
1435                .reverts
1436                .clone()
1437                .to_plain_state_reverts()
1438                .accounts
1439                .into_iter()
1440                .flatten()
1441                .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
1442                .collect();
1443            Ok(changesets)
1444        } else {
1445            // Perform checks on whether or not changesets exist for the block.
1446
1447            // No prune checkpoint means history should exist and we should `unwrap_or(true)`
1448            let account_history_exists = self
1449                .storage_provider
1450                .get_prune_checkpoint(PruneSegment::AccountHistory)?
1451                .and_then(|checkpoint| {
1452                    // return true if the block number is ahead of the prune checkpoint.
1453                    //
1454                    // The checkpoint stores the highest pruned block number, so we should make
1455                    // sure the block_number is strictly greater.
1456                    checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1457                })
1458                .unwrap_or(true);
1459
1460            if !account_history_exists {
1461                return Err(ProviderError::StateAtBlockPruned(block_number))
1462            }
1463
1464            self.storage_provider.account_block_changeset(block_number)
1465        }
1466    }
1467}
1468
1469impl<N: ProviderNodeTypes> AccountReader for ConsistentProvider<N> {
1470    /// Get basic account information.
1471    fn basic_account(&self, address: Address) -> ProviderResult<Option<Account>> {
1472        // use latest state provider
1473        let state_provider = self.latest_ref()?;
1474        state_provider.basic_account(address)
1475    }
1476}
1477
1478impl<N: ProviderNodeTypes> StateReader for ConsistentProvider<N> {
1479    type Receipt = ReceiptTy<N>;
1480
1481    /// Re-constructs the [`ExecutionOutcome`] from in-memory and database state, if necessary.
1482    ///
1483    /// If data for the block does not exist, this will return [`None`].
1484    ///
1485    /// NOTE: This cannot be called safely in a loop outside of the blockchain tree thread. This is
1486    /// because the [`CanonicalInMemoryState`] could change during a reorg, causing results to be
1487    /// inconsistent. Currently this can safely be called within the blockchain tree thread,
1488    /// because the tree thread is responsible for modifying the [`CanonicalInMemoryState`] in the
1489    /// first place.
1490    fn get_state(
1491        &self,
1492        block: BlockNumber,
1493    ) -> ProviderResult<Option<ExecutionOutcome<Self::Receipt>>> {
1494        if let Some(state) = self.head_block.as_ref().and_then(|b| b.block_on_chain(block.into())) {
1495            let state = state.block_ref().execution_outcome().clone();
1496            Ok(Some(state))
1497        } else {
1498            Self::get_state(self, block..=block)
1499        }
1500    }
1501}
1502
1503#[cfg(test)]
1504mod tests {
1505    use crate::{
1506        providers::blockchain_provider::BlockchainProvider2,
1507        test_utils::create_test_provider_factory, BlockWriter,
1508    };
1509    use alloy_eips::BlockHashOrNumber;
1510    use alloy_primitives::B256;
1511    use itertools::Itertools;
1512    use rand::Rng;
1513    use reth_chain_state::{ExecutedBlock, NewCanonicalChain};
1514    use reth_db::models::AccountBeforeTx;
1515    use reth_execution_types::ExecutionOutcome;
1516    use reth_primitives::SealedBlock;
1517    use reth_storage_api::{BlockReader, BlockSource, ChangeSetReader};
1518    use reth_testing_utils::generators::{
1519        self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
1520    };
1521    use revm::db::BundleState;
1522    use std::{
1523        ops::{Bound, Range, RangeBounds},
1524        sync::Arc,
1525    };
1526
1527    const TEST_BLOCKS_COUNT: usize = 5;
1528
1529    fn random_blocks(
1530        rng: &mut impl Rng,
1531        database_blocks: usize,
1532        in_memory_blocks: usize,
1533        requests_count: Option<Range<u8>>,
1534        withdrawals_count: Option<Range<u8>>,
1535        tx_count: impl RangeBounds<u8>,
1536    ) -> (Vec<SealedBlock>, Vec<SealedBlock>) {
1537        let block_range = (database_blocks + in_memory_blocks - 1) as u64;
1538
1539        let tx_start = match tx_count.start_bound() {
1540            Bound::Included(&n) | Bound::Excluded(&n) => n,
1541            Bound::Unbounded => u8::MIN,
1542        };
1543        let tx_end = match tx_count.end_bound() {
1544            Bound::Included(&n) | Bound::Excluded(&n) => n + 1,
1545            Bound::Unbounded => u8::MAX,
1546        };
1547
1548        let blocks = random_block_range(
1549            rng,
1550            0..=block_range,
1551            BlockRangeParams {
1552                parent: Some(B256::ZERO),
1553                tx_count: tx_start..tx_end,
1554                requests_count,
1555                withdrawals_count,
1556            },
1557        );
1558        let (database_blocks, in_memory_blocks) = blocks.split_at(database_blocks);
1559        (database_blocks.to_vec(), in_memory_blocks.to_vec())
1560    }
1561
1562    #[test]
1563    fn test_block_reader_find_block_by_hash() -> eyre::Result<()> {
1564        // Initialize random number generator and provider factory
1565        let mut rng = generators::rng();
1566        let factory = create_test_provider_factory();
1567
1568        // Generate 10 random blocks and split into database and in-memory blocks
1569        let blocks = random_block_range(
1570            &mut rng,
1571            0..=10,
1572            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1573        );
1574        let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1575
1576        // Insert first 5 blocks into the database
1577        let provider_rw = factory.provider_rw()?;
1578        for block in database_blocks {
1579            provider_rw.insert_historical_block(
1580                block.clone().seal_with_senders().expect("failed to seal block with senders"),
1581            )?;
1582        }
1583        provider_rw.commit()?;
1584
1585        // Create a new provider
1586        let provider = BlockchainProvider2::new(factory)?;
1587        let consistent_provider = provider.consistent_provider()?;
1588
1589        // Useful blocks
1590        let first_db_block = database_blocks.first().unwrap();
1591        let first_in_mem_block = in_memory_blocks.first().unwrap();
1592        let last_in_mem_block = in_memory_blocks.last().unwrap();
1593
1594        // No block in memory before setting in memory state
1595        assert_eq!(
1596            consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1597            None
1598        );
1599        assert_eq!(
1600            consistent_provider
1601                .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1602            None
1603        );
1604        // No pending block in memory
1605        assert_eq!(
1606            consistent_provider
1607                .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Pending)?,
1608            None
1609        );
1610
1611        // Insert first block into the in-memory state
1612        let in_memory_block_senders =
1613            first_in_mem_block.senders().expect("failed to recover senders");
1614        let chain = NewCanonicalChain::Commit {
1615            new: vec![ExecutedBlock::new(
1616                Arc::new(first_in_mem_block.clone()),
1617                Arc::new(in_memory_block_senders),
1618                Default::default(),
1619                Default::default(),
1620                Default::default(),
1621            )],
1622        };
1623        consistent_provider.canonical_in_memory_state.update_chain(chain);
1624        let consistent_provider = provider.consistent_provider()?;
1625
1626        // Now the block should be found in memory
1627        assert_eq!(
1628            consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1629            Some(first_in_mem_block.clone().into())
1630        );
1631        assert_eq!(
1632            consistent_provider
1633                .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1634            Some(first_in_mem_block.clone().into())
1635        );
1636
1637        // Find the first block in database by hash
1638        assert_eq!(
1639            consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Any)?,
1640            Some(first_db_block.clone().into())
1641        );
1642        assert_eq!(
1643            consistent_provider
1644                .find_block_by_hash(first_db_block.hash(), BlockSource::Canonical)?,
1645            Some(first_db_block.clone().into())
1646        );
1647
1648        // No pending block in database
1649        assert_eq!(
1650            consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Pending)?,
1651            None
1652        );
1653
1654        // Insert the last block into the pending state
1655        provider.canonical_in_memory_state.set_pending_block(ExecutedBlock {
1656            block: Arc::new(last_in_mem_block.clone()),
1657            senders: Default::default(),
1658            execution_output: Default::default(),
1659            hashed_state: Default::default(),
1660            trie: Default::default(),
1661        });
1662
1663        // Now the last block should be found in memory
1664        assert_eq!(
1665            consistent_provider
1666                .find_block_by_hash(last_in_mem_block.hash(), BlockSource::Pending)?,
1667            Some(last_in_mem_block.clone().into())
1668        );
1669
1670        Ok(())
1671    }
1672
1673    #[test]
1674    fn test_block_reader_block() -> eyre::Result<()> {
1675        // Initialize random number generator and provider factory
1676        let mut rng = generators::rng();
1677        let factory = create_test_provider_factory();
1678
1679        // Generate 10 random blocks and split into database and in-memory blocks
1680        let blocks = random_block_range(
1681            &mut rng,
1682            0..=10,
1683            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1684        );
1685        let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1686
1687        // Insert first 5 blocks into the database
1688        let provider_rw = factory.provider_rw()?;
1689        for block in database_blocks {
1690            provider_rw.insert_historical_block(
1691                block.clone().seal_with_senders().expect("failed to seal block with senders"),
1692            )?;
1693        }
1694        provider_rw.commit()?;
1695
1696        // Create a new provider
1697        let provider = BlockchainProvider2::new(factory)?;
1698        let consistent_provider = provider.consistent_provider()?;
1699
1700        // First in memory block
1701        let first_in_mem_block = in_memory_blocks.first().unwrap();
1702        // First database block
1703        let first_db_block = database_blocks.first().unwrap();
1704
1705        // First in memory block should not be found yet as not integrated to the in-memory state
1706        assert_eq!(
1707            consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1708            None
1709        );
1710        assert_eq!(
1711            consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1712            None
1713        );
1714
1715        // Insert first block into the in-memory state
1716        let in_memory_block_senders =
1717            first_in_mem_block.senders().expect("failed to recover senders");
1718        let chain = NewCanonicalChain::Commit {
1719            new: vec![ExecutedBlock::new(
1720                Arc::new(first_in_mem_block.clone()),
1721                Arc::new(in_memory_block_senders),
1722                Default::default(),
1723                Default::default(),
1724                Default::default(),
1725            )],
1726        };
1727        consistent_provider.canonical_in_memory_state.update_chain(chain);
1728
1729        let consistent_provider = provider.consistent_provider()?;
1730
1731        // First in memory block should be found
1732        assert_eq!(
1733            consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1734            Some(first_in_mem_block.clone().into())
1735        );
1736        assert_eq!(
1737            consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1738            Some(first_in_mem_block.clone().into())
1739        );
1740
1741        // First database block should be found
1742        assert_eq!(
1743            consistent_provider.block(BlockHashOrNumber::Hash(first_db_block.hash()))?,
1744            Some(first_db_block.clone().into())
1745        );
1746        assert_eq!(
1747            consistent_provider.block(BlockHashOrNumber::Number(first_db_block.number))?,
1748            Some(first_db_block.clone().into())
1749        );
1750
1751        Ok(())
1752    }
1753
1754    #[test]
1755    fn test_changeset_reader() -> eyre::Result<()> {
1756        let mut rng = generators::rng();
1757
1758        let (database_blocks, in_memory_blocks) =
1759            random_blocks(&mut rng, TEST_BLOCKS_COUNT, 1, None, None, 0..1);
1760
1761        let first_database_block = database_blocks.first().map(|block| block.number).unwrap();
1762        let last_database_block = database_blocks.last().map(|block| block.number).unwrap();
1763        let first_in_memory_block = in_memory_blocks.first().map(|block| block.number).unwrap();
1764
1765        let accounts = random_eoa_accounts(&mut rng, 2);
1766
1767        let (database_changesets, database_state) = random_changeset_range(
1768            &mut rng,
1769            &database_blocks,
1770            accounts.into_iter().map(|(address, account)| (address, (account, Vec::new()))),
1771            0..0,
1772            0..0,
1773        );
1774        let (in_memory_changesets, in_memory_state) = random_changeset_range(
1775            &mut rng,
1776            &in_memory_blocks,
1777            database_state
1778                .iter()
1779                .map(|(address, (account, storage))| (*address, (*account, storage.clone()))),
1780            0..0,
1781            0..0,
1782        );
1783
1784        let factory = create_test_provider_factory();
1785
1786        let provider_rw = factory.provider_rw()?;
1787        provider_rw.append_blocks_with_state(
1788            database_blocks
1789                .into_iter()
1790                .map(|b| b.seal_with_senders().expect("failed to seal block with senders"))
1791                .collect(),
1792            ExecutionOutcome {
1793                bundle: BundleState::new(
1794                    database_state.into_iter().map(|(address, (account, _))| {
1795                        (address, None, Some(account.into()), Default::default())
1796                    }),
1797                    database_changesets
1798                        .iter()
1799                        .map(|block_changesets| {
1800                            block_changesets.iter().map(|(address, account, _)| {
1801                                (*address, Some(Some((*account).into())), [])
1802                            })
1803                        })
1804                        .collect::<Vec<_>>(),
1805                    Vec::new(),
1806                ),
1807                first_block: first_database_block,
1808                ..Default::default()
1809            },
1810            Default::default(),
1811            Default::default(),
1812        )?;
1813        provider_rw.commit()?;
1814
1815        let provider = BlockchainProvider2::new(factory)?;
1816
1817        let in_memory_changesets = in_memory_changesets.into_iter().next().unwrap();
1818        let chain = NewCanonicalChain::Commit {
1819            new: vec![in_memory_blocks
1820                .first()
1821                .map(|block| {
1822                    let senders = block.senders().expect("failed to recover senders");
1823                    ExecutedBlock::new(
1824                        Arc::new(block.clone()),
1825                        Arc::new(senders),
1826                        Arc::new(ExecutionOutcome {
1827                            bundle: BundleState::new(
1828                                in_memory_state.into_iter().map(|(address, (account, _))| {
1829                                    (address, None, Some(account.into()), Default::default())
1830                                }),
1831                                [in_memory_changesets.iter().map(|(address, account, _)| {
1832                                    (*address, Some(Some((*account).into())), Vec::new())
1833                                })],
1834                                [],
1835                            ),
1836                            first_block: first_in_memory_block,
1837                            ..Default::default()
1838                        }),
1839                        Default::default(),
1840                        Default::default(),
1841                    )
1842                })
1843                .unwrap()],
1844        };
1845        provider.canonical_in_memory_state.update_chain(chain);
1846
1847        let consistent_provider = provider.consistent_provider()?;
1848
1849        assert_eq!(
1850            consistent_provider.account_block_changeset(last_database_block).unwrap(),
1851            database_changesets
1852                .into_iter()
1853                .last()
1854                .unwrap()
1855                .into_iter()
1856                .sorted_by_key(|(address, _, _)| *address)
1857                .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1858                .collect::<Vec<_>>()
1859        );
1860        assert_eq!(
1861            consistent_provider.account_block_changeset(first_in_memory_block).unwrap(),
1862            in_memory_changesets
1863                .into_iter()
1864                .sorted_by_key(|(address, _, _)| *address)
1865                .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1866                .collect::<Vec<_>>()
1867        );
1868
1869        Ok(())
1870    }
1871}