1use super::{DatabaseProviderRO, ProviderFactory, ProviderNodeTypes};
2use crate::{
3 providers::StaticFileProvider, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader,
4 BlockReader, BlockReaderIdExt, BlockSource, ChainSpecProvider, ChangeSetReader, HeaderProvider,
5 ProviderError, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt,
6 StageCheckpointReader, StateReader, StaticFileProviderFactory, TransactionVariant,
7 TransactionsProvider, WithdrawalsProvider,
8};
9use alloy_consensus::{transaction::TransactionMeta, BlockHeader};
10use alloy_eips::{
11 eip2718::Encodable2718, eip4895::Withdrawals, BlockHashOrNumber, BlockId, BlockNumHash,
12 BlockNumberOrTag, HashOrNumber,
13};
14use alloy_primitives::{
15 map::{hash_map, HashMap},
16 Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256,
17};
18use reth_chain_state::{BlockState, CanonicalInMemoryState, MemoryOverlayStateProviderRef};
19use reth_chainspec::{ChainInfo, EthereumHardforks};
20use reth_db_api::models::{AccountBeforeTx, BlockNumberAddress, StoredBlockBodyIndices};
21use reth_execution_types::{BundleStateInit, ExecutionOutcome, RevertsInit};
22use reth_node_types::{BlockTy, HeaderTy, ReceiptTy, TxTy};
23use reth_primitives_traits::{
24 Account, BlockBody, RecoveredBlock, SealedBlock, SealedHeader, StorageEntry,
25};
26use reth_prune_types::{PruneCheckpoint, PruneSegment};
27use reth_stages_types::{StageCheckpoint, StageId};
28use reth_storage_api::{
29 BlockBodyIndicesProvider, DatabaseProviderFactory, NodePrimitivesProvider, OmmersProvider,
30 StateProvider, StorageChangeSetReader,
31};
32use reth_storage_errors::provider::ProviderResult;
33use revm_database::states::PlainStorageRevert;
34use std::{
35 ops::{Add, Bound, RangeBounds, RangeInclusive, Sub},
36 sync::Arc,
37};
38use tracing::trace;
39
40#[derive(Debug)]
47#[doc(hidden)] pub struct ConsistentProvider<N: ProviderNodeTypes> {
49 storage_provider: <ProviderFactory<N> as DatabaseProviderFactory>::Provider,
51 head_block: Option<Arc<BlockState<N::Primitives>>>,
53 canonical_in_memory_state: CanonicalInMemoryState<N::Primitives>,
55}
56
57impl<N: ProviderNodeTypes> ConsistentProvider<N> {
58 pub fn new(
64 storage_provider_factory: ProviderFactory<N>,
65 state: CanonicalInMemoryState<N::Primitives>,
66 ) -> ProviderResult<Self> {
67 let head_block = state.head_state();
75 let storage_provider = storage_provider_factory.database_provider_ro()?;
76 Ok(Self { storage_provider, head_block, canonical_in_memory_state: state })
77 }
78
79 fn convert_range_bounds<T>(
81 &self,
82 range: impl RangeBounds<T>,
83 end_unbounded: impl FnOnce() -> T,
84 ) -> (T, T)
85 where
86 T: Copy + Add<Output = T> + Sub<Output = T> + From<u8>,
87 {
88 let start = match range.start_bound() {
89 Bound::Included(&n) => n,
90 Bound::Excluded(&n) => n + T::from(1u8),
91 Bound::Unbounded => T::from(0u8),
92 };
93
94 let end = match range.end_bound() {
95 Bound::Included(&n) => n,
96 Bound::Excluded(&n) => n - T::from(1u8),
97 Bound::Unbounded => end_unbounded(),
98 };
99
100 (start, end)
101 }
102
103 fn latest_ref<'a>(&'a self) -> ProviderResult<Box<dyn StateProvider + 'a>> {
105 trace!(target: "providers::blockchain", "Getting latest block state provider");
106
107 if let Some(state) = &self.head_block {
109 trace!(target: "providers::blockchain", "Using head state for latest state provider");
110 Ok(self.block_state_provider_ref(state)?.boxed())
111 } else {
112 trace!(target: "providers::blockchain", "Using database state for latest state provider");
113 Ok(self.storage_provider.latest())
114 }
115 }
116
117 fn history_by_block_hash_ref<'a>(
118 &'a self,
119 block_hash: BlockHash,
120 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
121 trace!(target: "providers::blockchain", ?block_hash, "Getting history by block hash");
122
123 self.get_in_memory_or_storage_by_block(
124 block_hash.into(),
125 |_| self.storage_provider.history_by_block_hash(block_hash),
126 |block_state| {
127 let state_provider = self.block_state_provider_ref(block_state)?;
128 Ok(Box::new(state_provider))
129 },
130 )
131 }
132
133 fn state_by_block_number_ref<'a>(
135 &'a self,
136 number: BlockNumber,
137 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
138 let hash =
139 self.block_hash(number)?.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
140 self.history_by_block_hash_ref(hash)
141 }
142
143 pub fn get_state(
147 &self,
148 range: RangeInclusive<BlockNumber>,
149 ) -> ProviderResult<Option<ExecutionOutcome<ReceiptTy<N>>>> {
150 if range.is_empty() {
151 return Ok(None)
152 }
153 let start_block_number = *range.start();
154 let end_block_number = *range.end();
155
156 let mut block_bodies = Vec::new();
158 for block_num in range.clone() {
159 let block_body = self
160 .block_body_indices(block_num)?
161 .ok_or(ProviderError::BlockBodyIndicesNotFound(block_num))?;
162 block_bodies.push((block_num, block_body))
163 }
164
165 let Some(from_transaction_num) = block_bodies.first().map(|body| body.1.first_tx_num())
167 else {
168 return Ok(None)
169 };
170 let Some(to_transaction_num) = block_bodies.last().map(|body| body.1.last_tx_num()) else {
171 return Ok(None)
172 };
173
174 let mut account_changeset = Vec::new();
175 for block_num in range.clone() {
176 let changeset =
177 self.account_block_changeset(block_num)?.into_iter().map(|elem| (block_num, elem));
178 account_changeset.extend(changeset);
179 }
180
181 let mut storage_changeset = Vec::new();
182 for block_num in range {
183 let changeset = self.storage_changeset(block_num)?;
184 storage_changeset.extend(changeset);
185 }
186
187 let (state, reverts) =
188 self.populate_bundle_state(account_changeset, storage_changeset, end_block_number)?;
189
190 let mut receipt_iter =
191 self.receipts_by_tx_range(from_transaction_num..=to_transaction_num)?.into_iter();
192
193 let mut receipts = Vec::with_capacity(block_bodies.len());
194 for (_, block_body) in block_bodies {
196 let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
197 for tx_num in block_body.tx_num_range() {
198 let receipt = receipt_iter
199 .next()
200 .ok_or_else(|| ProviderError::ReceiptNotFound(tx_num.into()))?;
201 block_receipts.push(receipt);
202 }
203 receipts.push(block_receipts);
204 }
205
206 Ok(Some(ExecutionOutcome::new_init(
207 state,
208 reverts,
209 Vec::new(),
211 receipts,
212 start_block_number,
213 Vec::new(),
214 )))
215 }
216
217 fn populate_bundle_state(
221 &self,
222 account_changeset: Vec<(u64, AccountBeforeTx)>,
223 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
224 block_range_end: BlockNumber,
225 ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
226 let mut state: BundleStateInit = HashMap::default();
227 let mut reverts: RevertsInit = HashMap::default();
228 let state_provider = self.state_by_block_number_ref(block_range_end)?;
229
230 for (block_number, account_before) in account_changeset.into_iter().rev() {
232 let AccountBeforeTx { info: old_info, address } = account_before;
233 match state.entry(address) {
234 hash_map::Entry::Vacant(entry) => {
235 let new_info = state_provider.basic_account(&address)?;
236 entry.insert((old_info, new_info, HashMap::default()));
237 }
238 hash_map::Entry::Occupied(mut entry) => {
239 entry.get_mut().0 = old_info;
241 }
242 }
243 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
245 }
246
247 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
249 let BlockNumberAddress((block_number, address)) = block_and_address;
250 let account_state = match state.entry(address) {
252 hash_map::Entry::Vacant(entry) => {
253 let present_info = state_provider.basic_account(&address)?;
254 entry.insert((present_info, present_info, HashMap::default()))
255 }
256 hash_map::Entry::Occupied(entry) => entry.into_mut(),
257 };
258
259 match account_state.2.entry(old_storage.key) {
261 hash_map::Entry::Vacant(entry) => {
262 let new_storage_value =
263 state_provider.storage(address, old_storage.key)?.unwrap_or_default();
264 entry.insert((
265 (old_storage.value, old_storage.is_private),
266 (new_storage_value.value, old_storage.is_private),
267 ));
268 }
269 hash_map::Entry::Occupied(mut entry) => {
270 entry.get_mut().0 = (old_storage.value, old_storage.is_private);
271 }
272 };
273
274 reverts
275 .entry(block_number)
276 .or_default()
277 .entry(address)
278 .or_default()
279 .1
280 .push(old_storage);
281 }
282
283 Ok((state, reverts))
284 }
285
286 fn get_in_memory_or_storage_by_block_range_while<T, F, G, P>(
298 &self,
299 range: impl RangeBounds<BlockNumber>,
300 fetch_db_range: F,
301 map_block_state_item: G,
302 mut predicate: P,
303 ) -> ProviderResult<Vec<T>>
304 where
305 F: FnOnce(
306 &DatabaseProviderRO<N::DB, N>,
307 RangeInclusive<BlockNumber>,
308 &mut P,
309 ) -> ProviderResult<Vec<T>>,
310 G: Fn(&BlockState<N::Primitives>, &mut P) -> Option<T>,
311 P: FnMut(&T) -> bool,
312 {
313 let mut in_memory_chain =
321 self.head_block.as_ref().map(|b| b.chain().collect::<Vec<_>>()).unwrap_or_default();
322 let db_provider = &self.storage_provider;
323
324 let (start, end) = self.convert_range_bounds(range, || {
325 in_memory_chain
327 .first()
328 .map(|b| b.number())
329 .unwrap_or_else(|| db_provider.last_block_number().unwrap_or_default())
330 });
331
332 if start > end {
333 return Ok(vec![])
334 }
335
336 let (in_memory, storage_range) = match in_memory_chain.last().as_ref().map(|b| b.number()) {
341 Some(lowest_memory_block) if lowest_memory_block <= end => {
342 let highest_memory_block =
343 in_memory_chain.first().as_ref().map(|b| b.number()).expect("qed");
344
345 let in_memory_range =
349 lowest_memory_block.max(start)..=end.min(highest_memory_block);
350
351 in_memory_chain.truncate(
354 in_memory_chain
355 .len()
356 .saturating_sub(start.saturating_sub(lowest_memory_block) as usize),
357 );
358
359 let storage_range =
360 (lowest_memory_block > start).then(|| start..=lowest_memory_block - 1);
361
362 (Some((in_memory_chain, in_memory_range)), storage_range)
363 }
364 _ => {
365 drop(in_memory_chain);
367
368 (None, Some(start..=end))
369 }
370 };
371
372 let mut items = Vec::with_capacity((end - start + 1) as usize);
373
374 if let Some(storage_range) = storage_range {
375 let mut db_items = fetch_db_range(db_provider, storage_range.clone(), &mut predicate)?;
376 items.append(&mut db_items);
377
378 if items.len() as u64 != storage_range.end() - storage_range.start() + 1 {
381 return Ok(items)
382 }
383 }
384
385 if let Some((in_memory_chain, in_memory_range)) = in_memory {
386 for (num, block) in in_memory_range.zip(in_memory_chain.into_iter().rev()) {
387 debug_assert!(num == block.number());
388 if let Some(item) = map_block_state_item(block, &mut predicate) {
389 items.push(item);
390 } else {
391 break
392 }
393 }
394 }
395
396 Ok(items)
397 }
398
399 fn block_state_provider_ref(
401 &self,
402 state: &BlockState<N::Primitives>,
403 ) -> ProviderResult<MemoryOverlayStateProviderRef<'_, N::Primitives>> {
404 let anchor_hash = state.anchor().hash;
405 let latest_historical = self.history_by_block_hash_ref(anchor_hash)?;
406 let in_memory = state.chain().map(|block_state| block_state.block()).collect();
407 Ok(MemoryOverlayStateProviderRef::new(latest_historical, in_memory))
408 }
409
410 fn get_in_memory_or_storage_by_tx_range<S, M, R>(
416 &self,
417 range: impl RangeBounds<BlockNumber>,
418 fetch_from_db: S,
419 fetch_from_block_state: M,
420 ) -> ProviderResult<Vec<R>>
421 where
422 S: FnOnce(
423 &DatabaseProviderRO<N::DB, N>,
424 RangeInclusive<TxNumber>,
425 ) -> ProviderResult<Vec<R>>,
426 M: Fn(RangeInclusive<usize>, &BlockState<N::Primitives>) -> ProviderResult<Vec<R>>,
427 {
428 let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
429 let provider = &self.storage_provider;
430
431 let last_database_block_number = in_mem_chain
434 .last()
435 .map(|b| Ok(b.anchor().number))
436 .unwrap_or_else(|| provider.last_block_number())?;
437
438 let last_block_body_index = provider
441 .block_body_indices(last_database_block_number)?
442 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
443 let mut in_memory_tx_num = last_block_body_index.next_tx_num();
444
445 let (start, end) = self.convert_range_bounds(range, || {
446 in_mem_chain
447 .iter()
448 .map(|b| b.block_ref().recovered_block().body().transactions().len() as u64)
449 .sum::<u64>() +
450 last_block_body_index.last_tx_num()
451 });
452
453 if start > end {
454 return Ok(vec![])
455 }
456
457 let mut tx_range = start..=end;
458
459 if *tx_range.end() < in_memory_tx_num {
462 return fetch_from_db(provider, tx_range);
463 }
464
465 let mut items = Vec::with_capacity((tx_range.end() - tx_range.start() + 1) as usize);
466
467 if *tx_range.start() < in_memory_tx_num {
469 let db_range = *tx_range.start()..=in_memory_tx_num.saturating_sub(1);
471
472 tx_range = in_memory_tx_num..=*tx_range.end();
474
475 items.extend(fetch_from_db(provider, db_range)?);
476 }
477
478 for block_state in in_mem_chain.iter().rev() {
480 let block_tx_count =
481 block_state.block_ref().recovered_block().body().transactions().len();
482 let remaining = (tx_range.end() - tx_range.start() + 1) as usize;
483
484 if *tx_range.start() >= in_memory_tx_num + block_tx_count as u64 {
487 in_memory_tx_num += block_tx_count as u64;
488 continue
489 }
490
491 let skip = (tx_range.start() - in_memory_tx_num) as usize;
493
494 items.extend(fetch_from_block_state(
495 skip..=skip + (remaining.min(block_tx_count - skip) - 1),
496 block_state,
497 )?);
498
499 in_memory_tx_num += block_tx_count as u64;
500
501 if in_memory_tx_num > *tx_range.end() {
503 break
504 }
505
506 tx_range = in_memory_tx_num..=*tx_range.end();
508 }
509
510 Ok(items)
511 }
512
513 fn get_in_memory_or_storage_by_tx<S, M, R>(
516 &self,
517 id: HashOrNumber,
518 fetch_from_db: S,
519 fetch_from_block_state: M,
520 ) -> ProviderResult<Option<R>>
521 where
522 S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<Option<R>>,
523 M: Fn(usize, TxNumber, &BlockState<N::Primitives>) -> ProviderResult<Option<R>>,
524 {
525 let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
526 let provider = &self.storage_provider;
527
528 let last_database_block_number = in_mem_chain
531 .last()
532 .map(|b| Ok(b.anchor().number))
533 .unwrap_or_else(|| provider.last_block_number())?;
534
535 let last_block_body_index = provider
538 .block_body_indices(last_database_block_number)?
539 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
540 let mut in_memory_tx_num = last_block_body_index.next_tx_num();
541
542 if let HashOrNumber::Number(id) = id {
545 if id < in_memory_tx_num {
546 return fetch_from_db(provider)
547 }
548 }
549
550 for block_state in in_mem_chain.iter().rev() {
552 let executed_block = block_state.block_ref();
553 let block = executed_block.recovered_block();
554
555 for tx_index in 0..block.body().transactions().len() {
556 match id {
557 HashOrNumber::Hash(tx_hash) => {
558 if tx_hash == block.body().transactions()[tx_index].trie_hash() {
559 return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
560 }
561 }
562 HashOrNumber::Number(id) => {
563 if id == in_memory_tx_num {
564 return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
565 }
566 }
567 }
568
569 in_memory_tx_num += 1;
570 }
571 }
572
573 if let HashOrNumber::Hash(_) = id {
575 return fetch_from_db(provider)
576 }
577
578 Ok(None)
579 }
580
581 pub(crate) fn get_in_memory_or_storage_by_block<S, M, R>(
583 &self,
584 id: BlockHashOrNumber,
585 fetch_from_db: S,
586 fetch_from_block_state: M,
587 ) -> ProviderResult<R>
588 where
589 S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<R>,
590 M: Fn(&BlockState<N::Primitives>) -> ProviderResult<R>,
591 {
592 if let Some(Some(block_state)) = self.head_block.as_ref().map(|b| b.block_on_chain(id)) {
593 return fetch_from_block_state(block_state)
594 }
595 fetch_from_db(&self.storage_provider)
596 }
597}
598
599impl<N: ProviderNodeTypes> ConsistentProvider<N> {
600 #[inline]
609 pub(crate) fn ensure_canonical_block(&self, block_number: BlockNumber) -> ProviderResult<()> {
610 let latest = self.best_block_number()?;
611 if block_number > latest {
612 Err(ProviderError::HeaderNotFound(block_number.into()))
613 } else {
614 Ok(())
615 }
616 }
617}
618
619impl<N: ProviderNodeTypes> NodePrimitivesProvider for ConsistentProvider<N> {
620 type Primitives = N::Primitives;
621}
622
623impl<N: ProviderNodeTypes> StaticFileProviderFactory for ConsistentProvider<N> {
624 fn static_file_provider(&self) -> StaticFileProvider<N::Primitives> {
625 self.storage_provider.static_file_provider()
626 }
627}
628
629impl<N: ProviderNodeTypes> HeaderProvider for ConsistentProvider<N> {
630 type Header = HeaderTy<N>;
631
632 fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
633 self.get_in_memory_or_storage_by_block(
634 (*block_hash).into(),
635 |db_provider| db_provider.header(block_hash),
636 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
637 )
638 }
639
640 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
641 self.get_in_memory_or_storage_by_block(
642 num.into(),
643 |db_provider| db_provider.header_by_number(num),
644 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())),
645 )
646 }
647
648 fn header_td(&self, hash: &BlockHash) -> ProviderResult<Option<U256>> {
649 if let Some(num) = self.block_number(*hash)? {
650 self.header_td_by_number(num)
651 } else {
652 Ok(None)
653 }
654 }
655
656 fn header_td_by_number(&self, number: BlockNumber) -> ProviderResult<Option<U256>> {
657 let number = if self.head_block.as_ref().map(|b| b.block_on_chain(number.into())).is_some()
658 {
659 if let Some(last_finalized_num_hash) =
666 self.canonical_in_memory_state.get_finalized_num_hash()
667 {
668 last_finalized_num_hash.number
669 } else {
670 self.last_block_number()?
671 }
672 } else {
673 number
675 };
676 self.storage_provider.header_td_by_number(number)
677 }
678
679 fn headers_range(
680 &self,
681 range: impl RangeBounds<BlockNumber>,
682 ) -> ProviderResult<Vec<Self::Header>> {
683 self.get_in_memory_or_storage_by_block_range_while(
684 range,
685 |db_provider, range, _| db_provider.headers_range(range),
686 |block_state, _| Some(block_state.block_ref().recovered_block().header().clone()),
687 |_| true,
688 )
689 }
690
691 fn sealed_header(
692 &self,
693 number: BlockNumber,
694 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
695 self.get_in_memory_or_storage_by_block(
696 number.into(),
697 |db_provider| db_provider.sealed_header(number),
698 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_sealed_header())),
699 )
700 }
701
702 fn sealed_headers_range(
703 &self,
704 range: impl RangeBounds<BlockNumber>,
705 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
706 self.get_in_memory_or_storage_by_block_range_while(
707 range,
708 |db_provider, range, _| db_provider.sealed_headers_range(range),
709 |block_state, _| Some(block_state.block_ref().recovered_block().clone_sealed_header()),
710 |_| true,
711 )
712 }
713
714 fn sealed_headers_while(
715 &self,
716 range: impl RangeBounds<BlockNumber>,
717 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
718 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
719 self.get_in_memory_or_storage_by_block_range_while(
720 range,
721 |db_provider, range, predicate| db_provider.sealed_headers_while(range, predicate),
722 |block_state, predicate| {
723 let header = block_state.block_ref().recovered_block().sealed_header();
724 predicate(header).then(|| header.clone())
725 },
726 predicate,
727 )
728 }
729}
730
731impl<N: ProviderNodeTypes> BlockHashReader for ConsistentProvider<N> {
732 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
733 self.get_in_memory_or_storage_by_block(
734 number.into(),
735 |db_provider| db_provider.block_hash(number),
736 |block_state| Ok(Some(block_state.hash())),
737 )
738 }
739
740 fn canonical_hashes_range(
741 &self,
742 start: BlockNumber,
743 end: BlockNumber,
744 ) -> ProviderResult<Vec<B256>> {
745 self.get_in_memory_or_storage_by_block_range_while(
746 start..end,
747 |db_provider, inclusive_range, _| {
748 db_provider
749 .canonical_hashes_range(*inclusive_range.start(), *inclusive_range.end() + 1)
750 },
751 |block_state, _| Some(block_state.hash()),
752 |_| true,
753 )
754 }
755}
756
757impl<N: ProviderNodeTypes> BlockNumReader for ConsistentProvider<N> {
758 fn chain_info(&self) -> ProviderResult<ChainInfo> {
759 let best_number = self.best_block_number()?;
760 Ok(ChainInfo { best_hash: self.block_hash(best_number)?.unwrap_or_default(), best_number })
761 }
762
763 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
764 self.head_block.as_ref().map(|b| Ok(b.number())).unwrap_or_else(|| self.last_block_number())
765 }
766
767 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
768 self.storage_provider.last_block_number()
769 }
770
771 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
772 self.get_in_memory_or_storage_by_block(
773 hash.into(),
774 |db_provider| db_provider.block_number(hash),
775 |block_state| Ok(Some(block_state.number())),
776 )
777 }
778}
779
780impl<N: ProviderNodeTypes> BlockIdReader for ConsistentProvider<N> {
781 fn pending_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
782 Ok(self.canonical_in_memory_state.pending_block_num_hash())
783 }
784
785 fn safe_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
786 Ok(self.canonical_in_memory_state.get_safe_num_hash())
787 }
788
789 fn finalized_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
790 Ok(self.canonical_in_memory_state.get_finalized_num_hash())
791 }
792}
793
794impl<N: ProviderNodeTypes> BlockReader for ConsistentProvider<N> {
795 type Block = BlockTy<N>;
796
797 fn find_block_by_hash(
798 &self,
799 hash: B256,
800 source: BlockSource,
801 ) -> ProviderResult<Option<Self::Block>> {
802 if matches!(source, BlockSource::Canonical | BlockSource::Any) {
803 if let Some(block) = self.get_in_memory_or_storage_by_block(
804 hash.into(),
805 |db_provider| db_provider.find_block_by_hash(hash, BlockSource::Canonical),
806 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
807 )? {
808 return Ok(Some(block))
809 }
810 }
811
812 if matches!(source, BlockSource::Pending | BlockSource::Any) {
813 return Ok(self
814 .canonical_in_memory_state
815 .pending_block()
816 .filter(|b| b.hash() == hash)
817 .map(|b| b.into_block()))
818 }
819
820 Ok(None)
821 }
822
823 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
824 self.get_in_memory_or_storage_by_block(
825 id,
826 |db_provider| db_provider.block(id),
827 |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_block())),
828 )
829 }
830
831 fn pending_block(&self) -> ProviderResult<Option<SealedBlock<Self::Block>>> {
832 Ok(self.canonical_in_memory_state.pending_block())
833 }
834
835 fn pending_block_with_senders(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
836 Ok(self.canonical_in_memory_state.pending_recovered_block())
837 }
838
839 fn pending_block_and_receipts(
840 &self,
841 ) -> ProviderResult<Option<(SealedBlock<Self::Block>, Vec<Self::Receipt>)>> {
842 Ok(self.canonical_in_memory_state.pending_block_and_receipts())
843 }
844
845 fn recovered_block(
852 &self,
853 id: BlockHashOrNumber,
854 transaction_kind: TransactionVariant,
855 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
856 self.get_in_memory_or_storage_by_block(
857 id,
858 |db_provider| db_provider.recovered_block(id, transaction_kind),
859 |block_state| Ok(Some(block_state.block().recovered_block().clone())),
860 )
861 }
862
863 fn sealed_block_with_senders(
864 &self,
865 id: BlockHashOrNumber,
866 transaction_kind: TransactionVariant,
867 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
868 self.get_in_memory_or_storage_by_block(
869 id,
870 |db_provider| db_provider.sealed_block_with_senders(id, transaction_kind),
871 |block_state| Ok(Some(block_state.block().recovered_block().clone())),
872 )
873 }
874
875 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
876 self.get_in_memory_or_storage_by_block_range_while(
877 range,
878 |db_provider, range, _| db_provider.block_range(range),
879 |block_state, _| Some(block_state.block_ref().recovered_block().clone_block()),
880 |_| true,
881 )
882 }
883
884 fn block_with_senders_range(
885 &self,
886 range: RangeInclusive<BlockNumber>,
887 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
888 self.get_in_memory_or_storage_by_block_range_while(
889 range,
890 |db_provider, range, _| db_provider.block_with_senders_range(range),
891 |block_state, _| Some(block_state.block().recovered_block().clone()),
892 |_| true,
893 )
894 }
895
896 fn recovered_block_range(
897 &self,
898 range: RangeInclusive<BlockNumber>,
899 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
900 self.get_in_memory_or_storage_by_block_range_while(
901 range,
902 |db_provider, range, _| db_provider.recovered_block_range(range),
903 |block_state, _| Some(block_state.block().recovered_block().clone()),
904 |_| true,
905 )
906 }
907}
908
909impl<N: ProviderNodeTypes> TransactionsProvider for ConsistentProvider<N> {
910 type Transaction = TxTy<N>;
911
912 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
913 self.get_in_memory_or_storage_by_tx(
914 tx_hash.into(),
915 |db_provider| db_provider.transaction_id(tx_hash),
916 |_, tx_number, _| Ok(Some(tx_number)),
917 )
918 }
919
920 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
921 self.get_in_memory_or_storage_by_tx(
922 id.into(),
923 |provider| provider.transaction_by_id(id),
924 |tx_index, _, block_state| {
925 Ok(block_state
926 .block_ref()
927 .recovered_block()
928 .body()
929 .transactions()
930 .get(tx_index)
931 .cloned())
932 },
933 )
934 }
935
936 fn transaction_by_id_unhashed(
937 &self,
938 id: TxNumber,
939 ) -> ProviderResult<Option<Self::Transaction>> {
940 self.get_in_memory_or_storage_by_tx(
941 id.into(),
942 |provider| provider.transaction_by_id_unhashed(id),
943 |tx_index, _, block_state| {
944 Ok(block_state
945 .block_ref()
946 .recovered_block()
947 .body()
948 .transactions()
949 .get(tx_index)
950 .cloned())
951 },
952 )
953 }
954
955 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
956 if let Some(tx) = self.head_block.as_ref().and_then(|b| b.transaction_on_chain(hash)) {
957 return Ok(Some(tx))
958 }
959
960 self.storage_provider.transaction_by_hash(hash)
961 }
962
963 fn transaction_by_hash_with_meta(
964 &self,
965 tx_hash: TxHash,
966 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
967 if let Some((tx, meta)) =
968 self.head_block.as_ref().and_then(|b| b.transaction_meta_on_chain(tx_hash))
969 {
970 return Ok(Some((tx, meta)))
971 }
972
973 self.storage_provider.transaction_by_hash_with_meta(tx_hash)
974 }
975
976 fn transaction_block(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
977 self.get_in_memory_or_storage_by_tx(
978 id.into(),
979 |provider| provider.transaction_block(id),
980 |_, _, block_state| Ok(Some(block_state.block_ref().recovered_block().number())),
981 )
982 }
983
984 fn transactions_by_block(
985 &self,
986 id: BlockHashOrNumber,
987 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
988 self.get_in_memory_or_storage_by_block(
989 id,
990 |provider| provider.transactions_by_block(id),
991 |block_state| {
992 Ok(Some(block_state.block_ref().recovered_block().body().transactions().to_vec()))
993 },
994 )
995 }
996
997 fn transactions_by_block_range(
998 &self,
999 range: impl RangeBounds<BlockNumber>,
1000 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1001 self.get_in_memory_or_storage_by_block_range_while(
1002 range,
1003 |db_provider, range, _| db_provider.transactions_by_block_range(range),
1004 |block_state, _| {
1005 Some(block_state.block_ref().recovered_block().body().transactions().to_vec())
1006 },
1007 |_| true,
1008 )
1009 }
1010
1011 fn transactions_by_tx_range(
1012 &self,
1013 range: impl RangeBounds<TxNumber>,
1014 ) -> ProviderResult<Vec<Self::Transaction>> {
1015 self.get_in_memory_or_storage_by_tx_range(
1016 range,
1017 |db_provider, db_range| db_provider.transactions_by_tx_range(db_range),
1018 |index_range, block_state| {
1019 Ok(block_state.block_ref().recovered_block().body().transactions()[index_range]
1020 .to_vec())
1021 },
1022 )
1023 }
1024
1025 fn senders_by_tx_range(
1026 &self,
1027 range: impl RangeBounds<TxNumber>,
1028 ) -> ProviderResult<Vec<Address>> {
1029 self.get_in_memory_or_storage_by_tx_range(
1030 range,
1031 |db_provider, db_range| db_provider.senders_by_tx_range(db_range),
1032 |index_range, block_state| {
1033 Ok(block_state.block_ref().recovered_block.senders()[index_range].to_vec())
1034 },
1035 )
1036 }
1037
1038 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1039 self.get_in_memory_or_storage_by_tx(
1040 id.into(),
1041 |provider| provider.transaction_sender(id),
1042 |tx_index, _, block_state| {
1043 Ok(block_state.block_ref().recovered_block.senders().get(tx_index).copied())
1044 },
1045 )
1046 }
1047}
1048
1049impl<N: ProviderNodeTypes> ReceiptProvider for ConsistentProvider<N> {
1050 type Receipt = ReceiptTy<N>;
1051
1052 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1053 self.get_in_memory_or_storage_by_tx(
1054 id.into(),
1055 |provider| provider.receipt(id),
1056 |tx_index, _, block_state| {
1057 Ok(block_state.executed_block_receipts().get(tx_index).cloned())
1058 },
1059 )
1060 }
1061
1062 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1063 for block_state in self.head_block.iter().flat_map(|b| b.chain()) {
1064 let executed_block = block_state.block_ref();
1065 let block = executed_block.recovered_block();
1066 let receipts = block_state.executed_block_receipts();
1067
1068 debug_assert_eq!(
1070 block.body().transactions().len(),
1071 receipts.len(),
1072 "Mismatch between transaction and receipt count"
1073 );
1074
1075 if let Some(tx_index) =
1076 block.body().transactions_iter().position(|tx| tx.trie_hash() == hash)
1077 {
1078 return Ok(receipts.get(tx_index).cloned());
1080 }
1081 }
1082
1083 self.storage_provider.receipt_by_hash(hash)
1084 }
1085
1086 fn receipts_by_block(
1087 &self,
1088 block: BlockHashOrNumber,
1089 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1090 self.get_in_memory_or_storage_by_block(
1091 block,
1092 |db_provider| db_provider.receipts_by_block(block),
1093 |block_state| Ok(Some(block_state.executed_block_receipts())),
1094 )
1095 }
1096
1097 fn receipts_by_tx_range(
1098 &self,
1099 range: impl RangeBounds<TxNumber>,
1100 ) -> ProviderResult<Vec<Self::Receipt>> {
1101 self.get_in_memory_or_storage_by_tx_range(
1102 range,
1103 |db_provider, db_range| db_provider.receipts_by_tx_range(db_range),
1104 |index_range, block_state| {
1105 Ok(block_state.executed_block_receipts().drain(index_range).collect())
1106 },
1107 )
1108 }
1109}
1110
1111impl<N: ProviderNodeTypes> ReceiptProviderIdExt for ConsistentProvider<N> {
1112 fn receipts_by_block_id(&self, block: BlockId) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1113 match block {
1114 BlockId::Hash(rpc_block_hash) => {
1115 let mut receipts = self.receipts_by_block(rpc_block_hash.block_hash.into())?;
1116 if receipts.is_none() && !rpc_block_hash.require_canonical.unwrap_or(false) {
1117 if let Some(state) = self
1118 .head_block
1119 .as_ref()
1120 .and_then(|b| b.block_on_chain(rpc_block_hash.block_hash.into()))
1121 {
1122 receipts = Some(state.executed_block_receipts());
1123 }
1124 }
1125 Ok(receipts)
1126 }
1127 BlockId::Number(num_tag) => match num_tag {
1128 BlockNumberOrTag::Pending => Ok(self
1129 .canonical_in_memory_state
1130 .pending_state()
1131 .map(|block_state| block_state.executed_block_receipts())),
1132 _ => {
1133 if let Some(num) = self.convert_block_number(num_tag)? {
1134 self.receipts_by_block(num.into())
1135 } else {
1136 Ok(None)
1137 }
1138 }
1139 },
1140 }
1141 }
1142}
1143
1144impl<N: ProviderNodeTypes> WithdrawalsProvider for ConsistentProvider<N> {
1145 fn withdrawals_by_block(
1146 &self,
1147 id: BlockHashOrNumber,
1148 timestamp: u64,
1149 ) -> ProviderResult<Option<Withdrawals>> {
1150 if !self.chain_spec().is_shanghai_active_at_timestamp(timestamp) {
1151 return Ok(None)
1152 }
1153
1154 self.get_in_memory_or_storage_by_block(
1155 id,
1156 |db_provider| db_provider.withdrawals_by_block(id, timestamp),
1157 |block_state| {
1158 Ok(block_state.block_ref().recovered_block().body().withdrawals().cloned())
1159 },
1160 )
1161 }
1162}
1163
1164impl<N: ProviderNodeTypes> OmmersProvider for ConsistentProvider<N> {
1165 fn ommers(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Vec<HeaderTy<N>>>> {
1166 self.get_in_memory_or_storage_by_block(
1167 id,
1168 |db_provider| db_provider.ommers(id),
1169 |block_state| {
1170 if self.chain_spec().is_paris_active_at_block(block_state.number()) {
1171 return Ok(Some(Vec::new()))
1172 }
1173
1174 Ok(block_state.block_ref().recovered_block().body().ommers().map(|o| o.to_vec()))
1175 },
1176 )
1177 }
1178}
1179
1180impl<N: ProviderNodeTypes> BlockBodyIndicesProvider for ConsistentProvider<N> {
1181 fn block_body_indices(
1182 &self,
1183 number: BlockNumber,
1184 ) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1185 self.get_in_memory_or_storage_by_block(
1186 number.into(),
1187 |db_provider| db_provider.block_body_indices(number),
1188 |block_state| {
1189 let last_storage_block_number = block_state.anchor().number;
1191 let mut stored_indices = self
1192 .storage_provider
1193 .block_body_indices(last_storage_block_number)?
1194 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_storage_block_number))?;
1195
1196 stored_indices.first_tx_num = stored_indices.next_tx_num();
1198 stored_indices.tx_count = 0;
1199
1200 for state in block_state.chain().collect::<Vec<_>>().into_iter().rev() {
1202 let block_tx_count =
1203 state.block_ref().recovered_block().body().transactions().len() as u64;
1204 if state.block_ref().recovered_block().number() == number {
1205 stored_indices.tx_count = block_tx_count;
1206 } else {
1207 stored_indices.first_tx_num += block_tx_count;
1208 }
1209 }
1210
1211 Ok(Some(stored_indices))
1212 },
1213 )
1214 }
1215
1216 fn block_body_indices_range(
1217 &self,
1218 range: RangeInclusive<BlockNumber>,
1219 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1220 range.map_while(|b| self.block_body_indices(b).transpose()).collect()
1221 }
1222}
1223
1224impl<N: ProviderNodeTypes> StageCheckpointReader for ConsistentProvider<N> {
1225 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1226 self.storage_provider.get_stage_checkpoint(id)
1227 }
1228
1229 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1230 self.storage_provider.get_stage_checkpoint_progress(id)
1231 }
1232
1233 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1234 self.storage_provider.get_all_checkpoints()
1235 }
1236}
1237
1238impl<N: ProviderNodeTypes> PruneCheckpointReader for ConsistentProvider<N> {
1239 fn get_prune_checkpoint(
1240 &self,
1241 segment: PruneSegment,
1242 ) -> ProviderResult<Option<PruneCheckpoint>> {
1243 self.storage_provider.get_prune_checkpoint(segment)
1244 }
1245
1246 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
1247 self.storage_provider.get_prune_checkpoints()
1248 }
1249}
1250
1251impl<N: ProviderNodeTypes> ChainSpecProvider for ConsistentProvider<N> {
1252 type ChainSpec = N::ChainSpec;
1253
1254 fn chain_spec(&self) -> Arc<N::ChainSpec> {
1255 ChainSpecProvider::chain_spec(&self.storage_provider)
1256 }
1257}
1258
1259impl<N: ProviderNodeTypes> BlockReaderIdExt for ConsistentProvider<N> {
1260 fn block_by_id(&self, id: BlockId) -> ProviderResult<Option<Self::Block>> {
1261 match id {
1262 BlockId::Number(num) => self.block_by_number_or_tag(num),
1263 BlockId::Hash(hash) => {
1264 if Some(true) == hash.require_canonical {
1269 self.find_block_by_hash(hash.block_hash, BlockSource::Canonical)
1271 } else {
1272 self.block_by_hash(hash.block_hash)
1273 }
1274 }
1275 }
1276 }
1277
1278 fn header_by_number_or_tag(&self, id: BlockNumberOrTag) -> ProviderResult<Option<HeaderTy<N>>> {
1279 Ok(match id {
1280 BlockNumberOrTag::Latest => {
1281 Some(self.canonical_in_memory_state.get_canonical_head().unseal())
1282 }
1283 BlockNumberOrTag::Finalized => {
1284 self.canonical_in_memory_state.get_finalized_header().map(|h| h.unseal())
1285 }
1286 BlockNumberOrTag::Safe => {
1287 self.canonical_in_memory_state.get_safe_header().map(|h| h.unseal())
1288 }
1289 BlockNumberOrTag::Earliest => self.header_by_number(0)?,
1290 BlockNumberOrTag::Pending => self.canonical_in_memory_state.pending_header(),
1291
1292 BlockNumberOrTag::Number(num) => self.header_by_number(num)?,
1293 })
1294 }
1295
1296 fn sealed_header_by_number_or_tag(
1297 &self,
1298 id: BlockNumberOrTag,
1299 ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1300 match id {
1301 BlockNumberOrTag::Latest => {
1302 Ok(Some(self.canonical_in_memory_state.get_canonical_head()))
1303 }
1304 BlockNumberOrTag::Finalized => {
1305 Ok(self.canonical_in_memory_state.get_finalized_header())
1306 }
1307 BlockNumberOrTag::Safe => Ok(self.canonical_in_memory_state.get_safe_header()),
1308 BlockNumberOrTag::Earliest => self
1309 .header_by_number(0)?
1310 .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1311 BlockNumberOrTag::Pending => Ok(self.canonical_in_memory_state.pending_sealed_header()),
1312 BlockNumberOrTag::Number(num) => self
1313 .header_by_number(num)?
1314 .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal_slow(h)))),
1315 }
1316 }
1317
1318 fn sealed_header_by_id(
1319 &self,
1320 id: BlockId,
1321 ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1322 Ok(match id {
1323 BlockId::Number(num) => self.sealed_header_by_number_or_tag(num)?,
1324 BlockId::Hash(hash) => self.header(&hash.block_hash)?.map(SealedHeader::seal_slow),
1325 })
1326 }
1327
1328 fn header_by_id(&self, id: BlockId) -> ProviderResult<Option<HeaderTy<N>>> {
1329 Ok(match id {
1330 BlockId::Number(num) => self.header_by_number_or_tag(num)?,
1331 BlockId::Hash(hash) => self.header(&hash.block_hash)?,
1332 })
1333 }
1334
1335 fn ommers_by_id(&self, id: BlockId) -> ProviderResult<Option<Vec<HeaderTy<N>>>> {
1336 match id {
1337 BlockId::Number(num) => self.ommers_by_number_or_tag(num),
1338 BlockId::Hash(hash) => {
1339 self.ommers(BlockHashOrNumber::Hash(hash.block_hash))
1342 }
1343 }
1344 }
1345}
1346
1347impl<N: ProviderNodeTypes> StorageChangeSetReader for ConsistentProvider<N> {
1348 fn storage_changeset(
1349 &self,
1350 block_number: BlockNumber,
1351 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1352 if let Some(state) =
1353 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1354 {
1355 let changesets = state
1356 .block()
1357 .execution_output
1358 .bundle
1359 .reverts
1360 .clone()
1361 .to_plain_state_reverts()
1362 .storage
1363 .into_iter()
1364 .flatten()
1365 .flat_map(|revert: PlainStorageRevert| {
1366 revert.storage_revert.into_iter().map(move |(key, value)| {
1367 (
1368 BlockNumberAddress((block_number, revert.address)),
1369 StorageEntry {
1370 key: key.into(),
1371 value: value.to_previous_value().value,
1372 is_private: value.to_previous_value().is_private,
1373 },
1374 )
1375 })
1376 })
1377 .collect();
1378 Ok(changesets)
1379 } else {
1380 let storage_history_exists = self
1384 .storage_provider
1385 .get_prune_checkpoint(PruneSegment::StorageHistory)?
1386 .and_then(|checkpoint| {
1387 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1392 })
1393 .unwrap_or(true);
1394
1395 if !storage_history_exists {
1396 return Err(ProviderError::StateAtBlockPruned(block_number))
1397 }
1398
1399 self.storage_provider.storage_changeset(block_number)
1400 }
1401 }
1402}
1403
1404impl<N: ProviderNodeTypes> ChangeSetReader for ConsistentProvider<N> {
1405 fn account_block_changeset(
1406 &self,
1407 block_number: BlockNumber,
1408 ) -> ProviderResult<Vec<AccountBeforeTx>> {
1409 if let Some(state) =
1410 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1411 {
1412 let changesets = state
1413 .block_ref()
1414 .execution_output
1415 .bundle
1416 .reverts
1417 .clone()
1418 .to_plain_state_reverts()
1419 .accounts
1420 .into_iter()
1421 .flatten()
1422 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
1423 .collect();
1424 Ok(changesets)
1425 } else {
1426 let account_history_exists = self
1430 .storage_provider
1431 .get_prune_checkpoint(PruneSegment::AccountHistory)?
1432 .and_then(|checkpoint| {
1433 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1438 })
1439 .unwrap_or(true);
1440
1441 if !account_history_exists {
1442 return Err(ProviderError::StateAtBlockPruned(block_number))
1443 }
1444
1445 self.storage_provider.account_block_changeset(block_number)
1446 }
1447 }
1448}
1449
1450impl<N: ProviderNodeTypes> AccountReader for ConsistentProvider<N> {
1451 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
1453 let state_provider = self.latest_ref()?;
1455 state_provider.basic_account(address)
1456 }
1457}
1458
1459impl<N: ProviderNodeTypes> StateReader for ConsistentProvider<N> {
1460 type Receipt = ReceiptTy<N>;
1461
1462 fn get_state(
1472 &self,
1473 block: BlockNumber,
1474 ) -> ProviderResult<Option<ExecutionOutcome<Self::Receipt>>> {
1475 if let Some(state) = self.head_block.as_ref().and_then(|b| b.block_on_chain(block.into())) {
1476 let state = state.block_ref().execution_outcome().clone();
1477 Ok(Some(state))
1478 } else {
1479 Self::get_state(self, block..=block)
1480 }
1481 }
1482}
1483
1484#[cfg(test)]
1485mod tests {
1486 use crate::{
1487 providers::blockchain_provider::BlockchainProvider,
1488 test_utils::create_test_provider_factory, BlockWriter,
1489 };
1490 use alloy_eips::BlockHashOrNumber;
1491 use alloy_primitives::B256;
1492 use itertools::Itertools;
1493 use rand::Rng;
1494 use reth_chain_state::{ExecutedBlock, ExecutedBlockWithTrieUpdates, NewCanonicalChain};
1495 use reth_db_api::models::AccountBeforeTx;
1496 use reth_ethereum_primitives::Block;
1497 use reth_execution_types::ExecutionOutcome;
1498 use reth_primitives_traits::{RecoveredBlock, SealedBlock};
1499 use reth_storage_api::{BlockReader, BlockSource, ChangeSetReader};
1500 use reth_testing_utils::generators::{
1501 self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
1502 };
1503 use revm_database::BundleState;
1504 use std::{
1505 ops::{Bound, Range, RangeBounds},
1506 sync::Arc,
1507 };
1508
1509 const TEST_BLOCKS_COUNT: usize = 5;
1510
1511 fn random_blocks(
1512 rng: &mut impl Rng,
1513 database_blocks: usize,
1514 in_memory_blocks: usize,
1515 requests_count: Option<Range<u8>>,
1516 withdrawals_count: Option<Range<u8>>,
1517 tx_count: impl RangeBounds<u8>,
1518 ) -> (Vec<SealedBlock<Block>>, Vec<SealedBlock<Block>>) {
1519 let block_range = (database_blocks + in_memory_blocks - 1) as u64;
1520
1521 let tx_start = match tx_count.start_bound() {
1522 Bound::Included(&n) | Bound::Excluded(&n) => n,
1523 Bound::Unbounded => u8::MIN,
1524 };
1525 let tx_end = match tx_count.end_bound() {
1526 Bound::Included(&n) | Bound::Excluded(&n) => n + 1,
1527 Bound::Unbounded => u8::MAX,
1528 };
1529
1530 let blocks = random_block_range(
1531 rng,
1532 0..=block_range,
1533 BlockRangeParams {
1534 parent: Some(B256::ZERO),
1535 tx_count: tx_start..tx_end,
1536 requests_count,
1537 withdrawals_count,
1538 },
1539 );
1540 let (database_blocks, in_memory_blocks) = blocks.split_at(database_blocks);
1541 (database_blocks.to_vec(), in_memory_blocks.to_vec())
1542 }
1543
1544 #[test]
1545 fn test_block_reader_find_block_by_hash() -> eyre::Result<()> {
1546 let mut rng = generators::rng();
1548 let factory = create_test_provider_factory();
1549
1550 let blocks = random_block_range(
1552 &mut rng,
1553 0..=10,
1554 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1555 );
1556 let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1557
1558 let provider_rw = factory.provider_rw()?;
1560 for block in database_blocks {
1561 provider_rw.insert_historical_block(
1562 block.clone().try_recover().expect("failed to seal block with senders"),
1563 )?;
1564 }
1565 provider_rw.commit()?;
1566
1567 let provider = BlockchainProvider::new(factory)?;
1569 let consistent_provider = provider.consistent_provider()?;
1570
1571 let first_db_block = database_blocks.first().unwrap();
1573 let first_in_mem_block = in_memory_blocks.first().unwrap();
1574 let last_in_mem_block = in_memory_blocks.last().unwrap();
1575
1576 assert_eq!(
1578 consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1579 None
1580 );
1581 assert_eq!(
1582 consistent_provider
1583 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1584 None
1585 );
1586 assert_eq!(
1588 consistent_provider
1589 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Pending)?,
1590 None
1591 );
1592
1593 let in_memory_block_senders =
1595 first_in_mem_block.senders().expect("failed to recover senders");
1596 let chain = NewCanonicalChain::Commit {
1597 new: vec![ExecutedBlockWithTrieUpdates::new(
1598 Arc::new(RecoveredBlock::new_sealed(
1599 first_in_mem_block.clone(),
1600 in_memory_block_senders,
1601 )),
1602 Default::default(),
1603 Default::default(),
1604 Default::default(),
1605 )],
1606 };
1607 consistent_provider.canonical_in_memory_state.update_chain(chain);
1608 let consistent_provider = provider.consistent_provider()?;
1609
1610 assert_eq!(
1612 consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1613 Some(first_in_mem_block.clone().into_block())
1614 );
1615 assert_eq!(
1616 consistent_provider
1617 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1618 Some(first_in_mem_block.clone().into_block())
1619 );
1620
1621 assert_eq!(
1623 consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Any)?,
1624 Some(first_db_block.clone().into_block())
1625 );
1626 assert_eq!(
1627 consistent_provider
1628 .find_block_by_hash(first_db_block.hash(), BlockSource::Canonical)?,
1629 Some(first_db_block.clone().into_block())
1630 );
1631
1632 assert_eq!(
1634 consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Pending)?,
1635 None
1636 );
1637
1638 provider.canonical_in_memory_state.set_pending_block(ExecutedBlockWithTrieUpdates {
1640 block: ExecutedBlock {
1641 recovered_block: Arc::new(RecoveredBlock::new_sealed(
1642 last_in_mem_block.clone(),
1643 Default::default(),
1644 )),
1645 execution_output: Default::default(),
1646 hashed_state: Default::default(),
1647 },
1648 trie: Default::default(),
1649 });
1650
1651 assert_eq!(
1653 consistent_provider
1654 .find_block_by_hash(last_in_mem_block.hash(), BlockSource::Pending)?,
1655 Some(last_in_mem_block.clone_block())
1656 );
1657
1658 Ok(())
1659 }
1660
1661 #[test]
1662 fn test_block_reader_block() -> eyre::Result<()> {
1663 let mut rng = generators::rng();
1665 let factory = create_test_provider_factory();
1666
1667 let blocks = random_block_range(
1669 &mut rng,
1670 0..=10,
1671 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1672 );
1673 let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1674
1675 let provider_rw = factory.provider_rw()?;
1677 for block in database_blocks {
1678 provider_rw.insert_historical_block(
1679 block.clone().try_recover().expect("failed to seal block with senders"),
1680 )?;
1681 }
1682 provider_rw.commit()?;
1683
1684 let provider = BlockchainProvider::new(factory)?;
1686 let consistent_provider = provider.consistent_provider()?;
1687
1688 let first_in_mem_block = in_memory_blocks.first().unwrap();
1690 let first_db_block = database_blocks.first().unwrap();
1692
1693 assert_eq!(
1695 consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1696 None
1697 );
1698 assert_eq!(
1699 consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1700 None
1701 );
1702
1703 let in_memory_block_senders =
1705 first_in_mem_block.senders().expect("failed to recover senders");
1706 let chain = NewCanonicalChain::Commit {
1707 new: vec![ExecutedBlockWithTrieUpdates::new(
1708 Arc::new(RecoveredBlock::new_sealed(
1709 first_in_mem_block.clone(),
1710 in_memory_block_senders,
1711 )),
1712 Default::default(),
1713 Default::default(),
1714 Default::default(),
1715 )],
1716 };
1717 consistent_provider.canonical_in_memory_state.update_chain(chain);
1718
1719 let consistent_provider = provider.consistent_provider()?;
1720
1721 assert_eq!(
1723 consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1724 Some(first_in_mem_block.clone().into_block())
1725 );
1726 assert_eq!(
1727 consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1728 Some(first_in_mem_block.clone().into_block())
1729 );
1730
1731 assert_eq!(
1733 consistent_provider.block(BlockHashOrNumber::Hash(first_db_block.hash()))?,
1734 Some(first_db_block.clone().into_block())
1735 );
1736 assert_eq!(
1737 consistent_provider.block(BlockHashOrNumber::Number(first_db_block.number))?,
1738 Some(first_db_block.clone().into_block())
1739 );
1740
1741 Ok(())
1742 }
1743
1744 #[test]
1745 fn test_changeset_reader() -> eyre::Result<()> {
1746 let mut rng = generators::rng();
1747
1748 let (database_blocks, in_memory_blocks) =
1749 random_blocks(&mut rng, TEST_BLOCKS_COUNT, 1, None, None, 0..1);
1750
1751 let first_database_block = database_blocks.first().map(|block| block.number).unwrap();
1752 let last_database_block = database_blocks.last().map(|block| block.number).unwrap();
1753 let first_in_memory_block = in_memory_blocks.first().map(|block| block.number).unwrap();
1754
1755 let accounts = random_eoa_accounts(&mut rng, 2);
1756
1757 let (database_changesets, database_state) = random_changeset_range(
1758 &mut rng,
1759 &database_blocks,
1760 accounts.into_iter().map(|(address, account)| (address, (account, Vec::new()))),
1761 0..0,
1762 0..0,
1763 );
1764 let (in_memory_changesets, in_memory_state) = random_changeset_range(
1765 &mut rng,
1766 &in_memory_blocks,
1767 database_state
1768 .iter()
1769 .map(|(address, (account, storage))| (*address, (*account, storage.clone()))),
1770 0..0,
1771 0..0,
1772 );
1773
1774 let factory = create_test_provider_factory();
1775
1776 let provider_rw = factory.provider_rw()?;
1777 provider_rw.append_blocks_with_state(
1778 database_blocks
1779 .into_iter()
1780 .map(|b| b.try_recover().expect("failed to seal block with senders"))
1781 .collect(),
1782 &ExecutionOutcome {
1783 bundle: BundleState::new(
1784 database_state.into_iter().map(|(address, (account, _))| {
1785 (address, None, Some(account.into()), Default::default())
1786 }),
1787 database_changesets
1788 .iter()
1789 .map(|block_changesets| {
1790 block_changesets.iter().map(|(address, account, _)| {
1791 (*address, Some(Some((*account).into())), [])
1792 })
1793 })
1794 .collect::<Vec<_>>(),
1795 Vec::new(),
1796 ),
1797 first_block: first_database_block,
1798 ..Default::default()
1799 },
1800 Default::default(),
1801 Default::default(),
1802 )?;
1803 provider_rw.commit()?;
1804
1805 let provider = BlockchainProvider::new(factory)?;
1806
1807 let in_memory_changesets = in_memory_changesets.into_iter().next().unwrap();
1808 let chain = NewCanonicalChain::Commit {
1809 new: vec![in_memory_blocks
1810 .first()
1811 .map(|block| {
1812 let senders = block.senders().expect("failed to recover senders");
1813 ExecutedBlockWithTrieUpdates::new(
1814 Arc::new(RecoveredBlock::new_sealed(block.clone(), senders)),
1815 Arc::new(ExecutionOutcome {
1816 bundle: BundleState::new(
1817 in_memory_state.into_iter().map(|(address, (account, _))| {
1818 (address, None, Some(account.into()), Default::default())
1819 }),
1820 [in_memory_changesets.iter().map(|(address, account, _)| {
1821 (*address, Some(Some((*account).into())), Vec::new())
1822 })],
1823 [],
1824 ),
1825 first_block: first_in_memory_block,
1826 ..Default::default()
1827 }),
1828 Default::default(),
1829 Default::default(),
1830 )
1831 })
1832 .unwrap()],
1833 };
1834 provider.canonical_in_memory_state.update_chain(chain);
1835
1836 let consistent_provider = provider.consistent_provider()?;
1837
1838 assert_eq!(
1839 consistent_provider.account_block_changeset(last_database_block).unwrap(),
1840 database_changesets
1841 .into_iter()
1842 .next_back()
1843 .unwrap()
1844 .into_iter()
1845 .sorted_by_key(|(address, _, _)| *address)
1846 .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1847 .collect::<Vec<_>>()
1848 );
1849 assert_eq!(
1850 consistent_provider.account_block_changeset(first_in_memory_block).unwrap(),
1851 in_memory_changesets
1852 .into_iter()
1853 .sorted_by_key(|(address, _, _)| *address)
1854 .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1855 .collect::<Vec<_>>()
1856 );
1857
1858 Ok(())
1859 }
1860}