1use crate::{
2 bundle_state::StorageRevertsIter,
3 providers::{
4 database::{chain::ChainStorage, metrics},
5 static_file::StaticFileWriter,
6 NodeTypesForProvider, StaticFileProvider,
7 },
8 to_range,
9 traits::{
10 AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
11 },
12 AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
13 BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
14 DBProvider, HashingWriter, HeaderProvider, HeaderSyncGapProvider, HistoricalStateProvider,
15 HistoricalStateProviderRef, HistoryWriter, LatestStateProvider, LatestStateProviderRef,
16 OriginalValuesKnown, ProviderError, PruneCheckpointReader, PruneCheckpointWriter, RevertsInit,
17 StageCheckpointReader, StateCommitmentProvider, StateProviderBox, StateWriter,
18 StaticFileProviderFactory, StatsReader, StorageLocation, StorageReader, StorageTrieWriter,
19 TransactionVariant, TransactionsProvider, TransactionsProviderExt, TrieWriter,
20 WithdrawalsProvider,
21};
22use alloy_consensus::{
23 transaction::{SignerRecoverable, TransactionMeta},
24 BlockHeader, Header, TxReceipt,
25};
26use alloy_eips::{eip2718::Encodable2718, eip4895::Withdrawals, BlockHashOrNumber};
27use alloy_primitives::{
28 keccak256,
29 map::{hash_map, B256Map, HashMap, HashSet},
30 Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256,
31};
32use itertools::Itertools;
33use rayon::slice::ParallelSliceMut;
34use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, EthereumHardforks};
35use reth_db_api::{
36 cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
37 database::Database,
38 models::{
39 sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
40 ShardedKey, StoredBlockBodyIndices,
41 },
42 table::Table,
43 tables,
44 transaction::{DbTx, DbTxMut},
45 BlockNumberList, DatabaseError, PlainAccountState, PlainStorageState,
46};
47use reth_execution_types::{Chain, ExecutionOutcome};
48use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
49use reth_primitives_traits::{
50 Account, Block as _, BlockBody as _, Bytecode, GotExpected, NodePrimitives, RecoveredBlock,
51 SealedBlock, SealedHeader, SignedTransaction, StorageEntry,
52};
53use reth_prune_types::{
54 PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_PRUNING_DISTANCE,
55};
56use reth_stages_types::{StageCheckpoint, StageId};
57use reth_static_file_types::StaticFileSegment;
58use reth_storage_api::{
59 BlockBodyIndicesProvider, BlockBodyReader, NodePrimitivesProvider, OmmersProvider,
60 StateProvider, StorageChangeSetReader, TryIntoHistoricalStateProvider,
61};
62use reth_storage_errors::provider::{ProviderResult, RootMismatch};
63use reth_trie::{
64 prefix_set::{PrefixSet, PrefixSetMut, TriePrefixSets},
65 updates::{StorageTrieUpdates, TrieUpdates},
66 HashedPostStateSorted, Nibbles, StateRoot, StoredNibbles,
67};
68use reth_trie_db::{DatabaseStateRoot, DatabaseStorageTrieCursor};
69use revm_database::states::{
70 PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
71};
72use std::{
73 cmp::Ordering,
74 collections::{BTreeMap, BTreeSet},
75 fmt::Debug,
76 ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive},
77 sync::{mpsc, Arc},
78};
79use tracing::{debug, trace};
80
81pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
83
84#[derive(Debug)]
89pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
90 pub DatabaseProvider<<DB as Database>::TXMut, N>,
91);
92
93impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
94 type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
95
96 fn deref(&self) -> &Self::Target {
97 &self.0
98 }
99}
100
101impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
102 fn deref_mut(&mut self) -> &mut Self::Target {
103 &mut self.0
104 }
105}
106
107impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
108 for DatabaseProviderRW<DB, N>
109{
110 fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
111 &self.0
112 }
113}
114
115impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
116 pub fn commit(self) -> ProviderResult<bool> {
118 self.0.commit()
119 }
120
121 pub fn into_tx(self) -> <DB as Database>::TXMut {
123 self.0.into_tx()
124 }
125}
126
127impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
128 for DatabaseProvider<<DB as Database>::TXMut, N>
129{
130 fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
131 provider.0
132 }
133}
134
135#[derive(Debug)]
138pub struct DatabaseProvider<TX, N: NodeTypes> {
139 tx: TX,
141 chain_spec: Arc<N::ChainSpec>,
143 static_file_provider: StaticFileProvider<N::Primitives>,
145 prune_modes: PruneModes,
147 storage: Arc<N::Storage>,
149}
150
151impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
152 pub const fn prune_modes_ref(&self) -> &PruneModes {
154 &self.prune_modes
155 }
156}
157
158impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
159 pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
161 trace!(target: "providers::db", "Returning latest state provider");
162 Box::new(LatestStateProviderRef::new(self))
163 }
164
165 pub fn history_by_block_hash<'a>(
167 &'a self,
168 block_hash: BlockHash,
169 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
170 let mut block_number =
171 self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
172 if block_number == self.best_block_number().unwrap_or_default() &&
173 block_number == self.last_block_number().unwrap_or_default()
174 {
175 return Ok(Box::new(LatestStateProviderRef::new(self)))
176 }
177
178 block_number += 1;
180
181 let account_history_prune_checkpoint =
182 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
183 let storage_history_prune_checkpoint =
184 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
185
186 let mut state_provider = HistoricalStateProviderRef::new(self, block_number);
187
188 if let Some(prune_checkpoint_block_number) =
191 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
192 {
193 state_provider = state_provider.with_lowest_available_account_history_block_number(
194 prune_checkpoint_block_number + 1,
195 );
196 }
197 if let Some(prune_checkpoint_block_number) =
198 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
199 {
200 state_provider = state_provider.with_lowest_available_storage_history_block_number(
201 prune_checkpoint_block_number + 1,
202 );
203 }
204
205 Ok(Box::new(state_provider))
206 }
207
208 #[cfg(feature = "test-utils")]
209 pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
211 self.prune_modes = prune_modes;
212 }
213}
214
215impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
216 type Primitives = N::Primitives;
217}
218
219impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
220 fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
222 self.static_file_provider.clone()
223 }
224}
225
226impl<TX: Debug + Send + Sync, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
227 for DatabaseProvider<TX, N>
228{
229 type ChainSpec = N::ChainSpec;
230
231 fn chain_spec(&self) -> Arc<Self::ChainSpec> {
232 self.chain_spec.clone()
233 }
234}
235
236impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
237 pub const fn new_rw(
239 tx: TX,
240 chain_spec: Arc<N::ChainSpec>,
241 static_file_provider: StaticFileProvider<N::Primitives>,
242 prune_modes: PruneModes,
243 storage: Arc<N::Storage>,
244 ) -> Self {
245 Self { tx, chain_spec, static_file_provider, prune_modes, storage }
246 }
247}
248
249impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
250 fn as_ref(&self) -> &Self {
251 self
252 }
253}
254
255impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
256 pub fn unwind_trie_state_range(
261 &self,
262 range: RangeInclusive<BlockNumber>,
263 ) -> ProviderResult<()> {
264 let changed_accounts = self
265 .tx
266 .cursor_read::<tables::AccountChangeSets>()?
267 .walk_range(range.clone())?
268 .collect::<Result<Vec<_>, _>>()?;
269
270 let hashed_addresses = self.unwind_account_hashing(changed_accounts.iter())?;
272 let mut account_prefix_set = PrefixSetMut::with_capacity(hashed_addresses.len());
273 let mut destroyed_accounts = HashSet::default();
274 for (hashed_address, account) in hashed_addresses {
275 account_prefix_set.insert(Nibbles::unpack(hashed_address));
276 if account.is_none() {
277 destroyed_accounts.insert(hashed_address);
278 }
279 }
280
281 self.unwind_account_history_indices(changed_accounts.iter())?;
283 let storage_range = BlockNumberAddress::range(range.clone());
284
285 let changed_storages = self
286 .tx
287 .cursor_read::<tables::StorageChangeSets>()?
288 .walk_range(storage_range)?
289 .collect::<Result<Vec<_>, _>>()?;
290
291 let mut storage_prefix_sets = B256Map::<PrefixSet>::default();
294 let storage_entries = self.unwind_storage_hashing(changed_storages.iter().copied())?;
295 for (hashed_address, hashed_slots) in storage_entries {
296 account_prefix_set.insert(Nibbles::unpack(hashed_address));
297 let mut storage_prefix_set = PrefixSetMut::with_capacity(hashed_slots.len());
298 for slot in hashed_slots {
299 storage_prefix_set.insert(Nibbles::unpack(slot));
300 }
301 storage_prefix_sets.insert(hashed_address, storage_prefix_set.freeze());
302 }
303
304 self.unwind_storage_history_indices(changed_storages.iter().copied())?;
306
307 let prefix_sets = TriePrefixSets {
311 account_prefix_set: account_prefix_set.freeze(),
312 storage_prefix_sets,
313 destroyed_accounts,
314 };
315 let (new_state_root, trie_updates) = StateRoot::from_tx(&self.tx)
316 .with_prefix_sets(prefix_sets)
317 .root_with_updates()
318 .map_err(reth_db_api::DatabaseError::from)?;
319
320 let parent_number = range.start().saturating_sub(1);
321 let parent_state_root = self
322 .header_by_number(parent_number)?
323 .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?
324 .state_root();
325
326 if new_state_root != parent_state_root {
329 let parent_hash = self
330 .block_hash(parent_number)?
331 .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?;
332 return Err(ProviderError::UnwindStateRootMismatch(Box::new(RootMismatch {
333 root: GotExpected { got: new_state_root, expected: parent_state_root },
334 block_number: parent_number,
335 block_hash: parent_hash,
336 })))
337 }
338 self.write_trie_updates(&trie_updates)?;
339
340 Ok(())
341 }
342
343 fn remove_receipts_from(
345 &self,
346 from_tx: TxNumber,
347 last_block: BlockNumber,
348 remove_from: StorageLocation,
349 ) -> ProviderResult<()> {
350 if remove_from.database() {
351 self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
353 }
354
355 if remove_from.static_files() && !self.prune_modes.has_receipts_pruning() {
356 let static_file_receipt_num =
357 self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
358
359 let to_delete = static_file_receipt_num
360 .map(|static_num| (static_num + 1).saturating_sub(from_tx))
361 .unwrap_or_default();
362
363 self.static_file_provider
364 .latest_writer(StaticFileSegment::Receipts)?
365 .prune_receipts(to_delete, last_block)?;
366 }
367
368 Ok(())
369 }
370}
371
372impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
373 fn try_into_history_at_block(
374 self,
375 mut block_number: BlockNumber,
376 ) -> ProviderResult<StateProviderBox> {
377 if block_number == self.best_block_number().unwrap_or_default() {
380 return Ok(Box::new(LatestStateProvider::new(self)))
381 }
382
383 block_number += 1;
385
386 let account_history_prune_checkpoint =
387 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
388 let storage_history_prune_checkpoint =
389 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
390
391 let mut state_provider = HistoricalStateProvider::new(self, block_number);
392
393 if let Some(prune_checkpoint_block_number) =
396 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
397 {
398 state_provider = state_provider.with_lowest_available_account_history_block_number(
399 prune_checkpoint_block_number + 1,
400 );
401 }
402 if let Some(prune_checkpoint_block_number) =
403 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
404 {
405 state_provider = state_provider.with_lowest_available_storage_history_block_number(
406 prune_checkpoint_block_number + 1,
407 );
408 }
409
410 Ok(Box::new(state_provider))
411 }
412}
413
414impl<TX: DbTx + 'static, N: NodeTypes> StateCommitmentProvider for DatabaseProvider<TX, N> {
415 type StateCommitment = N::StateCommitment;
416}
417
418impl<
419 Tx: DbTx + DbTxMut + 'static,
420 N: NodeTypesForProvider<Primitives: NodePrimitives<BlockHeader = Header>>,
421 > DatabaseProvider<Tx, N>
422{
423 pub fn insert_historical_block(
427 &self,
428 block: RecoveredBlock<<Self as BlockWriter>::Block>,
429 ) -> ProviderResult<StoredBlockBodyIndices> {
430 let ttd = if block.number() == 0 {
431 block.header().difficulty()
432 } else {
433 let parent_block_number = block.number() - 1;
434 let parent_ttd = self.header_td_by_number(parent_block_number)?.unwrap_or_default();
435 parent_ttd + block.header().difficulty()
436 };
437
438 let mut writer = self.static_file_provider.latest_writer(StaticFileSegment::Headers)?;
439
440 let segment_header = writer.user_header();
442 if segment_header.block_end().is_none() && segment_header.expected_block_start() == 0 {
443 for block_number in 0..block.number() {
444 let mut prev = block.clone_header();
445 prev.number = block_number;
446 writer.append_header(&prev, U256::ZERO, &B256::ZERO)?;
447 }
448 }
449
450 writer.append_header(block.header(), ttd, &block.hash())?;
451
452 self.insert_block(block, StorageLocation::Database)
453 }
454}
455
456fn unwind_history_shards<S, T, C>(
469 cursor: &mut C,
470 start_key: T::Key,
471 block_number: BlockNumber,
472 mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
473) -> ProviderResult<Vec<u64>>
474where
475 T: Table<Value = BlockNumberList>,
476 T::Key: AsRef<ShardedKey<S>>,
477 C: DbCursorRO<T> + DbCursorRW<T>,
478{
479 let mut item = cursor.seek_exact(start_key)?;
480 while let Some((sharded_key, list)) = item {
481 if !shard_belongs_to_key(&sharded_key) {
483 break
484 }
485 cursor.delete_current()?;
486
487 let first = list.iter().next().expect("List can't be empty");
490 if first >= block_number {
491 item = cursor.prev()?;
492 continue
493 } else if block_number <= sharded_key.as_ref().highest_block_number {
494 return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
496 }
497 return Ok(list.iter().collect::<Vec<_>>())
498 }
499
500 Ok(Vec::new())
501}
502
503impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
504 pub const fn new(
506 tx: TX,
507 chain_spec: Arc<N::ChainSpec>,
508 static_file_provider: StaticFileProvider<N::Primitives>,
509 prune_modes: PruneModes,
510 storage: Arc<N::Storage>,
511 ) -> Self {
512 Self { tx, chain_spec, static_file_provider, prune_modes, storage }
513 }
514
515 pub fn into_tx(self) -> TX {
517 self.tx
518 }
519
520 pub const fn tx_mut(&mut self) -> &mut TX {
522 &mut self.tx
523 }
524
525 pub const fn tx_ref(&self) -> &TX {
527 &self.tx
528 }
529
530 pub fn chain_spec(&self) -> &N::ChainSpec {
532 &self.chain_spec
533 }
534}
535
536impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
537 fn transactions_by_tx_range_with_cursor<C>(
538 &self,
539 range: impl RangeBounds<TxNumber>,
540 cursor: &mut C,
541 ) -> ProviderResult<Vec<TxTy<N>>>
542 where
543 C: DbCursorRO<tables::Transactions<TxTy<N>>>,
544 {
545 self.static_file_provider.get_range_with_static_file_or_database(
546 StaticFileSegment::Transactions,
547 to_range(range),
548 |static_file, range, _| static_file.transactions_by_tx_range(range),
549 |range, _| self.cursor_collect(cursor, range),
550 |_| true,
551 )
552 }
553
554 fn recovered_block<H, HF, B, BF>(
555 &self,
556 id: BlockHashOrNumber,
557 _transaction_kind: TransactionVariant,
558 header_by_number: HF,
559 construct_block: BF,
560 ) -> ProviderResult<Option<B>>
561 where
562 H: AsRef<HeaderTy<N>>,
563 HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
564 BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
565 {
566 let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
567 let Some(header) = header_by_number(block_number)? else { return Ok(None) };
568
569 let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
576
577 let tx_range = body.tx_num_range();
578
579 let (transactions, senders) = if tx_range.is_empty() {
580 (vec![], vec![])
581 } else {
582 (self.transactions_by_tx_range(tx_range.clone())?, self.senders_by_tx_range(tx_range)?)
583 };
584
585 let body = self
586 .storage
587 .reader()
588 .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
589 .pop()
590 .ok_or(ProviderError::InvalidStorageOutput)?;
591
592 construct_block(header, body, senders)
593 }
594
595 fn block_range<F, H, HF, R>(
605 &self,
606 range: RangeInclusive<BlockNumber>,
607 headers_range: HF,
608 mut assemble_block: F,
609 ) -> ProviderResult<Vec<R>>
610 where
611 H: AsRef<HeaderTy<N>>,
612 HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
613 F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
614 {
615 if range.is_empty() {
616 return Ok(Vec::new())
617 }
618
619 let len = range.end().saturating_sub(*range.start()) as usize;
620 let mut blocks = Vec::with_capacity(len);
621
622 let headers = headers_range(range.clone())?;
623 let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<TxTy<N>>>()?;
624
625 let present_headers = self
631 .block_body_indices_range(range)?
632 .into_iter()
633 .map(|b| b.tx_num_range())
634 .zip(headers)
635 .collect::<Vec<_>>();
636
637 let mut inputs = Vec::new();
638 for (tx_range, header) in &present_headers {
639 let transactions = if tx_range.is_empty() {
640 Vec::new()
641 } else {
642 self.transactions_by_tx_range_with_cursor(tx_range.clone(), &mut tx_cursor)?
643 };
644
645 inputs.push((header.as_ref(), transactions));
646 }
647
648 let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
649
650 for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
651 blocks.push(assemble_block(header, body, tx_range)?);
652 }
653
654 Ok(blocks)
655 }
656
657 fn block_with_senders_range<H, HF, B, BF>(
668 &self,
669 range: RangeInclusive<BlockNumber>,
670 headers_range: HF,
671 assemble_block: BF,
672 ) -> ProviderResult<Vec<B>>
673 where
674 H: AsRef<HeaderTy<N>>,
675 HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
676 BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
677 {
678 let mut senders_cursor = self.tx.cursor_read::<tables::TransactionSenders>()?;
679
680 self.block_range(range, headers_range, |header, body, tx_range| {
681 let senders = if tx_range.is_empty() {
682 Vec::new()
683 } else {
684 let known_senders =
686 senders_cursor
687 .walk_range(tx_range.clone())?
688 .collect::<Result<HashMap<_, _>, _>>()?;
689
690 let mut senders = Vec::with_capacity(body.transactions().len());
691 for (tx_num, tx) in tx_range.zip(body.transactions()) {
692 match known_senders.get(&tx_num) {
693 None => {
694 let sender = tx.recover_signer_unchecked()?;
696 senders.push(sender);
697 }
698 Some(sender) => senders.push(*sender),
699 }
700 }
701
702 senders
703 };
704
705 assemble_block(header, body, senders)
706 })
707 }
708
709 fn populate_bundle_state<A, S>(
713 &self,
714 account_changeset: Vec<(u64, AccountBeforeTx)>,
715 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
716 plain_accounts_cursor: &mut A,
717 plain_storage_cursor: &mut S,
718 ) -> ProviderResult<(BundleStateInit, RevertsInit)>
719 where
720 A: DbCursorRO<PlainAccountState>,
721 S: DbDupCursorRO<PlainStorageState>,
722 {
723 let mut state: BundleStateInit = HashMap::default();
727
728 let mut reverts: RevertsInit = HashMap::default();
734
735 for (block_number, account_before) in account_changeset.into_iter().rev() {
737 let AccountBeforeTx { info: old_info, address } = account_before;
738 match state.entry(address) {
739 hash_map::Entry::Vacant(entry) => {
740 let new_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
741 entry.insert((old_info, new_info, HashMap::default()));
742 }
743 hash_map::Entry::Occupied(mut entry) => {
744 entry.get_mut().0 = old_info;
746 }
747 }
748 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
750 }
751
752 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
754 let BlockNumberAddress((block_number, address)) = block_and_address;
755 let account_state = match state.entry(address) {
757 hash_map::Entry::Vacant(entry) => {
758 let present_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
759 entry.insert((present_info, present_info, HashMap::default()))
760 }
761 hash_map::Entry::Occupied(entry) => entry.into_mut(),
762 };
763
764 match account_state.2.entry(old_storage.key) {
766 hash_map::Entry::Vacant(entry) => {
767 let new_storage = plain_storage_cursor
768 .seek_by_key_subkey(address, old_storage.key)?
769 .filter(|storage| storage.key == old_storage.key)
770 .unwrap_or_default();
771 entry.insert((old_storage.value, new_storage.value));
772 }
773 hash_map::Entry::Occupied(mut entry) => {
774 entry.get_mut().0 = old_storage.value;
775 }
776 };
777
778 reverts
779 .entry(block_number)
780 .or_default()
781 .entry(address)
782 .or_default()
783 .1
784 .push(old_storage);
785 }
786
787 Ok((state, reverts))
788 }
789}
790
791impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
792 pub fn commit(self) -> ProviderResult<bool> {
794 Ok(self.tx.commit()?)
795 }
796
797 fn take_shard<T>(
800 &self,
801 cursor: &mut <TX as DbTxMut>::CursorMut<T>,
802 key: T::Key,
803 ) -> ProviderResult<Vec<u64>>
804 where
805 T: Table<Value = BlockNumberList>,
806 {
807 if let Some((_, list)) = cursor.seek_exact(key)? {
808 cursor.delete_current()?;
810 let list = list.iter().collect::<Vec<_>>();
811 return Ok(list)
812 }
813 Ok(Vec::new())
814 }
815
816 fn append_history_index<P, T>(
824 &self,
825 index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
826 mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
827 ) -> ProviderResult<()>
828 where
829 P: Copy,
830 T: Table<Value = BlockNumberList>,
831 {
832 let mut cursor = self.tx.cursor_write::<T>()?;
833 for (partial_key, indices) in index_updates {
834 let mut last_shard =
835 self.take_shard::<T>(&mut cursor, sharded_key_factory(partial_key, u64::MAX))?;
836 last_shard.extend(indices);
837 let mut chunks = last_shard.chunks(sharded_key::NUM_OF_INDICES_IN_SHARD).peekable();
839 while let Some(list) = chunks.next() {
840 let highest_block_number = if chunks.peek().is_some() {
841 *list.last().expect("`chunks` does not return empty list")
842 } else {
843 u64::MAX
845 };
846 cursor.insert(
847 sharded_key_factory(partial_key, highest_block_number),
848 &BlockNumberList::new_pre_sorted(list.iter().copied()),
849 )?;
850 }
851 }
852 Ok(())
853 }
854}
855
856impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
857 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
858 Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
859 }
860}
861
862impl<TX: DbTx, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
863 fn changed_accounts_with_range(
864 &self,
865 range: impl RangeBounds<BlockNumber>,
866 ) -> ProviderResult<BTreeSet<Address>> {
867 self.tx
868 .cursor_read::<tables::AccountChangeSets>()?
869 .walk_range(range)?
870 .map(|entry| {
871 entry.map(|(_, account_before)| account_before.address).map_err(Into::into)
872 })
873 .collect()
874 }
875
876 fn basic_accounts(
877 &self,
878 iter: impl IntoIterator<Item = Address>,
879 ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
880 let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
881 Ok(iter
882 .into_iter()
883 .map(|address| plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v))))
884 .collect::<Result<Vec<_>, _>>()?)
885 }
886
887 fn changed_accounts_and_blocks_with_range(
888 &self,
889 range: RangeInclusive<BlockNumber>,
890 ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
891 let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
892
893 let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
894 BTreeMap::new(),
895 |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
896 let (index, account) = entry?;
897 accounts.entry(account.address).or_default().push(index);
898 Ok(accounts)
899 },
900 )?;
901
902 Ok(account_transitions)
903 }
904}
905
906impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
907 fn storage_changeset(
908 &self,
909 block_number: BlockNumber,
910 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
911 let range = block_number..=block_number;
912 let storage_range = BlockNumberAddress::range(range);
913 self.tx
914 .cursor_dup_read::<tables::StorageChangeSets>()?
915 .walk_range(storage_range)?
916 .map(|result| -> ProviderResult<_> { Ok(result?) })
917 .collect()
918 }
919}
920
921impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
922 fn account_block_changeset(
923 &self,
924 block_number: BlockNumber,
925 ) -> ProviderResult<Vec<AccountBeforeTx>> {
926 let range = block_number..=block_number;
927 self.tx
928 .cursor_read::<tables::AccountChangeSets>()?
929 .walk_range(range)?
930 .map(|result| -> ProviderResult<_> {
931 let (_, account_before) = result?;
932 Ok(account_before)
933 })
934 .collect()
935 }
936}
937
938impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
939 for DatabaseProvider<TX, N>
940{
941 type Header = HeaderTy<N>;
942
943 fn local_tip_header(
944 &self,
945 highest_uninterrupted_block: BlockNumber,
946 ) -> ProviderResult<SealedHeader<Self::Header>> {
947 let static_file_provider = self.static_file_provider();
948
949 let next_static_file_block_num = static_file_provider
952 .get_highest_static_file_block(StaticFileSegment::Headers)
953 .map(|id| id + 1)
954 .unwrap_or_default();
955 let next_block = highest_uninterrupted_block + 1;
956
957 match next_static_file_block_num.cmp(&next_block) {
958 Ordering::Greater => {
961 let mut static_file_producer =
962 static_file_provider.latest_writer(StaticFileSegment::Headers)?;
963 static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
964 static_file_producer.commit()?
967 }
968 Ordering::Less => {
969 return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
971 }
972 Ordering::Equal => {}
973 }
974
975 let local_head = static_file_provider
976 .sealed_header(highest_uninterrupted_block)?
977 .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
978
979 Ok(local_head)
980 }
981}
982
983impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
984 type Header = HeaderTy<N>;
985
986 fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
987 if let Some(num) = self.block_number(*block_hash)? {
988 Ok(self.header_by_number(num)?)
989 } else {
990 Ok(None)
991 }
992 }
993
994 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
995 self.static_file_provider.get_with_static_file_or_database(
996 StaticFileSegment::Headers,
997 num,
998 |static_file| static_file.header_by_number(num),
999 || Ok(self.tx.get::<tables::Headers<Self::Header>>(num)?),
1000 )
1001 }
1002
1003 fn header_td(&self, block_hash: &BlockHash) -> ProviderResult<Option<U256>> {
1004 if let Some(num) = self.block_number(*block_hash)? {
1005 self.header_td_by_number(num)
1006 } else {
1007 Ok(None)
1008 }
1009 }
1010
1011 fn header_td_by_number(&self, number: BlockNumber) -> ProviderResult<Option<U256>> {
1012 if self.chain_spec.is_paris_active_at_block(number) {
1013 if let Some(td) = self.chain_spec.final_paris_total_difficulty() {
1014 return Ok(Some(td))
1017 }
1018 }
1019
1020 self.static_file_provider.get_with_static_file_or_database(
1021 StaticFileSegment::Headers,
1022 number,
1023 |static_file| static_file.header_td_by_number(number),
1024 || Ok(self.tx.get::<tables::HeaderTerminalDifficulties>(number)?.map(|td| td.0)),
1025 )
1026 }
1027
1028 fn headers_range(
1029 &self,
1030 range: impl RangeBounds<BlockNumber>,
1031 ) -> ProviderResult<Vec<Self::Header>> {
1032 self.static_file_provider.get_range_with_static_file_or_database(
1033 StaticFileSegment::Headers,
1034 to_range(range),
1035 |static_file, range, _| static_file.headers_range(range),
1036 |range, _| self.cursor_read_collect::<tables::Headers<Self::Header>>(range),
1037 |_| true,
1038 )
1039 }
1040
1041 fn sealed_header(
1042 &self,
1043 number: BlockNumber,
1044 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1045 self.static_file_provider.get_with_static_file_or_database(
1046 StaticFileSegment::Headers,
1047 number,
1048 |static_file| static_file.sealed_header(number),
1049 || {
1050 if let Some(header) = self.header_by_number(number)? {
1051 let hash = self
1052 .block_hash(number)?
1053 .ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
1054 Ok(Some(SealedHeader::new(header, hash)))
1055 } else {
1056 Ok(None)
1057 }
1058 },
1059 )
1060 }
1061
1062 fn sealed_headers_while(
1063 &self,
1064 range: impl RangeBounds<BlockNumber>,
1065 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1066 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1067 self.static_file_provider.get_range_with_static_file_or_database(
1068 StaticFileSegment::Headers,
1069 to_range(range),
1070 |static_file, range, predicate| static_file.sealed_headers_while(range, predicate),
1071 |range, mut predicate| {
1072 let mut headers = vec![];
1073 for entry in
1074 self.tx.cursor_read::<tables::Headers<Self::Header>>()?.walk_range(range)?
1075 {
1076 let (number, header) = entry?;
1077 let hash = self
1078 .block_hash(number)?
1079 .ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
1080 let sealed = SealedHeader::new(header, hash);
1081 if !predicate(&sealed) {
1082 break
1083 }
1084 headers.push(sealed);
1085 }
1086 Ok(headers)
1087 },
1088 predicate,
1089 )
1090 }
1091}
1092
1093impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1094 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1095 self.static_file_provider.get_with_static_file_or_database(
1096 StaticFileSegment::Headers,
1097 number,
1098 |static_file| static_file.block_hash(number),
1099 || Ok(self.tx.get::<tables::CanonicalHeaders>(number)?),
1100 )
1101 }
1102
1103 fn canonical_hashes_range(
1104 &self,
1105 start: BlockNumber,
1106 end: BlockNumber,
1107 ) -> ProviderResult<Vec<B256>> {
1108 self.static_file_provider.get_range_with_static_file_or_database(
1109 StaticFileSegment::Headers,
1110 start..end,
1111 |static_file, range, _| static_file.canonical_hashes_range(range.start, range.end),
1112 |range, _| self.cursor_read_collect::<tables::CanonicalHeaders>(range),
1113 |_| true,
1114 )
1115 }
1116}
1117
1118impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1119 fn chain_info(&self) -> ProviderResult<ChainInfo> {
1120 let best_number = self.best_block_number()?;
1121 let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1122 Ok(ChainInfo { best_hash, best_number })
1123 }
1124
1125 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1126 Ok(self
1129 .get_stage_checkpoint(StageId::Finish)?
1130 .map(|checkpoint| checkpoint.block_number)
1131 .unwrap_or_default())
1132 }
1133
1134 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1135 Ok(self
1136 .tx
1137 .cursor_read::<tables::CanonicalHeaders>()?
1138 .last()?
1139 .map(|(num, _)| num)
1140 .max(
1141 self.static_file_provider.get_highest_static_file_block(StaticFileSegment::Headers),
1142 )
1143 .unwrap_or_default())
1144 }
1145
1146 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1147 Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1148 }
1149}
1150
1151impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1152 type Block = BlockTy<N>;
1153
1154 fn find_block_by_hash(
1155 &self,
1156 hash: B256,
1157 source: BlockSource,
1158 ) -> ProviderResult<Option<Self::Block>> {
1159 if source.is_canonical() {
1160 self.block(hash.into())
1161 } else {
1162 Ok(None)
1163 }
1164 }
1165
1166 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1172 if let Some(number) = self.convert_hash_or_number(id)? {
1173 if let Some(header) = self.header_by_number(number)? {
1174 let Some(transactions) = self.transactions_by_block(number.into())? else {
1179 return Ok(None)
1180 };
1181
1182 let body = self
1183 .storage
1184 .reader()
1185 .read_block_bodies(self, vec![(&header, transactions)])?
1186 .pop()
1187 .ok_or(ProviderError::InvalidStorageOutput)?;
1188
1189 return Ok(Some(Self::Block::new(header, body)))
1190 }
1191 }
1192
1193 Ok(None)
1194 }
1195
1196 fn pending_block(&self) -> ProviderResult<Option<SealedBlock<Self::Block>>> {
1197 Ok(None)
1198 }
1199
1200 fn pending_block_with_senders(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1201 Ok(None)
1202 }
1203
1204 fn pending_block_and_receipts(
1205 &self,
1206 ) -> ProviderResult<Option<(SealedBlock<Self::Block>, Vec<Self::Receipt>)>> {
1207 Ok(None)
1208 }
1209
1210 fn recovered_block(
1219 &self,
1220 id: BlockHashOrNumber,
1221 transaction_kind: TransactionVariant,
1222 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1223 self.recovered_block(
1224 id,
1225 transaction_kind,
1226 |block_number| self.header_by_number(block_number),
1227 |header, body, senders| {
1228 Self::Block::new(header, body)
1229 .try_into_recovered_unchecked(senders)
1233 .map(Some)
1234 .map_err(|_| ProviderError::SenderRecoveryError)
1235 },
1236 )
1237 }
1238
1239 fn sealed_block_with_senders(
1240 &self,
1241 id: BlockHashOrNumber,
1242 transaction_kind: TransactionVariant,
1243 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1244 self.recovered_block(
1245 id,
1246 transaction_kind,
1247 |block_number| self.sealed_header(block_number),
1248 |header, body, senders| {
1249 Self::Block::new_sealed(header, body)
1250 .try_with_senders_unchecked(senders)
1254 .map(Some)
1255 .map_err(|_| ProviderError::SenderRecoveryError)
1256 },
1257 )
1258 }
1259
1260 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1261 self.block_range(
1262 range,
1263 |range| self.headers_range(range),
1264 |header, body, _| Ok(Self::Block::new(header, body)),
1265 )
1266 }
1267
1268 fn block_with_senders_range(
1269 &self,
1270 range: RangeInclusive<BlockNumber>,
1271 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1272 self.block_with_senders_range(
1273 range,
1274 |range| self.headers_range(range),
1275 |header, body, senders| {
1276 Self::Block::new(header, body)
1277 .try_into_recovered_unchecked(senders)
1278 .map_err(|_| ProviderError::SenderRecoveryError)
1279 },
1280 )
1281 }
1282
1283 fn recovered_block_range(
1284 &self,
1285 range: RangeInclusive<BlockNumber>,
1286 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1287 self.block_with_senders_range(
1288 range,
1289 |range| self.sealed_headers_range(range),
1290 |header, body, senders| {
1291 Self::Block::new_sealed(header, body)
1292 .try_with_senders(senders)
1293 .map_err(|_| ProviderError::SenderRecoveryError)
1294 },
1295 )
1296 }
1297}
1298
1299impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1300 for DatabaseProvider<TX, N>
1301{
1302 fn transaction_hashes_by_range(
1305 &self,
1306 tx_range: Range<TxNumber>,
1307 ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1308 self.static_file_provider.get_range_with_static_file_or_database(
1309 StaticFileSegment::Transactions,
1310 tx_range,
1311 |static_file, range, _| static_file.transaction_hashes_by_range(range),
1312 |tx_range, _| {
1313 let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<TxTy<N>>>()?;
1314 let tx_range_size = tx_range.clone().count();
1315 let tx_walker = tx_cursor.walk_range(tx_range)?;
1316
1317 let chunk_size = (tx_range_size / rayon::current_num_threads()).max(1);
1318 let mut channels = Vec::with_capacity(chunk_size);
1319 let mut transaction_count = 0;
1320
1321 #[inline]
1322 fn calculate_hash<T>(
1323 entry: Result<(TxNumber, T), DatabaseError>,
1324 rlp_buf: &mut Vec<u8>,
1325 ) -> Result<(B256, TxNumber), Box<ProviderError>>
1326 where
1327 T: Encodable2718,
1328 {
1329 let (tx_id, tx) = entry.map_err(|e| Box::new(e.into()))?;
1330 tx.encode_2718(rlp_buf);
1331 Ok((keccak256(rlp_buf), tx_id))
1332 }
1333
1334 for chunk in &tx_walker.chunks(chunk_size) {
1335 let (tx, rx) = mpsc::channel();
1336 channels.push(rx);
1337
1338 let chunk: Vec<_> = chunk.collect();
1341 transaction_count += chunk.len();
1342
1343 rayon::spawn(move || {
1347 let mut rlp_buf = Vec::with_capacity(128);
1348 for entry in chunk {
1349 rlp_buf.clear();
1350 let _ = tx.send(calculate_hash(entry, &mut rlp_buf));
1351 }
1352 });
1353 }
1354 let mut tx_list = Vec::with_capacity(transaction_count);
1355
1356 for channel in channels {
1358 while let Ok(tx) = channel.recv() {
1359 let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
1360 tx_list.push((tx_hash, tx_id));
1361 }
1362 }
1363
1364 Ok(tx_list)
1365 },
1366 |_| true,
1367 )
1368 }
1369}
1370
1371impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1373 type Transaction = TxTy<N>;
1374
1375 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1376 Ok(self.tx.get::<tables::TransactionHashNumbers>(tx_hash)?)
1377 }
1378
1379 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1380 self.static_file_provider.get_with_static_file_or_database(
1381 StaticFileSegment::Transactions,
1382 id,
1383 |static_file| static_file.transaction_by_id(id),
1384 || Ok(self.tx.get::<tables::Transactions<Self::Transaction>>(id)?),
1385 )
1386 }
1387
1388 fn transaction_by_id_unhashed(
1389 &self,
1390 id: TxNumber,
1391 ) -> ProviderResult<Option<Self::Transaction>> {
1392 self.static_file_provider.get_with_static_file_or_database(
1393 StaticFileSegment::Transactions,
1394 id,
1395 |static_file| static_file.transaction_by_id_unhashed(id),
1396 || Ok(self.tx.get::<tables::Transactions<Self::Transaction>>(id)?),
1397 )
1398 }
1399
1400 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1401 if let Some(id) = self.transaction_id(hash)? {
1402 Ok(self.transaction_by_id_unhashed(id)?)
1403 } else {
1404 Ok(None)
1405 }
1406 }
1407
1408 fn transaction_by_hash_with_meta(
1409 &self,
1410 tx_hash: TxHash,
1411 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1412 let mut transaction_cursor = self.tx.cursor_read::<tables::TransactionBlocks>()?;
1413 if let Some(transaction_id) = self.transaction_id(tx_hash)? {
1414 if let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? {
1415 if let Some(block_number) =
1416 transaction_cursor.seek(transaction_id).map(|b| b.map(|(_, bn)| bn))?
1417 {
1418 if let Some(sealed_header) = self.sealed_header(block_number)? {
1419 let (header, block_hash) = sealed_header.split();
1420 if let Some(block_body) = self.block_body_indices(block_number)? {
1421 let index = transaction_id - block_body.first_tx_num();
1426
1427 let meta = TransactionMeta {
1428 tx_hash,
1429 index,
1430 block_hash,
1431 block_number,
1432 base_fee: header.base_fee_per_gas(),
1433 excess_blob_gas: header.excess_blob_gas(),
1434 timestamp: header.timestamp(),
1435 };
1436
1437 return Ok(Some((transaction, meta)))
1438 }
1439 }
1440 }
1441 }
1442 }
1443
1444 Ok(None)
1445 }
1446
1447 fn transaction_block(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1448 let mut cursor = self.tx.cursor_read::<tables::TransactionBlocks>()?;
1449 Ok(cursor.seek(id)?.map(|(_, bn)| bn))
1450 }
1451
1452 fn transactions_by_block(
1453 &self,
1454 id: BlockHashOrNumber,
1455 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1456 let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<Self::Transaction>>()?;
1457
1458 if let Some(block_number) = self.convert_hash_or_number(id)? {
1459 if let Some(body) = self.block_body_indices(block_number)? {
1460 let tx_range = body.tx_num_range();
1461 return if tx_range.is_empty() {
1462 Ok(Some(Vec::new()))
1463 } else {
1464 Ok(Some(self.transactions_by_tx_range_with_cursor(tx_range, &mut tx_cursor)?))
1465 }
1466 }
1467 }
1468 Ok(None)
1469 }
1470
1471 fn transactions_by_block_range(
1472 &self,
1473 range: impl RangeBounds<BlockNumber>,
1474 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1475 let range = to_range(range);
1476 let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<Self::Transaction>>()?;
1477
1478 self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
1479 .into_iter()
1480 .map(|body| {
1481 let tx_num_range = body.tx_num_range();
1482 if tx_num_range.is_empty() {
1483 Ok(Vec::new())
1484 } else {
1485 Ok(self
1486 .transactions_by_tx_range_with_cursor(tx_num_range, &mut tx_cursor)?
1487 .into_iter()
1488 .collect())
1489 }
1490 })
1491 .collect()
1492 }
1493
1494 fn transactions_by_tx_range(
1495 &self,
1496 range: impl RangeBounds<TxNumber>,
1497 ) -> ProviderResult<Vec<Self::Transaction>> {
1498 self.transactions_by_tx_range_with_cursor(
1499 range,
1500 &mut self.tx.cursor_read::<tables::Transactions<_>>()?,
1501 )
1502 }
1503
1504 fn senders_by_tx_range(
1505 &self,
1506 range: impl RangeBounds<TxNumber>,
1507 ) -> ProviderResult<Vec<Address>> {
1508 self.cursor_read_collect::<tables::TransactionSenders>(range)
1509 }
1510
1511 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1512 Ok(self.tx.get::<tables::TransactionSenders>(id)?)
1513 }
1514}
1515
1516impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
1517 type Receipt = ReceiptTy<N>;
1518
1519 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1520 self.static_file_provider.get_with_static_file_or_database(
1521 StaticFileSegment::Receipts,
1522 id,
1523 |static_file| static_file.receipt(id),
1524 || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
1525 )
1526 }
1527
1528 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1529 if let Some(id) = self.transaction_id(hash)? {
1530 self.receipt(id)
1531 } else {
1532 Ok(None)
1533 }
1534 }
1535
1536 fn receipts_by_block(
1537 &self,
1538 block: BlockHashOrNumber,
1539 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1540 if let Some(number) = self.convert_hash_or_number(block)? {
1541 if let Some(body) = self.block_body_indices(number)? {
1542 let tx_range = body.tx_num_range();
1543 return if tx_range.is_empty() {
1544 Ok(Some(Vec::new()))
1545 } else {
1546 self.receipts_by_tx_range(tx_range).map(Some)
1547 }
1548 }
1549 }
1550 Ok(None)
1551 }
1552
1553 fn receipts_by_tx_range(
1554 &self,
1555 range: impl RangeBounds<TxNumber>,
1556 ) -> ProviderResult<Vec<Self::Receipt>> {
1557 self.static_file_provider.get_range_with_static_file_or_database(
1558 StaticFileSegment::Receipts,
1559 to_range(range),
1560 |static_file, range, _| static_file.receipts_by_tx_range(range),
1561 |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
1562 |_| true,
1563 )
1564 }
1565}
1566
1567impl<TX: DbTx + 'static, N: NodeTypes<ChainSpec: EthereumHardforks>> WithdrawalsProvider
1568 for DatabaseProvider<TX, N>
1569{
1570 fn withdrawals_by_block(
1571 &self,
1572 id: BlockHashOrNumber,
1573 timestamp: u64,
1574 ) -> ProviderResult<Option<Withdrawals>> {
1575 if self.chain_spec.is_shanghai_active_at_timestamp(timestamp) {
1576 if let Some(number) = self.convert_hash_or_number(id)? {
1577 return self.static_file_provider.get_with_static_file_or_database(
1578 StaticFileSegment::BlockMeta,
1579 number,
1580 |static_file| static_file.withdrawals_by_block(number.into(), timestamp),
1581 || {
1582 let withdrawals = self
1585 .tx
1586 .get::<tables::BlockWithdrawals>(number)
1587 .map(|w| w.map(|w| w.withdrawals))?
1588 .unwrap_or_default();
1589 Ok(Some(withdrawals))
1590 },
1591 )
1592 }
1593 }
1594 Ok(None)
1595 }
1596}
1597
1598impl<TX: DbTx + 'static, N: NodeTypesForProvider> OmmersProvider for DatabaseProvider<TX, N> {
1599 fn ommers(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Vec<Self::Header>>> {
1604 if let Some(number) = self.convert_hash_or_number(id)? {
1605 if self.chain_spec.is_paris_active_at_block(number) {
1608 return Ok(Some(Vec::new()))
1609 }
1610
1611 return self.static_file_provider.get_with_static_file_or_database(
1612 StaticFileSegment::BlockMeta,
1613 number,
1614 |static_file| static_file.ommers(id),
1615 || Ok(self.tx.get::<tables::BlockOmmers<Self::Header>>(number)?.map(|o| o.ommers)),
1616 )
1617 }
1618
1619 Ok(None)
1620 }
1621}
1622
1623impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
1624 for DatabaseProvider<TX, N>
1625{
1626 fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1627 self.static_file_provider.get_with_static_file_or_database(
1628 StaticFileSegment::BlockMeta,
1629 num,
1630 |static_file| static_file.block_body_indices(num),
1631 || Ok(self.tx.get::<tables::BlockBodyIndices>(num)?),
1632 )
1633 }
1634
1635 fn block_body_indices_range(
1636 &self,
1637 range: RangeInclusive<BlockNumber>,
1638 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1639 self.static_file_provider.get_range_with_static_file_or_database(
1640 StaticFileSegment::BlockMeta,
1641 *range.start()..*range.end() + 1,
1642 |static_file, range, _| {
1643 static_file.block_body_indices_range(range.start..=range.end.saturating_sub(1))
1644 },
1645 |range, _| self.cursor_read_collect::<tables::BlockBodyIndices>(range),
1646 |_| true,
1647 )
1648 }
1649}
1650
1651impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
1652 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1653 Ok(self.tx.get::<tables::StageCheckpoints>(id.to_string())?)
1654 }
1655
1656 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1658 Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
1659 }
1660
1661 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1662 self.tx
1663 .cursor_read::<tables::StageCheckpoints>()?
1664 .walk(None)?
1665 .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
1666 .map_err(ProviderError::Database)
1667 }
1668}
1669
1670impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
1671 fn save_stage_checkpoint(
1673 &self,
1674 id: StageId,
1675 checkpoint: StageCheckpoint,
1676 ) -> ProviderResult<()> {
1677 Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
1678 }
1679
1680 fn save_stage_checkpoint_progress(
1682 &self,
1683 id: StageId,
1684 checkpoint: Vec<u8>,
1685 ) -> ProviderResult<()> {
1686 Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
1687 }
1688
1689 fn update_pipeline_stages(
1690 &self,
1691 block_number: BlockNumber,
1692 drop_stage_checkpoint: bool,
1693 ) -> ProviderResult<()> {
1694 let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
1696 for stage_id in StageId::ALL {
1697 let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
1698 cursor.upsert(
1699 stage_id.to_string(),
1700 &StageCheckpoint {
1701 block_number,
1702 ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
1703 },
1704 )?;
1705 }
1706
1707 Ok(())
1708 }
1709}
1710
1711impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
1712 fn plain_state_storages(
1713 &self,
1714 addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
1715 ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
1716 let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
1717
1718 addresses_with_keys
1719 .into_iter()
1720 .map(|(address, storage)| {
1721 storage
1722 .into_iter()
1723 .map(|key| -> ProviderResult<_> {
1724 Ok(plain_storage
1725 .seek_by_key_subkey(address, key)?
1726 .filter(|v| v.key == key)
1727 .unwrap_or_else(|| StorageEntry { key, ..Default::default() }))
1728 })
1729 .collect::<ProviderResult<Vec<_>>>()
1730 .map(|storage| (address, storage))
1731 })
1732 .collect::<ProviderResult<Vec<(_, _)>>>()
1733 }
1734
1735 fn changed_storages_with_range(
1736 &self,
1737 range: RangeInclusive<BlockNumber>,
1738 ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
1739 self.tx
1740 .cursor_read::<tables::StorageChangeSets>()?
1741 .walk_range(BlockNumberAddress::range(range))?
1742 .try_fold(BTreeMap::new(), |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
1745 let (BlockNumberAddress((_, address)), storage_entry) = entry?;
1746 accounts.entry(address).or_default().insert(storage_entry.key);
1747 Ok(accounts)
1748 })
1749 }
1750
1751 fn changed_storages_and_blocks_with_range(
1752 &self,
1753 range: RangeInclusive<BlockNumber>,
1754 ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
1755 let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
1756
1757 let storage_changeset_lists =
1758 changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
1759 BTreeMap::new(),
1760 |mut storages: BTreeMap<(Address, B256), Vec<u64>>, entry| -> ProviderResult<_> {
1761 let (index, storage) = entry?;
1762 storages
1763 .entry((index.address(), storage.key))
1764 .or_default()
1765 .push(index.block_number());
1766 Ok(storages)
1767 },
1768 )?;
1769
1770 Ok(storage_changeset_lists)
1771 }
1772}
1773
1774impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
1775 for DatabaseProvider<TX, N>
1776{
1777 type Receipt = ReceiptTy<N>;
1778
1779 fn write_state(
1780 &self,
1781 execution_outcome: &ExecutionOutcome<Self::Receipt>,
1782 is_value_known: OriginalValuesKnown,
1783 write_receipts_to: StorageLocation,
1784 ) -> ProviderResult<()> {
1785 let first_block = execution_outcome.first_block();
1786 let block_count = execution_outcome.len() as u64;
1787 let last_block = execution_outcome.last_block();
1788 let block_range = first_block..=last_block;
1789
1790 let tip = self.last_block_number()?.max(last_block);
1791
1792 let (plain_state, reverts) =
1793 execution_outcome.bundle.to_plain_state_and_reverts(is_value_known);
1794
1795 self.write_state_reverts(reverts, first_block)?;
1796 self.write_state_changes(plain_state)?;
1797
1798 let block_indices: Vec<_> = self
1800 .block_body_indices_range(block_range)?
1801 .into_iter()
1802 .map(|b| b.first_tx_num)
1803 .collect();
1804
1805 if block_indices.len() < block_count as usize {
1807 let missing_blocks = block_count - block_indices.len() as u64;
1808 return Err(ProviderError::BlockBodyIndicesNotFound(
1809 last_block.saturating_sub(missing_blocks - 1),
1810 ));
1811 }
1812
1813 let has_receipts_pruning = self.prune_modes.has_receipts_pruning();
1814
1815 let mut receipts_cursor = (write_receipts_to.database() || has_receipts_pruning)
1820 .then(|| self.tx.cursor_write::<tables::Receipts<Self::Receipt>>())
1821 .transpose()?;
1822
1823 let mut receipts_static_writer = (write_receipts_to.static_files() &&
1827 !has_receipts_pruning)
1828 .then(|| self.static_file_provider.get_writer(first_block, StaticFileSegment::Receipts))
1829 .transpose()?;
1830
1831 let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
1832 let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
1833
1834 let prunable_receipts =
1837 PruneMode::Distance(MINIMUM_PRUNING_DISTANCE).should_prune(first_block, tip);
1838
1839 let mut allowed_addresses: HashSet<Address, _> = HashSet::new();
1841 for (_, addresses) in contract_log_pruner.range(..first_block) {
1842 allowed_addresses.extend(addresses.iter().copied());
1843 }
1844
1845 for (idx, (receipts, first_tx_index)) in
1846 execution_outcome.receipts.iter().zip(block_indices).enumerate()
1847 {
1848 let block_number = first_block + idx as u64;
1849
1850 if let Some(writer) = receipts_static_writer.as_mut() {
1852 writer.increment_block(block_number)?;
1853 }
1854
1855 if prunable_receipts &&
1857 self.prune_modes
1858 .receipts
1859 .is_some_and(|mode| mode.should_prune(block_number, tip))
1860 {
1861 continue
1862 }
1863
1864 if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
1866 allowed_addresses.extend(new_addresses.iter().copied());
1867 }
1868
1869 for (idx, receipt) in receipts.iter().enumerate() {
1870 let receipt_idx = first_tx_index + idx as u64;
1871 if prunable_receipts &&
1874 has_contract_log_filter &&
1875 !receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
1876 {
1877 continue
1878 }
1879
1880 if let Some(writer) = &mut receipts_static_writer {
1881 writer.append_receipt(receipt_idx, receipt)?;
1882 }
1883
1884 if let Some(cursor) = &mut receipts_cursor {
1885 cursor.append(receipt_idx, receipt)?;
1886 }
1887 }
1888 }
1889
1890 Ok(())
1891 }
1892
1893 fn write_state_reverts(
1894 &self,
1895 reverts: PlainStateReverts,
1896 first_block: BlockNumber,
1897 ) -> ProviderResult<()> {
1898 tracing::trace!("Writing storage changes");
1900 let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1901 let mut storage_changeset_cursor =
1902 self.tx_ref().cursor_dup_write::<tables::StorageChangeSets>()?;
1903 for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
1904 let block_number = first_block + block_index as BlockNumber;
1905
1906 tracing::trace!(block_number, "Writing block change");
1907 storage_changes.par_sort_unstable_by_key(|a| a.address);
1909 for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
1910 let storage_id = BlockNumberAddress((block_number, address));
1911
1912 let mut storage = storage_revert
1913 .into_iter()
1914 .map(|(k, v)| (B256::new(k.to_be_bytes()), v))
1915 .collect::<Vec<_>>();
1916 storage.par_sort_unstable_by_key(|a| a.0);
1918
1919 let mut wiped_storage = Vec::new();
1923 if wiped {
1924 tracing::trace!(?address, "Wiping storage");
1925 if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
1926 wiped_storage.push((entry.key, entry.into()));
1927 while let Some(entry) = storages_cursor.next_dup_val()? {
1928 wiped_storage.push((entry.key, entry.into()))
1929 }
1930 }
1931 }
1932
1933 tracing::trace!(?address, ?storage, "Writing storage reverts");
1934 for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
1935 storage_changeset_cursor.append_dup(storage_id, StorageEntry { key, value })?;
1936 }
1937 }
1938 }
1939
1940 tracing::trace!("Writing account changes");
1942 let mut account_changeset_cursor =
1943 self.tx_ref().cursor_dup_write::<tables::AccountChangeSets>()?;
1944
1945 for (block_index, mut account_block_reverts) in reverts.accounts.into_iter().enumerate() {
1946 let block_number = first_block + block_index as BlockNumber;
1947 account_block_reverts.par_sort_by_key(|a| a.0);
1949
1950 for (address, info) in account_block_reverts {
1951 account_changeset_cursor.append_dup(
1952 block_number,
1953 AccountBeforeTx { address, info: info.map(Into::into) },
1954 )?;
1955 }
1956 }
1957
1958 Ok(())
1959 }
1960
1961 fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
1962 changes.accounts.par_sort_by_key(|a| a.0);
1965 changes.storage.par_sort_by_key(|a| a.address);
1966 changes.contracts.par_sort_by_key(|a| a.0);
1967
1968 tracing::trace!(len = changes.accounts.len(), "Writing new account state");
1970 let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
1971 for (address, account) in changes.accounts {
1973 if let Some(account) = account {
1974 tracing::trace!(?address, "Updating plain state account");
1975 accounts_cursor.upsert(address, &account.into())?;
1976 } else if accounts_cursor.seek_exact(address)?.is_some() {
1977 tracing::trace!(?address, "Deleting plain state account");
1978 accounts_cursor.delete_current()?;
1979 }
1980 }
1981
1982 tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
1984 let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
1985 for (hash, bytecode) in changes.contracts {
1986 bytecodes_cursor.upsert(hash, &Bytecode(bytecode))?;
1987 }
1988
1989 tracing::trace!(len = changes.storage.len(), "Writing new storage state");
1991 let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1992 for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
1993 if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
1995 storages_cursor.delete_current_duplicates()?;
1996 }
1997 let mut storage = storage
1999 .into_iter()
2000 .map(|(k, value)| StorageEntry { key: k.into(), value })
2001 .collect::<Vec<_>>();
2002 storage.par_sort_unstable_by_key(|a| a.key);
2004
2005 for entry in storage {
2006 tracing::trace!(?address, ?entry.key, "Updating plain state storage");
2007 if let Some(db_entry) = storages_cursor.seek_by_key_subkey(address, entry.key)? {
2008 if db_entry.key == entry.key {
2009 storages_cursor.delete_current()?;
2010 }
2011 }
2012
2013 if !entry.to_flagged_storage().is_zero() {
2014 storages_cursor.upsert(address, &entry)?;
2015 }
2016 }
2017 }
2018
2019 Ok(())
2020 }
2021
2022 fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
2023 let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
2025 for (hashed_address, account) in hashed_state.accounts().accounts_sorted() {
2026 if let Some(account) = account {
2027 hashed_accounts_cursor.upsert(hashed_address, &account)?;
2028 } else if hashed_accounts_cursor.seek_exact(hashed_address)?.is_some() {
2029 hashed_accounts_cursor.delete_current()?;
2030 }
2031 }
2032
2033 let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
2035 let mut hashed_storage_cursor =
2036 self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
2037 for (hashed_address, storage) in sorted_storages {
2038 if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
2039 hashed_storage_cursor.delete_current_duplicates()?;
2040 }
2041
2042 for (hashed_slot, value) in storage.storage_slots_sorted() {
2043 let entry = StorageEntry { key: hashed_slot, value };
2044 if let Some(db_entry) =
2045 hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)?
2046 {
2047 if db_entry.key == entry.key {
2048 hashed_storage_cursor.delete_current()?;
2049 }
2050 }
2051
2052 if !entry.value.is_zero() {
2053 hashed_storage_cursor.upsert(*hashed_address, &entry)?;
2054 }
2055 }
2056 }
2057
2058 Ok(())
2059 }
2060
2061 fn remove_state_above(
2083 &self,
2084 block: BlockNumber,
2085 remove_receipts_from: StorageLocation,
2086 ) -> ProviderResult<()> {
2087 let range = block + 1..=self.last_block_number()?;
2088
2089 if range.is_empty() {
2090 return Ok(());
2091 }
2092
2093 let block_bodies = self.block_body_indices_range(range.clone())?;
2095
2096 let from_transaction_num =
2098 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2099
2100 let storage_range = BlockNumberAddress::range(range.clone());
2101
2102 let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
2103 let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
2104
2105 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2110 let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2111
2112 let (state, _) = self.populate_bundle_state(
2113 account_changeset,
2114 storage_changeset,
2115 &mut plain_accounts_cursor,
2116 &mut plain_storage_cursor,
2117 )?;
2118
2119 for (address, (old_account, new_account, storage)) in &state {
2121 if old_account != new_account {
2123 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2124 if let Some(account) = old_account {
2125 plain_accounts_cursor.upsert(*address, account)?;
2126 } else if existing_entry.is_some() {
2127 plain_accounts_cursor.delete_current()?;
2128 }
2129 }
2130
2131 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2133 let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
2134 if plain_storage_cursor
2137 .seek_by_key_subkey(*address, *storage_key)?
2138 .filter(|s| s.key == *storage_key)
2139 .is_some()
2140 {
2141 plain_storage_cursor.delete_current()?
2142 }
2143
2144 if !old_storage_value.is_zero() {
2146 plain_storage_cursor.upsert(*address, &storage_entry)?;
2147 }
2148 }
2149 }
2150
2151 self.remove_receipts_from(from_transaction_num, block, remove_receipts_from)?;
2152
2153 Ok(())
2154 }
2155
2156 fn take_state_above(
2178 &self,
2179 block: BlockNumber,
2180 remove_receipts_from: StorageLocation,
2181 ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
2182 let range = block + 1..=self.last_block_number()?;
2183
2184 if range.is_empty() {
2185 return Ok(ExecutionOutcome::default())
2186 }
2187 let start_block_number = *range.start();
2188
2189 let block_bodies = self.block_body_indices_range(range.clone())?;
2191
2192 let from_transaction_num =
2194 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2195 let to_transaction_num =
2196 block_bodies.last().expect("already checked if there are blocks").last_tx_num();
2197
2198 let storage_range = BlockNumberAddress::range(range.clone());
2199
2200 let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
2201 let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
2202
2203 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2208 let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2209
2210 let (state, reverts) = self.populate_bundle_state(
2213 account_changeset,
2214 storage_changeset,
2215 &mut plain_accounts_cursor,
2216 &mut plain_storage_cursor,
2217 )?;
2218
2219 for (address, (old_account, new_account, storage)) in &state {
2221 if old_account != new_account {
2223 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2224 if let Some(account) = old_account {
2225 plain_accounts_cursor.upsert(*address, account)?;
2226 } else if existing_entry.is_some() {
2227 plain_accounts_cursor.delete_current()?;
2228 }
2229 }
2230
2231 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2233 let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
2234 if plain_storage_cursor
2237 .seek_by_key_subkey(*address, *storage_key)?
2238 .filter(|s| s.key == *storage_key)
2239 .is_some()
2240 {
2241 plain_storage_cursor.delete_current()?
2242 }
2243
2244 if !old_storage_value.is_zero() {
2246 plain_storage_cursor.upsert(*address, &storage_entry)?;
2247 }
2248 }
2249 }
2250
2251 let mut receipts_iter = self
2253 .static_file_provider
2254 .get_range_with_static_file_or_database(
2255 StaticFileSegment::Receipts,
2256 from_transaction_num..to_transaction_num + 1,
2257 |static_file, range, _| {
2258 static_file
2259 .receipts_by_tx_range(range.clone())
2260 .map(|r| range.into_iter().zip(r).collect())
2261 },
2262 |range, _| {
2263 self.tx
2264 .cursor_read::<tables::Receipts<Self::Receipt>>()?
2265 .walk_range(range)?
2266 .map(|r| r.map_err(Into::into))
2267 .collect()
2268 },
2269 |_| true,
2270 )?
2271 .into_iter()
2272 .peekable();
2273
2274 let mut receipts = Vec::with_capacity(block_bodies.len());
2275 for block_body in block_bodies {
2277 let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
2278 for num in block_body.tx_num_range() {
2279 if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
2280 block_receipts.push(receipts_iter.next().unwrap().1);
2281 }
2282 }
2283 receipts.push(block_receipts);
2284 }
2285
2286 self.remove_receipts_from(from_transaction_num, block, remove_receipts_from)?;
2287
2288 Ok(ExecutionOutcome::new_init(
2289 state,
2290 reverts,
2291 Vec::new(),
2292 receipts,
2293 start_block_number,
2294 Vec::new(),
2295 ))
2296 }
2297}
2298
2299impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
2300 fn write_trie_updates(&self, trie_updates: &TrieUpdates) -> ProviderResult<usize> {
2302 if trie_updates.is_empty() {
2303 return Ok(0)
2304 }
2305
2306 let mut num_entries = 0;
2308
2309 let mut account_updates = trie_updates
2311 .removed_nodes_ref()
2312 .iter()
2313 .filter_map(|n| {
2314 (!trie_updates.account_nodes_ref().contains_key(n)).then_some((n, None))
2315 })
2316 .collect::<Vec<_>>();
2317 account_updates.extend(
2318 trie_updates.account_nodes_ref().iter().map(|(nibbles, node)| (nibbles, Some(node))),
2319 );
2320 account_updates.sort_unstable_by(|a, b| a.0.cmp(b.0));
2322
2323 let tx = self.tx_ref();
2324 let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
2325 for (key, updated_node) in account_updates {
2326 let nibbles = StoredNibbles(key.clone());
2327 match updated_node {
2328 Some(node) => {
2329 if !nibbles.0.is_empty() {
2330 num_entries += 1;
2331 account_trie_cursor.upsert(nibbles, node)?;
2332 }
2333 }
2334 None => {
2335 num_entries += 1;
2336 if account_trie_cursor.seek_exact(nibbles)?.is_some() {
2337 account_trie_cursor.delete_current()?;
2338 }
2339 }
2340 }
2341 }
2342
2343 num_entries += self.write_storage_trie_updates(trie_updates.storage_tries_ref())?;
2344
2345 Ok(num_entries)
2346 }
2347}
2348
2349impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
2350 fn write_storage_trie_updates(
2353 &self,
2354 storage_tries: &B256Map<StorageTrieUpdates>,
2355 ) -> ProviderResult<usize> {
2356 let mut num_entries = 0;
2357 let mut storage_tries = Vec::from_iter(storage_tries);
2358 storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
2359 let mut cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2360 for (hashed_address, storage_trie_updates) in storage_tries {
2361 let mut db_storage_trie_cursor =
2362 DatabaseStorageTrieCursor::new(cursor, *hashed_address);
2363 num_entries +=
2364 db_storage_trie_cursor.write_storage_trie_updates(storage_trie_updates)?;
2365 cursor = db_storage_trie_cursor.cursor;
2366 }
2367
2368 Ok(num_entries)
2369 }
2370
2371 fn write_individual_storage_trie_updates(
2372 &self,
2373 hashed_address: B256,
2374 updates: &StorageTrieUpdates,
2375 ) -> ProviderResult<usize> {
2376 if updates.is_empty() {
2377 return Ok(0)
2378 }
2379
2380 let cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2381 let mut trie_db_cursor = DatabaseStorageTrieCursor::new(cursor, hashed_address);
2382 Ok(trie_db_cursor.write_storage_trie_updates(updates)?)
2383 }
2384}
2385
2386impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
2387 fn unwind_account_hashing<'a>(
2388 &self,
2389 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2390 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2391 let hashed_accounts = changesets
2395 .into_iter()
2396 .map(|(_, e)| (keccak256(e.address), e.info))
2397 .collect::<Vec<_>>()
2398 .into_iter()
2399 .rev()
2400 .collect::<BTreeMap<_, _>>();
2401
2402 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2404 for (hashed_address, account) in &hashed_accounts {
2405 if let Some(account) = account {
2406 hashed_accounts_cursor.upsert(*hashed_address, account)?;
2407 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2408 hashed_accounts_cursor.delete_current()?;
2409 }
2410 }
2411
2412 Ok(hashed_accounts)
2413 }
2414
2415 fn unwind_account_hashing_range(
2416 &self,
2417 range: impl RangeBounds<BlockNumber>,
2418 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2419 let changesets = self
2420 .tx
2421 .cursor_read::<tables::AccountChangeSets>()?
2422 .walk_range(range)?
2423 .collect::<Result<Vec<_>, _>>()?;
2424 self.unwind_account_hashing(changesets.iter())
2425 }
2426
2427 fn insert_account_for_hashing(
2428 &self,
2429 changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
2430 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2431 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2432 let hashed_accounts =
2433 changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
2434 for (hashed_address, account) in &hashed_accounts {
2435 if let Some(account) = account {
2436 hashed_accounts_cursor.upsert(*hashed_address, account)?;
2437 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2438 hashed_accounts_cursor.delete_current()?;
2439 }
2440 }
2441 Ok(hashed_accounts)
2442 }
2443
2444 fn unwind_storage_hashing(
2445 &self,
2446 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2447 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2448 let mut hashed_storages = changesets
2450 .into_iter()
2451 .map(|(BlockNumberAddress((_, address)), storage_entry)| {
2452 (keccak256(address), keccak256(storage_entry.key), storage_entry.value)
2453 })
2454 .collect::<Vec<_>>();
2455 hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
2456
2457 let mut hashed_storage_keys: HashMap<B256, BTreeSet<B256>> =
2459 HashMap::with_capacity_and_hasher(hashed_storages.len(), Default::default());
2460 let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2461 for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
2462 hashed_storage_keys.entry(hashed_address).or_default().insert(key);
2463
2464 if hashed_storage
2465 .seek_by_key_subkey(hashed_address, key)?
2466 .filter(|entry| entry.key == key)
2467 .is_some()
2468 {
2469 hashed_storage.delete_current()?;
2470 }
2471
2472 if !value.is_zero() {
2473 hashed_storage.upsert(hashed_address, &StorageEntry { key, value })?;
2474 }
2475 }
2476 Ok(hashed_storage_keys)
2477 }
2478
2479 fn unwind_storage_hashing_range(
2480 &self,
2481 range: impl RangeBounds<BlockNumberAddress>,
2482 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2483 let changesets = self
2484 .tx
2485 .cursor_read::<tables::StorageChangeSets>()?
2486 .walk_range(range)?
2487 .collect::<Result<Vec<_>, _>>()?;
2488 self.unwind_storage_hashing(changesets.into_iter())
2489 }
2490
2491 fn insert_storage_for_hashing(
2492 &self,
2493 storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
2494 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2495 let hashed_storages =
2497 storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
2498 let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
2499 map.insert(keccak256(entry.key), entry.value);
2500 map
2501 });
2502 map.insert(keccak256(address), storage);
2503 map
2504 });
2505
2506 let hashed_storage_keys =
2507 HashMap::from_iter(hashed_storages.iter().map(|(hashed_address, entries)| {
2508 (*hashed_address, BTreeSet::from_iter(entries.keys().copied()))
2509 }));
2510
2511 let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2512 hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
2515 storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
2516 if hashed_storage_cursor
2517 .seek_by_key_subkey(hashed_address, key)?
2518 .filter(|entry| entry.key == key)
2519 .is_some()
2520 {
2521 hashed_storage_cursor.delete_current()?;
2522 }
2523
2524 if !value.is_zero() {
2525 hashed_storage_cursor.upsert(
2526 hashed_address,
2527 &StorageEntry { key, value, ..Default::default() },
2528 )?;
2529 }
2530 Ok(())
2531 })
2532 })?;
2533
2534 Ok(hashed_storage_keys)
2535 }
2536
2537 fn insert_hashes(
2538 &self,
2539 range: RangeInclusive<BlockNumber>,
2540 end_block_hash: B256,
2541 expected_state_root: B256,
2542 ) -> ProviderResult<()> {
2543 let mut account_prefix_set = PrefixSetMut::default();
2545 let mut storage_prefix_sets: HashMap<B256, PrefixSetMut> = HashMap::default();
2546 let mut destroyed_accounts = HashSet::default();
2547
2548 let mut durations_recorder = metrics::DurationsRecorder::default();
2549
2550 {
2552 let lists = self.changed_storages_with_range(range.clone())?;
2553 let storages = self.plain_state_storages(lists)?;
2554 let storage_entries = self.insert_storage_for_hashing(storages)?;
2555 for (hashed_address, hashed_slots) in storage_entries {
2556 account_prefix_set.insert(Nibbles::unpack(hashed_address));
2557 for slot in hashed_slots {
2558 storage_prefix_sets
2559 .entry(hashed_address)
2560 .or_default()
2561 .insert(Nibbles::unpack(slot));
2562 }
2563 }
2564 }
2565 durations_recorder.record_relative(metrics::Action::InsertStorageHashing);
2566
2567 {
2569 let lists = self.changed_accounts_with_range(range.clone())?;
2570 let accounts = self.basic_accounts(lists)?;
2571 let hashed_addresses = self.insert_account_for_hashing(accounts)?;
2572 for (hashed_address, account) in hashed_addresses {
2573 account_prefix_set.insert(Nibbles::unpack(hashed_address));
2574 if account.is_none() {
2575 destroyed_accounts.insert(hashed_address);
2576 }
2577 }
2578 }
2579 durations_recorder.record_relative(metrics::Action::InsertAccountHashing);
2580
2581 {
2583 let prefix_sets = TriePrefixSets {
2586 account_prefix_set: account_prefix_set.freeze(),
2587 storage_prefix_sets: storage_prefix_sets
2588 .into_iter()
2589 .map(|(k, v)| (k, v.freeze()))
2590 .collect(),
2591 destroyed_accounts,
2592 };
2593 let (state_root, trie_updates) = StateRoot::from_tx(&self.tx)
2594 .with_prefix_sets(prefix_sets)
2595 .root_with_updates()
2596 .map_err(reth_db_api::DatabaseError::from)?;
2597 if state_root != expected_state_root {
2598 return Err(ProviderError::StateRootMismatch(Box::new(RootMismatch {
2599 root: GotExpected { got: state_root, expected: expected_state_root },
2600 block_number: *range.end(),
2601 block_hash: end_block_hash,
2602 })))
2603 }
2604 self.write_trie_updates(&trie_updates)?;
2605 }
2606 durations_recorder.record_relative(metrics::Action::InsertMerkleTree);
2607
2608 debug!(target: "providers::db", ?range, actions = ?durations_recorder.actions, "Inserted hashes");
2609
2610 Ok(())
2611 }
2612}
2613
2614impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
2615 fn unwind_account_history_indices<'a>(
2616 &self,
2617 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2618 ) -> ProviderResult<usize> {
2619 let mut last_indices = changesets
2620 .into_iter()
2621 .map(|(index, account)| (account.address, *index))
2622 .collect::<Vec<_>>();
2623 last_indices.sort_by_key(|(a, _)| *a);
2624
2625 let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
2627 for &(address, rem_index) in &last_indices {
2628 let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
2629 &mut cursor,
2630 ShardedKey::last(address),
2631 rem_index,
2632 |sharded_key| sharded_key.key == address,
2633 )?;
2634
2635 if !partial_shard.is_empty() {
2638 cursor.insert(
2639 ShardedKey::last(address),
2640 &BlockNumberList::new_pre_sorted(partial_shard),
2641 )?;
2642 }
2643 }
2644
2645 let changesets = last_indices.len();
2646 Ok(changesets)
2647 }
2648
2649 fn unwind_account_history_indices_range(
2650 &self,
2651 range: impl RangeBounds<BlockNumber>,
2652 ) -> ProviderResult<usize> {
2653 let changesets = self
2654 .tx
2655 .cursor_read::<tables::AccountChangeSets>()?
2656 .walk_range(range)?
2657 .collect::<Result<Vec<_>, _>>()?;
2658 self.unwind_account_history_indices(changesets.iter())
2659 }
2660
2661 fn insert_account_history_index(
2662 &self,
2663 account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
2664 ) -> ProviderResult<()> {
2665 self.append_history_index::<_, tables::AccountsHistory>(
2666 account_transitions,
2667 ShardedKey::new,
2668 )
2669 }
2670
2671 fn unwind_storage_history_indices(
2672 &self,
2673 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2674 ) -> ProviderResult<usize> {
2675 let mut storage_changesets = changesets
2676 .into_iter()
2677 .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
2678 .collect::<Vec<_>>();
2679 storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
2680
2681 let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
2682 for &(address, storage_key, rem_index) in &storage_changesets {
2683 let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
2684 &mut cursor,
2685 StorageShardedKey::last(address, storage_key),
2686 rem_index,
2687 |storage_sharded_key| {
2688 storage_sharded_key.address == address &&
2689 storage_sharded_key.sharded_key.key == storage_key
2690 },
2691 )?;
2692
2693 if !partial_shard.is_empty() {
2696 cursor.insert(
2697 StorageShardedKey::last(address, storage_key),
2698 &BlockNumberList::new_pre_sorted(partial_shard),
2699 )?;
2700 }
2701 }
2702
2703 let changesets = storage_changesets.len();
2704 Ok(changesets)
2705 }
2706
2707 fn unwind_storage_history_indices_range(
2708 &self,
2709 range: impl RangeBounds<BlockNumberAddress>,
2710 ) -> ProviderResult<usize> {
2711 let changesets = self
2712 .tx
2713 .cursor_read::<tables::StorageChangeSets>()?
2714 .walk_range(range)?
2715 .collect::<Result<Vec<_>, _>>()?;
2716 self.unwind_storage_history_indices(changesets.into_iter())
2717 }
2718
2719 fn insert_storage_history_index(
2720 &self,
2721 storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
2722 ) -> ProviderResult<()> {
2723 self.append_history_index::<_, tables::StoragesHistory>(
2724 storage_transitions,
2725 |(address, storage_key), highest_block_number| {
2726 StorageShardedKey::new(address, storage_key, highest_block_number)
2727 },
2728 )
2729 }
2730
2731 fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
2732 {
2734 let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
2735 self.insert_account_history_index(indices)?;
2736 }
2737
2738 {
2740 let indices = self.changed_storages_and_blocks_with_range(range)?;
2741 self.insert_storage_history_index(indices)?;
2742 }
2743
2744 Ok(())
2745 }
2746}
2747
2748impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockExecutionWriter
2749 for DatabaseProvider<TX, N>
2750{
2751 fn take_block_and_execution_above(
2752 &self,
2753 block: BlockNumber,
2754 remove_from: StorageLocation,
2755 ) -> ProviderResult<Chain<Self::Primitives>> {
2756 let range = block + 1..=self.last_block_number()?;
2757
2758 self.unwind_trie_state_range(range.clone())?;
2759
2760 let execution_state = self.take_state_above(block, remove_from)?;
2762
2763 let blocks = self.recovered_block_range(range)?;
2764
2765 self.remove_blocks_above(block, remove_from)?;
2768
2769 self.update_pipeline_stages(block, true)?;
2771
2772 Ok(Chain::new(blocks, execution_state, None))
2773 }
2774
2775 fn remove_block_and_execution_above(
2776 &self,
2777 block: BlockNumber,
2778 remove_from: StorageLocation,
2779 ) -> ProviderResult<()> {
2780 let range = block + 1..=self.last_block_number()?;
2781
2782 self.unwind_trie_state_range(range)?;
2783
2784 self.remove_state_above(block, remove_from)?;
2786
2787 self.remove_blocks_above(block, remove_from)?;
2790
2791 self.update_pipeline_stages(block, true)?;
2793
2794 Ok(())
2795 }
2796}
2797
2798impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWriter
2799 for DatabaseProvider<TX, N>
2800{
2801 type Block = BlockTy<N>;
2802 type Receipt = ReceiptTy<N>;
2803
2804 fn insert_block(
2825 &self,
2826 block: RecoveredBlock<Self::Block>,
2827 write_to: StorageLocation,
2828 ) -> ProviderResult<StoredBlockBodyIndices> {
2829 let block_number = block.number();
2830
2831 let mut durations_recorder = metrics::DurationsRecorder::default();
2832
2833 let ttd = if block_number == 0 {
2835 block.header().difficulty()
2836 } else {
2837 let parent_block_number = block_number - 1;
2838 let parent_ttd = self.header_td_by_number(parent_block_number)?.unwrap_or_default();
2839 durations_recorder.record_relative(metrics::Action::GetParentTD);
2840 parent_ttd + block.header().difficulty()
2841 };
2842
2843 if write_to.database() {
2844 self.tx.put::<tables::CanonicalHeaders>(block_number, block.hash())?;
2845 durations_recorder.record_relative(metrics::Action::InsertCanonicalHeaders);
2846
2847 self.tx.put::<tables::Headers<HeaderTy<N>>>(block_number, block.header().clone())?;
2849 durations_recorder.record_relative(metrics::Action::InsertHeaders);
2850
2851 self.tx.put::<tables::HeaderTerminalDifficulties>(block_number, ttd.into())?;
2852 durations_recorder.record_relative(metrics::Action::InsertHeaderTerminalDifficulties);
2853 }
2854
2855 if write_to.static_files() {
2856 let mut writer =
2857 self.static_file_provider.get_writer(block_number, StaticFileSegment::Headers)?;
2858 writer.append_header(block.header(), ttd, &block.hash())?;
2859 }
2860
2861 self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
2862 durations_recorder.record_relative(metrics::Action::InsertHeaderNumbers);
2863
2864 let mut next_tx_num = self
2865 .tx
2866 .cursor_read::<tables::TransactionBlocks>()?
2867 .last()?
2868 .map(|(n, _)| n + 1)
2869 .unwrap_or_default();
2870 durations_recorder.record_relative(metrics::Action::GetNextTxNum);
2871 let first_tx_num = next_tx_num;
2872
2873 let tx_count = block.body().transaction_count() as u64;
2874
2875 for (transaction, sender) in block.body().transactions_iter().zip(block.senders_iter()) {
2877 let hash = transaction.tx_hash();
2878
2879 if self.prune_modes.sender_recovery.as_ref().is_none_or(|m| !m.is_full()) {
2880 self.tx.put::<tables::TransactionSenders>(next_tx_num, *sender)?;
2881 }
2882
2883 if self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full()) {
2884 self.tx.put::<tables::TransactionHashNumbers>(*hash, next_tx_num)?;
2885 }
2886 next_tx_num += 1;
2887 }
2888
2889 self.append_block_bodies(vec![(block_number, Some(block.into_body()))], write_to)?;
2890
2891 debug!(
2892 target: "providers::db",
2893 ?block_number,
2894 actions = ?durations_recorder.actions,
2895 "Inserted block"
2896 );
2897
2898 Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
2899 }
2900
2901 fn append_block_bodies(
2902 &self,
2903 bodies: Vec<(BlockNumber, Option<BodyTy<N>>)>,
2904 write_to: StorageLocation,
2905 ) -> ProviderResult<()> {
2906 let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
2907
2908 let mut tx_static_writer = write_to
2910 .static_files()
2911 .then(|| {
2912 self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)
2913 })
2914 .transpose()?;
2915
2916 let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
2917 let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
2918
2919 let mut tx_cursor = write_to
2921 .database()
2922 .then(|| self.tx.cursor_write::<tables::Transactions<TxTy<N>>>())
2923 .transpose()?;
2924
2925 let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
2927
2928 for (block_number, body) in &bodies {
2929 if let Some(writer) = tx_static_writer.as_mut() {
2931 writer.increment_block(*block_number)?;
2932 }
2933
2934 let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
2935 let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
2936
2937 let mut durations_recorder = metrics::DurationsRecorder::default();
2938
2939 block_indices_cursor.append(*block_number, &block_indices)?;
2941
2942 durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
2943
2944 let Some(body) = body else { continue };
2945
2946 if !body.transactions().is_empty() {
2948 tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
2949 durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
2950 }
2951
2952 for transaction in body.transactions() {
2954 if let Some(writer) = tx_static_writer.as_mut() {
2955 writer.append_transaction(next_tx_num, transaction)?;
2956 }
2957 if let Some(cursor) = tx_cursor.as_mut() {
2958 cursor.append(next_tx_num, transaction)?;
2959 }
2960
2961 next_tx_num += 1;
2963 }
2964
2965 debug!(
2966 target: "providers::db",
2967 ?block_number,
2968 actions = ?durations_recorder.actions,
2969 "Inserted block body"
2970 );
2971 }
2972
2973 self.storage.writer().write_block_bodies(self, bodies, write_to)?;
2974
2975 Ok(())
2976 }
2977
2978 fn remove_blocks_above(
2979 &self,
2980 block: BlockNumber,
2981 remove_from: StorageLocation,
2982 ) -> ProviderResult<()> {
2983 for hash in self.canonical_hashes_range(block + 1, self.last_block_number()? + 1)? {
2984 self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
2985 }
2986
2987 self.remove::<tables::CanonicalHeaders>(block + 1..)?;
2990 self.remove::<tables::Headers<HeaderTy<N>>>(block + 1..)?;
2991 self.remove::<tables::HeaderTerminalDifficulties>(block + 1..)?;
2992
2993 let unwind_tx_from = self
2995 .block_body_indices(block)?
2996 .map(|b| b.next_tx_num())
2997 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
2998
2999 let unwind_tx_to = self
3001 .tx
3002 .cursor_read::<tables::BlockBodyIndices>()?
3003 .last()?
3004 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
3006 .1
3007 .last_tx_num();
3008
3009 if unwind_tx_from <= unwind_tx_to {
3010 for (hash, _) in self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))? {
3011 self.tx.delete::<tables::TransactionHashNumbers>(hash, None)?;
3012 }
3013 }
3014
3015 self.remove::<tables::TransactionSenders>(unwind_tx_from..)?;
3016
3017 self.remove_bodies_above(block, remove_from)?;
3018
3019 Ok(())
3020 }
3021
3022 fn remove_bodies_above(
3023 &self,
3024 block: BlockNumber,
3025 remove_from: StorageLocation,
3026 ) -> ProviderResult<()> {
3027 self.storage.writer().remove_block_bodies_above(self, block, remove_from)?;
3028
3029 let unwind_tx_from = self
3031 .block_body_indices(block)?
3032 .map(|b| b.next_tx_num())
3033 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3034
3035 self.remove::<tables::BlockBodyIndices>(block + 1..)?;
3036 self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
3037
3038 if remove_from.database() {
3039 self.remove::<tables::Transactions<TxTy<N>>>(unwind_tx_from..)?;
3040 }
3041
3042 if remove_from.static_files() {
3043 let static_file_tx_num = self
3044 .static_file_provider
3045 .get_highest_static_file_tx(StaticFileSegment::Transactions);
3046
3047 let to_delete = static_file_tx_num
3048 .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
3049 .unwrap_or_default();
3050
3051 self.static_file_provider
3052 .latest_writer(StaticFileSegment::Transactions)?
3053 .prune_transactions(to_delete, block)?;
3054 }
3055
3056 Ok(())
3057 }
3058
3059 fn append_blocks_with_state(
3061 &self,
3062 blocks: Vec<RecoveredBlock<Self::Block>>,
3063 execution_outcome: &ExecutionOutcome<Self::Receipt>,
3064 hashed_state: HashedPostStateSorted,
3065 trie_updates: TrieUpdates,
3066 ) -> ProviderResult<()> {
3067 if blocks.is_empty() {
3068 debug!(target: "providers::db", "Attempted to append empty block range");
3069 return Ok(())
3070 }
3071
3072 let first_number = blocks.first().unwrap().number();
3073
3074 let last = blocks.last().unwrap();
3075 let last_block_number = last.number();
3076
3077 let mut durations_recorder = metrics::DurationsRecorder::default();
3078
3079 for block in blocks {
3081 self.insert_block(block, StorageLocation::Database)?;
3082 durations_recorder.record_relative(metrics::Action::InsertBlock);
3083 }
3084
3085 self.write_state(execution_outcome, OriginalValuesKnown::No, StorageLocation::Database)?;
3086 durations_recorder.record_relative(metrics::Action::InsertState);
3087
3088 self.write_hashed_state(&hashed_state)?;
3090 self.write_trie_updates(&trie_updates)?;
3091 durations_recorder.record_relative(metrics::Action::InsertHashes);
3092
3093 self.update_history_indices(first_number..=last_block_number)?;
3094 durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3095
3096 self.update_pipeline_stages(last_block_number, false)?;
3098 durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3099
3100 debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3101
3102 Ok(())
3103 }
3104}
3105
3106impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3107 fn get_prune_checkpoint(
3108 &self,
3109 segment: PruneSegment,
3110 ) -> ProviderResult<Option<PruneCheckpoint>> {
3111 Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3112 }
3113
3114 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3115 Ok(self
3116 .tx
3117 .cursor_read::<tables::PruneCheckpoints>()?
3118 .walk(None)?
3119 .collect::<Result<_, _>>()?)
3120 }
3121}
3122
3123impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3124 fn save_prune_checkpoint(
3125 &self,
3126 segment: PruneSegment,
3127 checkpoint: PruneCheckpoint,
3128 ) -> ProviderResult<()> {
3129 Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3130 }
3131}
3132
3133impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3134 fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3135 let db_entries = self.tx.entries::<T>()?;
3136 let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3137 Ok(entries) => entries,
3138 Err(ProviderError::UnsupportedProvider) => 0,
3139 Err(err) => return Err(err),
3140 };
3141
3142 Ok(db_entries + static_file_entries)
3143 }
3144}
3145
3146impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3147 fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3148 let mut finalized_blocks = self
3149 .tx
3150 .cursor_read::<tables::ChainState>()?
3151 .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3152 .take(1)
3153 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3154
3155 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3156 Ok(last_finalized_block_number)
3157 }
3158
3159 fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3160 let mut finalized_blocks = self
3161 .tx
3162 .cursor_read::<tables::ChainState>()?
3163 .walk(Some(tables::ChainStateKey::LastSafeBlockBlock))?
3164 .take(1)
3165 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3166
3167 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3168 Ok(last_finalized_block_number)
3169 }
3170}
3171
3172impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3173 fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3174 Ok(self
3175 .tx
3176 .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3177 }
3178
3179 fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3180 Ok(self
3181 .tx
3182 .put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlockBlock, block_number)?)
3183 }
3184}
3185
3186impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3187 type Tx = TX;
3188
3189 fn tx_ref(&self) -> &Self::Tx {
3190 &self.tx
3191 }
3192
3193 fn tx_mut(&mut self) -> &mut Self::Tx {
3194 &mut self.tx
3195 }
3196
3197 fn into_tx(self) -> Self::Tx {
3198 self.tx
3199 }
3200
3201 fn prune_modes_ref(&self) -> &PruneModes {
3202 self.prune_modes_ref()
3203 }
3204}