reth_provider/providers/
consistent.rs

1use super::{DatabaseProviderRO, ProviderFactory, ProviderNodeTypes};
2use crate::{
3    providers::StaticFileProvider, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader,
4    BlockReader, BlockReaderIdExt, BlockSource, ChainSpecProvider, ChangeSetReader, HeaderProvider,
5    ProviderError, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt,
6    StageCheckpointReader, StateReader, StaticFileProviderFactory, TransactionVariant,
7    TransactionsProvider, WithdrawalsProvider,
8};
9use alloy_consensus::{transaction::TransactionMeta, BlockHeader};
10use alloy_eips::{
11    eip2718::Encodable2718, eip4895::Withdrawals, BlockHashOrNumber, BlockId, BlockNumHash,
12    BlockNumberOrTag, HashOrNumber,
13};
14use alloy_primitives::{
15    map::{hash_map, HashMap},
16    Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256,
17};
18use reth_chain_state::{BlockState, CanonicalInMemoryState, MemoryOverlayStateProviderRef};
19use reth_chainspec::{ChainInfo, EthereumHardforks};
20use reth_db_api::models::{AccountBeforeTx, BlockNumberAddress, StoredBlockBodyIndices};
21use reth_execution_types::{BundleStateInit, ExecutionOutcome, RevertsInit};
22use reth_node_types::{BlockTy, HeaderTy, ReceiptTy, TxTy};
23use reth_primitives_traits::{
24    Account, BlockBody, RecoveredBlock, SealedBlock, SealedHeader, StorageEntry,
25};
26use reth_prune_types::{PruneCheckpoint, PruneSegment};
27use reth_stages_types::{StageCheckpoint, StageId};
28use reth_storage_api::{
29    BlockBodyIndicesProvider, DatabaseProviderFactory, NodePrimitivesProvider, OmmersProvider,
30    StateProvider, StorageChangeSetReader,
31};
32use reth_storage_errors::provider::ProviderResult;
33use revm_database::states::PlainStorageRevert;
34use std::{
35    ops::{Add, Bound, RangeBounds, RangeInclusive, Sub},
36    sync::Arc,
37};
38use tracing::trace;
39
40/// Type that interacts with a snapshot view of the blockchain (storage and in-memory) at time of
41/// instantiation, EXCEPT for pending, safe and finalized block which might change while holding
42/// this provider.
43///
44/// CAUTION: Avoid holding this provider for too long or the inner database transaction will
45/// time-out.
46#[derive(Debug)]
47#[doc(hidden)] // triggers ICE for `cargo docs`
48pub struct ConsistentProvider<N: ProviderNodeTypes> {
49    /// Storage provider.
50    storage_provider: <ProviderFactory<N> as DatabaseProviderFactory>::Provider,
51    /// Head block at time of [`Self`] creation
52    head_block: Option<Arc<BlockState<N::Primitives>>>,
53    /// In-memory canonical state. This is not a snapshot, and can change! Use with caution.
54    canonical_in_memory_state: CanonicalInMemoryState<N::Primitives>,
55}
56
57impl<N: ProviderNodeTypes> ConsistentProvider<N> {
58    /// Create a new provider using [`ProviderFactory`] and [`CanonicalInMemoryState`],
59    ///
60    /// Underneath it will take a snapshot by fetching [`CanonicalInMemoryState::head_state`] and
61    /// [`ProviderFactory::database_provider_ro`] effectively maintaining one single snapshotted
62    /// view of memory and database.
63    pub fn new(
64        storage_provider_factory: ProviderFactory<N>,
65        state: CanonicalInMemoryState<N::Primitives>,
66    ) -> ProviderResult<Self> {
67        // Each one provides a snapshot at the time of instantiation, but its order matters.
68        //
69        // If we acquire first the database provider, it's possible that before the in-memory chain
70        // snapshot is instantiated, it will flush blocks to disk. This would
71        // mean that our database provider would not have access to the flushed blocks (since it's
72        // working under an older view), while the in-memory state may have deleted them
73        // entirely. Resulting in gaps on the range.
74        let head_block = state.head_state();
75        let storage_provider = storage_provider_factory.database_provider_ro()?;
76        Ok(Self { storage_provider, head_block, canonical_in_memory_state: state })
77    }
78
79    // Helper function to convert range bounds
80    fn convert_range_bounds<T>(
81        &self,
82        range: impl RangeBounds<T>,
83        end_unbounded: impl FnOnce() -> T,
84    ) -> (T, T)
85    where
86        T: Copy + Add<Output = T> + Sub<Output = T> + From<u8>,
87    {
88        let start = match range.start_bound() {
89            Bound::Included(&n) => n,
90            Bound::Excluded(&n) => n + T::from(1u8),
91            Bound::Unbounded => T::from(0u8),
92        };
93
94        let end = match range.end_bound() {
95            Bound::Included(&n) => n,
96            Bound::Excluded(&n) => n - T::from(1u8),
97            Bound::Unbounded => end_unbounded(),
98        };
99
100        (start, end)
101    }
102
103    /// Storage provider for latest block
104    fn latest_ref<'a>(&'a self) -> ProviderResult<Box<dyn StateProvider + 'a>> {
105        trace!(target: "providers::blockchain", "Getting latest block state provider");
106
107        // use latest state provider if the head state exists
108        if let Some(state) = &self.head_block {
109            trace!(target: "providers::blockchain", "Using head state for latest state provider");
110            Ok(self.block_state_provider_ref(state)?.boxed())
111        } else {
112            trace!(target: "providers::blockchain", "Using database state for latest state provider");
113            Ok(self.storage_provider.latest())
114        }
115    }
116
117    fn history_by_block_hash_ref<'a>(
118        &'a self,
119        block_hash: BlockHash,
120    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
121        trace!(target: "providers::blockchain", ?block_hash, "Getting history by block hash");
122
123        self.get_in_memory_or_storage_by_block(
124            block_hash.into(),
125            |_| self.storage_provider.history_by_block_hash(block_hash),
126            |block_state| {
127                let state_provider = self.block_state_provider_ref(block_state)?;
128                Ok(Box::new(state_provider))
129            },
130        )
131    }
132
133    /// Returns a state provider indexed by the given block number or tag.
134    fn state_by_block_number_ref<'a>(
135        &'a self,
136        number: BlockNumber,
137    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
138        let hash =
139            self.block_hash(number)?.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
140        self.history_by_block_hash_ref(hash)
141    }
142
143    /// Return the last N blocks of state, recreating the [`ExecutionOutcome`].
144    ///
145    /// If the range is empty, or there are no blocks for the given range, then this returns `None`.
146    pub fn get_state(
147        &self,
148        range: RangeInclusive<BlockNumber>,
149    ) -> ProviderResult<Option<ExecutionOutcome<ReceiptTy<N>>>> {
150        if range.is_empty() {
151            return Ok(None)
152        }
153        let start_block_number = *range.start();
154        let end_block_number = *range.end();
155
156        // We are not removing block meta as it is used to get block changesets.
157        let mut block_bodies = Vec::new();
158        for block_num in range.clone() {
159            let block_body = self
160                .block_body_indices(block_num)?
161                .ok_or(ProviderError::BlockBodyIndicesNotFound(block_num))?;
162            block_bodies.push((block_num, block_body))
163        }
164
165        // get transaction receipts
166        let Some(from_transaction_num) = block_bodies.first().map(|body| body.1.first_tx_num())
167        else {
168            return Ok(None)
169        };
170        let Some(to_transaction_num) = block_bodies.last().map(|body| body.1.last_tx_num()) else {
171            return Ok(None)
172        };
173
174        let mut account_changeset = Vec::new();
175        for block_num in range.clone() {
176            let changeset =
177                self.account_block_changeset(block_num)?.into_iter().map(|elem| (block_num, elem));
178            account_changeset.extend(changeset);
179        }
180
181        let mut storage_changeset = Vec::new();
182        for block_num in range {
183            let changeset = self.storage_changeset(block_num)?;
184            storage_changeset.extend(changeset);
185        }
186
187        let (state, reverts) =
188            self.populate_bundle_state(account_changeset, storage_changeset, end_block_number)?;
189
190        let mut receipt_iter =
191            self.receipts_by_tx_range(from_transaction_num..=to_transaction_num)?.into_iter();
192
193        let mut receipts = Vec::with_capacity(block_bodies.len());
194        // loop break if we are at the end of the blocks.
195        for (_, block_body) in block_bodies {
196            let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
197            for tx_num in block_body.tx_num_range() {
198                let receipt = receipt_iter
199                    .next()
200                    .ok_or_else(|| ProviderError::ReceiptNotFound(tx_num.into()))?;
201                block_receipts.push(receipt);
202            }
203            receipts.push(block_receipts);
204        }
205
206        Ok(Some(ExecutionOutcome::new_init(
207            state,
208            reverts,
209            // We skip new contracts since we never delete them from the database
210            Vec::new(),
211            receipts,
212            start_block_number,
213            Vec::new(),
214        )))
215    }
216
217    /// Populate a [`BundleStateInit`] and [`RevertsInit`] using cursors over the
218    /// [`reth_db::PlainAccountState`] and [`reth_db::PlainStorageState`] tables, based on the given
219    /// storage and account changesets.
220    fn populate_bundle_state(
221        &self,
222        account_changeset: Vec<(u64, AccountBeforeTx)>,
223        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
224        block_range_end: BlockNumber,
225    ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
226        let mut state: BundleStateInit = HashMap::default();
227        let mut reverts: RevertsInit = HashMap::default();
228        let state_provider = self.state_by_block_number_ref(block_range_end)?;
229
230        // add account changeset changes
231        for (block_number, account_before) in account_changeset.into_iter().rev() {
232            let AccountBeforeTx { info: old_info, address } = account_before;
233            match state.entry(address) {
234                hash_map::Entry::Vacant(entry) => {
235                    let new_info = state_provider.basic_account(&address)?;
236                    entry.insert((old_info, new_info, HashMap::default()));
237                }
238                hash_map::Entry::Occupied(mut entry) => {
239                    // overwrite old account state.
240                    entry.get_mut().0 = old_info;
241                }
242            }
243            // insert old info into reverts.
244            reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
245        }
246
247        // add storage changeset changes
248        for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
249            let BlockNumberAddress((block_number, address)) = block_and_address;
250            // get account state or insert from plain state.
251            let account_state = match state.entry(address) {
252                hash_map::Entry::Vacant(entry) => {
253                    let present_info = state_provider.basic_account(&address)?;
254                    entry.insert((present_info, present_info, HashMap::default()))
255                }
256                hash_map::Entry::Occupied(entry) => entry.into_mut(),
257            };
258
259            // match storage.
260            match account_state.2.entry(old_storage.key) {
261                hash_map::Entry::Vacant(entry) => {
262                    let new_storage_value =
263                        state_provider.storage(address, old_storage.key)?.unwrap_or_default();
264                    entry.insert((
265                        (old_storage.value, old_storage.is_private),
266                        (new_storage_value.value, old_storage.is_private),
267                    ));
268                }
269                hash_map::Entry::Occupied(mut entry) => {
270                    entry.get_mut().0 = (old_storage.value, old_storage.is_private);
271                }
272            };
273
274            reverts
275                .entry(block_number)
276                .or_default()
277                .entry(address)
278                .or_default()
279                .1
280                .push(old_storage);
281        }
282
283        Ok((state, reverts))
284    }
285
286    /// Fetches a range of data from both in-memory state and persistent storage while a predicate
287    /// is met.
288    ///
289    /// Creates a snapshot of the in-memory chain state and database provider to prevent
290    /// inconsistencies. Splits the range into in-memory and storage sections, prioritizing
291    /// recent in-memory blocks in case of overlaps.
292    ///
293    /// * `fetch_db_range` function (`F`) provides access to the database provider, allowing the
294    ///   user to retrieve the required items from the database using [`RangeInclusive`].
295    /// * `map_block_state_item` function (`G`) provides each block of the range in the in-memory
296    ///   state, allowing for selection or filtering for the desired data.
297    fn get_in_memory_or_storage_by_block_range_while<T, F, G, P>(
298        &self,
299        range: impl RangeBounds<BlockNumber>,
300        fetch_db_range: F,
301        map_block_state_item: G,
302        mut predicate: P,
303    ) -> ProviderResult<Vec<T>>
304    where
305        F: FnOnce(
306            &DatabaseProviderRO<N::DB, N>,
307            RangeInclusive<BlockNumber>,
308            &mut P,
309        ) -> ProviderResult<Vec<T>>,
310        G: Fn(&BlockState<N::Primitives>, &mut P) -> Option<T>,
311        P: FnMut(&T) -> bool,
312    {
313        // Each one provides a snapshot at the time of instantiation, but its order matters.
314        //
315        // If we acquire first the database provider, it's possible that before the in-memory chain
316        // snapshot is instantiated, it will flush blocks to disk. This would
317        // mean that our database provider would not have access to the flushed blocks (since it's
318        // working under an older view), while the in-memory state may have deleted them
319        // entirely. Resulting in gaps on the range.
320        let mut in_memory_chain =
321            self.head_block.as_ref().map(|b| b.chain().collect::<Vec<_>>()).unwrap_or_default();
322        let db_provider = &self.storage_provider;
323
324        let (start, end) = self.convert_range_bounds(range, || {
325            // the first block is the highest one.
326            in_memory_chain
327                .first()
328                .map(|b| b.number())
329                .unwrap_or_else(|| db_provider.last_block_number().unwrap_or_default())
330        });
331
332        if start > end {
333            return Ok(vec![])
334        }
335
336        // Split range into storage_range and in-memory range. If the in-memory range is not
337        // necessary drop it early.
338        //
339        // The last block of `in_memory_chain` is the lowest block number.
340        let (in_memory, storage_range) = match in_memory_chain.last().as_ref().map(|b| b.number()) {
341            Some(lowest_memory_block) if lowest_memory_block <= end => {
342                let highest_memory_block =
343                    in_memory_chain.first().as_ref().map(|b| b.number()).expect("qed");
344
345                // Database will for a time overlap with in-memory-chain blocks. In
346                // case of a re-org, it can mean that the database blocks are of a forked chain, and
347                // so, we should prioritize the in-memory overlapped blocks.
348                let in_memory_range =
349                    lowest_memory_block.max(start)..=end.min(highest_memory_block);
350
351                // If requested range is in the middle of the in-memory range, remove the necessary
352                // lowest blocks
353                in_memory_chain.truncate(
354                    in_memory_chain
355                        .len()
356                        .saturating_sub(start.saturating_sub(lowest_memory_block) as usize),
357                );
358
359                let storage_range =
360                    (lowest_memory_block > start).then(|| start..=lowest_memory_block - 1);
361
362                (Some((in_memory_chain, in_memory_range)), storage_range)
363            }
364            _ => {
365                // Drop the in-memory chain so we don't hold blocks in memory.
366                drop(in_memory_chain);
367
368                (None, Some(start..=end))
369            }
370        };
371
372        let mut items = Vec::with_capacity((end - start + 1) as usize);
373
374        if let Some(storage_range) = storage_range {
375            let mut db_items = fetch_db_range(db_provider, storage_range.clone(), &mut predicate)?;
376            items.append(&mut db_items);
377
378            // The predicate was not met, if the number of items differs from the expected. So, we
379            // return what we have.
380            if items.len() as u64 != storage_range.end() - storage_range.start() + 1 {
381                return Ok(items)
382            }
383        }
384
385        if let Some((in_memory_chain, in_memory_range)) = in_memory {
386            for (num, block) in in_memory_range.zip(in_memory_chain.into_iter().rev()) {
387                debug_assert!(num == block.number());
388                if let Some(item) = map_block_state_item(block, &mut predicate) {
389                    items.push(item);
390                } else {
391                    break
392                }
393            }
394        }
395
396        Ok(items)
397    }
398
399    /// This uses a given [`BlockState`] to initialize a state provider for that block.
400    fn block_state_provider_ref(
401        &self,
402        state: &BlockState<N::Primitives>,
403    ) -> ProviderResult<MemoryOverlayStateProviderRef<'_, N::Primitives>> {
404        let anchor_hash = state.anchor().hash;
405        let latest_historical = self.history_by_block_hash_ref(anchor_hash)?;
406        let in_memory = state.chain().map(|block_state| block_state.block()).collect();
407        Ok(MemoryOverlayStateProviderRef::new(latest_historical, in_memory))
408    }
409
410    /// Fetches data from either in-memory state or persistent storage for a range of transactions.
411    ///
412    /// * `fetch_from_db`: has a `DatabaseProviderRO` and the storage specific range.
413    /// * `fetch_from_block_state`: has a [`RangeInclusive`] of elements that should be fetched from
414    ///   [`BlockState`]. [`RangeInclusive`] is necessary to handle partial look-ups of a block.
415    fn get_in_memory_or_storage_by_tx_range<S, M, R>(
416        &self,
417        range: impl RangeBounds<BlockNumber>,
418        fetch_from_db: S,
419        fetch_from_block_state: M,
420    ) -> ProviderResult<Vec<R>>
421    where
422        S: FnOnce(
423            &DatabaseProviderRO<N::DB, N>,
424            RangeInclusive<TxNumber>,
425        ) -> ProviderResult<Vec<R>>,
426        M: Fn(RangeInclusive<usize>, &BlockState<N::Primitives>) -> ProviderResult<Vec<R>>,
427    {
428        let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
429        let provider = &self.storage_provider;
430
431        // Get the last block number stored in the storage which does NOT overlap with in-memory
432        // chain.
433        let last_database_block_number = in_mem_chain
434            .last()
435            .map(|b| Ok(b.anchor().number))
436            .unwrap_or_else(|| provider.last_block_number())?;
437
438        // Get the next tx number for the last block stored in the storage, which marks the start of
439        // the in-memory state.
440        let last_block_body_index = provider
441            .block_body_indices(last_database_block_number)?
442            .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
443        let mut in_memory_tx_num = last_block_body_index.next_tx_num();
444
445        let (start, end) = self.convert_range_bounds(range, || {
446            in_mem_chain
447                .iter()
448                .map(|b| b.block_ref().recovered_block().body().transactions().len() as u64)
449                .sum::<u64>() +
450                last_block_body_index.last_tx_num()
451        });
452
453        if start > end {
454            return Ok(vec![])
455        }
456
457        let mut tx_range = start..=end;
458
459        // If the range is entirely before the first in-memory transaction number, fetch from
460        // storage
461        if *tx_range.end() < in_memory_tx_num {
462            return fetch_from_db(provider, tx_range);
463        }
464
465        let mut items = Vec::with_capacity((tx_range.end() - tx_range.start() + 1) as usize);
466
467        // If the range spans storage and memory, get elements from storage first.
468        if *tx_range.start() < in_memory_tx_num {
469            // Determine the range that needs to be fetched from storage.
470            let db_range = *tx_range.start()..=in_memory_tx_num.saturating_sub(1);
471
472            // Set the remaining transaction range for in-memory
473            tx_range = in_memory_tx_num..=*tx_range.end();
474
475            items.extend(fetch_from_db(provider, db_range)?);
476        }
477
478        // Iterate from the lowest block to the highest in-memory chain
479        for block_state in in_mem_chain.iter().rev() {
480            let block_tx_count =
481                block_state.block_ref().recovered_block().body().transactions().len();
482            let remaining = (tx_range.end() - tx_range.start() + 1) as usize;
483
484            // If the transaction range start is equal or higher than the next block first
485            // transaction, advance
486            if *tx_range.start() >= in_memory_tx_num + block_tx_count as u64 {
487                in_memory_tx_num += block_tx_count as u64;
488                continue
489            }
490
491            // This should only be more than 0 once, in case of a partial range inside a block.
492            let skip = (tx_range.start() - in_memory_tx_num) as usize;
493
494            items.extend(fetch_from_block_state(
495                skip..=skip + (remaining.min(block_tx_count - skip) - 1),
496                block_state,
497            )?);
498
499            in_memory_tx_num += block_tx_count as u64;
500
501            // Break if the range has been fully processed
502            if in_memory_tx_num > *tx_range.end() {
503                break
504            }
505
506            // Set updated range
507            tx_range = in_memory_tx_num..=*tx_range.end();
508        }
509
510        Ok(items)
511    }
512
513    /// Fetches data from either in-memory state or persistent storage by transaction
514    /// [`HashOrNumber`].
515    fn get_in_memory_or_storage_by_tx<S, M, R>(
516        &self,
517        id: HashOrNumber,
518        fetch_from_db: S,
519        fetch_from_block_state: M,
520    ) -> ProviderResult<Option<R>>
521    where
522        S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<Option<R>>,
523        M: Fn(usize, TxNumber, &BlockState<N::Primitives>) -> ProviderResult<Option<R>>,
524    {
525        let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
526        let provider = &self.storage_provider;
527
528        // Get the last block number stored in the database which does NOT overlap with in-memory
529        // chain.
530        let last_database_block_number = in_mem_chain
531            .last()
532            .map(|b| Ok(b.anchor().number))
533            .unwrap_or_else(|| provider.last_block_number())?;
534
535        // Get the next tx number for the last block stored in the database and consider it the
536        // first tx number of the in-memory state
537        let last_block_body_index = provider
538            .block_body_indices(last_database_block_number)?
539            .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
540        let mut in_memory_tx_num = last_block_body_index.next_tx_num();
541
542        // If the transaction number is less than the first in-memory transaction number, make a
543        // database lookup
544        if let HashOrNumber::Number(id) = id {
545            if id < in_memory_tx_num {
546                return fetch_from_db(provider)
547            }
548        }
549
550        // Iterate from the lowest block to the highest
551        for block_state in in_mem_chain.iter().rev() {
552            let executed_block = block_state.block_ref();
553            let block = executed_block.recovered_block();
554
555            for tx_index in 0..block.body().transactions().len() {
556                match id {
557                    HashOrNumber::Hash(tx_hash) => {
558                        if tx_hash == block.body().transactions()[tx_index].trie_hash() {
559                            return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
560                        }
561                    }
562                    HashOrNumber::Number(id) => {
563                        if id == in_memory_tx_num {
564                            return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
565                        }
566                    }
567                }
568
569                in_memory_tx_num += 1;
570            }
571        }
572
573        // Not found in-memory, so check database.
574        if let HashOrNumber::Hash(_) = id {
575            return fetch_from_db(provider)
576        }
577
578        Ok(None)
579    }
580
581    /// Fetches data from either in-memory state or persistent storage by [`BlockHashOrNumber`].
582    pub(crate) fn get_in_memory_or_storage_by_block<S, M, R>(
583        &self,
584        id: BlockHashOrNumber,
585        fetch_from_db: S,
586        fetch_from_block_state: M,
587    ) -> ProviderResult<R>
588    where
589        S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<R>,
590        M: Fn(&BlockState<N::Primitives>) -> ProviderResult<R>,
591    {
592        if let Some(Some(block_state)) = self.head_block.as_ref().map(|b| b.block_on_chain(id)) {
593            return fetch_from_block_state(block_state)
594        }
595        fetch_from_db(&self.storage_provider)
596    }
597}
598
599impl<N: ProviderNodeTypes> ConsistentProvider<N> {
600    /// Ensures that the given block number is canonical (synced)
601    ///
602    /// This is a helper for guarding the `HistoricalStateProvider` against block numbers that are
603    /// out of range and would lead to invalid results, mainly during initial sync.
604    ///
605    /// Verifying the `block_number` would be expensive since we need to lookup sync table
606    /// Instead, we ensure that the `block_number` is within the range of the
607    /// [`Self::best_block_number`] which is updated when a block is synced.
608    #[inline]
609    pub(crate) fn ensure_canonical_block(&self, block_number: BlockNumber) -> ProviderResult<()> {
610        let latest = self.best_block_number()?;
611        if block_number > latest {
612            Err(ProviderError::HeaderNotFound(block_number.into()))
613        } else {
614            Ok(())
615        }
616    }
617}
618
619impl<N: ProviderNodeTypes> NodePrimitivesProvider for ConsistentProvider<N> {
620    type Primitives = N::Primitives;
621}
622
623impl<N: ProviderNodeTypes> StaticFileProviderFactory for ConsistentProvider<N> {
624    fn static_file_provider(&self) -> StaticFileProvider<N::Primitives> {
625        self.storage_provider.static_file_provider()
626    }
627}
628
629impl<N: ProviderNodeTypes> HeaderProvider for ConsistentProvider<N> {
630    type Header = HeaderTy<N>;
631
632    fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
633        self.get_in_memory_or_storage_by_block(
634            (*block_hash).into(),
635            |db_provider| db_provider.header(block_hash),
636            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
637        )
638    }
639
640    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
641        self.get_in_memory_or_storage_by_block(
642            num.into(),
643            |db_provider| db_provider.header_by_number(num),
644            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
645        )
646    }
647
648    fn header_td(&self, hash: &BlockHash) -> ProviderResult<Option<U256>> {
649        if let Some(num) = self.block_number(*hash)? {
650            self.header_td_by_number(num)
651        } else {
652            Ok(None)
653        }
654    }
655
656    fn header_td_by_number(&self, number: BlockNumber) -> ProviderResult<Option<U256>> {
657        let number = if self.head_block.as_ref().map(|b| b.block_on_chain(number.into())).is_some()
658        {
659            // If the block exists in memory, we should return a TD for it.
660            //
661            // The canonical in memory state should only store post-merge blocks. Post-merge blocks
662            // have zero difficulty. This means we can use the total difficulty for the last
663            // finalized block number if present (so that we are not affected by reorgs), if not the
664            // last number in the database will be used.
665            if let Some(last_finalized_num_hash) =
666                self.canonical_in_memory_state.get_finalized_num_hash()
667            {
668                last_finalized_num_hash.number
669            } else {
670                self.last_block_number()?
671            }
672        } else {
673            // Otherwise, return what we have on disk for the input block
674            number
675        };
676        self.storage_provider.header_td_by_number(number)
677    }
678
679    fn headers_range(
680        &self,
681        range: impl RangeBounds<BlockNumber>,
682    ) -> ProviderResult<Vec<Self::Header>> {
683        self.get_in_memory_or_storage_by_block_range_while(
684            range,
685            |db_provider, range, _| db_provider.headers_range(range),
686            |block_state, _| Some(block_state.block_ref().recovered_block().header().clone()),
687            |_| true,
688        )
689    }
690
691    fn sealed_header(
692        &self,
693        number: BlockNumber,
694    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
695        self.get_in_memory_or_storage_by_block(
696            number.into(),
697            |db_provider| db_provider.sealed_header(number),
698            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_sealed_header())),
699        )
700    }
701
702    fn sealed_headers_range(
703        &self,
704        range: impl RangeBounds<BlockNumber>,
705    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
706        self.get_in_memory_or_storage_by_block_range_while(
707            range,
708            |db_provider, range, _| db_provider.sealed_headers_range(range),
709            |block_state, _| Some(block_state.block_ref().recovered_block().clone_sealed_header()),
710            |_| true,
711        )
712    }
713
714    fn sealed_headers_while(
715        &self,
716        range: impl RangeBounds<BlockNumber>,
717        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
718    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
719        self.get_in_memory_or_storage_by_block_range_while(
720            range,
721            |db_provider, range, predicate| db_provider.sealed_headers_while(range, predicate),
722            |block_state, predicate| {
723                let header = block_state.block_ref().recovered_block().sealed_header();
724                predicate(header).then(|| header.clone())
725            },
726            predicate,
727        )
728    }
729}
730
731impl<N: ProviderNodeTypes> BlockHashReader for ConsistentProvider<N> {
732    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
733        self.get_in_memory_or_storage_by_block(
734            number.into(),
735            |db_provider| db_provider.block_hash(number),
736            |block_state| Ok(Some(block_state.hash())),
737        )
738    }
739
740    fn canonical_hashes_range(
741        &self,
742        start: BlockNumber,
743        end: BlockNumber,
744    ) -> ProviderResult<Vec<B256>> {
745        self.get_in_memory_or_storage_by_block_range_while(
746            start..end,
747            |db_provider, inclusive_range, _| {
748                db_provider
749                    .canonical_hashes_range(*inclusive_range.start(), *inclusive_range.end() + 1)
750            },
751            |block_state, _| Some(block_state.hash()),
752            |_| true,
753        )
754    }
755}
756
757impl<N: ProviderNodeTypes> BlockNumReader for ConsistentProvider<N> {
758    fn chain_info(&self) -> ProviderResult<ChainInfo> {
759        let best_number = self.best_block_number()?;
760        Ok(ChainInfo { best_hash: self.block_hash(best_number)?.unwrap_or_default(), best_number })
761    }
762
763    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
764        self.head_block.as_ref().map(|b| Ok(b.number())).unwrap_or_else(|| self.last_block_number())
765    }
766
767    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
768        self.storage_provider.last_block_number()
769    }
770
771    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
772        self.get_in_memory_or_storage_by_block(
773            hash.into(),
774            |db_provider| db_provider.block_number(hash),
775            |block_state| Ok(Some(block_state.number())),
776        )
777    }
778}
779
780impl<N: ProviderNodeTypes> BlockIdReader for ConsistentProvider<N> {
781    fn pending_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
782        Ok(self.canonical_in_memory_state.pending_block_num_hash())
783    }
784
785    fn safe_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
786        Ok(self.canonical_in_memory_state.get_safe_num_hash())
787    }
788
789    fn finalized_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
790        Ok(self.canonical_in_memory_state.get_finalized_num_hash())
791    }
792}
793
794impl<N: ProviderNodeTypes> BlockReader for ConsistentProvider<N> {
795    type Block = BlockTy<N>;
796
797    fn find_block_by_hash(
798        &self,
799        hash: B256,
800        source: BlockSource,
801    ) -> ProviderResult<Option<Self::Block>> {
802        if matches!(source, BlockSource::Canonical | BlockSource::Any) {
803            if let Some(block) = self.get_in_memory_or_storage_by_block(
804                hash.into(),
805                |db_provider| db_provider.find_block_by_hash(hash, BlockSource::Canonical),
806                |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
807            )? {
808                return Ok(Some(block))
809            }
810        }
811
812        if matches!(source, BlockSource::Pending | BlockSource::Any) {
813            return Ok(self
814                .canonical_in_memory_state
815                .pending_block()
816                .filter(|b| b.hash() == hash)
817                .map(|b| b.into_block()))
818        }
819
820        Ok(None)
821    }
822
823    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
824        self.get_in_memory_or_storage_by_block(
825            id,
826            |db_provider| db_provider.block(id),
827            |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
828        )
829    }
830
831    fn pending_block(&self) -> ProviderResult<Option<SealedBlock<Self::Block>>> {
832        Ok(self.canonical_in_memory_state.pending_block())
833    }
834
835    fn pending_block_with_senders(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
836        Ok(self.canonical_in_memory_state.pending_recovered_block())
837    }
838
839    fn pending_block_and_receipts(
840        &self,
841    ) -> ProviderResult<Option<(SealedBlock<Self::Block>, Vec<Self::Receipt>)>> {
842        Ok(self.canonical_in_memory_state.pending_block_and_receipts())
843    }
844
845    /// Returns the block with senders with matching number or hash from database.
846    ///
847    /// **NOTE: If [`TransactionVariant::NoHash`] is provided then the transactions have invalid
848    /// hashes, since they would need to be calculated on the spot, and we want fast querying.**
849    ///
850    /// Returns `None` if block is not found.
851    fn recovered_block(
852        &self,
853        id: BlockHashOrNumber,
854        transaction_kind: TransactionVariant,
855    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
856        self.get_in_memory_or_storage_by_block(
857            id,
858            |db_provider| db_provider.recovered_block(id, transaction_kind),
859            |block_state| Ok(Some(block_state.block().recovered_block().clone())),
860        )
861    }
862
863    fn sealed_block_with_senders(
864        &self,
865        id: BlockHashOrNumber,
866        transaction_kind: TransactionVariant,
867    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
868        self.get_in_memory_or_storage_by_block(
869            id,
870            |db_provider| db_provider.sealed_block_with_senders(id, transaction_kind),
871            |block_state| Ok(Some(block_state.block().recovered_block().clone())),
872        )
873    }
874
875    fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
876        self.get_in_memory_or_storage_by_block_range_while(
877            range,
878            |db_provider, range, _| db_provider.block_range(range),
879            |block_state, _| Some(block_state.block_ref().recovered_block().clone_block()),
880            |_| true,
881        )
882    }
883
884    fn block_with_senders_range(
885        &self,
886        range: RangeInclusive<BlockNumber>,
887    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
888        self.get_in_memory_or_storage_by_block_range_while(
889            range,
890            |db_provider, range, _| db_provider.block_with_senders_range(range),
891            |block_state, _| Some(block_state.block().recovered_block().clone()),
892            |_| true,
893        )
894    }
895
896    fn recovered_block_range(
897        &self,
898        range: RangeInclusive<BlockNumber>,
899    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
900        self.get_in_memory_or_storage_by_block_range_while(
901            range,
902            |db_provider, range, _| db_provider.recovered_block_range(range),
903            |block_state, _| Some(block_state.block().recovered_block().clone()),
904            |_| true,
905        )
906    }
907}
908
909impl<N: ProviderNodeTypes> TransactionsProvider for ConsistentProvider<N> {
910    type Transaction = TxTy<N>;
911
912    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
913        self.get_in_memory_or_storage_by_tx(
914            tx_hash.into(),
915            |db_provider| db_provider.transaction_id(tx_hash),
916            |_, tx_number, _| Ok(Some(tx_number)),
917        )
918    }
919
920    fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
921        self.get_in_memory_or_storage_by_tx(
922            id.into(),
923            |provider| provider.transaction_by_id(id),
924            |tx_index, _, block_state| {
925                Ok(block_state
926                    .block_ref()
927                    .recovered_block()
928                    .body()
929                    .transactions()
930                    .get(tx_index)
931                    .cloned())
932            },
933        )
934    }
935
936    fn transaction_by_id_unhashed(
937        &self,
938        id: TxNumber,
939    ) -> ProviderResult<Option<Self::Transaction>> {
940        self.get_in_memory_or_storage_by_tx(
941            id.into(),
942            |provider| provider.transaction_by_id_unhashed(id),
943            |tx_index, _, block_state| {
944                Ok(block_state
945                    .block_ref()
946                    .recovered_block()
947                    .body()
948                    .transactions()
949                    .get(tx_index)
950                    .cloned())
951            },
952        )
953    }
954
955    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
956        if let Some(tx) = self.head_block.as_ref().and_then(|b| b.transaction_on_chain(hash)) {
957            return Ok(Some(tx))
958        }
959
960        self.storage_provider.transaction_by_hash(hash)
961    }
962
963    fn transaction_by_hash_with_meta(
964        &self,
965        tx_hash: TxHash,
966    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
967        if let Some((tx, meta)) =
968            self.head_block.as_ref().and_then(|b| b.transaction_meta_on_chain(tx_hash))
969        {
970            return Ok(Some((tx, meta)))
971        }
972
973        self.storage_provider.transaction_by_hash_with_meta(tx_hash)
974    }
975
976    fn transaction_block(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
977        self.get_in_memory_or_storage_by_tx(
978            id.into(),
979            |provider| provider.transaction_block(id),
980            |_, _, block_state| Ok(Some(block_state.block_ref().recovered_block().number())),
981        )
982    }
983
984    fn transactions_by_block(
985        &self,
986        id: BlockHashOrNumber,
987    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
988        self.get_in_memory_or_storage_by_block(
989            id,
990            |provider| provider.transactions_by_block(id),
991            |block_state| {
992                Ok(Some(block_state.block_ref().recovered_block().body().transactions().to_vec()))
993            },
994        )
995    }
996
997    fn transactions_by_block_range(
998        &self,
999        range: impl RangeBounds<BlockNumber>,
1000    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1001        self.get_in_memory_or_storage_by_block_range_while(
1002            range,
1003            |db_provider, range, _| db_provider.transactions_by_block_range(range),
1004            |block_state, _| {
1005                Some(block_state.block_ref().recovered_block().body().transactions().to_vec())
1006            },
1007            |_| true,
1008        )
1009    }
1010
1011    fn transactions_by_tx_range(
1012        &self,
1013        range: impl RangeBounds<TxNumber>,
1014    ) -> ProviderResult<Vec<Self::Transaction>> {
1015        self.get_in_memory_or_storage_by_tx_range(
1016            range,
1017            |db_provider, db_range| db_provider.transactions_by_tx_range(db_range),
1018            |index_range, block_state| {
1019                Ok(block_state.block_ref().recovered_block().body().transactions()[index_range]
1020                    .to_vec())
1021            },
1022        )
1023    }
1024
1025    fn senders_by_tx_range(
1026        &self,
1027        range: impl RangeBounds<TxNumber>,
1028    ) -> ProviderResult<Vec<Address>> {
1029        self.get_in_memory_or_storage_by_tx_range(
1030            range,
1031            |db_provider, db_range| db_provider.senders_by_tx_range(db_range),
1032            |index_range, block_state| {
1033                Ok(block_state.block_ref().recovered_block.senders()[index_range].to_vec())
1034            },
1035        )
1036    }
1037
1038    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1039        self.get_in_memory_or_storage_by_tx(
1040            id.into(),
1041            |provider| provider.transaction_sender(id),
1042            |tx_index, _, block_state| {
1043                Ok(block_state.block_ref().recovered_block.senders().get(tx_index).copied())
1044            },
1045        )
1046    }
1047}
1048
1049impl<N: ProviderNodeTypes> ReceiptProvider for ConsistentProvider<N> {
1050    type Receipt = ReceiptTy<N>;
1051
1052    fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1053        self.get_in_memory_or_storage_by_tx(
1054            id.into(),
1055            |provider| provider.receipt(id),
1056            |tx_index, _, block_state| {
1057                Ok(block_state.executed_block_receipts().get(tx_index).cloned())
1058            },
1059        )
1060    }
1061
1062    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1063        for block_state in self.head_block.iter().flat_map(|b| b.chain()) {
1064            let executed_block = block_state.block_ref();
1065            let block = executed_block.recovered_block();
1066            let receipts = block_state.executed_block_receipts();
1067
1068            // assuming 1:1 correspondence between transactions and receipts
1069            debug_assert_eq!(
1070                block.body().transactions().len(),
1071                receipts.len(),
1072                "Mismatch between transaction and receipt count"
1073            );
1074
1075            if let Some(tx_index) =
1076                block.body().transactions_iter().position(|tx| tx.trie_hash() == hash)
1077            {
1078                // safe to use tx_index for receipts due to 1:1 correspondence
1079                return Ok(receipts.get(tx_index).cloned());
1080            }
1081        }
1082
1083        self.storage_provider.receipt_by_hash(hash)
1084    }
1085
1086    fn receipts_by_block(
1087        &self,
1088        block: BlockHashOrNumber,
1089    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1090        self.get_in_memory_or_storage_by_block(
1091            block,
1092            |db_provider| db_provider.receipts_by_block(block),
1093            |block_state| Ok(Some(block_state.executed_block_receipts())),
1094        )
1095    }
1096
1097    fn receipts_by_tx_range(
1098        &self,
1099        range: impl RangeBounds<TxNumber>,
1100    ) -> ProviderResult<Vec<Self::Receipt>> {
1101        self.get_in_memory_or_storage_by_tx_range(
1102            range,
1103            |db_provider, db_range| db_provider.receipts_by_tx_range(db_range),
1104            |index_range, block_state| {
1105                Ok(block_state.executed_block_receipts().drain(index_range).collect())
1106            },
1107        )
1108    }
1109}
1110
1111impl<N: ProviderNodeTypes> ReceiptProviderIdExt for ConsistentProvider<N> {
1112    fn receipts_by_block_id(&self, block: BlockId) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1113        match block {
1114            BlockId::Hash(rpc_block_hash) => {
1115                let mut receipts = self.receipts_by_block(rpc_block_hash.block_hash.into())?;
1116                if receipts.is_none() && !rpc_block_hash.require_canonical.unwrap_or(false) {
1117                    if let Some(state) = self
1118                        .head_block
1119                        .as_ref()
1120                        .and_then(|b| b.block_on_chain(rpc_block_hash.block_hash.into()))
1121                    {
1122                        receipts = Some(state.executed_block_receipts());
1123                    }
1124                }
1125                Ok(receipts)
1126            }
1127            BlockId::Number(num_tag) => match num_tag {
1128                BlockNumberOrTag::Pending => Ok(self
1129                    .canonical_in_memory_state
1130                    .pending_state()
1131                    .map(|block_state| block_state.executed_block_receipts())),
1132                _ => {
1133                    if let Some(num) = self.convert_block_number(num_tag)? {
1134                        self.receipts_by_block(num.into())
1135                    } else {
1136                        Ok(None)
1137                    }
1138                }
1139            },
1140        }
1141    }
1142}
1143
1144impl<N: ProviderNodeTypes> WithdrawalsProvider for ConsistentProvider<N> {
1145    fn withdrawals_by_block(
1146        &self,
1147        id: BlockHashOrNumber,
1148        timestamp: u64,
1149    ) -> ProviderResult<Option<Withdrawals>> {
1150        if !self.chain_spec().is_shanghai_active_at_timestamp(timestamp) {
1151            return Ok(None)
1152        }
1153
1154        self.get_in_memory_or_storage_by_block(
1155            id,
1156            |db_provider| db_provider.withdrawals_by_block(id, timestamp),
1157            |block_state| {
1158                Ok(block_state.block_ref().recovered_block().body().withdrawals().cloned())
1159            },
1160        )
1161    }
1162}
1163
1164impl<N: ProviderNodeTypes> OmmersProvider for ConsistentProvider<N> {
1165    fn ommers(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Vec<HeaderTy<N>>>> {
1166        self.get_in_memory_or_storage_by_block(
1167            id,
1168            |db_provider| db_provider.ommers(id),
1169            |block_state| {
1170                if self.chain_spec().is_paris_active_at_block(block_state.number()) {
1171                    return Ok(Some(Vec::new()))
1172                }
1173
1174                Ok(block_state.block_ref().recovered_block().body().ommers().map(|o| o.to_vec()))
1175            },
1176        )
1177    }
1178}
1179
1180impl<N: ProviderNodeTypes> BlockBodyIndicesProvider for ConsistentProvider<N> {
1181    fn block_body_indices(
1182        &self,
1183        number: BlockNumber,
1184    ) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1185        self.get_in_memory_or_storage_by_block(
1186            number.into(),
1187            |db_provider| db_provider.block_body_indices(number),
1188            |block_state| {
1189                // Find the last block indices on database
1190                let last_storage_block_number = block_state.anchor().number;
1191                let mut stored_indices = self
1192                    .storage_provider
1193                    .block_body_indices(last_storage_block_number)?
1194                    .ok_or(ProviderError::BlockBodyIndicesNotFound(last_storage_block_number))?;
1195
1196                // Prepare our block indices
1197                stored_indices.first_tx_num = stored_indices.next_tx_num();
1198                stored_indices.tx_count = 0;
1199
1200                // Iterate from the lowest block in memory until our target block
1201                for state in block_state.chain().collect::<Vec<_>>().into_iter().rev() {
1202                    let block_tx_count =
1203                        state.block_ref().recovered_block().body().transactions().len() as u64;
1204                    if state.block_ref().recovered_block().number() == number {
1205                        stored_indices.tx_count = block_tx_count;
1206                    } else {
1207                        stored_indices.first_tx_num += block_tx_count;
1208                    }
1209                }
1210
1211                Ok(Some(stored_indices))
1212            },
1213        )
1214    }
1215
1216    fn block_body_indices_range(
1217        &self,
1218        range: RangeInclusive<BlockNumber>,
1219    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1220        range.map_while(|b| self.block_body_indices(b).transpose()).collect()
1221    }
1222}
1223
1224impl<N: ProviderNodeTypes> StageCheckpointReader for ConsistentProvider<N> {
1225    fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1226        self.storage_provider.get_stage_checkpoint(id)
1227    }
1228
1229    fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1230        self.storage_provider.get_stage_checkpoint_progress(id)
1231    }
1232
1233    fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1234        self.storage_provider.get_all_checkpoints()
1235    }
1236}
1237
1238impl<N: ProviderNodeTypes> PruneCheckpointReader for ConsistentProvider<N> {
1239    fn get_prune_checkpoint(
1240        &self,
1241        segment: PruneSegment,
1242    ) -> ProviderResult<Option<PruneCheckpoint>> {
1243        self.storage_provider.get_prune_checkpoint(segment)
1244    }
1245
1246    fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
1247        self.storage_provider.get_prune_checkpoints()
1248    }
1249}
1250
1251impl<N: ProviderNodeTypes> ChainSpecProvider for ConsistentProvider<N> {
1252    type ChainSpec = N::ChainSpec;
1253
1254    fn chain_spec(&self) -> Arc<N::ChainSpec> {
1255        ChainSpecProvider::chain_spec(&self.storage_provider)
1256    }
1257}
1258
1259impl<N: ProviderNodeTypes> BlockReaderIdExt for ConsistentProvider<N> {
1260    fn block_by_id(&self, id: BlockId) -> ProviderResult<Option<Self::Block>> {
1261        match id {
1262            BlockId::Number(num) => self.block_by_number_or_tag(num),
1263            BlockId::Hash(hash) => {
1264                // TODO: should we only apply this for the RPCs that are listed in EIP-1898?
1265                // so not at the provider level?
1266                // if we decide to do this at a higher level, then we can make this an automatic
1267                // trait impl
1268                if Some(true) == hash.require_canonical {
1269                    // check the database, canonical blocks are only stored in the database
1270                    self.find_block_by_hash(hash.block_hash, BlockSource::Canonical)
1271                } else {
1272                    self.block_by_hash(hash.block_hash)
1273                }
1274            }
1275        }
1276    }
1277
1278    fn header_by_number_or_tag(&self, id: BlockNumberOrTag) -> ProviderResult<Option<HeaderTy<N>>> {
1279        Ok(match id {
1280            BlockNumberOrTag::Latest => {
1281                Some(self.canonical_in_memory_state.get_canonical_head().unseal())
1282            }
1283            BlockNumberOrTag::Finalized => {
1284                self.canonical_in_memory_state.get_finalized_header().map(|h| h.unseal())
1285            }
1286            BlockNumberOrTag::Safe => {
1287                self.canonical_in_memory_state.get_safe_header().map(|h| h.unseal())
1288            }
1289            BlockNumberOrTag::Earliest => self.header_by_number(0)?,
1290            BlockNumberOrTag::Pending => self.canonical_in_memory_state.pending_header(),
1291
1292            BlockNumberOrTag::Number(num) => self.header_by_number(num)?,
1293        })
1294    }
1295
1296    fn sealed_header_by_number_or_tag(
1297        &self,
1298        id: BlockNumberOrTag,
1299    ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1300        match id {
1301            BlockNumberOrTag::Latest => {
1302                Ok(Some(self.canonical_in_memory_state.get_canonical_head()))
1303            }
1304            BlockNumberOrTag::Finalized => {
1305                Ok(self.canonical_in_memory_state.get_finalized_header())
1306            }
1307            BlockNumberOrTag::Safe => Ok(self.canonical_in_memory_state.get_safe_header()),
1308            BlockNumberOrTag::Earliest => self
1309                .header_by_number(0)?
1310                .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1311            BlockNumberOrTag::Pending => Ok(self.canonical_in_memory_state.pending_sealed_header()),
1312            BlockNumberOrTag::Number(num) => self
1313                .header_by_number(num)?
1314                .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1315        }
1316    }
1317
1318    fn sealed_header_by_id(
1319        &self,
1320        id: BlockId,
1321    ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1322        Ok(match id {
1323            BlockId::Number(num) => self.sealed_header_by_number_or_tag(num)?,
1324            BlockId::Hash(hash) => self.header(&hash.block_hash)?.map(SealedHeader::seal_slow),
1325        })
1326    }
1327
1328    fn header_by_id(&self, id: BlockId) -> ProviderResult<Option<HeaderTy<N>>> {
1329        Ok(match id {
1330            BlockId::Number(num) => self.header_by_number_or_tag(num)?,
1331            BlockId::Hash(hash) => self.header(&hash.block_hash)?,
1332        })
1333    }
1334
1335    fn ommers_by_id(&self, id: BlockId) -> ProviderResult<Option<Vec<HeaderTy<N>>>> {
1336        match id {
1337            BlockId::Number(num) => self.ommers_by_number_or_tag(num),
1338            BlockId::Hash(hash) => {
1339                // TODO: EIP-1898 question, see above
1340                // here it is not handled
1341                self.ommers(BlockHashOrNumber::Hash(hash.block_hash))
1342            }
1343        }
1344    }
1345}
1346
1347impl<N: ProviderNodeTypes> StorageChangeSetReader for ConsistentProvider<N> {
1348    fn storage_changeset(
1349        &self,
1350        block_number: BlockNumber,
1351    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1352        if let Some(state) =
1353            self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1354        {
1355            let changesets = state
1356                .block()
1357                .execution_output
1358                .bundle
1359                .reverts
1360                .clone()
1361                .to_plain_state_reverts()
1362                .storage
1363                .into_iter()
1364                .flatten()
1365                .flat_map(|revert: PlainStorageRevert| {
1366                    revert.storage_revert.into_iter().map(move |(key, value)| {
1367                        (
1368                            BlockNumberAddress((block_number, revert.address)),
1369                            StorageEntry {
1370                                key: key.into(),
1371                                value: value.to_previous_value().value,
1372                                is_private: value.to_previous_value().is_private,
1373                            },
1374                        )
1375                    })
1376                })
1377                .collect();
1378            Ok(changesets)
1379        } else {
1380            // Perform checks on whether or not changesets exist for the block.
1381
1382            // No prune checkpoint means history should exist and we should `unwrap_or(true)`
1383            let storage_history_exists = self
1384                .storage_provider
1385                .get_prune_checkpoint(PruneSegment::StorageHistory)?
1386                .and_then(|checkpoint| {
1387                    // return true if the block number is ahead of the prune checkpoint.
1388                    //
1389                    // The checkpoint stores the highest pruned block number, so we should make
1390                    // sure the block_number is strictly greater.
1391                    checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1392                })
1393                .unwrap_or(true);
1394
1395            if !storage_history_exists {
1396                return Err(ProviderError::StateAtBlockPruned(block_number))
1397            }
1398
1399            self.storage_provider.storage_changeset(block_number)
1400        }
1401    }
1402}
1403
1404impl<N: ProviderNodeTypes> ChangeSetReader for ConsistentProvider<N> {
1405    fn account_block_changeset(
1406        &self,
1407        block_number: BlockNumber,
1408    ) -> ProviderResult<Vec<AccountBeforeTx>> {
1409        if let Some(state) =
1410            self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1411        {
1412            let changesets = state
1413                .block_ref()
1414                .execution_output
1415                .bundle
1416                .reverts
1417                .clone()
1418                .to_plain_state_reverts()
1419                .accounts
1420                .into_iter()
1421                .flatten()
1422                .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
1423                .collect();
1424            Ok(changesets)
1425        } else {
1426            // Perform checks on whether or not changesets exist for the block.
1427
1428            // No prune checkpoint means history should exist and we should `unwrap_or(true)`
1429            let account_history_exists = self
1430                .storage_provider
1431                .get_prune_checkpoint(PruneSegment::AccountHistory)?
1432                .and_then(|checkpoint| {
1433                    // return true if the block number is ahead of the prune checkpoint.
1434                    //
1435                    // The checkpoint stores the highest pruned block number, so we should make
1436                    // sure the block_number is strictly greater.
1437                    checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1438                })
1439                .unwrap_or(true);
1440
1441            if !account_history_exists {
1442                return Err(ProviderError::StateAtBlockPruned(block_number))
1443            }
1444
1445            self.storage_provider.account_block_changeset(block_number)
1446        }
1447    }
1448}
1449
1450impl<N: ProviderNodeTypes> AccountReader for ConsistentProvider<N> {
1451    /// Get basic account information.
1452    fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1453        // use latest state provider
1454        let state_provider = self.latest_ref()?;
1455        state_provider.basic_account(address)
1456    }
1457}
1458
1459impl<N: ProviderNodeTypes> StateReader for ConsistentProvider<N> {
1460    type Receipt = ReceiptTy<N>;
1461
1462    /// Re-constructs the [`ExecutionOutcome`] from in-memory and database state, if necessary.
1463    ///
1464    /// If data for the block does not exist, this will return [`None`].
1465    ///
1466    /// NOTE: This cannot be called safely in a loop outside of the blockchain tree thread. This is
1467    /// because the [`CanonicalInMemoryState`] could change during a reorg, causing results to be
1468    /// inconsistent. Currently this can safely be called within the blockchain tree thread,
1469    /// because the tree thread is responsible for modifying the [`CanonicalInMemoryState`] in the
1470    /// first place.
1471    fn get_state(
1472        &self,
1473        block: BlockNumber,
1474    ) -> ProviderResult<Option<ExecutionOutcome<Self::Receipt>>> {
1475        if let Some(state) = self.head_block.as_ref().and_then(|b| b.block_on_chain(block.into())) {
1476            let state = state.block_ref().execution_outcome().clone();
1477            Ok(Some(state))
1478        } else {
1479            Self::get_state(self, block..=block)
1480        }
1481    }
1482}
1483
1484#[cfg(test)]
1485mod tests {
1486    use crate::{
1487        providers::blockchain_provider::BlockchainProvider,
1488        test_utils::create_test_provider_factory, BlockWriter,
1489    };
1490    use alloy_eips::BlockHashOrNumber;
1491    use alloy_primitives::B256;
1492    use itertools::Itertools;
1493    use rand::Rng;
1494    use reth_chain_state::{ExecutedBlock, ExecutedBlockWithTrieUpdates, NewCanonicalChain};
1495    use reth_db_api::models::AccountBeforeTx;
1496    use reth_ethereum_primitives::Block;
1497    use reth_execution_types::ExecutionOutcome;
1498    use reth_primitives_traits::{RecoveredBlock, SealedBlock};
1499    use reth_storage_api::{BlockReader, BlockSource, ChangeSetReader};
1500    use reth_testing_utils::generators::{
1501        self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
1502    };
1503    use revm_database::BundleState;
1504    use std::{
1505        ops::{Bound, Range, RangeBounds},
1506        sync::Arc,
1507    };
1508
1509    const TEST_BLOCKS_COUNT: usize = 5;
1510
1511    fn random_blocks(
1512        rng: &mut impl Rng,
1513        database_blocks: usize,
1514        in_memory_blocks: usize,
1515        requests_count: Option<Range<u8>>,
1516        withdrawals_count: Option<Range<u8>>,
1517        tx_count: impl RangeBounds<u8>,
1518    ) -> (Vec<SealedBlock<Block>>, Vec<SealedBlock<Block>>) {
1519        let block_range = (database_blocks + in_memory_blocks - 1) as u64;
1520
1521        let tx_start = match tx_count.start_bound() {
1522            Bound::Included(&n) | Bound::Excluded(&n) => n,
1523            Bound::Unbounded => u8::MIN,
1524        };
1525        let tx_end = match tx_count.end_bound() {
1526            Bound::Included(&n) | Bound::Excluded(&n) => n + 1,
1527            Bound::Unbounded => u8::MAX,
1528        };
1529
1530        let blocks = random_block_range(
1531            rng,
1532            0..=block_range,
1533            BlockRangeParams {
1534                parent: Some(B256::ZERO),
1535                tx_count: tx_start..tx_end,
1536                requests_count,
1537                withdrawals_count,
1538            },
1539        );
1540        let (database_blocks, in_memory_blocks) = blocks.split_at(database_blocks);
1541        (database_blocks.to_vec(), in_memory_blocks.to_vec())
1542    }
1543
1544    #[test]
1545    fn test_block_reader_find_block_by_hash() -> eyre::Result<()> {
1546        // Initialize random number generator and provider factory
1547        let mut rng = generators::rng();
1548        let factory = create_test_provider_factory();
1549
1550        // Generate 10 random blocks and split into database and in-memory blocks
1551        let blocks = random_block_range(
1552            &mut rng,
1553            0..=10,
1554            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1555        );
1556        let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1557
1558        // Insert first 5 blocks into the database
1559        let provider_rw = factory.provider_rw()?;
1560        for block in database_blocks {
1561            provider_rw.insert_historical_block(
1562                block.clone().try_recover().expect("failed to seal block with senders"),
1563            )?;
1564        }
1565        provider_rw.commit()?;
1566
1567        // Create a new provider
1568        let provider = BlockchainProvider::new(factory)?;
1569        let consistent_provider = provider.consistent_provider()?;
1570
1571        // Useful blocks
1572        let first_db_block = database_blocks.first().unwrap();
1573        let first_in_mem_block = in_memory_blocks.first().unwrap();
1574        let last_in_mem_block = in_memory_blocks.last().unwrap();
1575
1576        // No block in memory before setting in memory state
1577        assert_eq!(
1578            consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1579            None
1580        );
1581        assert_eq!(
1582            consistent_provider
1583                .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1584            None
1585        );
1586        // No pending block in memory
1587        assert_eq!(
1588            consistent_provider
1589                .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Pending)?,
1590            None
1591        );
1592
1593        // Insert first block into the in-memory state
1594        let in_memory_block_senders =
1595            first_in_mem_block.senders().expect("failed to recover senders");
1596        let chain = NewCanonicalChain::Commit {
1597            new: vec![ExecutedBlockWithTrieUpdates::new(
1598                Arc::new(RecoveredBlock::new_sealed(
1599                    first_in_mem_block.clone(),
1600                    in_memory_block_senders,
1601                )),
1602                Default::default(),
1603                Default::default(),
1604                Default::default(),
1605            )],
1606        };
1607        consistent_provider.canonical_in_memory_state.update_chain(chain);
1608        let consistent_provider = provider.consistent_provider()?;
1609
1610        // Now the block should be found in memory
1611        assert_eq!(
1612            consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1613            Some(first_in_mem_block.clone().into_block())
1614        );
1615        assert_eq!(
1616            consistent_provider
1617                .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1618            Some(first_in_mem_block.clone().into_block())
1619        );
1620
1621        // Find the first block in database by hash
1622        assert_eq!(
1623            consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Any)?,
1624            Some(first_db_block.clone().into_block())
1625        );
1626        assert_eq!(
1627            consistent_provider
1628                .find_block_by_hash(first_db_block.hash(), BlockSource::Canonical)?,
1629            Some(first_db_block.clone().into_block())
1630        );
1631
1632        // No pending block in database
1633        assert_eq!(
1634            consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Pending)?,
1635            None
1636        );
1637
1638        // Insert the last block into the pending state
1639        provider.canonical_in_memory_state.set_pending_block(ExecutedBlockWithTrieUpdates {
1640            block: ExecutedBlock {
1641                recovered_block: Arc::new(RecoveredBlock::new_sealed(
1642                    last_in_mem_block.clone(),
1643                    Default::default(),
1644                )),
1645                execution_output: Default::default(),
1646                hashed_state: Default::default(),
1647            },
1648            trie: Default::default(),
1649        });
1650
1651        // Now the last block should be found in memory
1652        assert_eq!(
1653            consistent_provider
1654                .find_block_by_hash(last_in_mem_block.hash(), BlockSource::Pending)?,
1655            Some(last_in_mem_block.clone_block())
1656        );
1657
1658        Ok(())
1659    }
1660
1661    #[test]
1662    fn test_block_reader_block() -> eyre::Result<()> {
1663        // Initialize random number generator and provider factory
1664        let mut rng = generators::rng();
1665        let factory = create_test_provider_factory();
1666
1667        // Generate 10 random blocks and split into database and in-memory blocks
1668        let blocks = random_block_range(
1669            &mut rng,
1670            0..=10,
1671            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1672        );
1673        let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1674
1675        // Insert first 5 blocks into the database
1676        let provider_rw = factory.provider_rw()?;
1677        for block in database_blocks {
1678            provider_rw.insert_historical_block(
1679                block.clone().try_recover().expect("failed to seal block with senders"),
1680            )?;
1681        }
1682        provider_rw.commit()?;
1683
1684        // Create a new provider
1685        let provider = BlockchainProvider::new(factory)?;
1686        let consistent_provider = provider.consistent_provider()?;
1687
1688        // First in memory block
1689        let first_in_mem_block = in_memory_blocks.first().unwrap();
1690        // First database block
1691        let first_db_block = database_blocks.first().unwrap();
1692
1693        // First in memory block should not be found yet as not integrated to the in-memory state
1694        assert_eq!(
1695            consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1696            None
1697        );
1698        assert_eq!(
1699            consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1700            None
1701        );
1702
1703        // Insert first block into the in-memory state
1704        let in_memory_block_senders =
1705            first_in_mem_block.senders().expect("failed to recover senders");
1706        let chain = NewCanonicalChain::Commit {
1707            new: vec![ExecutedBlockWithTrieUpdates::new(
1708                Arc::new(RecoveredBlock::new_sealed(
1709                    first_in_mem_block.clone(),
1710                    in_memory_block_senders,
1711                )),
1712                Default::default(),
1713                Default::default(),
1714                Default::default(),
1715            )],
1716        };
1717        consistent_provider.canonical_in_memory_state.update_chain(chain);
1718
1719        let consistent_provider = provider.consistent_provider()?;
1720
1721        // First in memory block should be found
1722        assert_eq!(
1723            consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1724            Some(first_in_mem_block.clone().into_block())
1725        );
1726        assert_eq!(
1727            consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1728            Some(first_in_mem_block.clone().into_block())
1729        );
1730
1731        // First database block should be found
1732        assert_eq!(
1733            consistent_provider.block(BlockHashOrNumber::Hash(first_db_block.hash()))?,
1734            Some(first_db_block.clone().into_block())
1735        );
1736        assert_eq!(
1737            consistent_provider.block(BlockHashOrNumber::Number(first_db_block.number))?,
1738            Some(first_db_block.clone().into_block())
1739        );
1740
1741        Ok(())
1742    }
1743
1744    #[test]
1745    fn test_changeset_reader() -> eyre::Result<()> {
1746        let mut rng = generators::rng();
1747
1748        let (database_blocks, in_memory_blocks) =
1749            random_blocks(&mut rng, TEST_BLOCKS_COUNT, 1, None, None, 0..1);
1750
1751        let first_database_block = database_blocks.first().map(|block| block.number).unwrap();
1752        let last_database_block = database_blocks.last().map(|block| block.number).unwrap();
1753        let first_in_memory_block = in_memory_blocks.first().map(|block| block.number).unwrap();
1754
1755        let accounts = random_eoa_accounts(&mut rng, 2);
1756
1757        let (database_changesets, database_state) = random_changeset_range(
1758            &mut rng,
1759            &database_blocks,
1760            accounts.into_iter().map(|(address, account)| (address, (account, Vec::new()))),
1761            0..0,
1762            0..0,
1763        );
1764        let (in_memory_changesets, in_memory_state) = random_changeset_range(
1765            &mut rng,
1766            &in_memory_blocks,
1767            database_state
1768                .iter()
1769                .map(|(address, (account, storage))| (*address, (*account, storage.clone()))),
1770            0..0,
1771            0..0,
1772        );
1773
1774        let factory = create_test_provider_factory();
1775
1776        let provider_rw = factory.provider_rw()?;
1777        provider_rw.append_blocks_with_state(
1778            database_blocks
1779                .into_iter()
1780                .map(|b| b.try_recover().expect("failed to seal block with senders"))
1781                .collect(),
1782            &ExecutionOutcome {
1783                bundle: BundleState::new(
1784                    database_state.into_iter().map(|(address, (account, _))| {
1785                        (address, None, Some(account.into()), Default::default())
1786                    }),
1787                    database_changesets
1788                        .iter()
1789                        .map(|block_changesets| {
1790                            block_changesets.iter().map(|(address, account, _)| {
1791                                (*address, Some(Some((*account).into())), [])
1792                            })
1793                        })
1794                        .collect::<Vec<_>>(),
1795                    Vec::new(),
1796                ),
1797                first_block: first_database_block,
1798                ..Default::default()
1799            },
1800            Default::default(),
1801            Default::default(),
1802        )?;
1803        provider_rw.commit()?;
1804
1805        let provider = BlockchainProvider::new(factory)?;
1806
1807        let in_memory_changesets = in_memory_changesets.into_iter().next().unwrap();
1808        let chain = NewCanonicalChain::Commit {
1809            new: vec![in_memory_blocks
1810                .first()
1811                .map(|block| {
1812                    let senders = block.senders().expect("failed to recover senders");
1813                    ExecutedBlockWithTrieUpdates::new(
1814                        Arc::new(RecoveredBlock::new_sealed(block.clone(), senders)),
1815                        Arc::new(ExecutionOutcome {
1816                            bundle: BundleState::new(
1817                                in_memory_state.into_iter().map(|(address, (account, _))| {
1818                                    (address, None, Some(account.into()), Default::default())
1819                                }),
1820                                [in_memory_changesets.iter().map(|(address, account, _)| {
1821                                    (*address, Some(Some((*account).into())), Vec::new())
1822                                })],
1823                                [],
1824                            ),
1825                            first_block: first_in_memory_block,
1826                            ..Default::default()
1827                        }),
1828                        Default::default(),
1829                        Default::default(),
1830                    )
1831                })
1832                .unwrap()],
1833        };
1834        provider.canonical_in_memory_state.update_chain(chain);
1835
1836        let consistent_provider = provider.consistent_provider()?;
1837
1838        assert_eq!(
1839            consistent_provider.account_block_changeset(last_database_block).unwrap(),
1840            database_changesets
1841                .into_iter()
1842                .next_back()
1843                .unwrap()
1844                .into_iter()
1845                .sorted_by_key(|(address, _, _)| *address)
1846                .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1847                .collect::<Vec<_>>()
1848        );
1849        assert_eq!(
1850            consistent_provider.account_block_changeset(first_in_memory_block).unwrap(),
1851            in_memory_changesets
1852                .into_iter()
1853                .sorted_by_key(|(address, _, _)| *address)
1854                .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1855                .collect::<Vec<_>>()
1856        );
1857
1858        Ok(())
1859    }
1860}