1use super::{DatabaseProviderRO, ProviderFactory, ProviderNodeTypes};
2use crate::{
3 providers::StaticFileProvider, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader,
4 BlockReader, BlockReaderIdExt, BlockSource, ChainSpecProvider, ChangeSetReader, EvmEnvProvider,
5 HeaderProvider, ProviderError, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt,
6 StageCheckpointReader, StateReader, StaticFileProviderFactory, TransactionVariant,
7 TransactionsProvider, WithdrawalsProvider,
8};
9use alloy_consensus::BlockHeader;
10use alloy_eips::{
11 eip2718::Encodable2718,
12 eip4895::{Withdrawal, Withdrawals},
13 BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag, HashOrNumber,
14};
15use alloy_primitives::{Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256};
16use reth_chain_state::{BlockState, CanonicalInMemoryState, MemoryOverlayStateProviderRef};
17use reth_chainspec::{ChainInfo, EthereumHardforks};
18use reth_db::models::BlockNumberAddress;
19use reth_db_api::models::{AccountBeforeTx, StoredBlockBodyIndices};
20use reth_evm::ConfigureEvmEnv;
21use reth_execution_types::{BundleStateInit, ExecutionOutcome, RevertsInit};
22use reth_node_types::{BlockTy, HeaderTy, ReceiptTy, TxTy};
23use reth_primitives::{
24 Account, BlockWithSenders, SealedBlockFor, SealedBlockWithSenders, SealedHeader, StorageEntry,
25 TransactionMeta,
26};
27use reth_primitives_traits::BlockBody;
28use reth_prune_types::{PruneCheckpoint, PruneSegment};
29use reth_stages_types::{StageCheckpoint, StageId};
30use reth_storage_api::{
31 BlockBodyIndicesProvider, DatabaseProviderFactory, NodePrimitivesProvider, OmmersProvider,
32 StateProvider, StorageChangeSetReader,
33};
34use reth_storage_errors::provider::ProviderResult;
35use revm::{
36 db::states::PlainStorageRevert,
37 primitives::{BlockEnv, CfgEnvWithHandlerCfg},
38};
39use std::{
40 collections::{hash_map, HashMap},
41 ops::{Add, Bound, RangeBounds, RangeInclusive, Sub},
42 sync::Arc,
43};
44use tracing::trace;
45
46#[derive(Debug)]
53#[doc(hidden)] pub struct ConsistentProvider<N: ProviderNodeTypes> {
55 storage_provider: <ProviderFactory<N> as DatabaseProviderFactory>::Provider,
57 head_block: Option<Arc<BlockState<N::Primitives>>>,
59 canonical_in_memory_state: CanonicalInMemoryState<N::Primitives>,
61}
62
63impl<N: ProviderNodeTypes> ConsistentProvider<N> {
64 pub fn new(
70 storage_provider_factory: ProviderFactory<N>,
71 state: CanonicalInMemoryState<N::Primitives>,
72 ) -> ProviderResult<Self> {
73 let head_block = state.head_state();
81 let storage_provider = storage_provider_factory.database_provider_ro()?;
82 Ok(Self { storage_provider, head_block, canonical_in_memory_state: state })
83 }
84
85 fn convert_range_bounds<T>(
87 &self,
88 range: impl RangeBounds<T>,
89 end_unbounded: impl FnOnce() -> T,
90 ) -> (T, T)
91 where
92 T: Copy + Add<Output = T> + Sub<Output = T> + From<u8>,
93 {
94 let start = match range.start_bound() {
95 Bound::Included(&n) => n,
96 Bound::Excluded(&n) => n + T::from(1u8),
97 Bound::Unbounded => T::from(0u8),
98 };
99
100 let end = match range.end_bound() {
101 Bound::Included(&n) => n,
102 Bound::Excluded(&n) => n - T::from(1u8),
103 Bound::Unbounded => end_unbounded(),
104 };
105
106 (start, end)
107 }
108
109 fn latest_ref<'a>(&'a self) -> ProviderResult<Box<dyn StateProvider + 'a>> {
111 trace!(target: "providers::blockchain", "Getting latest block state provider");
112
113 if let Some(state) = &self.head_block {
115 trace!(target: "providers::blockchain", "Using head state for latest state provider");
116 Ok(self.block_state_provider_ref(state)?.boxed())
117 } else {
118 trace!(target: "providers::blockchain", "Using database state for latest state provider");
119 Ok(self.storage_provider.latest())
120 }
121 }
122
123 fn history_by_block_hash_ref<'a>(
124 &'a self,
125 block_hash: BlockHash,
126 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
127 trace!(target: "providers::blockchain", ?block_hash, "Getting history by block hash");
128
129 self.get_in_memory_or_storage_by_block(
130 block_hash.into(),
131 |_| self.storage_provider.history_by_block_hash(block_hash),
132 |block_state| {
133 let state_provider = self.block_state_provider_ref(block_state)?;
134 Ok(Box::new(state_provider))
135 },
136 )
137 }
138
139 fn state_by_block_number_ref<'a>(
141 &'a self,
142 number: BlockNumber,
143 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
144 let hash =
145 self.block_hash(number)?.ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
146 self.history_by_block_hash_ref(hash)
147 }
148
149 pub fn get_state(
153 &self,
154 range: RangeInclusive<BlockNumber>,
155 ) -> ProviderResult<Option<ExecutionOutcome<ReceiptTy<N>>>> {
156 if range.is_empty() {
157 return Ok(None)
158 }
159 let start_block_number = *range.start();
160 let end_block_number = *range.end();
161
162 let mut block_bodies = Vec::new();
164 for block_num in range.clone() {
165 let block_body = self
166 .block_body_indices(block_num)?
167 .ok_or(ProviderError::BlockBodyIndicesNotFound(block_num))?;
168 block_bodies.push((block_num, block_body))
169 }
170
171 let Some(from_transaction_num) = block_bodies.first().map(|body| body.1.first_tx_num())
173 else {
174 return Ok(None)
175 };
176 let Some(to_transaction_num) = block_bodies.last().map(|body| body.1.last_tx_num()) else {
177 return Ok(None)
178 };
179
180 let mut account_changeset = Vec::new();
181 for block_num in range.clone() {
182 let changeset =
183 self.account_block_changeset(block_num)?.into_iter().map(|elem| (block_num, elem));
184 account_changeset.extend(changeset);
185 }
186
187 let mut storage_changeset = Vec::new();
188 for block_num in range {
189 let changeset = self.storage_changeset(block_num)?;
190 storage_changeset.extend(changeset);
191 }
192
193 let (state, reverts) =
194 self.populate_bundle_state(account_changeset, storage_changeset, end_block_number)?;
195
196 let mut receipt_iter =
197 self.receipts_by_tx_range(from_transaction_num..=to_transaction_num)?.into_iter();
198
199 let mut receipts = Vec::with_capacity(block_bodies.len());
200 for (_, block_body) in block_bodies {
202 let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
203 for tx_num in block_body.tx_num_range() {
204 let receipt = receipt_iter
205 .next()
206 .ok_or_else(|| ProviderError::ReceiptNotFound(tx_num.into()))?;
207 block_receipts.push(Some(receipt));
208 }
209 receipts.push(block_receipts);
210 }
211
212 Ok(Some(ExecutionOutcome::new_init(
213 state,
214 reverts,
215 Vec::new(),
217 receipts.into(),
218 start_block_number,
219 Vec::new(),
220 )))
221 }
222
223 fn populate_bundle_state(
227 &self,
228 account_changeset: Vec<(u64, AccountBeforeTx)>,
229 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
230 block_range_end: BlockNumber,
231 ) -> ProviderResult<(BundleStateInit, RevertsInit)> {
232 let mut state: BundleStateInit = HashMap::new();
233 let mut reverts: RevertsInit = HashMap::new();
234 let state_provider = self.state_by_block_number_ref(block_range_end)?;
235
236 for (block_number, account_before) in account_changeset.into_iter().rev() {
238 let AccountBeforeTx { info: old_info, address } = account_before;
239 match state.entry(address) {
240 hash_map::Entry::Vacant(entry) => {
241 let new_info = state_provider.basic_account(address)?;
242 entry.insert((old_info, new_info, HashMap::new()));
243 }
244 hash_map::Entry::Occupied(mut entry) => {
245 entry.get_mut().0 = old_info;
247 }
248 }
249 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
251 }
252
253 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
255 let BlockNumberAddress((block_number, address)) = block_and_address;
256 let account_state = match state.entry(address) {
258 hash_map::Entry::Vacant(entry) => {
259 let present_info = state_provider.basic_account(address)?;
260 entry.insert((present_info, present_info, HashMap::new()))
261 }
262 hash_map::Entry::Occupied(entry) => entry.into_mut(),
263 };
264
265 match account_state.2.entry(old_storage.key) {
267 hash_map::Entry::Vacant(entry) => {
268 let new_storage_value =
269 state_provider.storage(address, old_storage.key)?.unwrap_or_default();
270 entry.insert((
271 (old_storage.value, old_storage.is_private),
272 (new_storage_value.value, old_storage.is_private),
273 ));
274 }
275 hash_map::Entry::Occupied(mut entry) => {
276 entry.get_mut().0 = (old_storage.value, old_storage.is_private);
277 }
278 };
279
280 reverts
281 .entry(block_number)
282 .or_default()
283 .entry(address)
284 .or_default()
285 .1
286 .push(old_storage);
287 }
288
289 Ok((state, reverts))
290 }
291
292 fn get_in_memory_or_storage_by_block_range_while<T, F, G, P>(
304 &self,
305 range: impl RangeBounds<BlockNumber>,
306 fetch_db_range: F,
307 map_block_state_item: G,
308 mut predicate: P,
309 ) -> ProviderResult<Vec<T>>
310 where
311 F: FnOnce(
312 &DatabaseProviderRO<N::DB, N>,
313 RangeInclusive<BlockNumber>,
314 &mut P,
315 ) -> ProviderResult<Vec<T>>,
316 G: Fn(&BlockState<N::Primitives>, &mut P) -> Option<T>,
317 P: FnMut(&T) -> bool,
318 {
319 let mut in_memory_chain =
327 self.head_block.as_ref().map(|b| b.chain().collect::<Vec<_>>()).unwrap_or_default();
328 let db_provider = &self.storage_provider;
329
330 let (start, end) = self.convert_range_bounds(range, || {
331 in_memory_chain
333 .first()
334 .map(|b| b.number())
335 .unwrap_or_else(|| db_provider.last_block_number().unwrap_or_default())
336 });
337
338 if start > end {
339 return Ok(vec![])
340 }
341
342 let (in_memory, storage_range) = match in_memory_chain.last().as_ref().map(|b| b.number()) {
347 Some(lowest_memory_block) if lowest_memory_block <= end => {
348 let highest_memory_block =
349 in_memory_chain.first().as_ref().map(|b| b.number()).expect("qed");
350
351 let in_memory_range =
355 lowest_memory_block.max(start)..=end.min(highest_memory_block);
356
357 in_memory_chain.truncate(
360 in_memory_chain
361 .len()
362 .saturating_sub(start.saturating_sub(lowest_memory_block) as usize),
363 );
364
365 let storage_range =
366 (lowest_memory_block > start).then(|| start..=lowest_memory_block - 1);
367
368 (Some((in_memory_chain, in_memory_range)), storage_range)
369 }
370 _ => {
371 drop(in_memory_chain);
373
374 (None, Some(start..=end))
375 }
376 };
377
378 let mut items = Vec::with_capacity((end - start + 1) as usize);
379
380 if let Some(storage_range) = storage_range {
381 let mut db_items = fetch_db_range(db_provider, storage_range.clone(), &mut predicate)?;
382 items.append(&mut db_items);
383
384 if items.len() as u64 != storage_range.end() - storage_range.start() + 1 {
387 return Ok(items)
388 }
389 }
390
391 if let Some((in_memory_chain, in_memory_range)) = in_memory {
392 for (num, block) in in_memory_range.zip(in_memory_chain.into_iter().rev()) {
393 debug_assert!(num == block.number());
394 if let Some(item) = map_block_state_item(block, &mut predicate) {
395 items.push(item);
396 } else {
397 break
398 }
399 }
400 }
401
402 Ok(items)
403 }
404
405 fn block_state_provider_ref(
407 &self,
408 state: &BlockState<N::Primitives>,
409 ) -> ProviderResult<MemoryOverlayStateProviderRef<'_, N::Primitives>> {
410 let anchor_hash = state.anchor().hash;
411 let latest_historical = self.history_by_block_hash_ref(anchor_hash)?;
412 let in_memory = state.chain().map(|block_state| block_state.block()).collect();
413 Ok(MemoryOverlayStateProviderRef::new(latest_historical, in_memory))
414 }
415
416 fn get_in_memory_or_storage_by_tx_range<S, M, R>(
422 &self,
423 range: impl RangeBounds<BlockNumber>,
424 fetch_from_db: S,
425 fetch_from_block_state: M,
426 ) -> ProviderResult<Vec<R>>
427 where
428 S: FnOnce(
429 &DatabaseProviderRO<N::DB, N>,
430 RangeInclusive<TxNumber>,
431 ) -> ProviderResult<Vec<R>>,
432 M: Fn(RangeInclusive<usize>, &BlockState<N::Primitives>) -> ProviderResult<Vec<R>>,
433 {
434 let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
435 let provider = &self.storage_provider;
436
437 let last_database_block_number = in_mem_chain
440 .last()
441 .map(|b| Ok(b.anchor().number))
442 .unwrap_or_else(|| provider.last_block_number())?;
443
444 let last_block_body_index = provider
447 .block_body_indices(last_database_block_number)?
448 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
449 let mut in_memory_tx_num = last_block_body_index.next_tx_num();
450
451 let (start, end) = self.convert_range_bounds(range, || {
452 in_mem_chain
453 .iter()
454 .map(|b| b.block_ref().block().body.transactions().len() as u64)
455 .sum::<u64>() +
456 last_block_body_index.last_tx_num()
457 });
458
459 if start > end {
460 return Ok(vec![])
461 }
462
463 let mut tx_range = start..=end;
464
465 if *tx_range.end() < in_memory_tx_num {
468 return fetch_from_db(provider, tx_range);
469 }
470
471 let mut items = Vec::with_capacity((tx_range.end() - tx_range.start() + 1) as usize);
472
473 if *tx_range.start() < in_memory_tx_num {
475 let db_range = *tx_range.start()..=in_memory_tx_num.saturating_sub(1);
477
478 tx_range = in_memory_tx_num..=*tx_range.end();
480
481 items.extend(fetch_from_db(provider, db_range)?);
482 }
483
484 for block_state in in_mem_chain.iter().rev() {
486 let block_tx_count = block_state.block_ref().block().body.transactions().len();
487 let remaining = (tx_range.end() - tx_range.start() + 1) as usize;
488
489 if *tx_range.start() >= in_memory_tx_num + block_tx_count as u64 {
492 in_memory_tx_num += block_tx_count as u64;
493 continue
494 }
495
496 let skip = (tx_range.start() - in_memory_tx_num) as usize;
498
499 items.extend(fetch_from_block_state(
500 skip..=skip + (remaining.min(block_tx_count - skip) - 1),
501 block_state,
502 )?);
503
504 in_memory_tx_num += block_tx_count as u64;
505
506 if in_memory_tx_num > *tx_range.end() {
508 break
509 }
510
511 tx_range = in_memory_tx_num..=*tx_range.end();
513 }
514
515 Ok(items)
516 }
517
518 fn get_in_memory_or_storage_by_tx<S, M, R>(
521 &self,
522 id: HashOrNumber,
523 fetch_from_db: S,
524 fetch_from_block_state: M,
525 ) -> ProviderResult<Option<R>>
526 where
527 S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<Option<R>>,
528 M: Fn(usize, TxNumber, &BlockState<N::Primitives>) -> ProviderResult<Option<R>>,
529 {
530 let in_mem_chain = self.head_block.iter().flat_map(|b| b.chain()).collect::<Vec<_>>();
531 let provider = &self.storage_provider;
532
533 let last_database_block_number = in_mem_chain
536 .last()
537 .map(|b| Ok(b.anchor().number))
538 .unwrap_or_else(|| provider.last_block_number())?;
539
540 let last_block_body_index = provider
543 .block_body_indices(last_database_block_number)?
544 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
545 let mut in_memory_tx_num = last_block_body_index.next_tx_num();
546
547 if let HashOrNumber::Number(id) = id {
550 if id < in_memory_tx_num {
551 return fetch_from_db(provider)
552 }
553 }
554
555 for block_state in in_mem_chain.iter().rev() {
557 let executed_block = block_state.block_ref();
558 let block = executed_block.block();
559
560 for tx_index in 0..block.body.transactions().len() {
561 match id {
562 HashOrNumber::Hash(tx_hash) => {
563 if tx_hash == block.body.transactions()[tx_index].trie_hash() {
564 return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
565 }
566 }
567 HashOrNumber::Number(id) => {
568 if id == in_memory_tx_num {
569 return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
570 }
571 }
572 }
573
574 in_memory_tx_num += 1;
575 }
576 }
577
578 if let HashOrNumber::Hash(_) = id {
580 return fetch_from_db(provider)
581 }
582
583 Ok(None)
584 }
585
586 pub(crate) fn get_in_memory_or_storage_by_block<S, M, R>(
588 &self,
589 id: BlockHashOrNumber,
590 fetch_from_db: S,
591 fetch_from_block_state: M,
592 ) -> ProviderResult<R>
593 where
594 S: FnOnce(&DatabaseProviderRO<N::DB, N>) -> ProviderResult<R>,
595 M: Fn(&BlockState<N::Primitives>) -> ProviderResult<R>,
596 {
597 if let Some(Some(block_state)) = self.head_block.as_ref().map(|b| b.block_on_chain(id)) {
598 return fetch_from_block_state(block_state)
599 }
600 fetch_from_db(&self.storage_provider)
601 }
602}
603
604impl<N: ProviderNodeTypes> ConsistentProvider<N> {
605 #[inline]
614 pub(crate) fn ensure_canonical_block(&self, block_number: BlockNumber) -> ProviderResult<()> {
615 let latest = self.best_block_number()?;
616 if block_number > latest {
617 Err(ProviderError::HeaderNotFound(block_number.into()))
618 } else {
619 Ok(())
620 }
621 }
622}
623
624impl<N: ProviderNodeTypes> NodePrimitivesProvider for ConsistentProvider<N> {
625 type Primitives = N::Primitives;
626}
627
628impl<N: ProviderNodeTypes> StaticFileProviderFactory for ConsistentProvider<N> {
629 fn static_file_provider(&self) -> StaticFileProvider<N::Primitives> {
630 self.storage_provider.static_file_provider()
631 }
632}
633
634impl<N: ProviderNodeTypes> HeaderProvider for ConsistentProvider<N> {
635 type Header = HeaderTy<N>;
636
637 fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
638 self.get_in_memory_or_storage_by_block(
639 (*block_hash).into(),
640 |db_provider| db_provider.header(block_hash),
641 |block_state| Ok(Some(block_state.block_ref().block().header.header().clone())),
642 )
643 }
644
645 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
646 self.get_in_memory_or_storage_by_block(
647 num.into(),
648 |db_provider| db_provider.header_by_number(num),
649 |block_state| Ok(Some(block_state.block_ref().block().header.header().clone())),
650 )
651 }
652
653 fn header_td(&self, hash: &BlockHash) -> ProviderResult<Option<U256>> {
654 if let Some(num) = self.block_number(*hash)? {
655 self.header_td_by_number(num)
656 } else {
657 Ok(None)
658 }
659 }
660
661 fn header_td_by_number(&self, number: BlockNumber) -> ProviderResult<Option<U256>> {
662 let number = if self.head_block.as_ref().map(|b| b.block_on_chain(number.into())).is_some()
663 {
664 if let Some(last_finalized_num_hash) =
671 self.canonical_in_memory_state.get_finalized_num_hash()
672 {
673 last_finalized_num_hash.number
674 } else {
675 self.last_block_number()?
676 }
677 } else {
678 number
680 };
681 self.storage_provider.header_td_by_number(number)
682 }
683
684 fn headers_range(
685 &self,
686 range: impl RangeBounds<BlockNumber>,
687 ) -> ProviderResult<Vec<Self::Header>> {
688 self.get_in_memory_or_storage_by_block_range_while(
689 range,
690 |db_provider, range, _| db_provider.headers_range(range),
691 |block_state, _| Some(block_state.block_ref().block().header.header().clone()),
692 |_| true,
693 )
694 }
695
696 fn sealed_header(
697 &self,
698 number: BlockNumber,
699 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
700 self.get_in_memory_or_storage_by_block(
701 number.into(),
702 |db_provider| db_provider.sealed_header(number),
703 |block_state| Ok(Some(block_state.block_ref().block().header.clone())),
704 )
705 }
706
707 fn sealed_headers_range(
708 &self,
709 range: impl RangeBounds<BlockNumber>,
710 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
711 self.get_in_memory_or_storage_by_block_range_while(
712 range,
713 |db_provider, range, _| db_provider.sealed_headers_range(range),
714 |block_state, _| Some(block_state.block_ref().block().header.clone()),
715 |_| true,
716 )
717 }
718
719 fn sealed_headers_while(
720 &self,
721 range: impl RangeBounds<BlockNumber>,
722 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
723 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
724 self.get_in_memory_or_storage_by_block_range_while(
725 range,
726 |db_provider, range, predicate| db_provider.sealed_headers_while(range, predicate),
727 |block_state, predicate| {
728 let header = &block_state.block_ref().block().header;
729 predicate(header).then(|| header.clone())
730 },
731 predicate,
732 )
733 }
734}
735
736impl<N: ProviderNodeTypes> BlockHashReader for ConsistentProvider<N> {
737 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
738 self.get_in_memory_or_storage_by_block(
739 number.into(),
740 |db_provider| db_provider.block_hash(number),
741 |block_state| Ok(Some(block_state.hash())),
742 )
743 }
744
745 fn canonical_hashes_range(
746 &self,
747 start: BlockNumber,
748 end: BlockNumber,
749 ) -> ProviderResult<Vec<B256>> {
750 self.get_in_memory_or_storage_by_block_range_while(
751 start..end,
752 |db_provider, inclusive_range, _| {
753 db_provider
754 .canonical_hashes_range(*inclusive_range.start(), *inclusive_range.end() + 1)
755 },
756 |block_state, _| Some(block_state.hash()),
757 |_| true,
758 )
759 }
760}
761
762impl<N: ProviderNodeTypes> BlockNumReader for ConsistentProvider<N> {
763 fn chain_info(&self) -> ProviderResult<ChainInfo> {
764 let best_number = self.best_block_number()?;
765 Ok(ChainInfo { best_hash: self.block_hash(best_number)?.unwrap_or_default(), best_number })
766 }
767
768 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
769 self.head_block.as_ref().map(|b| Ok(b.number())).unwrap_or_else(|| self.last_block_number())
770 }
771
772 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
773 self.storage_provider.last_block_number()
774 }
775
776 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
777 self.get_in_memory_or_storage_by_block(
778 hash.into(),
779 |db_provider| db_provider.block_number(hash),
780 |block_state| Ok(Some(block_state.number())),
781 )
782 }
783}
784
785impl<N: ProviderNodeTypes> BlockIdReader for ConsistentProvider<N> {
786 fn pending_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
787 Ok(self.canonical_in_memory_state.pending_block_num_hash())
788 }
789
790 fn safe_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
791 Ok(self.canonical_in_memory_state.get_safe_num_hash())
792 }
793
794 fn finalized_block_num_hash(&self) -> ProviderResult<Option<BlockNumHash>> {
795 Ok(self.canonical_in_memory_state.get_finalized_num_hash())
796 }
797}
798
799impl<N: ProviderNodeTypes> BlockReader for ConsistentProvider<N> {
800 type Block = BlockTy<N>;
801
802 fn find_block_by_hash(
803 &self,
804 hash: B256,
805 source: BlockSource,
806 ) -> ProviderResult<Option<Self::Block>> {
807 match source {
808 BlockSource::Any | BlockSource::Canonical => {
809 self.get_in_memory_or_storage_by_block(
812 hash.into(),
813 |db_provider| db_provider.find_block_by_hash(hash, source),
814 |block_state| Ok(Some(block_state.block_ref().block().clone().unseal())),
815 )
816 }
817 BlockSource::Pending => {
818 Ok(self.canonical_in_memory_state.pending_block().map(|block| block.unseal()))
819 }
820 }
821 }
822
823 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
824 self.get_in_memory_or_storage_by_block(
825 id,
826 |db_provider| db_provider.block(id),
827 |block_state| Ok(Some(block_state.block_ref().block().clone().unseal())),
828 )
829 }
830
831 fn pending_block(&self) -> ProviderResult<Option<SealedBlockFor<Self::Block>>> {
832 Ok(self.canonical_in_memory_state.pending_block())
833 }
834
835 fn pending_block_with_senders(
836 &self,
837 ) -> ProviderResult<Option<SealedBlockWithSenders<Self::Block>>> {
838 Ok(self.canonical_in_memory_state.pending_block_with_senders())
839 }
840
841 fn pending_block_and_receipts(
842 &self,
843 ) -> ProviderResult<Option<(SealedBlockFor<Self::Block>, Vec<Self::Receipt>)>> {
844 Ok(self.canonical_in_memory_state.pending_block_and_receipts())
845 }
846
847 fn block_with_senders(
854 &self,
855 id: BlockHashOrNumber,
856 transaction_kind: TransactionVariant,
857 ) -> ProviderResult<Option<BlockWithSenders<Self::Block>>> {
858 self.get_in_memory_or_storage_by_block(
859 id,
860 |db_provider| db_provider.block_with_senders(id, transaction_kind),
861 |block_state| Ok(Some(block_state.block_with_senders())),
862 )
863 }
864
865 fn sealed_block_with_senders(
866 &self,
867 id: BlockHashOrNumber,
868 transaction_kind: TransactionVariant,
869 ) -> ProviderResult<Option<SealedBlockWithSenders<Self::Block>>> {
870 self.get_in_memory_or_storage_by_block(
871 id,
872 |db_provider| db_provider.sealed_block_with_senders(id, transaction_kind),
873 |block_state| Ok(Some(block_state.sealed_block_with_senders())),
874 )
875 }
876
877 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
878 self.get_in_memory_or_storage_by_block_range_while(
879 range,
880 |db_provider, range, _| db_provider.block_range(range),
881 |block_state, _| Some(block_state.block_ref().block().clone().unseal()),
882 |_| true,
883 )
884 }
885
886 fn block_with_senders_range(
887 &self,
888 range: RangeInclusive<BlockNumber>,
889 ) -> ProviderResult<Vec<BlockWithSenders<Self::Block>>> {
890 self.get_in_memory_or_storage_by_block_range_while(
891 range,
892 |db_provider, range, _| db_provider.block_with_senders_range(range),
893 |block_state, _| Some(block_state.block_with_senders()),
894 |_| true,
895 )
896 }
897
898 fn sealed_block_with_senders_range(
899 &self,
900 range: RangeInclusive<BlockNumber>,
901 ) -> ProviderResult<Vec<SealedBlockWithSenders<Self::Block>>> {
902 self.get_in_memory_or_storage_by_block_range_while(
903 range,
904 |db_provider, range, _| db_provider.sealed_block_with_senders_range(range),
905 |block_state, _| Some(block_state.sealed_block_with_senders()),
906 |_| true,
907 )
908 }
909}
910
911impl<N: ProviderNodeTypes> TransactionsProvider for ConsistentProvider<N> {
912 type Transaction = TxTy<N>;
913
914 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
915 self.get_in_memory_or_storage_by_tx(
916 tx_hash.into(),
917 |db_provider| db_provider.transaction_id(tx_hash),
918 |_, tx_number, _| Ok(Some(tx_number)),
919 )
920 }
921
922 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
923 self.get_in_memory_or_storage_by_tx(
924 id.into(),
925 |provider| provider.transaction_by_id(id),
926 |tx_index, _, block_state| {
927 Ok(block_state
928 .block_ref()
929 .block()
930 .body
931 .transactions()
932 .get(tx_index)
933 .cloned()
934 .map(Into::into))
935 },
936 )
937 }
938
939 fn transaction_by_id_unhashed(
940 &self,
941 id: TxNumber,
942 ) -> ProviderResult<Option<Self::Transaction>> {
943 self.get_in_memory_or_storage_by_tx(
944 id.into(),
945 |provider| provider.transaction_by_id_unhashed(id),
946 |tx_index, _, block_state| {
947 Ok(block_state
948 .block_ref()
949 .block()
950 .body
951 .transactions()
952 .get(tx_index)
953 .cloned()
954 .map(Into::into))
955 },
956 )
957 }
958
959 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
960 if let Some(tx) = self.head_block.as_ref().and_then(|b| b.transaction_on_chain(hash)) {
961 return Ok(Some(tx))
962 }
963
964 self.storage_provider.transaction_by_hash(hash)
965 }
966
967 fn transaction_by_hash_with_meta(
968 &self,
969 tx_hash: TxHash,
970 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
971 if let Some((tx, meta)) =
972 self.head_block.as_ref().and_then(|b| b.transaction_meta_on_chain(tx_hash))
973 {
974 return Ok(Some((tx, meta)))
975 }
976
977 self.storage_provider.transaction_by_hash_with_meta(tx_hash)
978 }
979
980 fn transaction_block(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
981 self.get_in_memory_or_storage_by_tx(
982 id.into(),
983 |provider| provider.transaction_block(id),
984 |_, _, block_state| Ok(Some(block_state.block_ref().block().number())),
985 )
986 }
987
988 fn transactions_by_block(
989 &self,
990 id: BlockHashOrNumber,
991 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
992 self.get_in_memory_or_storage_by_block(
993 id,
994 |provider| provider.transactions_by_block(id),
995 |block_state| Ok(Some(block_state.block_ref().block().body.transactions().to_vec())),
996 )
997 }
998
999 fn transactions_by_block_range(
1000 &self,
1001 range: impl RangeBounds<BlockNumber>,
1002 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1003 self.get_in_memory_or_storage_by_block_range_while(
1004 range,
1005 |db_provider, range, _| db_provider.transactions_by_block_range(range),
1006 |block_state, _| Some(block_state.block_ref().block().body.transactions().to_vec()),
1007 |_| true,
1008 )
1009 }
1010
1011 fn transactions_by_tx_range(
1012 &self,
1013 range: impl RangeBounds<TxNumber>,
1014 ) -> ProviderResult<Vec<Self::Transaction>> {
1015 self.get_in_memory_or_storage_by_tx_range(
1016 range,
1017 |db_provider, db_range| db_provider.transactions_by_tx_range(db_range),
1018 |index_range, block_state| {
1019 Ok(block_state.block_ref().block().body.transactions()[index_range].to_vec())
1020 },
1021 )
1022 }
1023
1024 fn senders_by_tx_range(
1025 &self,
1026 range: impl RangeBounds<TxNumber>,
1027 ) -> ProviderResult<Vec<Address>> {
1028 self.get_in_memory_or_storage_by_tx_range(
1029 range,
1030 |db_provider, db_range| db_provider.senders_by_tx_range(db_range),
1031 |index_range, block_state| Ok(block_state.block_ref().senders[index_range].to_vec()),
1032 )
1033 }
1034
1035 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1036 self.get_in_memory_or_storage_by_tx(
1037 id.into(),
1038 |provider| provider.transaction_sender(id),
1039 |tx_index, _, block_state| Ok(block_state.block_ref().senders.get(tx_index).copied()),
1040 )
1041 }
1042}
1043
1044impl<N: ProviderNodeTypes> ReceiptProvider for ConsistentProvider<N> {
1045 type Receipt = ReceiptTy<N>;
1046
1047 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1048 self.get_in_memory_or_storage_by_tx(
1049 id.into(),
1050 |provider| provider.receipt(id),
1051 |tx_index, _, block_state| {
1052 Ok(block_state.executed_block_receipts().get(tx_index).cloned())
1053 },
1054 )
1055 }
1056
1057 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1058 for block_state in self.head_block.iter().flat_map(|b| b.chain()) {
1059 let executed_block = block_state.block_ref();
1060 let block = executed_block.block();
1061 let receipts = block_state.executed_block_receipts();
1062
1063 debug_assert_eq!(
1065 block.body.transactions().len(),
1066 receipts.len(),
1067 "Mismatch between transaction and receipt count"
1068 );
1069
1070 if let Some(tx_index) =
1071 block.body.transactions().iter().position(|tx| tx.trie_hash() == hash)
1072 {
1073 return Ok(receipts.get(tx_index).cloned());
1075 }
1076 }
1077
1078 self.storage_provider.receipt_by_hash(hash)
1079 }
1080
1081 fn receipts_by_block(
1082 &self,
1083 block: BlockHashOrNumber,
1084 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1085 self.get_in_memory_or_storage_by_block(
1086 block,
1087 |db_provider| db_provider.receipts_by_block(block),
1088 |block_state| Ok(Some(block_state.executed_block_receipts())),
1089 )
1090 }
1091
1092 fn receipts_by_tx_range(
1093 &self,
1094 range: impl RangeBounds<TxNumber>,
1095 ) -> ProviderResult<Vec<Self::Receipt>> {
1096 self.get_in_memory_or_storage_by_tx_range(
1097 range,
1098 |db_provider, db_range| db_provider.receipts_by_tx_range(db_range),
1099 |index_range, block_state| {
1100 Ok(block_state.executed_block_receipts().drain(index_range).collect())
1101 },
1102 )
1103 }
1104}
1105
1106impl<N: ProviderNodeTypes> ReceiptProviderIdExt for ConsistentProvider<N> {
1107 fn receipts_by_block_id(&self, block: BlockId) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1108 match block {
1109 BlockId::Hash(rpc_block_hash) => {
1110 let mut receipts = self.receipts_by_block(rpc_block_hash.block_hash.into())?;
1111 if receipts.is_none() && !rpc_block_hash.require_canonical.unwrap_or(false) {
1112 if let Some(state) = self
1113 .head_block
1114 .as_ref()
1115 .and_then(|b| b.block_on_chain(rpc_block_hash.block_hash.into()))
1116 {
1117 receipts = Some(state.executed_block_receipts());
1118 }
1119 }
1120 Ok(receipts)
1121 }
1122 BlockId::Number(num_tag) => match num_tag {
1123 BlockNumberOrTag::Pending => Ok(self
1124 .canonical_in_memory_state
1125 .pending_state()
1126 .map(|block_state| block_state.executed_block_receipts())),
1127 _ => {
1128 if let Some(num) = self.convert_block_number(num_tag)? {
1129 self.receipts_by_block(num.into())
1130 } else {
1131 Ok(None)
1132 }
1133 }
1134 },
1135 }
1136 }
1137}
1138
1139impl<N: ProviderNodeTypes> WithdrawalsProvider for ConsistentProvider<N> {
1140 fn withdrawals_by_block(
1141 &self,
1142 id: BlockHashOrNumber,
1143 timestamp: u64,
1144 ) -> ProviderResult<Option<Withdrawals>> {
1145 if !self.chain_spec().is_shanghai_active_at_timestamp(timestamp) {
1146 return Ok(None)
1147 }
1148
1149 self.get_in_memory_or_storage_by_block(
1150 id,
1151 |db_provider| db_provider.withdrawals_by_block(id, timestamp),
1152 |block_state| Ok(block_state.block_ref().block().body.withdrawals().cloned()),
1153 )
1154 }
1155
1156 fn latest_withdrawal(&self) -> ProviderResult<Option<Withdrawal>> {
1157 let best_block_num = self.best_block_number()?;
1158
1159 self.get_in_memory_or_storage_by_block(
1160 best_block_num.into(),
1161 |db_provider| db_provider.latest_withdrawal(),
1162 |block_state| {
1163 Ok(block_state
1164 .block_ref()
1165 .block()
1166 .body
1167 .withdrawals()
1168 .cloned()
1169 .and_then(|mut w| w.pop()))
1170 },
1171 )
1172 }
1173}
1174
1175impl<N: ProviderNodeTypes> OmmersProvider for ConsistentProvider<N> {
1176 fn ommers(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Vec<HeaderTy<N>>>> {
1177 self.get_in_memory_or_storage_by_block(
1178 id,
1179 |db_provider| db_provider.ommers(id),
1180 |block_state| {
1181 if self.chain_spec().final_paris_total_difficulty(block_state.number()).is_some() {
1182 return Ok(Some(Vec::new()))
1183 }
1184
1185 Ok(block_state.block_ref().block().body.ommers().map(|o| o.to_vec()))
1186 },
1187 )
1188 }
1189}
1190
1191impl<N: ProviderNodeTypes> BlockBodyIndicesProvider for ConsistentProvider<N> {
1192 fn block_body_indices(
1193 &self,
1194 number: BlockNumber,
1195 ) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1196 self.get_in_memory_or_storage_by_block(
1197 number.into(),
1198 |db_provider| db_provider.block_body_indices(number),
1199 |block_state| {
1200 let last_storage_block_number = block_state.anchor().number;
1202 let mut stored_indices = self
1203 .storage_provider
1204 .block_body_indices(last_storage_block_number)?
1205 .ok_or(ProviderError::BlockBodyIndicesNotFound(last_storage_block_number))?;
1206
1207 stored_indices.first_tx_num = stored_indices.next_tx_num();
1209 stored_indices.tx_count = 0;
1210
1211 for state in block_state.chain().collect::<Vec<_>>().into_iter().rev() {
1213 let block_tx_count = state.block_ref().block.body.transactions().len() as u64;
1214 if state.block_ref().block().number() == number {
1215 stored_indices.tx_count = block_tx_count;
1216 } else {
1217 stored_indices.first_tx_num += block_tx_count;
1218 }
1219 }
1220
1221 Ok(Some(stored_indices))
1222 },
1223 )
1224 }
1225}
1226
1227impl<N: ProviderNodeTypes> StageCheckpointReader for ConsistentProvider<N> {
1228 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1229 self.storage_provider.get_stage_checkpoint(id)
1230 }
1231
1232 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1233 self.storage_provider.get_stage_checkpoint_progress(id)
1234 }
1235
1236 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1237 self.storage_provider.get_all_checkpoints()
1238 }
1239}
1240
1241impl<N: ProviderNodeTypes> EvmEnvProvider<HeaderTy<N>> for ConsistentProvider<N> {
1242 fn env_with_header<EvmConfig>(
1243 &self,
1244 header: &HeaderTy<N>,
1245 evm_config: EvmConfig,
1246 ) -> ProviderResult<(CfgEnvWithHandlerCfg, BlockEnv)>
1247 where
1248 EvmConfig: ConfigureEvmEnv<Header = HeaderTy<N>>,
1249 {
1250 let total_difficulty = self
1251 .header_td_by_number(header.number())?
1252 .ok_or_else(|| ProviderError::HeaderNotFound(header.number().into()))?;
1253 Ok(evm_config.cfg_and_block_env(header, total_difficulty))
1254 }
1255}
1256
1257impl<N: ProviderNodeTypes> PruneCheckpointReader for ConsistentProvider<N> {
1258 fn get_prune_checkpoint(
1259 &self,
1260 segment: PruneSegment,
1261 ) -> ProviderResult<Option<PruneCheckpoint>> {
1262 self.storage_provider.get_prune_checkpoint(segment)
1263 }
1264
1265 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
1266 self.storage_provider.get_prune_checkpoints()
1267 }
1268}
1269
1270impl<N: ProviderNodeTypes> ChainSpecProvider for ConsistentProvider<N> {
1271 type ChainSpec = N::ChainSpec;
1272
1273 fn chain_spec(&self) -> Arc<N::ChainSpec> {
1274 ChainSpecProvider::chain_spec(&self.storage_provider)
1275 }
1276}
1277
1278impl<N: ProviderNodeTypes> BlockReaderIdExt for ConsistentProvider<N> {
1279 fn block_by_id(&self, id: BlockId) -> ProviderResult<Option<Self::Block>> {
1280 match id {
1281 BlockId::Number(num) => self.block_by_number_or_tag(num),
1282 BlockId::Hash(hash) => {
1283 if Some(true) == hash.require_canonical {
1288 self.find_block_by_hash(hash.block_hash, BlockSource::Canonical)
1290 } else {
1291 self.block_by_hash(hash.block_hash)
1292 }
1293 }
1294 }
1295 }
1296
1297 fn header_by_number_or_tag(&self, id: BlockNumberOrTag) -> ProviderResult<Option<HeaderTy<N>>> {
1298 Ok(match id {
1299 BlockNumberOrTag::Latest => {
1300 Some(self.canonical_in_memory_state.get_canonical_head().unseal())
1301 }
1302 BlockNumberOrTag::Finalized => {
1303 self.canonical_in_memory_state.get_finalized_header().map(|h| h.unseal())
1304 }
1305 BlockNumberOrTag::Safe => {
1306 self.canonical_in_memory_state.get_safe_header().map(|h| h.unseal())
1307 }
1308 BlockNumberOrTag::Earliest => self.header_by_number(0)?,
1309 BlockNumberOrTag::Pending => self.canonical_in_memory_state.pending_header(),
1310
1311 BlockNumberOrTag::Number(num) => self.header_by_number(num)?,
1312 })
1313 }
1314
1315 fn sealed_header_by_number_or_tag(
1316 &self,
1317 id: BlockNumberOrTag,
1318 ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1319 match id {
1320 BlockNumberOrTag::Latest => {
1321 Ok(Some(self.canonical_in_memory_state.get_canonical_head()))
1322 }
1323 BlockNumberOrTag::Finalized => {
1324 Ok(self.canonical_in_memory_state.get_finalized_header())
1325 }
1326 BlockNumberOrTag::Safe => Ok(self.canonical_in_memory_state.get_safe_header()),
1327 BlockNumberOrTag::Earliest => self
1328 .header_by_number(0)?
1329 .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal(h)))),
1330 BlockNumberOrTag::Pending => Ok(self.canonical_in_memory_state.pending_sealed_header()),
1331 BlockNumberOrTag::Number(num) => self
1332 .header_by_number(num)?
1333 .map_or_else(|| Ok(None), |h| Ok(Some(SealedHeader::seal(h)))),
1334 }
1335 }
1336
1337 fn sealed_header_by_id(
1338 &self,
1339 id: BlockId,
1340 ) -> ProviderResult<Option<SealedHeader<HeaderTy<N>>>> {
1341 Ok(match id {
1342 BlockId::Number(num) => self.sealed_header_by_number_or_tag(num)?,
1343 BlockId::Hash(hash) => self.header(&hash.block_hash)?.map(SealedHeader::seal),
1344 })
1345 }
1346
1347 fn header_by_id(&self, id: BlockId) -> ProviderResult<Option<HeaderTy<N>>> {
1348 Ok(match id {
1349 BlockId::Number(num) => self.header_by_number_or_tag(num)?,
1350 BlockId::Hash(hash) => self.header(&hash.block_hash)?,
1351 })
1352 }
1353
1354 fn ommers_by_id(&self, id: BlockId) -> ProviderResult<Option<Vec<HeaderTy<N>>>> {
1355 match id {
1356 BlockId::Number(num) => self.ommers_by_number_or_tag(num),
1357 BlockId::Hash(hash) => {
1358 self.ommers(BlockHashOrNumber::Hash(hash.block_hash))
1361 }
1362 }
1363 }
1364}
1365
1366impl<N: ProviderNodeTypes> StorageChangeSetReader for ConsistentProvider<N> {
1367 fn storage_changeset(
1368 &self,
1369 block_number: BlockNumber,
1370 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
1371 if let Some(state) =
1372 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1373 {
1374 let changesets = state
1375 .block()
1376 .execution_output
1377 .bundle
1378 .reverts
1379 .clone()
1380 .to_plain_state_reverts()
1381 .storage
1382 .into_iter()
1383 .flatten()
1384 .flat_map(|revert: PlainStorageRevert| {
1385 revert.storage_revert.into_iter().map(move |(key, value)| {
1386 (
1387 BlockNumberAddress((block_number, revert.address)),
1388 StorageEntry {
1389 key: key.into(),
1390 value: value.to_previous_value().value,
1391 is_private: value.to_previous_value().is_private,
1392 },
1393 )
1394 })
1395 })
1396 .collect();
1397 Ok(changesets)
1398 } else {
1399 let storage_history_exists = self
1403 .storage_provider
1404 .get_prune_checkpoint(PruneSegment::StorageHistory)?
1405 .and_then(|checkpoint| {
1406 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1411 })
1412 .unwrap_or(true);
1413
1414 if !storage_history_exists {
1415 return Err(ProviderError::StateAtBlockPruned(block_number))
1416 }
1417
1418 self.storage_provider.storage_changeset(block_number)
1419 }
1420 }
1421}
1422
1423impl<N: ProviderNodeTypes> ChangeSetReader for ConsistentProvider<N> {
1424 fn account_block_changeset(
1425 &self,
1426 block_number: BlockNumber,
1427 ) -> ProviderResult<Vec<AccountBeforeTx>> {
1428 if let Some(state) =
1429 self.head_block.as_ref().and_then(|b| b.block_on_chain(block_number.into()))
1430 {
1431 let changesets = state
1432 .block_ref()
1433 .execution_output
1434 .bundle
1435 .reverts
1436 .clone()
1437 .to_plain_state_reverts()
1438 .accounts
1439 .into_iter()
1440 .flatten()
1441 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
1442 .collect();
1443 Ok(changesets)
1444 } else {
1445 let account_history_exists = self
1449 .storage_provider
1450 .get_prune_checkpoint(PruneSegment::AccountHistory)?
1451 .and_then(|checkpoint| {
1452 checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
1457 })
1458 .unwrap_or(true);
1459
1460 if !account_history_exists {
1461 return Err(ProviderError::StateAtBlockPruned(block_number))
1462 }
1463
1464 self.storage_provider.account_block_changeset(block_number)
1465 }
1466 }
1467}
1468
1469impl<N: ProviderNodeTypes> AccountReader for ConsistentProvider<N> {
1470 fn basic_account(&self, address: Address) -> ProviderResult<Option<Account>> {
1472 let state_provider = self.latest_ref()?;
1474 state_provider.basic_account(address)
1475 }
1476}
1477
1478impl<N: ProviderNodeTypes> StateReader for ConsistentProvider<N> {
1479 type Receipt = ReceiptTy<N>;
1480
1481 fn get_state(
1491 &self,
1492 block: BlockNumber,
1493 ) -> ProviderResult<Option<ExecutionOutcome<Self::Receipt>>> {
1494 if let Some(state) = self.head_block.as_ref().and_then(|b| b.block_on_chain(block.into())) {
1495 let state = state.block_ref().execution_outcome().clone();
1496 Ok(Some(state))
1497 } else {
1498 Self::get_state(self, block..=block)
1499 }
1500 }
1501}
1502
1503#[cfg(test)]
1504mod tests {
1505 use crate::{
1506 providers::blockchain_provider::BlockchainProvider2,
1507 test_utils::create_test_provider_factory, BlockWriter,
1508 };
1509 use alloy_eips::BlockHashOrNumber;
1510 use alloy_primitives::B256;
1511 use itertools::Itertools;
1512 use rand::Rng;
1513 use reth_chain_state::{ExecutedBlock, NewCanonicalChain};
1514 use reth_db::models::AccountBeforeTx;
1515 use reth_execution_types::ExecutionOutcome;
1516 use reth_primitives::SealedBlock;
1517 use reth_storage_api::{BlockReader, BlockSource, ChangeSetReader};
1518 use reth_testing_utils::generators::{
1519 self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,
1520 };
1521 use revm::db::BundleState;
1522 use std::{
1523 ops::{Bound, Range, RangeBounds},
1524 sync::Arc,
1525 };
1526
1527 const TEST_BLOCKS_COUNT: usize = 5;
1528
1529 fn random_blocks(
1530 rng: &mut impl Rng,
1531 database_blocks: usize,
1532 in_memory_blocks: usize,
1533 requests_count: Option<Range<u8>>,
1534 withdrawals_count: Option<Range<u8>>,
1535 tx_count: impl RangeBounds<u8>,
1536 ) -> (Vec<SealedBlock>, Vec<SealedBlock>) {
1537 let block_range = (database_blocks + in_memory_blocks - 1) as u64;
1538
1539 let tx_start = match tx_count.start_bound() {
1540 Bound::Included(&n) | Bound::Excluded(&n) => n,
1541 Bound::Unbounded => u8::MIN,
1542 };
1543 let tx_end = match tx_count.end_bound() {
1544 Bound::Included(&n) | Bound::Excluded(&n) => n + 1,
1545 Bound::Unbounded => u8::MAX,
1546 };
1547
1548 let blocks = random_block_range(
1549 rng,
1550 0..=block_range,
1551 BlockRangeParams {
1552 parent: Some(B256::ZERO),
1553 tx_count: tx_start..tx_end,
1554 requests_count,
1555 withdrawals_count,
1556 },
1557 );
1558 let (database_blocks, in_memory_blocks) = blocks.split_at(database_blocks);
1559 (database_blocks.to_vec(), in_memory_blocks.to_vec())
1560 }
1561
1562 #[test]
1563 fn test_block_reader_find_block_by_hash() -> eyre::Result<()> {
1564 let mut rng = generators::rng();
1566 let factory = create_test_provider_factory();
1567
1568 let blocks = random_block_range(
1570 &mut rng,
1571 0..=10,
1572 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1573 );
1574 let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1575
1576 let provider_rw = factory.provider_rw()?;
1578 for block in database_blocks {
1579 provider_rw.insert_historical_block(
1580 block.clone().seal_with_senders().expect("failed to seal block with senders"),
1581 )?;
1582 }
1583 provider_rw.commit()?;
1584
1585 let provider = BlockchainProvider2::new(factory)?;
1587 let consistent_provider = provider.consistent_provider()?;
1588
1589 let first_db_block = database_blocks.first().unwrap();
1591 let first_in_mem_block = in_memory_blocks.first().unwrap();
1592 let last_in_mem_block = in_memory_blocks.last().unwrap();
1593
1594 assert_eq!(
1596 consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1597 None
1598 );
1599 assert_eq!(
1600 consistent_provider
1601 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1602 None
1603 );
1604 assert_eq!(
1606 consistent_provider
1607 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Pending)?,
1608 None
1609 );
1610
1611 let in_memory_block_senders =
1613 first_in_mem_block.senders().expect("failed to recover senders");
1614 let chain = NewCanonicalChain::Commit {
1615 new: vec![ExecutedBlock::new(
1616 Arc::new(first_in_mem_block.clone()),
1617 Arc::new(in_memory_block_senders),
1618 Default::default(),
1619 Default::default(),
1620 Default::default(),
1621 )],
1622 };
1623 consistent_provider.canonical_in_memory_state.update_chain(chain);
1624 let consistent_provider = provider.consistent_provider()?;
1625
1626 assert_eq!(
1628 consistent_provider.find_block_by_hash(first_in_mem_block.hash(), BlockSource::Any)?,
1629 Some(first_in_mem_block.clone().into())
1630 );
1631 assert_eq!(
1632 consistent_provider
1633 .find_block_by_hash(first_in_mem_block.hash(), BlockSource::Canonical)?,
1634 Some(first_in_mem_block.clone().into())
1635 );
1636
1637 assert_eq!(
1639 consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Any)?,
1640 Some(first_db_block.clone().into())
1641 );
1642 assert_eq!(
1643 consistent_provider
1644 .find_block_by_hash(first_db_block.hash(), BlockSource::Canonical)?,
1645 Some(first_db_block.clone().into())
1646 );
1647
1648 assert_eq!(
1650 consistent_provider.find_block_by_hash(first_db_block.hash(), BlockSource::Pending)?,
1651 None
1652 );
1653
1654 provider.canonical_in_memory_state.set_pending_block(ExecutedBlock {
1656 block: Arc::new(last_in_mem_block.clone()),
1657 senders: Default::default(),
1658 execution_output: Default::default(),
1659 hashed_state: Default::default(),
1660 trie: Default::default(),
1661 });
1662
1663 assert_eq!(
1665 consistent_provider
1666 .find_block_by_hash(last_in_mem_block.hash(), BlockSource::Pending)?,
1667 Some(last_in_mem_block.clone().into())
1668 );
1669
1670 Ok(())
1671 }
1672
1673 #[test]
1674 fn test_block_reader_block() -> eyre::Result<()> {
1675 let mut rng = generators::rng();
1677 let factory = create_test_provider_factory();
1678
1679 let blocks = random_block_range(
1681 &mut rng,
1682 0..=10,
1683 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 0..1, ..Default::default() },
1684 );
1685 let (database_blocks, in_memory_blocks) = blocks.split_at(5);
1686
1687 let provider_rw = factory.provider_rw()?;
1689 for block in database_blocks {
1690 provider_rw.insert_historical_block(
1691 block.clone().seal_with_senders().expect("failed to seal block with senders"),
1692 )?;
1693 }
1694 provider_rw.commit()?;
1695
1696 let provider = BlockchainProvider2::new(factory)?;
1698 let consistent_provider = provider.consistent_provider()?;
1699
1700 let first_in_mem_block = in_memory_blocks.first().unwrap();
1702 let first_db_block = database_blocks.first().unwrap();
1704
1705 assert_eq!(
1707 consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1708 None
1709 );
1710 assert_eq!(
1711 consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1712 None
1713 );
1714
1715 let in_memory_block_senders =
1717 first_in_mem_block.senders().expect("failed to recover senders");
1718 let chain = NewCanonicalChain::Commit {
1719 new: vec![ExecutedBlock::new(
1720 Arc::new(first_in_mem_block.clone()),
1721 Arc::new(in_memory_block_senders),
1722 Default::default(),
1723 Default::default(),
1724 Default::default(),
1725 )],
1726 };
1727 consistent_provider.canonical_in_memory_state.update_chain(chain);
1728
1729 let consistent_provider = provider.consistent_provider()?;
1730
1731 assert_eq!(
1733 consistent_provider.block(BlockHashOrNumber::Hash(first_in_mem_block.hash()))?,
1734 Some(first_in_mem_block.clone().into())
1735 );
1736 assert_eq!(
1737 consistent_provider.block(BlockHashOrNumber::Number(first_in_mem_block.number))?,
1738 Some(first_in_mem_block.clone().into())
1739 );
1740
1741 assert_eq!(
1743 consistent_provider.block(BlockHashOrNumber::Hash(first_db_block.hash()))?,
1744 Some(first_db_block.clone().into())
1745 );
1746 assert_eq!(
1747 consistent_provider.block(BlockHashOrNumber::Number(first_db_block.number))?,
1748 Some(first_db_block.clone().into())
1749 );
1750
1751 Ok(())
1752 }
1753
1754 #[test]
1755 fn test_changeset_reader() -> eyre::Result<()> {
1756 let mut rng = generators::rng();
1757
1758 let (database_blocks, in_memory_blocks) =
1759 random_blocks(&mut rng, TEST_BLOCKS_COUNT, 1, None, None, 0..1);
1760
1761 let first_database_block = database_blocks.first().map(|block| block.number).unwrap();
1762 let last_database_block = database_blocks.last().map(|block| block.number).unwrap();
1763 let first_in_memory_block = in_memory_blocks.first().map(|block| block.number).unwrap();
1764
1765 let accounts = random_eoa_accounts(&mut rng, 2);
1766
1767 let (database_changesets, database_state) = random_changeset_range(
1768 &mut rng,
1769 &database_blocks,
1770 accounts.into_iter().map(|(address, account)| (address, (account, Vec::new()))),
1771 0..0,
1772 0..0,
1773 );
1774 let (in_memory_changesets, in_memory_state) = random_changeset_range(
1775 &mut rng,
1776 &in_memory_blocks,
1777 database_state
1778 .iter()
1779 .map(|(address, (account, storage))| (*address, (*account, storage.clone()))),
1780 0..0,
1781 0..0,
1782 );
1783
1784 let factory = create_test_provider_factory();
1785
1786 let provider_rw = factory.provider_rw()?;
1787 provider_rw.append_blocks_with_state(
1788 database_blocks
1789 .into_iter()
1790 .map(|b| b.seal_with_senders().expect("failed to seal block with senders"))
1791 .collect(),
1792 ExecutionOutcome {
1793 bundle: BundleState::new(
1794 database_state.into_iter().map(|(address, (account, _))| {
1795 (address, None, Some(account.into()), Default::default())
1796 }),
1797 database_changesets
1798 .iter()
1799 .map(|block_changesets| {
1800 block_changesets.iter().map(|(address, account, _)| {
1801 (*address, Some(Some((*account).into())), [])
1802 })
1803 })
1804 .collect::<Vec<_>>(),
1805 Vec::new(),
1806 ),
1807 first_block: first_database_block,
1808 ..Default::default()
1809 },
1810 Default::default(),
1811 Default::default(),
1812 )?;
1813 provider_rw.commit()?;
1814
1815 let provider = BlockchainProvider2::new(factory)?;
1816
1817 let in_memory_changesets = in_memory_changesets.into_iter().next().unwrap();
1818 let chain = NewCanonicalChain::Commit {
1819 new: vec![in_memory_blocks
1820 .first()
1821 .map(|block| {
1822 let senders = block.senders().expect("failed to recover senders");
1823 ExecutedBlock::new(
1824 Arc::new(block.clone()),
1825 Arc::new(senders),
1826 Arc::new(ExecutionOutcome {
1827 bundle: BundleState::new(
1828 in_memory_state.into_iter().map(|(address, (account, _))| {
1829 (address, None, Some(account.into()), Default::default())
1830 }),
1831 [in_memory_changesets.iter().map(|(address, account, _)| {
1832 (*address, Some(Some((*account).into())), Vec::new())
1833 })],
1834 [],
1835 ),
1836 first_block: first_in_memory_block,
1837 ..Default::default()
1838 }),
1839 Default::default(),
1840 Default::default(),
1841 )
1842 })
1843 .unwrap()],
1844 };
1845 provider.canonical_in_memory_state.update_chain(chain);
1846
1847 let consistent_provider = provider.consistent_provider()?;
1848
1849 assert_eq!(
1850 consistent_provider.account_block_changeset(last_database_block).unwrap(),
1851 database_changesets
1852 .into_iter()
1853 .last()
1854 .unwrap()
1855 .into_iter()
1856 .sorted_by_key(|(address, _, _)| *address)
1857 .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1858 .collect::<Vec<_>>()
1859 );
1860 assert_eq!(
1861 consistent_provider.account_block_changeset(first_in_memory_block).unwrap(),
1862 in_memory_changesets
1863 .into_iter()
1864 .sorted_by_key(|(address, _, _)| *address)
1865 .map(|(address, account, _)| AccountBeforeTx { address, info: Some(account) })
1866 .collect::<Vec<_>>()
1867 );
1868
1869 Ok(())
1870 }
1871}