1use crate::{
2 bundle_state::StorageRevertsIter,
3 providers::{
4 database::{chain::ChainStorage, metrics},
5 static_file::StaticFileWriter,
6 NodeTypesForProvider, StaticFileProvider,
7 },
8 to_range,
9 traits::{
10 AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
11 },
12 AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
13 BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
14 DBProvider, HashingWriter, HeaderProvider, HeaderSyncGapProvider, HistoricalStateProvider,
15 HistoricalStateProviderRef, HistoryWriter, LatestStateProvider, LatestStateProviderRef,
16 OriginalValuesKnown, ProviderError, PruneCheckpointReader, PruneCheckpointWriter, RevertsInit,
17 StageCheckpointReader, StateCommitmentProvider, StateProviderBox, StateWriter,
18 StaticFileProviderFactory, StatsReader, StorageLocation, StorageReader, StorageTrieWriter,
19 TransactionVariant, TransactionsProvider, TransactionsProviderExt, TrieWriter,
20 WithdrawalsProvider,
21};
22use alloy_consensus::{
23 transaction::{SignerRecoverable, TransactionMeta},
24 BlockHeader, Header, TxReceipt,
25};
26use alloy_eips::{eip2718::Encodable2718, eip4895::Withdrawals, BlockHashOrNumber};
27use alloy_primitives::{
28 keccak256,
29 map::{hash_map, B256Map, HashMap, HashSet},
30 Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256,
31};
32use itertools::Itertools;
33use rayon::slice::ParallelSliceMut;
34use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, EthereumHardforks};
35use reth_db_api::{
36 cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
37 database::Database,
38 models::{
39 sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
40 ShardedKey, StoredBlockBodyIndices,
41 },
42 table::Table,
43 tables,
44 transaction::{DbTx, DbTxMut},
45 BlockNumberList, DatabaseError, PlainAccountState, PlainStorageState,
46};
47use reth_execution_types::{Chain, ExecutionOutcome};
48use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
49use reth_primitives_traits::{
50 Account, Block as _, BlockBody as _, Bytecode, GotExpected, NodePrimitives, RecoveredBlock,
51 SealedBlock, SealedHeader, SignedTransaction, StorageEntry,
52};
53use reth_prune_types::{
54 PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_PRUNING_DISTANCE,
55};
56use reth_stages_types::{StageCheckpoint, StageId};
57use reth_static_file_types::StaticFileSegment;
58use reth_storage_api::{
59 BlockBodyIndicesProvider, BlockBodyReader, NodePrimitivesProvider, OmmersProvider,
60 StateProvider, StorageChangeSetReader, TryIntoHistoricalStateProvider,
61};
62use reth_storage_errors::provider::{ProviderResult, RootMismatch};
63use reth_trie::{
64 prefix_set::{PrefixSet, PrefixSetMut, TriePrefixSets},
65 updates::{StorageTrieUpdates, TrieUpdates},
66 HashedPostStateSorted, Nibbles, StateRoot, StoredNibbles,
67};
68use reth_trie_db::{DatabaseStateRoot, DatabaseStorageTrieCursor};
69use revm_database::states::{
70 PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
71};
72use revm_state::FlaggedStorage;
73use std::{
74 cmp::Ordering,
75 collections::{BTreeMap, BTreeSet},
76 fmt::Debug,
77 ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive},
78 sync::{mpsc, Arc},
79};
80use tracing::{debug, trace};
81
82pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
84
85#[derive(Debug)]
90pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
91 pub DatabaseProvider<<DB as Database>::TXMut, N>,
92);
93
94impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
95 type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
96
97 fn deref(&self) -> &Self::Target {
98 &self.0
99 }
100}
101
102impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
103 fn deref_mut(&mut self) -> &mut Self::Target {
104 &mut self.0
105 }
106}
107
108impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
109 for DatabaseProviderRW<DB, N>
110{
111 fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
112 &self.0
113 }
114}
115
116impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
117 pub fn commit(self) -> ProviderResult<bool> {
119 self.0.commit()
120 }
121
122 pub fn into_tx(self) -> <DB as Database>::TXMut {
124 self.0.into_tx()
125 }
126}
127
128impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
129 for DatabaseProvider<<DB as Database>::TXMut, N>
130{
131 fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
132 provider.0
133 }
134}
135
136#[derive(Debug)]
139pub struct DatabaseProvider<TX, N: NodeTypes> {
140 tx: TX,
142 chain_spec: Arc<N::ChainSpec>,
144 static_file_provider: StaticFileProvider<N::Primitives>,
146 prune_modes: PruneModes,
148 storage: Arc<N::Storage>,
150}
151
152impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
153 pub const fn prune_modes_ref(&self) -> &PruneModes {
155 &self.prune_modes
156 }
157}
158
159impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
160 pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
162 trace!(target: "providers::db", "Returning latest state provider");
163 Box::new(LatestStateProviderRef::new(self))
164 }
165
166 pub fn history_by_block_hash<'a>(
168 &'a self,
169 block_hash: BlockHash,
170 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
171 let mut block_number =
172 self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
173 if block_number == self.best_block_number().unwrap_or_default() &&
174 block_number == self.last_block_number().unwrap_or_default()
175 {
176 return Ok(Box::new(LatestStateProviderRef::new(self)))
177 }
178
179 block_number += 1;
181
182 let account_history_prune_checkpoint =
183 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
184 let storage_history_prune_checkpoint =
185 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
186
187 let mut state_provider = HistoricalStateProviderRef::new(self, block_number);
188
189 if let Some(prune_checkpoint_block_number) =
192 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
193 {
194 state_provider = state_provider.with_lowest_available_account_history_block_number(
195 prune_checkpoint_block_number + 1,
196 );
197 }
198 if let Some(prune_checkpoint_block_number) =
199 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
200 {
201 state_provider = state_provider.with_lowest_available_storage_history_block_number(
202 prune_checkpoint_block_number + 1,
203 );
204 }
205
206 Ok(Box::new(state_provider))
207 }
208
209 #[cfg(feature = "test-utils")]
210 pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
212 self.prune_modes = prune_modes;
213 }
214}
215
216impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
217 type Primitives = N::Primitives;
218}
219
220impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
221 fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
223 self.static_file_provider.clone()
224 }
225}
226
227impl<TX: Debug + Send + Sync, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
228 for DatabaseProvider<TX, N>
229{
230 type ChainSpec = N::ChainSpec;
231
232 fn chain_spec(&self) -> Arc<Self::ChainSpec> {
233 self.chain_spec.clone()
234 }
235}
236
237impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
238 pub const fn new_rw(
240 tx: TX,
241 chain_spec: Arc<N::ChainSpec>,
242 static_file_provider: StaticFileProvider<N::Primitives>,
243 prune_modes: PruneModes,
244 storage: Arc<N::Storage>,
245 ) -> Self {
246 Self { tx, chain_spec, static_file_provider, prune_modes, storage }
247 }
248}
249
250impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
251 fn as_ref(&self) -> &Self {
252 self
253 }
254}
255
256impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
257 pub fn unwind_trie_state_range(
262 &self,
263 range: RangeInclusive<BlockNumber>,
264 ) -> ProviderResult<()> {
265 let changed_accounts = self
266 .tx
267 .cursor_read::<tables::AccountChangeSets>()?
268 .walk_range(range.clone())?
269 .collect::<Result<Vec<_>, _>>()?;
270
271 let hashed_addresses = self.unwind_account_hashing(changed_accounts.iter())?;
273 let mut account_prefix_set = PrefixSetMut::with_capacity(hashed_addresses.len());
274 let mut destroyed_accounts = HashSet::default();
275 for (hashed_address, account) in hashed_addresses {
276 account_prefix_set.insert(Nibbles::unpack(hashed_address));
277 if account.is_none() {
278 destroyed_accounts.insert(hashed_address);
279 }
280 }
281
282 self.unwind_account_history_indices(changed_accounts.iter())?;
284 let storage_range = BlockNumberAddress::range(range.clone());
285
286 let changed_storages = self
287 .tx
288 .cursor_read::<tables::StorageChangeSets>()?
289 .walk_range(storage_range)?
290 .collect::<Result<Vec<_>, _>>()?;
291
292 let mut storage_prefix_sets = B256Map::<PrefixSet>::default();
295 let storage_entries = self.unwind_storage_hashing(changed_storages.iter().copied())?;
296 for (hashed_address, hashed_slots) in storage_entries {
297 account_prefix_set.insert(Nibbles::unpack(hashed_address));
298 let mut storage_prefix_set = PrefixSetMut::with_capacity(hashed_slots.len());
299 for slot in hashed_slots {
300 storage_prefix_set.insert(Nibbles::unpack(slot));
301 }
302 storage_prefix_sets.insert(hashed_address, storage_prefix_set.freeze());
303 }
304
305 self.unwind_storage_history_indices(changed_storages.iter().copied())?;
307
308 let prefix_sets = TriePrefixSets {
312 account_prefix_set: account_prefix_set.freeze(),
313 storage_prefix_sets,
314 destroyed_accounts,
315 };
316 let (new_state_root, trie_updates) = StateRoot::from_tx(&self.tx)
317 .with_prefix_sets(prefix_sets)
318 .root_with_updates()
319 .map_err(reth_db_api::DatabaseError::from)?;
320
321 let parent_number = range.start().saturating_sub(1);
322 let parent_state_root = self
323 .header_by_number(parent_number)?
324 .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?
325 .state_root();
326
327 if new_state_root != parent_state_root {
330 let parent_hash = self
331 .block_hash(parent_number)?
332 .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?;
333 return Err(ProviderError::UnwindStateRootMismatch(Box::new(RootMismatch {
334 root: GotExpected { got: new_state_root, expected: parent_state_root },
335 block_number: parent_number,
336 block_hash: parent_hash,
337 })))
338 }
339 self.write_trie_updates(&trie_updates)?;
340
341 Ok(())
342 }
343
344 fn remove_receipts_from(
346 &self,
347 from_tx: TxNumber,
348 last_block: BlockNumber,
349 remove_from: StorageLocation,
350 ) -> ProviderResult<()> {
351 if remove_from.database() {
352 self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
354 }
355
356 if remove_from.static_files() && !self.prune_modes.has_receipts_pruning() {
357 let static_file_receipt_num =
358 self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
359
360 let to_delete = static_file_receipt_num
361 .map(|static_num| (static_num + 1).saturating_sub(from_tx))
362 .unwrap_or_default();
363
364 self.static_file_provider
365 .latest_writer(StaticFileSegment::Receipts)?
366 .prune_receipts(to_delete, last_block)?;
367 }
368
369 Ok(())
370 }
371}
372
373impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
374 fn try_into_history_at_block(
375 self,
376 mut block_number: BlockNumber,
377 ) -> ProviderResult<StateProviderBox> {
378 if block_number == self.best_block_number().unwrap_or_default() {
381 return Ok(Box::new(LatestStateProvider::new(self)))
382 }
383
384 block_number += 1;
386
387 let account_history_prune_checkpoint =
388 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
389 let storage_history_prune_checkpoint =
390 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
391
392 let mut state_provider = HistoricalStateProvider::new(self, block_number);
393
394 if let Some(prune_checkpoint_block_number) =
397 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
398 {
399 state_provider = state_provider.with_lowest_available_account_history_block_number(
400 prune_checkpoint_block_number + 1,
401 );
402 }
403 if let Some(prune_checkpoint_block_number) =
404 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
405 {
406 state_provider = state_provider.with_lowest_available_storage_history_block_number(
407 prune_checkpoint_block_number + 1,
408 );
409 }
410
411 Ok(Box::new(state_provider))
412 }
413}
414
415impl<TX: DbTx + 'static, N: NodeTypes> StateCommitmentProvider for DatabaseProvider<TX, N> {
416 type StateCommitment = N::StateCommitment;
417}
418
419impl<
420 Tx: DbTx + DbTxMut + 'static,
421 N: NodeTypesForProvider<Primitives: NodePrimitives<BlockHeader = Header>>,
422 > DatabaseProvider<Tx, N>
423{
424 pub fn insert_historical_block(
428 &self,
429 block: RecoveredBlock<<Self as BlockWriter>::Block>,
430 ) -> ProviderResult<StoredBlockBodyIndices> {
431 let ttd = if block.number() == 0 {
432 block.header().difficulty()
433 } else {
434 let parent_block_number = block.number() - 1;
435 let parent_ttd = self.header_td_by_number(parent_block_number)?.unwrap_or_default();
436 parent_ttd + block.header().difficulty()
437 };
438
439 let mut writer = self.static_file_provider.latest_writer(StaticFileSegment::Headers)?;
440
441 let segment_header = writer.user_header();
443 if segment_header.block_end().is_none() && segment_header.expected_block_start() == 0 {
444 for block_number in 0..block.number() {
445 let mut prev = block.clone_header();
446 prev.number = block_number;
447 writer.append_header(&prev, U256::ZERO, &B256::ZERO)?;
448 }
449 }
450
451 writer.append_header(block.header(), ttd, &block.hash())?;
452
453 self.insert_block(block, StorageLocation::Database)
454 }
455}
456
457fn unwind_history_shards<S, T, C>(
470 cursor: &mut C,
471 start_key: T::Key,
472 block_number: BlockNumber,
473 mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
474) -> ProviderResult<Vec<u64>>
475where
476 T: Table<Value = BlockNumberList>,
477 T::Key: AsRef<ShardedKey<S>>,
478 C: DbCursorRO<T> + DbCursorRW<T>,
479{
480 let mut item = cursor.seek_exact(start_key)?;
481 while let Some((sharded_key, list)) = item {
482 if !shard_belongs_to_key(&sharded_key) {
484 break
485 }
486 cursor.delete_current()?;
487
488 let first = list.iter().next().expect("List can't be empty");
491 if first >= block_number {
492 item = cursor.prev()?;
493 continue
494 } else if block_number <= sharded_key.as_ref().highest_block_number {
495 return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
497 }
498 return Ok(list.iter().collect::<Vec<_>>())
499 }
500
501 Ok(Vec::new())
502}
503
504impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
505 pub const fn new(
507 tx: TX,
508 chain_spec: Arc<N::ChainSpec>,
509 static_file_provider: StaticFileProvider<N::Primitives>,
510 prune_modes: PruneModes,
511 storage: Arc<N::Storage>,
512 ) -> Self {
513 Self { tx, chain_spec, static_file_provider, prune_modes, storage }
514 }
515
516 pub fn into_tx(self) -> TX {
518 self.tx
519 }
520
521 pub const fn tx_mut(&mut self) -> &mut TX {
523 &mut self.tx
524 }
525
526 pub const fn tx_ref(&self) -> &TX {
528 &self.tx
529 }
530
531 pub fn chain_spec(&self) -> &N::ChainSpec {
533 &self.chain_spec
534 }
535}
536
537impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
538 fn transactions_by_tx_range_with_cursor<C>(
539 &self,
540 range: impl RangeBounds<TxNumber>,
541 cursor: &mut C,
542 ) -> ProviderResult<Vec<TxTy<N>>>
543 where
544 C: DbCursorRO<tables::Transactions<TxTy<N>>>,
545 {
546 self.static_file_provider.get_range_with_static_file_or_database(
547 StaticFileSegment::Transactions,
548 to_range(range),
549 |static_file, range, _| static_file.transactions_by_tx_range(range),
550 |range, _| self.cursor_collect(cursor, range),
551 |_| true,
552 )
553 }
554
555 fn recovered_block<H, HF, B, BF>(
556 &self,
557 id: BlockHashOrNumber,
558 _transaction_kind: TransactionVariant,
559 header_by_number: HF,
560 construct_block: BF,
561 ) -> ProviderResult<Option<B>>
562 where
563 H: AsRef<HeaderTy<N>>,
564 HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
565 BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
566 {
567 let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
568 let Some(header) = header_by_number(block_number)? else { return Ok(None) };
569
570 let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
577
578 let tx_range = body.tx_num_range();
579
580 let (transactions, senders) = if tx_range.is_empty() {
581 (vec![], vec![])
582 } else {
583 (self.transactions_by_tx_range(tx_range.clone())?, self.senders_by_tx_range(tx_range)?)
584 };
585
586 let body = self
587 .storage
588 .reader()
589 .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
590 .pop()
591 .ok_or(ProviderError::InvalidStorageOutput)?;
592
593 construct_block(header, body, senders)
594 }
595
596 fn block_range<F, H, HF, R>(
606 &self,
607 range: RangeInclusive<BlockNumber>,
608 headers_range: HF,
609 mut assemble_block: F,
610 ) -> ProviderResult<Vec<R>>
611 where
612 H: AsRef<HeaderTy<N>>,
613 HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
614 F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
615 {
616 if range.is_empty() {
617 return Ok(Vec::new())
618 }
619
620 let len = range.end().saturating_sub(*range.start()) as usize;
621 let mut blocks = Vec::with_capacity(len);
622
623 let headers = headers_range(range.clone())?;
624 let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<TxTy<N>>>()?;
625
626 let present_headers = self
632 .block_body_indices_range(range)?
633 .into_iter()
634 .map(|b| b.tx_num_range())
635 .zip(headers)
636 .collect::<Vec<_>>();
637
638 let mut inputs = Vec::new();
639 for (tx_range, header) in &present_headers {
640 let transactions = if tx_range.is_empty() {
641 Vec::new()
642 } else {
643 self.transactions_by_tx_range_with_cursor(tx_range.clone(), &mut tx_cursor)?
644 };
645
646 inputs.push((header.as_ref(), transactions));
647 }
648
649 let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
650
651 for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
652 blocks.push(assemble_block(header, body, tx_range)?);
653 }
654
655 Ok(blocks)
656 }
657
658 fn block_with_senders_range<H, HF, B, BF>(
669 &self,
670 range: RangeInclusive<BlockNumber>,
671 headers_range: HF,
672 assemble_block: BF,
673 ) -> ProviderResult<Vec<B>>
674 where
675 H: AsRef<HeaderTy<N>>,
676 HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
677 BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
678 {
679 let mut senders_cursor = self.tx.cursor_read::<tables::TransactionSenders>()?;
680
681 self.block_range(range, headers_range, |header, body, tx_range| {
682 let senders = if tx_range.is_empty() {
683 Vec::new()
684 } else {
685 let known_senders =
687 senders_cursor
688 .walk_range(tx_range.clone())?
689 .collect::<Result<HashMap<_, _>, _>>()?;
690
691 let mut senders = Vec::with_capacity(body.transactions().len());
692 for (tx_num, tx) in tx_range.zip(body.transactions()) {
693 match known_senders.get(&tx_num) {
694 None => {
695 let sender = tx.recover_signer_unchecked()?;
697 senders.push(sender);
698 }
699 Some(sender) => senders.push(*sender),
700 }
701 }
702
703 senders
704 };
705
706 assemble_block(header, body, senders)
707 })
708 }
709
710 fn populate_bundle_state<A, S>(
714 &self,
715 account_changeset: Vec<(u64, AccountBeforeTx)>,
716 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
717 plain_accounts_cursor: &mut A,
718 plain_storage_cursor: &mut S,
719 ) -> ProviderResult<(BundleStateInit, RevertsInit)>
720 where
721 A: DbCursorRO<PlainAccountState>,
722 S: DbDupCursorRO<PlainStorageState>,
723 {
724 let mut state: BundleStateInit = HashMap::default();
728
729 let mut reverts: RevertsInit = HashMap::default();
735
736 for (block_number, account_before) in account_changeset.into_iter().rev() {
738 let AccountBeforeTx { info: old_info, address } = account_before;
739 match state.entry(address) {
740 hash_map::Entry::Vacant(entry) => {
741 let new_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
742 entry.insert((old_info, new_info, HashMap::default()));
743 }
744 hash_map::Entry::Occupied(mut entry) => {
745 entry.get_mut().0 = old_info;
747 }
748 }
749 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
751 }
752
753 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
755 let BlockNumberAddress((block_number, address)) = block_and_address;
756 let account_state = match state.entry(address) {
758 hash_map::Entry::Vacant(entry) => {
759 let present_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
760 entry.insert((present_info, present_info, HashMap::default()))
761 }
762 hash_map::Entry::Occupied(entry) => entry.into_mut(),
763 };
764
765 match account_state.2.entry(old_storage.key) {
767 hash_map::Entry::Vacant(entry) => {
768 let new_storage = plain_storage_cursor
769 .seek_by_key_subkey(address, old_storage.key)?
770 .filter(|storage| storage.key == old_storage.key)
771 .unwrap_or_default();
772 entry.insert((
773 (old_storage.value, old_storage.is_private),
774 (new_storage.value, new_storage.is_private),
775 ));
776 }
777 hash_map::Entry::Occupied(mut entry) => {
778 entry.get_mut().0 = (old_storage.value, old_storage.is_private);
779 }
780 };
781
782 reverts
783 .entry(block_number)
784 .or_default()
785 .entry(address)
786 .or_default()
787 .1
788 .push(old_storage);
789 }
790
791 Ok((state, reverts))
792 }
793}
794
795impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
796 pub fn commit(self) -> ProviderResult<bool> {
798 Ok(self.tx.commit()?)
799 }
800
801 fn take_shard<T>(
804 &self,
805 cursor: &mut <TX as DbTxMut>::CursorMut<T>,
806 key: T::Key,
807 ) -> ProviderResult<Vec<u64>>
808 where
809 T: Table<Value = BlockNumberList>,
810 {
811 if let Some((_, list)) = cursor.seek_exact(key)? {
812 cursor.delete_current()?;
814 let list = list.iter().collect::<Vec<_>>();
815 return Ok(list)
816 }
817 Ok(Vec::new())
818 }
819
820 fn append_history_index<P, T>(
828 &self,
829 index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
830 mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
831 ) -> ProviderResult<()>
832 where
833 P: Copy,
834 T: Table<Value = BlockNumberList>,
835 {
836 let mut cursor = self.tx.cursor_write::<T>()?;
837 for (partial_key, indices) in index_updates {
838 let mut last_shard =
839 self.take_shard::<T>(&mut cursor, sharded_key_factory(partial_key, u64::MAX))?;
840 last_shard.extend(indices);
841 let mut chunks = last_shard.chunks(sharded_key::NUM_OF_INDICES_IN_SHARD).peekable();
843 while let Some(list) = chunks.next() {
844 let highest_block_number = if chunks.peek().is_some() {
845 *list.last().expect("`chunks` does not return empty list")
846 } else {
847 u64::MAX
849 };
850 cursor.insert(
851 sharded_key_factory(partial_key, highest_block_number),
852 &BlockNumberList::new_pre_sorted(list.iter().copied()),
853 )?;
854 }
855 }
856 Ok(())
857 }
858}
859
860impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
861 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
862 Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
863 }
864}
865
866impl<TX: DbTx, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
867 fn changed_accounts_with_range(
868 &self,
869 range: impl RangeBounds<BlockNumber>,
870 ) -> ProviderResult<BTreeSet<Address>> {
871 self.tx
872 .cursor_read::<tables::AccountChangeSets>()?
873 .walk_range(range)?
874 .map(|entry| {
875 entry.map(|(_, account_before)| account_before.address).map_err(Into::into)
876 })
877 .collect()
878 }
879
880 fn basic_accounts(
881 &self,
882 iter: impl IntoIterator<Item = Address>,
883 ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
884 let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
885 Ok(iter
886 .into_iter()
887 .map(|address| plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v))))
888 .collect::<Result<Vec<_>, _>>()?)
889 }
890
891 fn changed_accounts_and_blocks_with_range(
892 &self,
893 range: RangeInclusive<BlockNumber>,
894 ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
895 let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
896
897 let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
898 BTreeMap::new(),
899 |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
900 let (index, account) = entry?;
901 accounts.entry(account.address).or_default().push(index);
902 Ok(accounts)
903 },
904 )?;
905
906 Ok(account_transitions)
907 }
908}
909
910impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
911 fn storage_changeset(
912 &self,
913 block_number: BlockNumber,
914 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
915 let range = block_number..=block_number;
916 let storage_range = BlockNumberAddress::range(range);
917 self.tx
918 .cursor_dup_read::<tables::StorageChangeSets>()?
919 .walk_range(storage_range)?
920 .map(|result| -> ProviderResult<_> { Ok(result?) })
921 .collect()
922 }
923}
924
925impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
926 fn account_block_changeset(
927 &self,
928 block_number: BlockNumber,
929 ) -> ProviderResult<Vec<AccountBeforeTx>> {
930 let range = block_number..=block_number;
931 self.tx
932 .cursor_read::<tables::AccountChangeSets>()?
933 .walk_range(range)?
934 .map(|result| -> ProviderResult<_> {
935 let (_, account_before) = result?;
936 Ok(account_before)
937 })
938 .collect()
939 }
940}
941
942impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
943 for DatabaseProvider<TX, N>
944{
945 type Header = HeaderTy<N>;
946
947 fn local_tip_header(
948 &self,
949 highest_uninterrupted_block: BlockNumber,
950 ) -> ProviderResult<SealedHeader<Self::Header>> {
951 let static_file_provider = self.static_file_provider();
952
953 let next_static_file_block_num = static_file_provider
956 .get_highest_static_file_block(StaticFileSegment::Headers)
957 .map(|id| id + 1)
958 .unwrap_or_default();
959 let next_block = highest_uninterrupted_block + 1;
960
961 match next_static_file_block_num.cmp(&next_block) {
962 Ordering::Greater => {
965 let mut static_file_producer =
966 static_file_provider.latest_writer(StaticFileSegment::Headers)?;
967 static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
968 static_file_producer.commit()?
971 }
972 Ordering::Less => {
973 return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
975 }
976 Ordering::Equal => {}
977 }
978
979 let local_head = static_file_provider
980 .sealed_header(highest_uninterrupted_block)?
981 .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
982
983 Ok(local_head)
984 }
985}
986
987impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
988 type Header = HeaderTy<N>;
989
990 fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
991 if let Some(num) = self.block_number(*block_hash)? {
992 Ok(self.header_by_number(num)?)
993 } else {
994 Ok(None)
995 }
996 }
997
998 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
999 self.static_file_provider.get_with_static_file_or_database(
1000 StaticFileSegment::Headers,
1001 num,
1002 |static_file| static_file.header_by_number(num),
1003 || Ok(self.tx.get::<tables::Headers<Self::Header>>(num)?),
1004 )
1005 }
1006
1007 fn header_td(&self, block_hash: &BlockHash) -> ProviderResult<Option<U256>> {
1008 if let Some(num) = self.block_number(*block_hash)? {
1009 self.header_td_by_number(num)
1010 } else {
1011 Ok(None)
1012 }
1013 }
1014
1015 fn header_td_by_number(&self, number: BlockNumber) -> ProviderResult<Option<U256>> {
1016 if self.chain_spec.is_paris_active_at_block(number) {
1017 if let Some(td) = self.chain_spec.final_paris_total_difficulty() {
1018 return Ok(Some(td))
1021 }
1022 }
1023
1024 self.static_file_provider.get_with_static_file_or_database(
1025 StaticFileSegment::Headers,
1026 number,
1027 |static_file| static_file.header_td_by_number(number),
1028 || Ok(self.tx.get::<tables::HeaderTerminalDifficulties>(number)?.map(|td| td.0)),
1029 )
1030 }
1031
1032 fn headers_range(
1033 &self,
1034 range: impl RangeBounds<BlockNumber>,
1035 ) -> ProviderResult<Vec<Self::Header>> {
1036 self.static_file_provider.get_range_with_static_file_or_database(
1037 StaticFileSegment::Headers,
1038 to_range(range),
1039 |static_file, range, _| static_file.headers_range(range),
1040 |range, _| self.cursor_read_collect::<tables::Headers<Self::Header>>(range),
1041 |_| true,
1042 )
1043 }
1044
1045 fn sealed_header(
1046 &self,
1047 number: BlockNumber,
1048 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1049 self.static_file_provider.get_with_static_file_or_database(
1050 StaticFileSegment::Headers,
1051 number,
1052 |static_file| static_file.sealed_header(number),
1053 || {
1054 if let Some(header) = self.header_by_number(number)? {
1055 let hash = self
1056 .block_hash(number)?
1057 .ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
1058 Ok(Some(SealedHeader::new(header, hash)))
1059 } else {
1060 Ok(None)
1061 }
1062 },
1063 )
1064 }
1065
1066 fn sealed_headers_while(
1067 &self,
1068 range: impl RangeBounds<BlockNumber>,
1069 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1070 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1071 self.static_file_provider.get_range_with_static_file_or_database(
1072 StaticFileSegment::Headers,
1073 to_range(range),
1074 |static_file, range, predicate| static_file.sealed_headers_while(range, predicate),
1075 |range, mut predicate| {
1076 let mut headers = vec![];
1077 for entry in
1078 self.tx.cursor_read::<tables::Headers<Self::Header>>()?.walk_range(range)?
1079 {
1080 let (number, header) = entry?;
1081 let hash = self
1082 .block_hash(number)?
1083 .ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
1084 let sealed = SealedHeader::new(header, hash);
1085 if !predicate(&sealed) {
1086 break
1087 }
1088 headers.push(sealed);
1089 }
1090 Ok(headers)
1091 },
1092 predicate,
1093 )
1094 }
1095}
1096
1097impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1098 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1099 self.static_file_provider.get_with_static_file_or_database(
1100 StaticFileSegment::Headers,
1101 number,
1102 |static_file| static_file.block_hash(number),
1103 || Ok(self.tx.get::<tables::CanonicalHeaders>(number)?),
1104 )
1105 }
1106
1107 fn canonical_hashes_range(
1108 &self,
1109 start: BlockNumber,
1110 end: BlockNumber,
1111 ) -> ProviderResult<Vec<B256>> {
1112 self.static_file_provider.get_range_with_static_file_or_database(
1113 StaticFileSegment::Headers,
1114 start..end,
1115 |static_file, range, _| static_file.canonical_hashes_range(range.start, range.end),
1116 |range, _| self.cursor_read_collect::<tables::CanonicalHeaders>(range),
1117 |_| true,
1118 )
1119 }
1120}
1121
1122impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1123 fn chain_info(&self) -> ProviderResult<ChainInfo> {
1124 let best_number = self.best_block_number()?;
1125 let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1126 Ok(ChainInfo { best_hash, best_number })
1127 }
1128
1129 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1130 Ok(self
1133 .get_stage_checkpoint(StageId::Finish)?
1134 .map(|checkpoint| checkpoint.block_number)
1135 .unwrap_or_default())
1136 }
1137
1138 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1139 Ok(self
1140 .tx
1141 .cursor_read::<tables::CanonicalHeaders>()?
1142 .last()?
1143 .map(|(num, _)| num)
1144 .max(
1145 self.static_file_provider.get_highest_static_file_block(StaticFileSegment::Headers),
1146 )
1147 .unwrap_or_default())
1148 }
1149
1150 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1151 Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1152 }
1153}
1154
1155impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1156 type Block = BlockTy<N>;
1157
1158 fn find_block_by_hash(
1159 &self,
1160 hash: B256,
1161 source: BlockSource,
1162 ) -> ProviderResult<Option<Self::Block>> {
1163 if source.is_canonical() {
1164 self.block(hash.into())
1165 } else {
1166 Ok(None)
1167 }
1168 }
1169
1170 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1176 if let Some(number) = self.convert_hash_or_number(id)? {
1177 if let Some(header) = self.header_by_number(number)? {
1178 let Some(transactions) = self.transactions_by_block(number.into())? else {
1183 return Ok(None)
1184 };
1185
1186 let body = self
1187 .storage
1188 .reader()
1189 .read_block_bodies(self, vec![(&header, transactions)])?
1190 .pop()
1191 .ok_or(ProviderError::InvalidStorageOutput)?;
1192
1193 return Ok(Some(Self::Block::new(header, body)))
1194 }
1195 }
1196
1197 Ok(None)
1198 }
1199
1200 fn pending_block(&self) -> ProviderResult<Option<SealedBlock<Self::Block>>> {
1201 Ok(None)
1202 }
1203
1204 fn pending_block_with_senders(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1205 Ok(None)
1206 }
1207
1208 fn pending_block_and_receipts(
1209 &self,
1210 ) -> ProviderResult<Option<(SealedBlock<Self::Block>, Vec<Self::Receipt>)>> {
1211 Ok(None)
1212 }
1213
1214 fn recovered_block(
1223 &self,
1224 id: BlockHashOrNumber,
1225 transaction_kind: TransactionVariant,
1226 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1227 self.recovered_block(
1228 id,
1229 transaction_kind,
1230 |block_number| self.header_by_number(block_number),
1231 |header, body, senders| {
1232 Self::Block::new(header, body)
1233 .try_into_recovered_unchecked(senders)
1237 .map(Some)
1238 .map_err(|_| ProviderError::SenderRecoveryError)
1239 },
1240 )
1241 }
1242
1243 fn sealed_block_with_senders(
1244 &self,
1245 id: BlockHashOrNumber,
1246 transaction_kind: TransactionVariant,
1247 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1248 self.recovered_block(
1249 id,
1250 transaction_kind,
1251 |block_number| self.sealed_header(block_number),
1252 |header, body, senders| {
1253 Self::Block::new_sealed(header, body)
1254 .try_with_senders_unchecked(senders)
1258 .map(Some)
1259 .map_err(|_| ProviderError::SenderRecoveryError)
1260 },
1261 )
1262 }
1263
1264 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1265 self.block_range(
1266 range,
1267 |range| self.headers_range(range),
1268 |header, body, _| Ok(Self::Block::new(header, body)),
1269 )
1270 }
1271
1272 fn block_with_senders_range(
1273 &self,
1274 range: RangeInclusive<BlockNumber>,
1275 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1276 self.block_with_senders_range(
1277 range,
1278 |range| self.headers_range(range),
1279 |header, body, senders| {
1280 Self::Block::new(header, body)
1281 .try_into_recovered_unchecked(senders)
1282 .map_err(|_| ProviderError::SenderRecoveryError)
1283 },
1284 )
1285 }
1286
1287 fn recovered_block_range(
1288 &self,
1289 range: RangeInclusive<BlockNumber>,
1290 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1291 self.block_with_senders_range(
1292 range,
1293 |range| self.sealed_headers_range(range),
1294 |header, body, senders| {
1295 Self::Block::new_sealed(header, body)
1296 .try_with_senders(senders)
1297 .map_err(|_| ProviderError::SenderRecoveryError)
1298 },
1299 )
1300 }
1301}
1302
1303impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1304 for DatabaseProvider<TX, N>
1305{
1306 fn transaction_hashes_by_range(
1309 &self,
1310 tx_range: Range<TxNumber>,
1311 ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1312 self.static_file_provider.get_range_with_static_file_or_database(
1313 StaticFileSegment::Transactions,
1314 tx_range,
1315 |static_file, range, _| static_file.transaction_hashes_by_range(range),
1316 |tx_range, _| {
1317 let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<TxTy<N>>>()?;
1318 let tx_range_size = tx_range.clone().count();
1319 let tx_walker = tx_cursor.walk_range(tx_range)?;
1320
1321 let chunk_size = (tx_range_size / rayon::current_num_threads()).max(1);
1322 let mut channels = Vec::with_capacity(chunk_size);
1323 let mut transaction_count = 0;
1324
1325 #[inline]
1326 fn calculate_hash<T>(
1327 entry: Result<(TxNumber, T), DatabaseError>,
1328 rlp_buf: &mut Vec<u8>,
1329 ) -> Result<(B256, TxNumber), Box<ProviderError>>
1330 where
1331 T: Encodable2718,
1332 {
1333 let (tx_id, tx) = entry.map_err(|e| Box::new(e.into()))?;
1334 tx.encode_2718(rlp_buf);
1335 Ok((keccak256(rlp_buf), tx_id))
1336 }
1337
1338 for chunk in &tx_walker.chunks(chunk_size) {
1339 let (tx, rx) = mpsc::channel();
1340 channels.push(rx);
1341
1342 let chunk: Vec<_> = chunk.collect();
1345 transaction_count += chunk.len();
1346
1347 rayon::spawn(move || {
1351 let mut rlp_buf = Vec::with_capacity(128);
1352 for entry in chunk {
1353 rlp_buf.clear();
1354 let _ = tx.send(calculate_hash(entry, &mut rlp_buf));
1355 }
1356 });
1357 }
1358 let mut tx_list = Vec::with_capacity(transaction_count);
1359
1360 for channel in channels {
1362 while let Ok(tx) = channel.recv() {
1363 let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
1364 tx_list.push((tx_hash, tx_id));
1365 }
1366 }
1367
1368 Ok(tx_list)
1369 },
1370 |_| true,
1371 )
1372 }
1373}
1374
1375impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1377 type Transaction = TxTy<N>;
1378
1379 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1380 Ok(self.tx.get::<tables::TransactionHashNumbers>(tx_hash)?)
1381 }
1382
1383 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1384 self.static_file_provider.get_with_static_file_or_database(
1385 StaticFileSegment::Transactions,
1386 id,
1387 |static_file| static_file.transaction_by_id(id),
1388 || Ok(self.tx.get::<tables::Transactions<Self::Transaction>>(id)?),
1389 )
1390 }
1391
1392 fn transaction_by_id_unhashed(
1393 &self,
1394 id: TxNumber,
1395 ) -> ProviderResult<Option<Self::Transaction>> {
1396 self.static_file_provider.get_with_static_file_or_database(
1397 StaticFileSegment::Transactions,
1398 id,
1399 |static_file| static_file.transaction_by_id_unhashed(id),
1400 || Ok(self.tx.get::<tables::Transactions<Self::Transaction>>(id)?),
1401 )
1402 }
1403
1404 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1405 if let Some(id) = self.transaction_id(hash)? {
1406 Ok(self.transaction_by_id_unhashed(id)?)
1407 } else {
1408 Ok(None)
1409 }
1410 }
1411
1412 fn transaction_by_hash_with_meta(
1413 &self,
1414 tx_hash: TxHash,
1415 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1416 let mut transaction_cursor = self.tx.cursor_read::<tables::TransactionBlocks>()?;
1417 if let Some(transaction_id) = self.transaction_id(tx_hash)? {
1418 if let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? {
1419 if let Some(block_number) =
1420 transaction_cursor.seek(transaction_id).map(|b| b.map(|(_, bn)| bn))?
1421 {
1422 if let Some(sealed_header) = self.sealed_header(block_number)? {
1423 let (header, block_hash) = sealed_header.split();
1424 if let Some(block_body) = self.block_body_indices(block_number)? {
1425 let index = transaction_id - block_body.first_tx_num();
1430
1431 let meta = TransactionMeta {
1432 tx_hash,
1433 index,
1434 block_hash,
1435 block_number,
1436 base_fee: header.base_fee_per_gas(),
1437 excess_blob_gas: header.excess_blob_gas(),
1438 timestamp: header.timestamp(),
1439 };
1440
1441 return Ok(Some((transaction, meta)))
1442 }
1443 }
1444 }
1445 }
1446 }
1447
1448 Ok(None)
1449 }
1450
1451 fn transaction_block(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1452 let mut cursor = self.tx.cursor_read::<tables::TransactionBlocks>()?;
1453 Ok(cursor.seek(id)?.map(|(_, bn)| bn))
1454 }
1455
1456 fn transactions_by_block(
1457 &self,
1458 id: BlockHashOrNumber,
1459 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1460 let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<Self::Transaction>>()?;
1461
1462 if let Some(block_number) = self.convert_hash_or_number(id)? {
1463 if let Some(body) = self.block_body_indices(block_number)? {
1464 let tx_range = body.tx_num_range();
1465 return if tx_range.is_empty() {
1466 Ok(Some(Vec::new()))
1467 } else {
1468 Ok(Some(self.transactions_by_tx_range_with_cursor(tx_range, &mut tx_cursor)?))
1469 }
1470 }
1471 }
1472 Ok(None)
1473 }
1474
1475 fn transactions_by_block_range(
1476 &self,
1477 range: impl RangeBounds<BlockNumber>,
1478 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1479 let range = to_range(range);
1480 let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<Self::Transaction>>()?;
1481
1482 self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
1483 .into_iter()
1484 .map(|body| {
1485 let tx_num_range = body.tx_num_range();
1486 if tx_num_range.is_empty() {
1487 Ok(Vec::new())
1488 } else {
1489 Ok(self
1490 .transactions_by_tx_range_with_cursor(tx_num_range, &mut tx_cursor)?
1491 .into_iter()
1492 .collect())
1493 }
1494 })
1495 .collect()
1496 }
1497
1498 fn transactions_by_tx_range(
1499 &self,
1500 range: impl RangeBounds<TxNumber>,
1501 ) -> ProviderResult<Vec<Self::Transaction>> {
1502 self.transactions_by_tx_range_with_cursor(
1503 range,
1504 &mut self.tx.cursor_read::<tables::Transactions<_>>()?,
1505 )
1506 }
1507
1508 fn senders_by_tx_range(
1509 &self,
1510 range: impl RangeBounds<TxNumber>,
1511 ) -> ProviderResult<Vec<Address>> {
1512 self.cursor_read_collect::<tables::TransactionSenders>(range)
1513 }
1514
1515 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1516 Ok(self.tx.get::<tables::TransactionSenders>(id)?)
1517 }
1518}
1519
1520impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
1521 type Receipt = ReceiptTy<N>;
1522
1523 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1524 self.static_file_provider.get_with_static_file_or_database(
1525 StaticFileSegment::Receipts,
1526 id,
1527 |static_file| static_file.receipt(id),
1528 || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
1529 )
1530 }
1531
1532 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1533 if let Some(id) = self.transaction_id(hash)? {
1534 self.receipt(id)
1535 } else {
1536 Ok(None)
1537 }
1538 }
1539
1540 fn receipts_by_block(
1541 &self,
1542 block: BlockHashOrNumber,
1543 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1544 if let Some(number) = self.convert_hash_or_number(block)? {
1545 if let Some(body) = self.block_body_indices(number)? {
1546 let tx_range = body.tx_num_range();
1547 return if tx_range.is_empty() {
1548 Ok(Some(Vec::new()))
1549 } else {
1550 self.receipts_by_tx_range(tx_range).map(Some)
1551 }
1552 }
1553 }
1554 Ok(None)
1555 }
1556
1557 fn receipts_by_tx_range(
1558 &self,
1559 range: impl RangeBounds<TxNumber>,
1560 ) -> ProviderResult<Vec<Self::Receipt>> {
1561 self.static_file_provider.get_range_with_static_file_or_database(
1562 StaticFileSegment::Receipts,
1563 to_range(range),
1564 |static_file, range, _| static_file.receipts_by_tx_range(range),
1565 |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
1566 |_| true,
1567 )
1568 }
1569}
1570
1571impl<TX: DbTx + 'static, N: NodeTypes<ChainSpec: EthereumHardforks>> WithdrawalsProvider
1572 for DatabaseProvider<TX, N>
1573{
1574 fn withdrawals_by_block(
1575 &self,
1576 id: BlockHashOrNumber,
1577 timestamp: u64,
1578 ) -> ProviderResult<Option<Withdrawals>> {
1579 if self.chain_spec.is_shanghai_active_at_timestamp(timestamp) {
1580 if let Some(number) = self.convert_hash_or_number(id)? {
1581 return self.static_file_provider.get_with_static_file_or_database(
1582 StaticFileSegment::BlockMeta,
1583 number,
1584 |static_file| static_file.withdrawals_by_block(number.into(), timestamp),
1585 || {
1586 let withdrawals = self
1589 .tx
1590 .get::<tables::BlockWithdrawals>(number)
1591 .map(|w| w.map(|w| w.withdrawals))?
1592 .unwrap_or_default();
1593 Ok(Some(withdrawals))
1594 },
1595 )
1596 }
1597 }
1598 Ok(None)
1599 }
1600}
1601
1602impl<TX: DbTx + 'static, N: NodeTypesForProvider> OmmersProvider for DatabaseProvider<TX, N> {
1603 fn ommers(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Vec<Self::Header>>> {
1608 if let Some(number) = self.convert_hash_or_number(id)? {
1609 if self.chain_spec.is_paris_active_at_block(number) {
1612 return Ok(Some(Vec::new()))
1613 }
1614
1615 return self.static_file_provider.get_with_static_file_or_database(
1616 StaticFileSegment::BlockMeta,
1617 number,
1618 |static_file| static_file.ommers(id),
1619 || Ok(self.tx.get::<tables::BlockOmmers<Self::Header>>(number)?.map(|o| o.ommers)),
1620 )
1621 }
1622
1623 Ok(None)
1624 }
1625}
1626
1627impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
1628 for DatabaseProvider<TX, N>
1629{
1630 fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1631 self.static_file_provider.get_with_static_file_or_database(
1632 StaticFileSegment::BlockMeta,
1633 num,
1634 |static_file| static_file.block_body_indices(num),
1635 || Ok(self.tx.get::<tables::BlockBodyIndices>(num)?),
1636 )
1637 }
1638
1639 fn block_body_indices_range(
1640 &self,
1641 range: RangeInclusive<BlockNumber>,
1642 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1643 self.static_file_provider.get_range_with_static_file_or_database(
1644 StaticFileSegment::BlockMeta,
1645 *range.start()..*range.end() + 1,
1646 |static_file, range, _| {
1647 static_file.block_body_indices_range(range.start..=range.end.saturating_sub(1))
1648 },
1649 |range, _| self.cursor_read_collect::<tables::BlockBodyIndices>(range),
1650 |_| true,
1651 )
1652 }
1653}
1654
1655impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
1656 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1657 Ok(self.tx.get::<tables::StageCheckpoints>(id.to_string())?)
1658 }
1659
1660 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1662 Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
1663 }
1664
1665 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1666 self.tx
1667 .cursor_read::<tables::StageCheckpoints>()?
1668 .walk(None)?
1669 .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
1670 .map_err(ProviderError::Database)
1671 }
1672}
1673
1674impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
1675 fn save_stage_checkpoint(
1677 &self,
1678 id: StageId,
1679 checkpoint: StageCheckpoint,
1680 ) -> ProviderResult<()> {
1681 Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
1682 }
1683
1684 fn save_stage_checkpoint_progress(
1686 &self,
1687 id: StageId,
1688 checkpoint: Vec<u8>,
1689 ) -> ProviderResult<()> {
1690 Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
1691 }
1692
1693 fn update_pipeline_stages(
1694 &self,
1695 block_number: BlockNumber,
1696 drop_stage_checkpoint: bool,
1697 ) -> ProviderResult<()> {
1698 let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
1700 for stage_id in StageId::ALL {
1701 let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
1702 cursor.upsert(
1703 stage_id.to_string(),
1704 &StageCheckpoint {
1705 block_number,
1706 ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
1707 },
1708 )?;
1709 }
1710
1711 Ok(())
1712 }
1713}
1714
1715impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
1716 fn plain_state_storages(
1717 &self,
1718 addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
1719 ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
1720 let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
1721
1722 addresses_with_keys
1723 .into_iter()
1724 .map(|(address, storage)| {
1725 storage
1726 .into_iter()
1727 .map(|key| -> ProviderResult<_> {
1728 Ok(plain_storage
1729 .seek_by_key_subkey(address, key)?
1730 .filter(|v| v.key == key)
1731 .unwrap_or_else(|| StorageEntry { key, ..Default::default() }))
1732 })
1733 .collect::<ProviderResult<Vec<_>>>()
1734 .map(|storage| (address, storage))
1735 })
1736 .collect::<ProviderResult<Vec<(_, _)>>>()
1737 }
1738
1739 fn changed_storages_with_range(
1740 &self,
1741 range: RangeInclusive<BlockNumber>,
1742 ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
1743 self.tx
1744 .cursor_read::<tables::StorageChangeSets>()?
1745 .walk_range(BlockNumberAddress::range(range))?
1746 .try_fold(BTreeMap::new(), |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
1749 let (BlockNumberAddress((_, address)), storage_entry) = entry?;
1750 accounts.entry(address).or_default().insert(storage_entry.key);
1751 Ok(accounts)
1752 })
1753 }
1754
1755 fn changed_storages_and_blocks_with_range(
1756 &self,
1757 range: RangeInclusive<BlockNumber>,
1758 ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
1759 let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
1760
1761 let storage_changeset_lists =
1762 changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
1763 BTreeMap::new(),
1764 |mut storages: BTreeMap<(Address, B256), Vec<u64>>, entry| -> ProviderResult<_> {
1765 let (index, storage) = entry?;
1766 storages
1767 .entry((index.address(), storage.key))
1768 .or_default()
1769 .push(index.block_number());
1770 Ok(storages)
1771 },
1772 )?;
1773
1774 Ok(storage_changeset_lists)
1775 }
1776}
1777
1778impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
1779 for DatabaseProvider<TX, N>
1780{
1781 type Receipt = ReceiptTy<N>;
1782
1783 fn write_state(
1784 &self,
1785 execution_outcome: &ExecutionOutcome<Self::Receipt>,
1786 is_value_known: OriginalValuesKnown,
1787 write_receipts_to: StorageLocation,
1788 ) -> ProviderResult<()> {
1789 let first_block = execution_outcome.first_block();
1790 let block_count = execution_outcome.len() as u64;
1791 let last_block = execution_outcome.last_block();
1792 let block_range = first_block..=last_block;
1793
1794 let tip = self.last_block_number()?.max(last_block);
1795
1796 let (plain_state, reverts) =
1797 execution_outcome.bundle.to_plain_state_and_reverts(is_value_known);
1798
1799 self.write_state_reverts(reverts, first_block)?;
1800 self.write_state_changes(plain_state)?;
1801
1802 let block_indices: Vec<_> = self
1804 .block_body_indices_range(block_range)?
1805 .into_iter()
1806 .map(|b| b.first_tx_num)
1807 .collect();
1808
1809 if block_indices.len() < block_count as usize {
1811 let missing_blocks = block_count - block_indices.len() as u64;
1812 return Err(ProviderError::BlockBodyIndicesNotFound(
1813 last_block.saturating_sub(missing_blocks - 1),
1814 ));
1815 }
1816
1817 let has_receipts_pruning = self.prune_modes.has_receipts_pruning();
1818
1819 let mut receipts_cursor = (write_receipts_to.database() || has_receipts_pruning)
1824 .then(|| self.tx.cursor_write::<tables::Receipts<Self::Receipt>>())
1825 .transpose()?;
1826
1827 let mut receipts_static_writer = (write_receipts_to.static_files() &&
1831 !has_receipts_pruning)
1832 .then(|| self.static_file_provider.get_writer(first_block, StaticFileSegment::Receipts))
1833 .transpose()?;
1834
1835 let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
1836 let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
1837
1838 let prunable_receipts =
1841 PruneMode::Distance(MINIMUM_PRUNING_DISTANCE).should_prune(first_block, tip);
1842
1843 let mut allowed_addresses: HashSet<Address, _> = HashSet::new();
1845 for (_, addresses) in contract_log_pruner.range(..first_block) {
1846 allowed_addresses.extend(addresses.iter().copied());
1847 }
1848
1849 for (idx, (receipts, first_tx_index)) in
1850 execution_outcome.receipts.iter().zip(block_indices).enumerate()
1851 {
1852 let block_number = first_block + idx as u64;
1853
1854 if let Some(writer) = receipts_static_writer.as_mut() {
1856 writer.increment_block(block_number)?;
1857 }
1858
1859 if prunable_receipts &&
1861 self.prune_modes
1862 .receipts
1863 .is_some_and(|mode| mode.should_prune(block_number, tip))
1864 {
1865 continue
1866 }
1867
1868 if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
1870 allowed_addresses.extend(new_addresses.iter().copied());
1871 }
1872
1873 for (idx, receipt) in receipts.iter().enumerate() {
1874 let receipt_idx = first_tx_index + idx as u64;
1875 if prunable_receipts &&
1878 has_contract_log_filter &&
1879 !receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
1880 {
1881 continue
1882 }
1883
1884 if let Some(writer) = &mut receipts_static_writer {
1885 writer.append_receipt(receipt_idx, receipt)?;
1886 }
1887
1888 if let Some(cursor) = &mut receipts_cursor {
1889 cursor.append(receipt_idx, receipt)?;
1890 }
1891 }
1892 }
1893
1894 Ok(())
1895 }
1896
1897 fn write_state_reverts(
1898 &self,
1899 reverts: PlainStateReverts,
1900 first_block: BlockNumber,
1901 ) -> ProviderResult<()> {
1902 tracing::trace!("Writing storage changes");
1904 let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1905 let mut storage_changeset_cursor =
1906 self.tx_ref().cursor_dup_write::<tables::StorageChangeSets>()?;
1907 for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
1908 let block_number = first_block + block_index as BlockNumber;
1909
1910 tracing::trace!(block_number, "Writing block change");
1911 storage_changes.par_sort_unstable_by_key(|a| a.address);
1913 for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
1914 let storage_id = BlockNumberAddress((block_number, address));
1915
1916 let mut storage = storage_revert
1917 .into_iter()
1918 .map(|(k, v)| (B256::new(k.to_be_bytes()), v))
1919 .collect::<Vec<_>>();
1920 storage.par_sort_unstable_by_key(|a| a.0);
1922
1923 let mut wiped_storage = Vec::new();
1927 if wiped {
1928 tracing::trace!(?address, "Wiping storage");
1929 if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
1930 wiped_storage.push((entry.key, entry.into()));
1931 while let Some(entry) = storages_cursor.next_dup_val()? {
1932 wiped_storage.push((entry.key, entry.into()))
1933 }
1934 }
1935 }
1936
1937 tracing::trace!(?address, ?storage, "Writing storage reverts");
1938 for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
1939 storage_changeset_cursor.append_dup(
1940 storage_id,
1941 StorageEntry { key, value: value.value, is_private: value.is_private },
1942 )?;
1943 }
1944 }
1945 }
1946
1947 tracing::trace!("Writing account changes");
1949 let mut account_changeset_cursor =
1950 self.tx_ref().cursor_dup_write::<tables::AccountChangeSets>()?;
1951
1952 for (block_index, mut account_block_reverts) in reverts.accounts.into_iter().enumerate() {
1953 let block_number = first_block + block_index as BlockNumber;
1954 account_block_reverts.par_sort_by_key(|a| a.0);
1956
1957 for (address, info) in account_block_reverts {
1958 account_changeset_cursor.append_dup(
1959 block_number,
1960 AccountBeforeTx { address, info: info.map(Into::into) },
1961 )?;
1962 }
1963 }
1964
1965 Ok(())
1966 }
1967
1968 fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
1969 changes.accounts.par_sort_by_key(|a| a.0);
1972 changes.storage.par_sort_by_key(|a| a.address);
1973 changes.contracts.par_sort_by_key(|a| a.0);
1974
1975 tracing::trace!(len = changes.accounts.len(), "Writing new account state");
1977 let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
1978 for (address, account) in changes.accounts {
1980 if let Some(account) = account {
1981 tracing::trace!(?address, "Updating plain state account");
1982 accounts_cursor.upsert(address, &account.into())?;
1983 } else if accounts_cursor.seek_exact(address)?.is_some() {
1984 tracing::trace!(?address, "Deleting plain state account");
1985 accounts_cursor.delete_current()?;
1986 }
1987 }
1988
1989 tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
1991 let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
1992 for (hash, bytecode) in changes.contracts {
1993 bytecodes_cursor.upsert(hash, &Bytecode(bytecode))?;
1994 }
1995
1996 tracing::trace!(len = changes.storage.len(), "Writing new storage state");
1998 let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1999 for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
2000 if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
2002 storages_cursor.delete_current_duplicates()?;
2003 }
2004 let mut storage = storage
2006 .into_iter()
2007 .map(|(k, value)| StorageEntry {
2008 key: k.into(),
2009 value: value.value,
2010 is_private: value.is_private,
2011 })
2012 .collect::<Vec<_>>();
2013 storage.par_sort_unstable_by_key(|a| a.key);
2015
2016 for entry in storage {
2017 tracing::trace!(?address, ?entry.key, "Updating plain state storage");
2018 if let Some(db_entry) = storages_cursor.seek_by_key_subkey(address, entry.key)? {
2019 if db_entry.key == entry.key {
2020 storages_cursor.delete_current()?;
2021 }
2022 }
2023
2024 if !entry.value.is_zero() {
2025 storages_cursor.upsert(address, &entry)?;
2026 }
2027 }
2028 }
2029
2030 Ok(())
2031 }
2032
2033 fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
2034 let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
2036 for (hashed_address, account) in hashed_state.accounts().accounts_sorted() {
2037 if let Some(account) = account {
2038 hashed_accounts_cursor.upsert(hashed_address, &account)?;
2039 } else if hashed_accounts_cursor.seek_exact(hashed_address)?.is_some() {
2040 hashed_accounts_cursor.delete_current()?;
2041 }
2042 }
2043
2044 let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
2046 let mut hashed_storage_cursor =
2047 self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
2048 for (hashed_address, storage) in sorted_storages {
2049 if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
2050 hashed_storage_cursor.delete_current_duplicates()?;
2051 }
2052
2053 for (hashed_slot, value) in storage.storage_slots_sorted() {
2054 let entry = StorageEntry {
2055 key: hashed_slot,
2056 value: value.value,
2057 is_private: value.is_private,
2058 };
2059 if let Some(db_entry) =
2060 hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)?
2061 {
2062 if db_entry.key == entry.key {
2063 hashed_storage_cursor.delete_current()?;
2064 }
2065 }
2066
2067 if !entry.value.is_zero() {
2068 hashed_storage_cursor.upsert(*hashed_address, &entry)?;
2069 }
2070 }
2071 }
2072
2073 Ok(())
2074 }
2075
2076 fn remove_state_above(
2098 &self,
2099 block: BlockNumber,
2100 remove_receipts_from: StorageLocation,
2101 ) -> ProviderResult<()> {
2102 let range = block + 1..=self.last_block_number()?;
2103
2104 if range.is_empty() {
2105 return Ok(());
2106 }
2107
2108 let block_bodies = self.block_body_indices_range(range.clone())?;
2110
2111 let from_transaction_num =
2113 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2114
2115 let storage_range = BlockNumberAddress::range(range.clone());
2116
2117 let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
2118 let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
2119
2120 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2125 let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2126
2127 let (state, _) = self.populate_bundle_state(
2128 account_changeset,
2129 storage_changeset,
2130 &mut plain_accounts_cursor,
2131 &mut plain_storage_cursor,
2132 )?;
2133
2134 for (address, (old_account, new_account, storage)) in &state {
2136 if old_account != new_account {
2138 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2139 if let Some(account) = old_account {
2140 plain_accounts_cursor.upsert(*address, account)?;
2141 } else if existing_entry.is_some() {
2142 plain_accounts_cursor.delete_current()?;
2143 }
2144 }
2145
2146 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2148 let storage_entry = StorageEntry {
2149 key: *storage_key,
2150 value: old_storage_value.0,
2151 is_private: old_storage_value.1,
2152 };
2153 if plain_storage_cursor
2156 .seek_by_key_subkey(*address, *storage_key)?
2157 .filter(|s| s.key == *storage_key)
2158 .is_some()
2159 {
2160 plain_storage_cursor.delete_current()?
2161 }
2162
2163 if !FlaggedStorage::new(old_storage_value.0, old_storage_value.1).is_zero() {
2165 plain_storage_cursor.upsert(*address, &storage_entry)?;
2166 }
2167 }
2168 }
2169
2170 self.remove_receipts_from(from_transaction_num, block, remove_receipts_from)?;
2171
2172 Ok(())
2173 }
2174
2175 fn take_state_above(
2197 &self,
2198 block: BlockNumber,
2199 remove_receipts_from: StorageLocation,
2200 ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
2201 let range = block + 1..=self.last_block_number()?;
2202
2203 if range.is_empty() {
2204 return Ok(ExecutionOutcome::default())
2205 }
2206 let start_block_number = *range.start();
2207
2208 let block_bodies = self.block_body_indices_range(range.clone())?;
2210
2211 let from_transaction_num =
2213 block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2214 let to_transaction_num =
2215 block_bodies.last().expect("already checked if there are blocks").last_tx_num();
2216
2217 let storage_range = BlockNumberAddress::range(range.clone());
2218
2219 let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
2220 let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
2221
2222 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2227 let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2228
2229 let (state, reverts) = self.populate_bundle_state(
2232 account_changeset,
2233 storage_changeset,
2234 &mut plain_accounts_cursor,
2235 &mut plain_storage_cursor,
2236 )?;
2237
2238 for (address, (old_account, new_account, storage)) in &state {
2240 if old_account != new_account {
2242 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2243 if let Some(account) = old_account {
2244 plain_accounts_cursor.upsert(*address, account)?;
2245 } else if existing_entry.is_some() {
2246 plain_accounts_cursor.delete_current()?;
2247 }
2248 }
2249
2250 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2252 let storage_entry = StorageEntry {
2253 key: *storage_key,
2254 value: old_storage_value.0,
2255 is_private: old_storage_value.1,
2256 };
2257 if plain_storage_cursor
2260 .seek_by_key_subkey(*address, *storage_key)?
2261 .filter(|s| s.key == *storage_key)
2262 .is_some()
2263 {
2264 plain_storage_cursor.delete_current()?
2265 }
2266
2267 if !FlaggedStorage::new(old_storage_value.0, old_storage_value.1).is_zero() {
2269 plain_storage_cursor.upsert(*address, &storage_entry)?;
2270 }
2271 }
2272 }
2273
2274 let mut receipts_iter = self
2276 .static_file_provider
2277 .get_range_with_static_file_or_database(
2278 StaticFileSegment::Receipts,
2279 from_transaction_num..to_transaction_num + 1,
2280 |static_file, range, _| {
2281 static_file
2282 .receipts_by_tx_range(range.clone())
2283 .map(|r| range.into_iter().zip(r).collect())
2284 },
2285 |range, _| {
2286 self.tx
2287 .cursor_read::<tables::Receipts<Self::Receipt>>()?
2288 .walk_range(range)?
2289 .map(|r| r.map_err(Into::into))
2290 .collect()
2291 },
2292 |_| true,
2293 )?
2294 .into_iter()
2295 .peekable();
2296
2297 let mut receipts = Vec::with_capacity(block_bodies.len());
2298 for block_body in block_bodies {
2300 let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
2301 for num in block_body.tx_num_range() {
2302 if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
2303 block_receipts.push(receipts_iter.next().unwrap().1);
2304 }
2305 }
2306 receipts.push(block_receipts);
2307 }
2308
2309 self.remove_receipts_from(from_transaction_num, block, remove_receipts_from)?;
2310
2311 Ok(ExecutionOutcome::new_init(
2312 state,
2313 reverts,
2314 Vec::new(),
2315 receipts,
2316 start_block_number,
2317 Vec::new(),
2318 ))
2319 }
2320}
2321
2322impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
2323 fn write_trie_updates(&self, trie_updates: &TrieUpdates) -> ProviderResult<usize> {
2325 if trie_updates.is_empty() {
2326 return Ok(0)
2327 }
2328
2329 let mut num_entries = 0;
2331
2332 let mut account_updates = trie_updates
2334 .removed_nodes_ref()
2335 .iter()
2336 .filter_map(|n| {
2337 (!trie_updates.account_nodes_ref().contains_key(n)).then_some((n, None))
2338 })
2339 .collect::<Vec<_>>();
2340 account_updates.extend(
2341 trie_updates.account_nodes_ref().iter().map(|(nibbles, node)| (nibbles, Some(node))),
2342 );
2343 account_updates.sort_unstable_by(|a, b| a.0.cmp(b.0));
2345
2346 let tx = self.tx_ref();
2347 let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
2348 for (key, updated_node) in account_updates {
2349 let nibbles = StoredNibbles(key.clone());
2350 match updated_node {
2351 Some(node) => {
2352 if !nibbles.0.is_empty() {
2353 num_entries += 1;
2354 account_trie_cursor.upsert(nibbles, node)?;
2355 }
2356 }
2357 None => {
2358 num_entries += 1;
2359 if account_trie_cursor.seek_exact(nibbles)?.is_some() {
2360 account_trie_cursor.delete_current()?;
2361 }
2362 }
2363 }
2364 }
2365
2366 num_entries += self.write_storage_trie_updates(trie_updates.storage_tries_ref())?;
2367
2368 Ok(num_entries)
2369 }
2370}
2371
2372impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
2373 fn write_storage_trie_updates(
2376 &self,
2377 storage_tries: &B256Map<StorageTrieUpdates>,
2378 ) -> ProviderResult<usize> {
2379 let mut num_entries = 0;
2380 let mut storage_tries = Vec::from_iter(storage_tries);
2381 storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
2382 let mut cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2383 for (hashed_address, storage_trie_updates) in storage_tries {
2384 let mut db_storage_trie_cursor =
2385 DatabaseStorageTrieCursor::new(cursor, *hashed_address);
2386 num_entries +=
2387 db_storage_trie_cursor.write_storage_trie_updates(storage_trie_updates)?;
2388 cursor = db_storage_trie_cursor.cursor;
2389 }
2390
2391 Ok(num_entries)
2392 }
2393
2394 fn write_individual_storage_trie_updates(
2395 &self,
2396 hashed_address: B256,
2397 updates: &StorageTrieUpdates,
2398 ) -> ProviderResult<usize> {
2399 if updates.is_empty() {
2400 return Ok(0)
2401 }
2402
2403 let cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2404 let mut trie_db_cursor = DatabaseStorageTrieCursor::new(cursor, hashed_address);
2405 Ok(trie_db_cursor.write_storage_trie_updates(updates)?)
2406 }
2407}
2408
2409impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
2410 fn unwind_account_hashing<'a>(
2411 &self,
2412 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2413 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2414 let hashed_accounts = changesets
2418 .into_iter()
2419 .map(|(_, e)| (keccak256(e.address), e.info))
2420 .collect::<Vec<_>>()
2421 .into_iter()
2422 .rev()
2423 .collect::<BTreeMap<_, _>>();
2424
2425 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2427 for (hashed_address, account) in &hashed_accounts {
2428 if let Some(account) = account {
2429 hashed_accounts_cursor.upsert(*hashed_address, account)?;
2430 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2431 hashed_accounts_cursor.delete_current()?;
2432 }
2433 }
2434
2435 Ok(hashed_accounts)
2436 }
2437
2438 fn unwind_account_hashing_range(
2439 &self,
2440 range: impl RangeBounds<BlockNumber>,
2441 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2442 let changesets = self
2443 .tx
2444 .cursor_read::<tables::AccountChangeSets>()?
2445 .walk_range(range)?
2446 .collect::<Result<Vec<_>, _>>()?;
2447 self.unwind_account_hashing(changesets.iter())
2448 }
2449
2450 fn insert_account_for_hashing(
2451 &self,
2452 changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
2453 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2454 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2455 let hashed_accounts =
2456 changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
2457 for (hashed_address, account) in &hashed_accounts {
2458 if let Some(account) = account {
2459 hashed_accounts_cursor.upsert(*hashed_address, account)?;
2460 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2461 hashed_accounts_cursor.delete_current()?;
2462 }
2463 }
2464 Ok(hashed_accounts)
2465 }
2466
2467 fn unwind_storage_hashing(
2468 &self,
2469 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2470 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2471 let mut hashed_storages = changesets
2473 .into_iter()
2474 .map(|(BlockNumberAddress((_, address)), storage_entry)| {
2475 (
2476 keccak256(address),
2477 keccak256(storage_entry.key),
2478 storage_entry.value,
2479 storage_entry.is_private,
2480 )
2481 })
2482 .collect::<Vec<_>>();
2483 hashed_storages.sort_by_key(|(ha, hk, _, _)| (*ha, *hk));
2484
2485 let mut hashed_storage_keys: HashMap<B256, BTreeSet<B256>> =
2487 HashMap::with_capacity_and_hasher(hashed_storages.len(), Default::default());
2488 let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2489 for (hashed_address, key, value, is_private) in hashed_storages.into_iter().rev() {
2490 hashed_storage_keys.entry(hashed_address).or_default().insert(key);
2491
2492 if hashed_storage
2493 .seek_by_key_subkey(hashed_address, key)?
2494 .filter(|entry| entry.key == key)
2495 .is_some()
2496 {
2497 hashed_storage.delete_current()?;
2498 }
2499
2500 if !value.is_zero() {
2501 hashed_storage.upsert(hashed_address, &StorageEntry { key, value, is_private })?;
2502 }
2503 }
2504 Ok(hashed_storage_keys)
2505 }
2506
2507 fn unwind_storage_hashing_range(
2508 &self,
2509 range: impl RangeBounds<BlockNumberAddress>,
2510 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2511 let changesets = self
2512 .tx
2513 .cursor_read::<tables::StorageChangeSets>()?
2514 .walk_range(range)?
2515 .collect::<Result<Vec<_>, _>>()?;
2516 self.unwind_storage_hashing(changesets.into_iter())
2517 }
2518
2519 fn insert_storage_for_hashing(
2520 &self,
2521 storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
2522 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2523 let hashed_storages =
2525 storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
2526 let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
2527 map.insert(keccak256(entry.key), entry.value);
2528 map
2529 });
2530 map.insert(keccak256(address), storage);
2531 map
2532 });
2533
2534 let hashed_storage_keys =
2535 HashMap::from_iter(hashed_storages.iter().map(|(hashed_address, entries)| {
2536 (*hashed_address, BTreeSet::from_iter(entries.keys().copied()))
2537 }));
2538
2539 let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2540 hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
2543 storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
2544 if hashed_storage_cursor
2545 .seek_by_key_subkey(hashed_address, key)?
2546 .filter(|entry| entry.key == key)
2547 .is_some()
2548 {
2549 hashed_storage_cursor.delete_current()?;
2550 }
2551
2552 if !value.is_zero() {
2553 hashed_storage_cursor.upsert(
2554 hashed_address,
2555 &StorageEntry { key, value, ..Default::default() },
2556 )?;
2557 }
2558 Ok(())
2559 })
2560 })?;
2561
2562 Ok(hashed_storage_keys)
2563 }
2564
2565 fn insert_hashes(
2566 &self,
2567 range: RangeInclusive<BlockNumber>,
2568 end_block_hash: B256,
2569 expected_state_root: B256,
2570 ) -> ProviderResult<()> {
2571 let mut account_prefix_set = PrefixSetMut::default();
2573 let mut storage_prefix_sets: HashMap<B256, PrefixSetMut> = HashMap::default();
2574 let mut destroyed_accounts = HashSet::default();
2575
2576 let mut durations_recorder = metrics::DurationsRecorder::default();
2577
2578 {
2580 let lists = self.changed_storages_with_range(range.clone())?;
2581 let storages = self.plain_state_storages(lists)?;
2582 let storage_entries = self.insert_storage_for_hashing(storages)?;
2583 for (hashed_address, hashed_slots) in storage_entries {
2584 account_prefix_set.insert(Nibbles::unpack(hashed_address));
2585 for slot in hashed_slots {
2586 storage_prefix_sets
2587 .entry(hashed_address)
2588 .or_default()
2589 .insert(Nibbles::unpack(slot));
2590 }
2591 }
2592 }
2593 durations_recorder.record_relative(metrics::Action::InsertStorageHashing);
2594
2595 {
2597 let lists = self.changed_accounts_with_range(range.clone())?;
2598 let accounts = self.basic_accounts(lists)?;
2599 let hashed_addresses = self.insert_account_for_hashing(accounts)?;
2600 for (hashed_address, account) in hashed_addresses {
2601 account_prefix_set.insert(Nibbles::unpack(hashed_address));
2602 if account.is_none() {
2603 destroyed_accounts.insert(hashed_address);
2604 }
2605 }
2606 }
2607 durations_recorder.record_relative(metrics::Action::InsertAccountHashing);
2608
2609 {
2611 let prefix_sets = TriePrefixSets {
2614 account_prefix_set: account_prefix_set.freeze(),
2615 storage_prefix_sets: storage_prefix_sets
2616 .into_iter()
2617 .map(|(k, v)| (k, v.freeze()))
2618 .collect(),
2619 destroyed_accounts,
2620 };
2621 let (state_root, trie_updates) = StateRoot::from_tx(&self.tx)
2622 .with_prefix_sets(prefix_sets)
2623 .root_with_updates()
2624 .map_err(reth_db_api::DatabaseError::from)?;
2625 if state_root != expected_state_root {
2626 return Err(ProviderError::StateRootMismatch(Box::new(RootMismatch {
2627 root: GotExpected { got: state_root, expected: expected_state_root },
2628 block_number: *range.end(),
2629 block_hash: end_block_hash,
2630 })))
2631 }
2632 self.write_trie_updates(&trie_updates)?;
2633 }
2634 durations_recorder.record_relative(metrics::Action::InsertMerkleTree);
2635
2636 debug!(target: "providers::db", ?range, actions = ?durations_recorder.actions, "Inserted hashes");
2637
2638 Ok(())
2639 }
2640}
2641
2642impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
2643 fn unwind_account_history_indices<'a>(
2644 &self,
2645 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2646 ) -> ProviderResult<usize> {
2647 let mut last_indices = changesets
2648 .into_iter()
2649 .map(|(index, account)| (account.address, *index))
2650 .collect::<Vec<_>>();
2651 last_indices.sort_by_key(|(a, _)| *a);
2652
2653 let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
2655 for &(address, rem_index) in &last_indices {
2656 let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
2657 &mut cursor,
2658 ShardedKey::last(address),
2659 rem_index,
2660 |sharded_key| sharded_key.key == address,
2661 )?;
2662
2663 if !partial_shard.is_empty() {
2666 cursor.insert(
2667 ShardedKey::last(address),
2668 &BlockNumberList::new_pre_sorted(partial_shard),
2669 )?;
2670 }
2671 }
2672
2673 let changesets = last_indices.len();
2674 Ok(changesets)
2675 }
2676
2677 fn unwind_account_history_indices_range(
2678 &self,
2679 range: impl RangeBounds<BlockNumber>,
2680 ) -> ProviderResult<usize> {
2681 let changesets = self
2682 .tx
2683 .cursor_read::<tables::AccountChangeSets>()?
2684 .walk_range(range)?
2685 .collect::<Result<Vec<_>, _>>()?;
2686 self.unwind_account_history_indices(changesets.iter())
2687 }
2688
2689 fn insert_account_history_index(
2690 &self,
2691 account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
2692 ) -> ProviderResult<()> {
2693 self.append_history_index::<_, tables::AccountsHistory>(
2694 account_transitions,
2695 ShardedKey::new,
2696 )
2697 }
2698
2699 fn unwind_storage_history_indices(
2700 &self,
2701 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2702 ) -> ProviderResult<usize> {
2703 let mut storage_changesets = changesets
2704 .into_iter()
2705 .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
2706 .collect::<Vec<_>>();
2707 storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
2708
2709 let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
2710 for &(address, storage_key, rem_index) in &storage_changesets {
2711 let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
2712 &mut cursor,
2713 StorageShardedKey::last(address, storage_key),
2714 rem_index,
2715 |storage_sharded_key| {
2716 storage_sharded_key.address == address &&
2717 storage_sharded_key.sharded_key.key == storage_key
2718 },
2719 )?;
2720
2721 if !partial_shard.is_empty() {
2724 cursor.insert(
2725 StorageShardedKey::last(address, storage_key),
2726 &BlockNumberList::new_pre_sorted(partial_shard),
2727 )?;
2728 }
2729 }
2730
2731 let changesets = storage_changesets.len();
2732 Ok(changesets)
2733 }
2734
2735 fn unwind_storage_history_indices_range(
2736 &self,
2737 range: impl RangeBounds<BlockNumberAddress>,
2738 ) -> ProviderResult<usize> {
2739 let changesets = self
2740 .tx
2741 .cursor_read::<tables::StorageChangeSets>()?
2742 .walk_range(range)?
2743 .collect::<Result<Vec<_>, _>>()?;
2744 self.unwind_storage_history_indices(changesets.into_iter())
2745 }
2746
2747 fn insert_storage_history_index(
2748 &self,
2749 storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
2750 ) -> ProviderResult<()> {
2751 self.append_history_index::<_, tables::StoragesHistory>(
2752 storage_transitions,
2753 |(address, storage_key), highest_block_number| {
2754 StorageShardedKey::new(address, storage_key, highest_block_number)
2755 },
2756 )
2757 }
2758
2759 fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
2760 {
2762 let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
2763 self.insert_account_history_index(indices)?;
2764 }
2765
2766 {
2768 let indices = self.changed_storages_and_blocks_with_range(range)?;
2769 self.insert_storage_history_index(indices)?;
2770 }
2771
2772 Ok(())
2773 }
2774}
2775
2776impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockExecutionWriter
2777 for DatabaseProvider<TX, N>
2778{
2779 fn take_block_and_execution_above(
2780 &self,
2781 block: BlockNumber,
2782 remove_from: StorageLocation,
2783 ) -> ProviderResult<Chain<Self::Primitives>> {
2784 let range = block + 1..=self.last_block_number()?;
2785
2786 self.unwind_trie_state_range(range.clone())?;
2787
2788 let execution_state = self.take_state_above(block, remove_from)?;
2790
2791 let blocks = self.recovered_block_range(range)?;
2792
2793 self.remove_blocks_above(block, remove_from)?;
2796
2797 self.update_pipeline_stages(block, true)?;
2799
2800 Ok(Chain::new(blocks, execution_state, None))
2801 }
2802
2803 fn remove_block_and_execution_above(
2804 &self,
2805 block: BlockNumber,
2806 remove_from: StorageLocation,
2807 ) -> ProviderResult<()> {
2808 let range = block + 1..=self.last_block_number()?;
2809
2810 self.unwind_trie_state_range(range)?;
2811
2812 self.remove_state_above(block, remove_from)?;
2814
2815 self.remove_blocks_above(block, remove_from)?;
2818
2819 self.update_pipeline_stages(block, true)?;
2821
2822 Ok(())
2823 }
2824}
2825
2826impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWriter
2827 for DatabaseProvider<TX, N>
2828{
2829 type Block = BlockTy<N>;
2830 type Receipt = ReceiptTy<N>;
2831
2832 fn insert_block(
2853 &self,
2854 block: RecoveredBlock<Self::Block>,
2855 write_to: StorageLocation,
2856 ) -> ProviderResult<StoredBlockBodyIndices> {
2857 let block_number = block.number();
2858
2859 let mut durations_recorder = metrics::DurationsRecorder::default();
2860
2861 let ttd = if block_number == 0 {
2863 block.header().difficulty()
2864 } else {
2865 let parent_block_number = block_number - 1;
2866 let parent_ttd = self.header_td_by_number(parent_block_number)?.unwrap_or_default();
2867 durations_recorder.record_relative(metrics::Action::GetParentTD);
2868 parent_ttd + block.header().difficulty()
2869 };
2870
2871 if write_to.database() {
2872 self.tx.put::<tables::CanonicalHeaders>(block_number, block.hash())?;
2873 durations_recorder.record_relative(metrics::Action::InsertCanonicalHeaders);
2874
2875 self.tx.put::<tables::Headers<HeaderTy<N>>>(block_number, block.header().clone())?;
2877 durations_recorder.record_relative(metrics::Action::InsertHeaders);
2878
2879 self.tx.put::<tables::HeaderTerminalDifficulties>(block_number, ttd.into())?;
2880 durations_recorder.record_relative(metrics::Action::InsertHeaderTerminalDifficulties);
2881 }
2882
2883 if write_to.static_files() {
2884 let mut writer =
2885 self.static_file_provider.get_writer(block_number, StaticFileSegment::Headers)?;
2886 writer.append_header(block.header(), ttd, &block.hash())?;
2887 }
2888
2889 self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
2890 durations_recorder.record_relative(metrics::Action::InsertHeaderNumbers);
2891
2892 let mut next_tx_num = self
2893 .tx
2894 .cursor_read::<tables::TransactionBlocks>()?
2895 .last()?
2896 .map(|(n, _)| n + 1)
2897 .unwrap_or_default();
2898 durations_recorder.record_relative(metrics::Action::GetNextTxNum);
2899 let first_tx_num = next_tx_num;
2900
2901 let tx_count = block.body().transaction_count() as u64;
2902
2903 for (transaction, sender) in block.body().transactions_iter().zip(block.senders_iter()) {
2905 let hash = transaction.tx_hash();
2906
2907 if self.prune_modes.sender_recovery.as_ref().is_none_or(|m| !m.is_full()) {
2908 self.tx.put::<tables::TransactionSenders>(next_tx_num, *sender)?;
2909 }
2910
2911 if self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full()) {
2912 self.tx.put::<tables::TransactionHashNumbers>(*hash, next_tx_num)?;
2913 }
2914 next_tx_num += 1;
2915 }
2916
2917 self.append_block_bodies(vec![(block_number, Some(block.into_body()))], write_to)?;
2918
2919 debug!(
2920 target: "providers::db",
2921 ?block_number,
2922 actions = ?durations_recorder.actions,
2923 "Inserted block"
2924 );
2925
2926 Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
2927 }
2928
2929 fn append_block_bodies(
2930 &self,
2931 bodies: Vec<(BlockNumber, Option<BodyTy<N>>)>,
2932 write_to: StorageLocation,
2933 ) -> ProviderResult<()> {
2934 let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
2935
2936 let mut tx_static_writer = write_to
2938 .static_files()
2939 .then(|| {
2940 self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)
2941 })
2942 .transpose()?;
2943
2944 let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
2945 let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
2946
2947 let mut tx_cursor = write_to
2949 .database()
2950 .then(|| self.tx.cursor_write::<tables::Transactions<TxTy<N>>>())
2951 .transpose()?;
2952
2953 let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
2955
2956 for (block_number, body) in &bodies {
2957 if let Some(writer) = tx_static_writer.as_mut() {
2959 writer.increment_block(*block_number)?;
2960 }
2961
2962 let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
2963 let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
2964
2965 let mut durations_recorder = metrics::DurationsRecorder::default();
2966
2967 block_indices_cursor.append(*block_number, &block_indices)?;
2969
2970 durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
2971
2972 let Some(body) = body else { continue };
2973
2974 if !body.transactions().is_empty() {
2976 tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
2977 durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
2978 }
2979
2980 for transaction in body.transactions() {
2982 if let Some(writer) = tx_static_writer.as_mut() {
2983 writer.append_transaction(next_tx_num, transaction)?;
2984 }
2985 if let Some(cursor) = tx_cursor.as_mut() {
2986 cursor.append(next_tx_num, transaction)?;
2987 }
2988
2989 next_tx_num += 1;
2991 }
2992
2993 debug!(
2994 target: "providers::db",
2995 ?block_number,
2996 actions = ?durations_recorder.actions,
2997 "Inserted block body"
2998 );
2999 }
3000
3001 self.storage.writer().write_block_bodies(self, bodies, write_to)?;
3002
3003 Ok(())
3004 }
3005
3006 fn remove_blocks_above(
3007 &self,
3008 block: BlockNumber,
3009 remove_from: StorageLocation,
3010 ) -> ProviderResult<()> {
3011 for hash in self.canonical_hashes_range(block + 1, self.last_block_number()? + 1)? {
3012 self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
3013 }
3014
3015 self.remove::<tables::CanonicalHeaders>(block + 1..)?;
3018 self.remove::<tables::Headers<HeaderTy<N>>>(block + 1..)?;
3019 self.remove::<tables::HeaderTerminalDifficulties>(block + 1..)?;
3020
3021 let unwind_tx_from = self
3023 .block_body_indices(block)?
3024 .map(|b| b.next_tx_num())
3025 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3026
3027 let unwind_tx_to = self
3029 .tx
3030 .cursor_read::<tables::BlockBodyIndices>()?
3031 .last()?
3032 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
3034 .1
3035 .last_tx_num();
3036
3037 if unwind_tx_from <= unwind_tx_to {
3038 for (hash, _) in self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))? {
3039 self.tx.delete::<tables::TransactionHashNumbers>(hash, None)?;
3040 }
3041 }
3042
3043 self.remove::<tables::TransactionSenders>(unwind_tx_from..)?;
3044
3045 self.remove_bodies_above(block, remove_from)?;
3046
3047 Ok(())
3048 }
3049
3050 fn remove_bodies_above(
3051 &self,
3052 block: BlockNumber,
3053 remove_from: StorageLocation,
3054 ) -> ProviderResult<()> {
3055 self.storage.writer().remove_block_bodies_above(self, block, remove_from)?;
3056
3057 let unwind_tx_from = self
3059 .block_body_indices(block)?
3060 .map(|b| b.next_tx_num())
3061 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3062
3063 self.remove::<tables::BlockBodyIndices>(block + 1..)?;
3064 self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
3065
3066 if remove_from.database() {
3067 self.remove::<tables::Transactions<TxTy<N>>>(unwind_tx_from..)?;
3068 }
3069
3070 if remove_from.static_files() {
3071 let static_file_tx_num = self
3072 .static_file_provider
3073 .get_highest_static_file_tx(StaticFileSegment::Transactions);
3074
3075 let to_delete = static_file_tx_num
3076 .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
3077 .unwrap_or_default();
3078
3079 self.static_file_provider
3080 .latest_writer(StaticFileSegment::Transactions)?
3081 .prune_transactions(to_delete, block)?;
3082 }
3083
3084 Ok(())
3085 }
3086
3087 fn append_blocks_with_state(
3089 &self,
3090 blocks: Vec<RecoveredBlock<Self::Block>>,
3091 execution_outcome: &ExecutionOutcome<Self::Receipt>,
3092 hashed_state: HashedPostStateSorted,
3093 trie_updates: TrieUpdates,
3094 ) -> ProviderResult<()> {
3095 if blocks.is_empty() {
3096 debug!(target: "providers::db", "Attempted to append empty block range");
3097 return Ok(())
3098 }
3099
3100 let first_number = blocks.first().unwrap().number();
3101
3102 let last = blocks.last().unwrap();
3103 let last_block_number = last.number();
3104
3105 let mut durations_recorder = metrics::DurationsRecorder::default();
3106
3107 for block in blocks {
3109 self.insert_block(block, StorageLocation::Database)?;
3110 durations_recorder.record_relative(metrics::Action::InsertBlock);
3111 }
3112
3113 self.write_state(execution_outcome, OriginalValuesKnown::No, StorageLocation::Database)?;
3114 durations_recorder.record_relative(metrics::Action::InsertState);
3115
3116 self.write_hashed_state(&hashed_state)?;
3118 self.write_trie_updates(&trie_updates)?;
3119 durations_recorder.record_relative(metrics::Action::InsertHashes);
3120
3121 self.update_history_indices(first_number..=last_block_number)?;
3122 durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3123
3124 self.update_pipeline_stages(last_block_number, false)?;
3126 durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3127
3128 debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3129
3130 Ok(())
3131 }
3132}
3133
3134impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3135 fn get_prune_checkpoint(
3136 &self,
3137 segment: PruneSegment,
3138 ) -> ProviderResult<Option<PruneCheckpoint>> {
3139 Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3140 }
3141
3142 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3143 Ok(self
3144 .tx
3145 .cursor_read::<tables::PruneCheckpoints>()?
3146 .walk(None)?
3147 .collect::<Result<_, _>>()?)
3148 }
3149}
3150
3151impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3152 fn save_prune_checkpoint(
3153 &self,
3154 segment: PruneSegment,
3155 checkpoint: PruneCheckpoint,
3156 ) -> ProviderResult<()> {
3157 Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3158 }
3159}
3160
3161impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3162 fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3163 let db_entries = self.tx.entries::<T>()?;
3164 let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3165 Ok(entries) => entries,
3166 Err(ProviderError::UnsupportedProvider) => 0,
3167 Err(err) => return Err(err),
3168 };
3169
3170 Ok(db_entries + static_file_entries)
3171 }
3172}
3173
3174impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3175 fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3176 let mut finalized_blocks = self
3177 .tx
3178 .cursor_read::<tables::ChainState>()?
3179 .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3180 .take(1)
3181 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3182
3183 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3184 Ok(last_finalized_block_number)
3185 }
3186
3187 fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3188 let mut finalized_blocks = self
3189 .tx
3190 .cursor_read::<tables::ChainState>()?
3191 .walk(Some(tables::ChainStateKey::LastSafeBlockBlock))?
3192 .take(1)
3193 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3194
3195 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3196 Ok(last_finalized_block_number)
3197 }
3198}
3199
3200impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3201 fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3202 Ok(self
3203 .tx
3204 .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3205 }
3206
3207 fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3208 Ok(self
3209 .tx
3210 .put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlockBlock, block_number)?)
3211 }
3212}
3213
3214impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3215 type Tx = TX;
3216
3217 fn tx_ref(&self) -> &Self::Tx {
3218 &self.tx
3219 }
3220
3221 fn tx_mut(&mut self) -> &mut Self::Tx {
3222 &mut self.tx
3223 }
3224
3225 fn into_tx(self) -> Self::Tx {
3226 self.tx
3227 }
3228
3229 fn prune_modes_ref(&self) -> &PruneModes {
3230 self.prune_modes_ref()
3231 }
3232}