1use crate::{
2 bundle_state::StorageRevertsIter,
3 providers::{
4 database::{chain::ChainStorage, metrics},
5 static_file::StaticFileWriter,
6 NodeTypesForProvider, StaticFileProvider,
7 },
8 to_range,
9 traits::{
10 AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
11 },
12 AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
13 BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
14 DBProvider, EvmEnvProvider, HashingWriter, HeaderProvider, HeaderSyncGap,
15 HeaderSyncGapProvider, HistoricalStateProvider, HistoricalStateProviderRef, HistoryWriter,
16 LatestStateProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError,
17 PruneCheckpointReader, PruneCheckpointWriter, RevertsInit, StageCheckpointReader,
18 StateCommitmentProvider, StateProviderBox, StateWriter, StaticFileProviderFactory, StatsReader,
19 StorageLocation, StorageReader, StorageTrieWriter, TransactionVariant, TransactionsProvider,
20 TransactionsProviderExt, TrieWriter, WithdrawalsProvider,
21};
22use alloy_consensus::{BlockHeader, Header};
23use alloy_eips::{
24 eip2718::Encodable2718,
25 eip4895::{Withdrawal, Withdrawals},
26 BlockHashOrNumber,
27};
28use alloy_primitives::{
29 keccak256,
30 map::{hash_map, B256HashMap, HashMap, HashSet},
31 Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256,
32};
33use itertools::Itertools;
34use rayon::slice::ParallelSliceMut;
35use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, EthereumHardforks};
36use reth_db::{
37 cursor::DbDupCursorRW, tables, BlockNumberList, PlainAccountState, PlainStorageState,
38};
39use reth_db_api::{
40 cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
41 database::Database,
42 models::{
43 sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
44 ShardedKey, StoredBlockBodyIndices,
45 },
46 table::Table,
47 transaction::{DbTx, DbTxMut},
48 DatabaseError,
49};
50use reth_evm::ConfigureEvmEnv;
51use reth_execution_types::{Chain, ExecutionOutcome};
52use reth_network_p2p::headers::downloader::SyncTarget;
53use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
54use reth_primitives::{
55 Account, BlockExt, BlockWithSenders, Bytecode, GotExpected, NodePrimitives, SealedBlock,
56 SealedBlockFor, SealedBlockWithSenders, SealedHeader, StaticFileSegment, StorageEntry,
57 TransactionMeta,
58};
59use reth_primitives_traits::{Block as _, BlockBody as _, SignedTransaction};
60use reth_prune_types::{PruneCheckpoint, PruneModes, PruneSegment};
61use reth_stages_types::{StageCheckpoint, StageId};
62use reth_storage_api::{
63 BlockBodyIndicesProvider, BlockBodyReader, NodePrimitivesProvider, OmmersProvider,
64 StateProvider, StorageChangeSetReader, TryIntoHistoricalStateProvider,
65};
66use reth_storage_errors::provider::{ProviderResult, RootMismatch};
67use reth_trie::{
68 prefix_set::{PrefixSet, PrefixSetMut, TriePrefixSets},
69 updates::{StorageTrieUpdates, TrieUpdates},
70 HashedPostStateSorted, Nibbles, StateRoot, StoredNibbles,
71};
72use reth_trie_db::{DatabaseStateRoot, DatabaseStorageTrieCursor};
73use revm::{
74 db::states::{PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset},
75 primitives::{BlockEnv, CfgEnvWithHandlerCfg, FlaggedStorage},
76};
77use std::{
78 cmp::Ordering,
79 collections::{BTreeMap, BTreeSet},
80 fmt::Debug,
81 ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive},
82 sync::{mpsc, Arc},
83};
84use tokio::sync::watch;
85use tracing::{debug, trace};
86
87pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
89
90#[derive(Debug)]
95pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
96 pub DatabaseProvider<<DB as Database>::TXMut, N>,
97);
98
99impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
100 type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
101
102 fn deref(&self) -> &Self::Target {
103 &self.0
104 }
105}
106
107impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
108 fn deref_mut(&mut self) -> &mut Self::Target {
109 &mut self.0
110 }
111}
112
113impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
114 for DatabaseProviderRW<DB, N>
115{
116 fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
117 &self.0
118 }
119}
120
121impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
122 pub fn commit(self) -> ProviderResult<bool> {
124 self.0.commit()
125 }
126
127 pub fn into_tx(self) -> <DB as Database>::TXMut {
129 self.0.into_tx()
130 }
131}
132
133impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
134 for DatabaseProvider<<DB as Database>::TXMut, N>
135{
136 fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
137 provider.0
138 }
139}
140
141#[derive(Debug)]
144pub struct DatabaseProvider<TX, N: NodeTypes> {
145 tx: TX,
147 chain_spec: Arc<N::ChainSpec>,
149 static_file_provider: StaticFileProvider<N::Primitives>,
151 prune_modes: PruneModes,
153 storage: Arc<N::Storage>,
155}
156
157impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
158 pub const fn prune_modes_ref(&self) -> &PruneModes {
160 &self.prune_modes
161 }
162}
163
164impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
165 pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
167 trace!(target: "providers::db", "Returning latest state provider");
168 Box::new(LatestStateProviderRef::new(self))
169 }
170
171 pub fn history_by_block_hash<'a>(
173 &'a self,
174 block_hash: BlockHash,
175 ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
176 let mut block_number =
177 self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
178 if block_number == self.best_block_number().unwrap_or_default() &&
179 block_number == self.last_block_number().unwrap_or_default()
180 {
181 return Ok(Box::new(LatestStateProviderRef::new(self)))
182 }
183
184 block_number += 1;
186
187 let account_history_prune_checkpoint =
188 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
189 let storage_history_prune_checkpoint =
190 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
191
192 let mut state_provider = HistoricalStateProviderRef::new(self, block_number);
193
194 if let Some(prune_checkpoint_block_number) =
197 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
198 {
199 state_provider = state_provider.with_lowest_available_account_history_block_number(
200 prune_checkpoint_block_number + 1,
201 );
202 }
203 if let Some(prune_checkpoint_block_number) =
204 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
205 {
206 state_provider = state_provider.with_lowest_available_storage_history_block_number(
207 prune_checkpoint_block_number + 1,
208 );
209 }
210
211 Ok(Box::new(state_provider))
212 }
213
214 #[cfg(feature = "test-utils")]
215 pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
217 self.prune_modes = prune_modes;
218 }
219}
220
221impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
222 type Primitives = N::Primitives;
223}
224
225impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
226 fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
228 self.static_file_provider.clone()
229 }
230}
231
232impl<TX: Send + Sync, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
233 for DatabaseProvider<TX, N>
234{
235 type ChainSpec = N::ChainSpec;
236
237 fn chain_spec(&self) -> Arc<Self::ChainSpec> {
238 self.chain_spec.clone()
239 }
240}
241
242impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
243 pub const fn new_rw(
245 tx: TX,
246 chain_spec: Arc<N::ChainSpec>,
247 static_file_provider: StaticFileProvider<N::Primitives>,
248 prune_modes: PruneModes,
249 storage: Arc<N::Storage>,
250 ) -> Self {
251 Self { tx, chain_spec, static_file_provider, prune_modes, storage }
252 }
253}
254
255impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
256 fn as_ref(&self) -> &Self {
257 self
258 }
259}
260
261impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
262 pub fn unwind_trie_state_range(
267 &self,
268 range: RangeInclusive<BlockNumber>,
269 ) -> ProviderResult<()> {
270 let changed_accounts = self
271 .tx
272 .cursor_read::<tables::AccountChangeSets>()?
273 .walk_range(range.clone())?
274 .collect::<Result<Vec<_>, _>>()?;
275
276 let hashed_addresses = self.unwind_account_hashing(changed_accounts.iter())?;
278 let mut account_prefix_set = PrefixSetMut::with_capacity(hashed_addresses.len());
279 let mut destroyed_accounts = HashSet::default();
280 for (hashed_address, account) in hashed_addresses {
281 account_prefix_set.insert(Nibbles::unpack(hashed_address));
282 if account.is_none() {
283 destroyed_accounts.insert(hashed_address);
284 }
285 }
286
287 self.unwind_account_history_indices(changed_accounts.iter())?;
289 let storage_range = BlockNumberAddress::range(range.clone());
290
291 let changed_storages = self
292 .tx
293 .cursor_read::<tables::StorageChangeSets>()?
294 .walk_range(storage_range)?
295 .collect::<Result<Vec<_>, _>>()?;
296
297 let mut storage_prefix_sets = B256HashMap::<PrefixSet>::default();
300 let storage_entries = self.unwind_storage_hashing(changed_storages.iter().copied())?;
301 for (hashed_address, hashed_slots) in storage_entries {
302 account_prefix_set.insert(Nibbles::unpack(hashed_address));
303 let mut storage_prefix_set = PrefixSetMut::with_capacity(hashed_slots.len());
304 for slot in hashed_slots {
305 storage_prefix_set.insert(Nibbles::unpack(slot));
306 }
307 storage_prefix_sets.insert(hashed_address, storage_prefix_set.freeze());
308 }
309
310 self.unwind_storage_history_indices(changed_storages.iter().copied())?;
312
313 let prefix_sets = TriePrefixSets {
317 account_prefix_set: account_prefix_set.freeze(),
318 storage_prefix_sets,
319 destroyed_accounts,
320 };
321 let (new_state_root, trie_updates) = StateRoot::from_tx(&self.tx)
322 .with_prefix_sets(prefix_sets)
323 .root_with_updates()
324 .map_err(reth_db::DatabaseError::from)?;
325
326 let parent_number = range.start().saturating_sub(1);
327 let parent_state_root = self
328 .header_by_number(parent_number)?
329 .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?
330 .state_root();
331
332 if new_state_root != parent_state_root {
335 let parent_hash = self
336 .block_hash(parent_number)?
337 .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?;
338 return Err(ProviderError::UnwindStateRootMismatch(Box::new(RootMismatch {
339 root: GotExpected { got: new_state_root, expected: parent_state_root },
340 block_number: parent_number,
341 block_hash: parent_hash,
342 })))
343 }
344 self.write_trie_updates(&trie_updates)?;
345
346 Ok(())
347 }
348
349 fn remove_receipts_from(
351 &self,
352 from_tx: TxNumber,
353 last_block: BlockNumber,
354 remove_from: StorageLocation,
355 ) -> ProviderResult<()> {
356 if remove_from.database() {
357 self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
359 }
360
361 if remove_from.static_files() && !self.prune_modes.has_receipts_pruning() {
362 let static_file_receipt_num =
363 self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
364
365 let to_delete = static_file_receipt_num
366 .map(|static_num| (static_num + 1).saturating_sub(from_tx))
367 .unwrap_or_default();
368
369 self.static_file_provider
370 .latest_writer(StaticFileSegment::Receipts)?
371 .prune_receipts(to_delete, last_block)?;
372 }
373
374 Ok(())
375 }
376}
377
378impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
379 fn try_into_history_at_block(
380 self,
381 mut block_number: BlockNumber,
382 ) -> ProviderResult<StateProviderBox> {
383 if block_number == self.best_block_number().unwrap_or_default() &&
384 block_number == self.last_block_number().unwrap_or_default()
385 {
386 return Ok(Box::new(LatestStateProvider::new(self)))
387 }
388
389 block_number += 1;
391
392 let account_history_prune_checkpoint =
393 self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
394 let storage_history_prune_checkpoint =
395 self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
396
397 let mut state_provider = HistoricalStateProvider::new(self, block_number);
398
399 if let Some(prune_checkpoint_block_number) =
402 account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
403 {
404 state_provider = state_provider.with_lowest_available_account_history_block_number(
405 prune_checkpoint_block_number + 1,
406 );
407 }
408 if let Some(prune_checkpoint_block_number) =
409 storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
410 {
411 state_provider = state_provider.with_lowest_available_storage_history_block_number(
412 prune_checkpoint_block_number + 1,
413 );
414 }
415
416 Ok(Box::new(state_provider))
417 }
418}
419
420impl<TX: DbTx + 'static, N: NodeTypes> StateCommitmentProvider for DatabaseProvider<TX, N> {
421 type StateCommitment = N::StateCommitment;
422}
423
424impl<
425 Tx: DbTx + DbTxMut + 'static,
426 N: NodeTypesForProvider<Primitives: NodePrimitives<BlockHeader = Header>>,
427 > DatabaseProvider<Tx, N>
428{
429 pub fn insert_historical_block(
433 &self,
434 block: SealedBlockWithSenders<<Self as BlockWriter>::Block>,
435 ) -> ProviderResult<StoredBlockBodyIndices> {
436 let ttd = if block.number == 0 {
437 block.difficulty
438 } else {
439 let parent_block_number = block.number - 1;
440 let parent_ttd = self.header_td_by_number(parent_block_number)?.unwrap_or_default();
441 parent_ttd + block.difficulty
442 };
443
444 let mut writer = self.static_file_provider.latest_writer(StaticFileSegment::Headers)?;
445
446 let segment_header = writer.user_header();
448 if segment_header.block_end().is_none() && segment_header.expected_block_start() == 0 {
449 for block_number in 0..block.number {
450 let mut prev = block.header.clone().unseal();
451 prev.number = block_number;
452 writer.append_header(&prev, U256::ZERO, &B256::ZERO)?;
453 }
454 }
455
456 writer.append_header(block.header.as_ref(), ttd, &block.hash())?;
457
458 self.insert_block(block, StorageLocation::Database)
459 }
460}
461
462fn unwind_history_shards<S, T, C>(
475 cursor: &mut C,
476 start_key: T::Key,
477 block_number: BlockNumber,
478 mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
479) -> ProviderResult<Vec<u64>>
480where
481 T: Table<Value = BlockNumberList>,
482 T::Key: AsRef<ShardedKey<S>>,
483 C: DbCursorRO<T> + DbCursorRW<T>,
484{
485 let mut item = cursor.seek_exact(start_key)?;
486 while let Some((sharded_key, list)) = item {
487 if !shard_belongs_to_key(&sharded_key) {
489 break
490 }
491 cursor.delete_current()?;
492
493 let first = list.iter().next().expect("List can't be empty");
496 if first >= block_number {
497 item = cursor.prev()?;
498 continue
499 } else if block_number <= sharded_key.as_ref().highest_block_number {
500 return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
502 }
503 return Ok(list.iter().collect::<Vec<_>>())
504 }
505
506 Ok(Vec::new())
507}
508
509impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
510 pub const fn new(
512 tx: TX,
513 chain_spec: Arc<N::ChainSpec>,
514 static_file_provider: StaticFileProvider<N::Primitives>,
515 prune_modes: PruneModes,
516 storage: Arc<N::Storage>,
517 ) -> Self {
518 Self { tx, chain_spec, static_file_provider, prune_modes, storage }
519 }
520
521 pub fn into_tx(self) -> TX {
523 self.tx
524 }
525
526 pub fn tx_mut(&mut self) -> &mut TX {
528 &mut self.tx
529 }
530
531 pub const fn tx_ref(&self) -> &TX {
533 &self.tx
534 }
535
536 pub fn chain_spec(&self) -> &N::ChainSpec {
538 &self.chain_spec
539 }
540}
541
542impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
543 fn transactions_by_tx_range_with_cursor<C>(
544 &self,
545 range: impl RangeBounds<TxNumber>,
546 cursor: &mut C,
547 ) -> ProviderResult<Vec<TxTy<N>>>
548 where
549 C: DbCursorRO<tables::Transactions<TxTy<N>>>,
550 {
551 self.static_file_provider.get_range_with_static_file_or_database(
552 StaticFileSegment::Transactions,
553 to_range(range),
554 |static_file, range, _| static_file.transactions_by_tx_range(range),
555 |range, _| self.cursor_collect(cursor, range),
556 |_| true,
557 )
558 }
559
560 fn block_with_senders<H, HF, B, BF>(
561 &self,
562 id: BlockHashOrNumber,
563 _transaction_kind: TransactionVariant,
564 header_by_number: HF,
565 construct_block: BF,
566 ) -> ProviderResult<Option<B>>
567 where
568 H: AsRef<HeaderTy<N>>,
569 HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
570 BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
571 {
572 let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
573 let Some(header) = header_by_number(block_number)? else { return Ok(None) };
574
575 let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
582
583 let tx_range = body.tx_num_range();
584
585 let (transactions, senders) = if tx_range.is_empty() {
586 (vec![], vec![])
587 } else {
588 (self.transactions_by_tx_range(tx_range.clone())?, self.senders_by_tx_range(tx_range)?)
589 };
590
591 let body = self
592 .storage
593 .reader()
594 .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
595 .pop()
596 .ok_or(ProviderError::InvalidStorageOutput)?;
597
598 construct_block(header, body, senders)
599 }
600
601 fn block_range<F, H, HF, R>(
611 &self,
612 range: RangeInclusive<BlockNumber>,
613 headers_range: HF,
614 mut assemble_block: F,
615 ) -> ProviderResult<Vec<R>>
616 where
617 H: AsRef<HeaderTy<N>>,
618 HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
619 F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
620 {
621 if range.is_empty() {
622 return Ok(Vec::new())
623 }
624
625 let len = range.end().saturating_sub(*range.start()) as usize;
626 let mut blocks = Vec::with_capacity(len);
627
628 let headers = headers_range(range)?;
629 let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<TxTy<N>>>()?;
630 let mut block_body_cursor = self.tx.cursor_read::<tables::BlockBodyIndices>()?;
631
632 let mut present_headers = Vec::new();
633 for header in headers {
634 if let Some((_, block_body_indices)) =
640 block_body_cursor.seek_exact(header.as_ref().number())?
641 {
642 let tx_range = block_body_indices.tx_num_range();
643 present_headers.push((header, tx_range));
644 }
645 }
646
647 let mut inputs = Vec::new();
648 for (header, tx_range) in &present_headers {
649 let transactions = if tx_range.is_empty() {
650 Vec::new()
651 } else {
652 self.transactions_by_tx_range_with_cursor(tx_range.clone(), &mut tx_cursor)?
653 };
654
655 inputs.push((header.as_ref(), transactions));
656 }
657
658 let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
659
660 for ((header, tx_range), body) in present_headers.into_iter().zip(bodies) {
661 blocks.push(assemble_block(header, body, tx_range)?);
662 }
663
664 Ok(blocks)
665 }
666
667 fn block_with_senders_range<H, HF, B, BF>(
678 &self,
679 range: RangeInclusive<BlockNumber>,
680 headers_range: HF,
681 assemble_block: BF,
682 ) -> ProviderResult<Vec<B>>
683 where
684 H: AsRef<HeaderTy<N>>,
685 HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
686 BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
687 {
688 let mut senders_cursor = self.tx.cursor_read::<tables::TransactionSenders>()?;
689
690 self.block_range(range, headers_range, |header, body, tx_range| {
691 let senders = if tx_range.is_empty() {
692 Vec::new()
693 } else {
694 let known_senders =
696 senders_cursor
697 .walk_range(tx_range.clone())?
698 .collect::<Result<HashMap<_, _>, _>>()?;
699
700 let mut senders = Vec::with_capacity(body.transactions().len());
701 for (tx_num, tx) in tx_range.zip(body.transactions()) {
702 match known_senders.get(&tx_num) {
703 None => {
704 let sender = tx
706 .recover_signer_unchecked()
707 .ok_or(ProviderError::SenderRecoveryError)?;
708 senders.push(sender);
709 }
710 Some(sender) => senders.push(*sender),
711 }
712 }
713
714 senders
715 };
716
717 assemble_block(header, body, senders)
718 })
719 }
720
721 fn populate_bundle_state<A, S>(
725 &self,
726 account_changeset: Vec<(u64, AccountBeforeTx)>,
727 storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
728 plain_accounts_cursor: &mut A,
729 plain_storage_cursor: &mut S,
730 ) -> ProviderResult<(BundleStateInit, RevertsInit)>
731 where
732 A: DbCursorRO<PlainAccountState>,
733 S: DbDupCursorRO<PlainStorageState>,
734 {
735 let mut state: BundleStateInit = HashMap::default();
739
740 let mut reverts: RevertsInit = HashMap::default();
746
747 for (block_number, account_before) in account_changeset.into_iter().rev() {
749 let AccountBeforeTx { info: old_info, address } = account_before;
750 match state.entry(address) {
751 hash_map::Entry::Vacant(entry) => {
752 let new_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
753 entry.insert((old_info, new_info, HashMap::default()));
754 }
755 hash_map::Entry::Occupied(mut entry) => {
756 entry.get_mut().0 = old_info;
758 }
759 }
760 reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
762 }
763
764 for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
766 let BlockNumberAddress((block_number, address)) = block_and_address;
767 let account_state = match state.entry(address) {
769 hash_map::Entry::Vacant(entry) => {
770 let present_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
771 entry.insert((present_info, present_info, HashMap::default()))
772 }
773 hash_map::Entry::Occupied(entry) => entry.into_mut(),
774 };
775
776 match account_state.2.entry(old_storage.key) {
778 hash_map::Entry::Vacant(entry) => {
779 let new_storage = plain_storage_cursor
780 .seek_by_key_subkey(address, old_storage.key)?
781 .filter(|storage| storage.key == old_storage.key)
782 .unwrap_or_default();
783 entry.insert((
784 (old_storage.value, old_storage.is_private),
785 (new_storage.value, new_storage.is_private),
786 ));
787 }
788 hash_map::Entry::Occupied(mut entry) => {
789 entry.get_mut().0 = (old_storage.value, old_storage.is_private);
790 }
791 };
792
793 reverts
794 .entry(block_number)
795 .or_default()
796 .entry(address)
797 .or_default()
798 .1
799 .push(old_storage);
800 }
801
802 Ok((state, reverts))
803 }
804}
805
806impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
807 pub fn commit(self) -> ProviderResult<bool> {
809 Ok(self.tx.commit()?)
810 }
811
812 fn take_shard<T>(&self, key: T::Key) -> ProviderResult<Vec<u64>>
815 where
816 T: Table<Value = BlockNumberList>,
817 {
818 let mut cursor = self.tx.cursor_read::<T>()?;
819 let shard = cursor.seek_exact(key)?;
820 if let Some((shard_key, list)) = shard {
821 self.tx.delete::<T>(shard_key, None)?;
823 let list = list.iter().collect::<Vec<_>>();
824 return Ok(list)
825 }
826 Ok(Vec::new())
827 }
828
829 fn append_history_index<P, T>(
837 &self,
838 index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
839 mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
840 ) -> ProviderResult<()>
841 where
842 P: Copy,
843 T: Table<Value = BlockNumberList>,
844 {
845 for (partial_key, indices) in index_updates {
846 let mut last_shard =
847 self.take_shard::<T>(sharded_key_factory(partial_key, u64::MAX))?;
848 last_shard.extend(indices);
849 let indices = last_shard;
851 let mut chunks = indices.chunks(sharded_key::NUM_OF_INDICES_IN_SHARD).peekable();
852 while let Some(list) = chunks.next() {
853 let highest_block_number = if chunks.peek().is_some() {
854 *list.last().expect("`chunks` does not return empty list")
855 } else {
856 u64::MAX
858 };
859 self.tx.put::<T>(
860 sharded_key_factory(partial_key, highest_block_number),
861 BlockNumberList::new_pre_sorted(list.iter().copied()),
862 )?;
863 }
864 }
865 Ok(())
866 }
867}
868
869impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
870 fn basic_account(&self, address: Address) -> ProviderResult<Option<Account>> {
871 Ok(self.tx.get::<tables::PlainAccountState>(address)?)
872 }
873}
874
875impl<TX: DbTx, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
876 fn changed_accounts_with_range(
877 &self,
878 range: impl RangeBounds<BlockNumber>,
879 ) -> ProviderResult<BTreeSet<Address>> {
880 self.tx
881 .cursor_read::<tables::AccountChangeSets>()?
882 .walk_range(range)?
883 .map(|entry| {
884 entry.map(|(_, account_before)| account_before.address).map_err(Into::into)
885 })
886 .collect()
887 }
888
889 fn basic_accounts(
890 &self,
891 iter: impl IntoIterator<Item = Address>,
892 ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
893 let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
894 Ok(iter
895 .into_iter()
896 .map(|address| plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v))))
897 .collect::<Result<Vec<_>, _>>()?)
898 }
899
900 fn changed_accounts_and_blocks_with_range(
901 &self,
902 range: RangeInclusive<BlockNumber>,
903 ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
904 let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
905
906 let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
907 BTreeMap::new(),
908 |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
909 let (index, account) = entry?;
910 accounts.entry(account.address).or_default().push(index);
911 Ok(accounts)
912 },
913 )?;
914
915 Ok(account_transitions)
916 }
917}
918
919impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
920 fn storage_changeset(
921 &self,
922 block_number: BlockNumber,
923 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
924 let range = block_number..=block_number;
925 let storage_range = BlockNumberAddress::range(range);
926 self.tx
927 .cursor_dup_read::<tables::StorageChangeSets>()?
928 .walk_range(storage_range)?
929 .map(|result| -> ProviderResult<_> { Ok(result?) })
930 .collect()
931 }
932}
933
934impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
935 fn account_block_changeset(
936 &self,
937 block_number: BlockNumber,
938 ) -> ProviderResult<Vec<AccountBeforeTx>> {
939 let range = block_number..=block_number;
940 self.tx
941 .cursor_read::<tables::AccountChangeSets>()?
942 .walk_range(range)?
943 .map(|result| -> ProviderResult<_> {
944 let (_, account_before) = result?;
945 Ok(account_before)
946 })
947 .collect()
948 }
949}
950
951impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
952 for DatabaseProvider<TX, N>
953{
954 type Header = HeaderTy<N>;
955
956 fn sync_gap(
957 &self,
958 tip: watch::Receiver<B256>,
959 highest_uninterrupted_block: BlockNumber,
960 ) -> ProviderResult<HeaderSyncGap<Self::Header>> {
961 let static_file_provider = self.static_file_provider();
962
963 let next_static_file_block_num = static_file_provider
966 .get_highest_static_file_block(StaticFileSegment::Headers)
967 .map(|id| id + 1)
968 .unwrap_or_default();
969 let next_block = highest_uninterrupted_block + 1;
970
971 match next_static_file_block_num.cmp(&next_block) {
972 Ordering::Greater => {
975 let mut static_file_producer =
976 static_file_provider.latest_writer(StaticFileSegment::Headers)?;
977 static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
978 static_file_producer.commit()?
981 }
982 Ordering::Less => {
983 return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
985 }
986 Ordering::Equal => {}
987 }
988
989 let local_head = static_file_provider
990 .sealed_header(highest_uninterrupted_block)?
991 .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
992
993 let target = SyncTarget::Tip(*tip.borrow());
994
995 Ok(HeaderSyncGap { local_head, target })
996 }
997}
998
999impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
1000 type Header = HeaderTy<N>;
1001
1002 fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
1003 if let Some(num) = self.block_number(*block_hash)? {
1004 Ok(self.header_by_number(num)?)
1005 } else {
1006 Ok(None)
1007 }
1008 }
1009
1010 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1011 self.static_file_provider.get_with_static_file_or_database(
1012 StaticFileSegment::Headers,
1013 num,
1014 |static_file| static_file.header_by_number(num),
1015 || Ok(self.tx.get::<tables::Headers<Self::Header>>(num)?),
1016 )
1017 }
1018
1019 fn header_td(&self, block_hash: &BlockHash) -> ProviderResult<Option<U256>> {
1020 if let Some(num) = self.block_number(*block_hash)? {
1021 self.header_td_by_number(num)
1022 } else {
1023 Ok(None)
1024 }
1025 }
1026
1027 fn header_td_by_number(&self, number: BlockNumber) -> ProviderResult<Option<U256>> {
1028 if let Some(td) = self.chain_spec.final_paris_total_difficulty(number) {
1029 return Ok(Some(td))
1032 }
1033
1034 self.static_file_provider.get_with_static_file_or_database(
1035 StaticFileSegment::Headers,
1036 number,
1037 |static_file| static_file.header_td_by_number(number),
1038 || Ok(self.tx.get::<tables::HeaderTerminalDifficulties>(number)?.map(|td| td.0)),
1039 )
1040 }
1041
1042 fn headers_range(
1043 &self,
1044 range: impl RangeBounds<BlockNumber>,
1045 ) -> ProviderResult<Vec<Self::Header>> {
1046 self.static_file_provider.get_range_with_static_file_or_database(
1047 StaticFileSegment::Headers,
1048 to_range(range),
1049 |static_file, range, _| static_file.headers_range(range),
1050 |range, _| {
1051 self.cursor_read_collect::<tables::Headers<Self::Header>>(range).map_err(Into::into)
1052 },
1053 |_| true,
1054 )
1055 }
1056
1057 fn sealed_header(
1058 &self,
1059 number: BlockNumber,
1060 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1061 self.static_file_provider.get_with_static_file_or_database(
1062 StaticFileSegment::Headers,
1063 number,
1064 |static_file| static_file.sealed_header(number),
1065 || {
1066 if let Some(header) = self.header_by_number(number)? {
1067 let hash = self
1068 .block_hash(number)?
1069 .ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
1070 Ok(Some(SealedHeader::new(header, hash)))
1071 } else {
1072 Ok(None)
1073 }
1074 },
1075 )
1076 }
1077
1078 fn sealed_headers_while(
1079 &self,
1080 range: impl RangeBounds<BlockNumber>,
1081 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1082 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1083 self.static_file_provider.get_range_with_static_file_or_database(
1084 StaticFileSegment::Headers,
1085 to_range(range),
1086 |static_file, range, predicate| static_file.sealed_headers_while(range, predicate),
1087 |range, mut predicate| {
1088 let mut headers = vec![];
1089 for entry in
1090 self.tx.cursor_read::<tables::Headers<Self::Header>>()?.walk_range(range)?
1091 {
1092 let (number, header) = entry?;
1093 let hash = self
1094 .block_hash(number)?
1095 .ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
1096 let sealed = SealedHeader::new(header, hash);
1097 if !predicate(&sealed) {
1098 break
1099 }
1100 headers.push(sealed);
1101 }
1102 Ok(headers)
1103 },
1104 predicate,
1105 )
1106 }
1107}
1108
1109impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1110 fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1111 self.static_file_provider.get_with_static_file_or_database(
1112 StaticFileSegment::Headers,
1113 number,
1114 |static_file| static_file.block_hash(number),
1115 || Ok(self.tx.get::<tables::CanonicalHeaders>(number)?),
1116 )
1117 }
1118
1119 fn canonical_hashes_range(
1120 &self,
1121 start: BlockNumber,
1122 end: BlockNumber,
1123 ) -> ProviderResult<Vec<B256>> {
1124 self.static_file_provider.get_range_with_static_file_or_database(
1125 StaticFileSegment::Headers,
1126 start..end,
1127 |static_file, range, _| static_file.canonical_hashes_range(range.start, range.end),
1128 |range, _| {
1129 self.cursor_read_collect::<tables::CanonicalHeaders>(range).map_err(Into::into)
1130 },
1131 |_| true,
1132 )
1133 }
1134}
1135
1136impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1137 fn chain_info(&self) -> ProviderResult<ChainInfo> {
1138 let best_number = self.best_block_number()?;
1139 let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1140 Ok(ChainInfo { best_hash, best_number })
1141 }
1142
1143 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1144 Ok(self
1145 .get_stage_checkpoint(StageId::Finish)?
1146 .map(|checkpoint| checkpoint.block_number)
1147 .unwrap_or_default())
1148 }
1149
1150 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1151 Ok(self
1152 .tx
1153 .cursor_read::<tables::CanonicalHeaders>()?
1154 .last()?
1155 .map(|(num, _)| num)
1156 .max(
1157 self.static_file_provider.get_highest_static_file_block(StaticFileSegment::Headers),
1158 )
1159 .unwrap_or_default())
1160 }
1161
1162 fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1163 Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1164 }
1165}
1166
1167impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1168 type Block = BlockTy<N>;
1169
1170 fn find_block_by_hash(
1171 &self,
1172 hash: B256,
1173 source: BlockSource,
1174 ) -> ProviderResult<Option<Self::Block>> {
1175 if source.is_canonical() {
1176 self.block(hash.into())
1177 } else {
1178 Ok(None)
1179 }
1180 }
1181
1182 fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1188 if let Some(number) = self.convert_hash_or_number(id)? {
1189 if let Some(header) = self.header_by_number(number)? {
1190 let Some(transactions) = self.transactions_by_block(number.into())? else {
1195 return Ok(None)
1196 };
1197
1198 let body = self
1199 .storage
1200 .reader()
1201 .read_block_bodies(self, vec![(&header, transactions)])?
1202 .pop()
1203 .ok_or(ProviderError::InvalidStorageOutput)?;
1204
1205 return Ok(Some(Self::Block::new(header, body)))
1206 }
1207 }
1208
1209 Ok(None)
1210 }
1211
1212 fn pending_block(&self) -> ProviderResult<Option<SealedBlockFor<Self::Block>>> {
1213 Ok(None)
1214 }
1215
1216 fn pending_block_with_senders(
1217 &self,
1218 ) -> ProviderResult<Option<SealedBlockWithSenders<Self::Block>>> {
1219 Ok(None)
1220 }
1221
1222 fn pending_block_and_receipts(
1223 &self,
1224 ) -> ProviderResult<Option<(SealedBlockFor<Self::Block>, Vec<Self::Receipt>)>> {
1225 Ok(None)
1226 }
1227
1228 fn block_with_senders(
1237 &self,
1238 id: BlockHashOrNumber,
1239 transaction_kind: TransactionVariant,
1240 ) -> ProviderResult<Option<BlockWithSenders<Self::Block>>> {
1241 self.block_with_senders(
1242 id,
1243 transaction_kind,
1244 |block_number| self.header_by_number(block_number),
1245 |header, body, senders| {
1246 Self::Block::new(header, body)
1247 .try_with_senders_unchecked(senders)
1251 .map(Some)
1252 .map_err(|_| ProviderError::SenderRecoveryError)
1253 },
1254 )
1255 }
1256
1257 fn sealed_block_with_senders(
1258 &self,
1259 id: BlockHashOrNumber,
1260 transaction_kind: TransactionVariant,
1261 ) -> ProviderResult<Option<SealedBlockWithSenders<Self::Block>>> {
1262 self.block_with_senders(
1263 id,
1264 transaction_kind,
1265 |block_number| self.sealed_header(block_number),
1266 |header, body, senders| {
1267 SealedBlock { header, body }
1268 .try_with_senders_unchecked(senders)
1272 .map(Some)
1273 .map_err(|_| ProviderError::SenderRecoveryError)
1274 },
1275 )
1276 }
1277
1278 fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1279 self.block_range(
1280 range,
1281 |range| self.headers_range(range),
1282 |header, body, _| Ok(Self::Block::new(header, body)),
1283 )
1284 }
1285
1286 fn block_with_senders_range(
1287 &self,
1288 range: RangeInclusive<BlockNumber>,
1289 ) -> ProviderResult<Vec<BlockWithSenders<Self::Block>>> {
1290 self.block_with_senders_range(
1291 range,
1292 |range| self.headers_range(range),
1293 |header, body, senders| {
1294 Self::Block::new(header, body)
1295 .try_with_senders_unchecked(senders)
1296 .map_err(|_| ProviderError::SenderRecoveryError)
1297 },
1298 )
1299 }
1300
1301 fn sealed_block_with_senders_range(
1302 &self,
1303 range: RangeInclusive<BlockNumber>,
1304 ) -> ProviderResult<Vec<SealedBlockWithSenders<Self::Block>>> {
1305 self.block_with_senders_range(
1306 range,
1307 |range| self.sealed_headers_range(range),
1308 |header, body, senders| {
1309 SealedBlockWithSenders::new(SealedBlock { header, body }, senders)
1310 .ok_or(ProviderError::SenderRecoveryError)
1311 },
1312 )
1313 }
1314}
1315
1316impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1317 for DatabaseProvider<TX, N>
1318{
1319 fn transaction_hashes_by_range(
1322 &self,
1323 tx_range: Range<TxNumber>,
1324 ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1325 self.static_file_provider.get_range_with_static_file_or_database(
1326 StaticFileSegment::Transactions,
1327 tx_range,
1328 |static_file, range, _| static_file.transaction_hashes_by_range(range),
1329 |tx_range, _| {
1330 let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<TxTy<N>>>()?;
1331 let tx_range_size = tx_range.clone().count();
1332 let tx_walker = tx_cursor.walk_range(tx_range)?;
1333
1334 let chunk_size = (tx_range_size / rayon::current_num_threads()).max(1);
1335 let mut channels = Vec::with_capacity(chunk_size);
1336 let mut transaction_count = 0;
1337
1338 #[inline]
1339 fn calculate_hash<T>(
1340 entry: Result<(TxNumber, T), DatabaseError>,
1341 rlp_buf: &mut Vec<u8>,
1342 ) -> Result<(B256, TxNumber), Box<ProviderError>>
1343 where
1344 T: Encodable2718,
1345 {
1346 let (tx_id, tx) = entry.map_err(|e| Box::new(e.into()))?;
1347 tx.encode_2718(rlp_buf);
1348 Ok((keccak256(rlp_buf), tx_id))
1349 }
1350
1351 for chunk in &tx_walker.chunks(chunk_size) {
1352 let (tx, rx) = mpsc::channel();
1353 channels.push(rx);
1354
1355 let chunk: Vec<_> = chunk.collect();
1358 transaction_count += chunk.len();
1359
1360 rayon::spawn(move || {
1364 let mut rlp_buf = Vec::with_capacity(128);
1365 for entry in chunk {
1366 rlp_buf.clear();
1367 let _ = tx.send(calculate_hash(entry, &mut rlp_buf));
1368 }
1369 });
1370 }
1371 let mut tx_list = Vec::with_capacity(transaction_count);
1372
1373 for channel in channels {
1375 while let Ok(tx) = channel.recv() {
1376 let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
1377 tx_list.push((tx_hash, tx_id));
1378 }
1379 }
1380
1381 Ok(tx_list)
1382 },
1383 |_| true,
1384 )
1385 }
1386}
1387
1388impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1390 type Transaction = TxTy<N>;
1391
1392 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1393 Ok(self.tx.get::<tables::TransactionHashNumbers>(tx_hash)?)
1394 }
1395
1396 fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1397 self.static_file_provider.get_with_static_file_or_database(
1398 StaticFileSegment::Transactions,
1399 id,
1400 |static_file| static_file.transaction_by_id(id),
1401 || Ok(self.tx.get::<tables::Transactions<Self::Transaction>>(id)?),
1402 )
1403 }
1404
1405 fn transaction_by_id_unhashed(
1406 &self,
1407 id: TxNumber,
1408 ) -> ProviderResult<Option<Self::Transaction>> {
1409 self.static_file_provider.get_with_static_file_or_database(
1410 StaticFileSegment::Transactions,
1411 id,
1412 |static_file| static_file.transaction_by_id_unhashed(id),
1413 || Ok(self.tx.get::<tables::Transactions<Self::Transaction>>(id)?),
1414 )
1415 }
1416
1417 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1418 if let Some(id) = self.transaction_id(hash)? {
1419 Ok(self.transaction_by_id_unhashed(id)?)
1420 } else {
1421 Ok(None)
1422 }
1423 }
1424
1425 fn transaction_by_hash_with_meta(
1426 &self,
1427 tx_hash: TxHash,
1428 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1429 let mut transaction_cursor = self.tx.cursor_read::<tables::TransactionBlocks>()?;
1430 if let Some(transaction_id) = self.transaction_id(tx_hash)? {
1431 if let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? {
1432 if let Some(block_number) =
1433 transaction_cursor.seek(transaction_id).map(|b| b.map(|(_, bn)| bn))?
1434 {
1435 if let Some(sealed_header) = self.sealed_header(block_number)? {
1436 let (header, block_hash) = sealed_header.split();
1437 if let Some(block_body) = self.block_body_indices(block_number)? {
1438 let index = transaction_id - block_body.first_tx_num();
1443
1444 let meta = TransactionMeta {
1445 tx_hash,
1446 index,
1447 block_hash,
1448 block_number,
1449 base_fee: header.base_fee_per_gas(),
1450 excess_blob_gas: header.excess_blob_gas(),
1451 timestamp: header.timestamp(),
1452 };
1453
1454 return Ok(Some((transaction, meta)))
1455 }
1456 }
1457 }
1458 }
1459 }
1460
1461 Ok(None)
1462 }
1463
1464 fn transaction_block(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1465 let mut cursor = self.tx.cursor_read::<tables::TransactionBlocks>()?;
1466 Ok(cursor.seek(id)?.map(|(_, bn)| bn))
1467 }
1468
1469 fn transactions_by_block(
1470 &self,
1471 id: BlockHashOrNumber,
1472 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1473 let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<Self::Transaction>>()?;
1474
1475 if let Some(block_number) = self.convert_hash_or_number(id)? {
1476 if let Some(body) = self.block_body_indices(block_number)? {
1477 let tx_range = body.tx_num_range();
1478 return if tx_range.is_empty() {
1479 Ok(Some(Vec::new()))
1480 } else {
1481 Ok(Some(self.transactions_by_tx_range_with_cursor(tx_range, &mut tx_cursor)?))
1482 }
1483 }
1484 }
1485 Ok(None)
1486 }
1487
1488 fn transactions_by_block_range(
1489 &self,
1490 range: impl RangeBounds<BlockNumber>,
1491 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1492 let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<Self::Transaction>>()?;
1493 let mut results = Vec::new();
1494 let mut body_cursor = self.tx.cursor_read::<tables::BlockBodyIndices>()?;
1495 for entry in body_cursor.walk_range(range)? {
1496 let (_, body) = entry?;
1497 let tx_num_range = body.tx_num_range();
1498 if tx_num_range.is_empty() {
1499 results.push(Vec::new());
1500 } else {
1501 results.push(
1502 self.transactions_by_tx_range_with_cursor(tx_num_range, &mut tx_cursor)?
1503 .into_iter()
1504 .collect(),
1505 );
1506 }
1507 }
1508 Ok(results)
1509 }
1510
1511 fn transactions_by_tx_range(
1512 &self,
1513 range: impl RangeBounds<TxNumber>,
1514 ) -> ProviderResult<Vec<Self::Transaction>> {
1515 self.transactions_by_tx_range_with_cursor(
1516 range,
1517 &mut self.tx.cursor_read::<tables::Transactions<_>>()?,
1518 )
1519 }
1520
1521 fn senders_by_tx_range(
1522 &self,
1523 range: impl RangeBounds<TxNumber>,
1524 ) -> ProviderResult<Vec<Address>> {
1525 self.cursor_read_collect::<tables::TransactionSenders>(range).map_err(Into::into)
1526 }
1527
1528 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1529 Ok(self.tx.get::<tables::TransactionSenders>(id)?)
1530 }
1531}
1532
1533impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
1534 type Receipt = ReceiptTy<N>;
1535
1536 fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1537 self.static_file_provider.get_with_static_file_or_database(
1538 StaticFileSegment::Receipts,
1539 id,
1540 |static_file| static_file.receipt(id),
1541 || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
1542 )
1543 }
1544
1545 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1546 if let Some(id) = self.transaction_id(hash)? {
1547 self.receipt(id)
1548 } else {
1549 Ok(None)
1550 }
1551 }
1552
1553 fn receipts_by_block(
1554 &self,
1555 block: BlockHashOrNumber,
1556 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1557 if let Some(number) = self.convert_hash_or_number(block)? {
1558 if let Some(body) = self.block_body_indices(number)? {
1559 let tx_range = body.tx_num_range();
1560 return if tx_range.is_empty() {
1561 Ok(Some(Vec::new()))
1562 } else {
1563 self.receipts_by_tx_range(tx_range).map(Some)
1564 }
1565 }
1566 }
1567 Ok(None)
1568 }
1569
1570 fn receipts_by_tx_range(
1571 &self,
1572 range: impl RangeBounds<TxNumber>,
1573 ) -> ProviderResult<Vec<Self::Receipt>> {
1574 self.static_file_provider.get_range_with_static_file_or_database(
1575 StaticFileSegment::Receipts,
1576 to_range(range),
1577 |static_file, range, _| static_file.receipts_by_tx_range(range),
1578 |range, _| {
1579 self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range)
1580 .map_err(Into::into)
1581 },
1582 |_| true,
1583 )
1584 }
1585}
1586
1587impl<TX: DbTx + 'static, N: NodeTypes<ChainSpec: EthereumHardforks>> WithdrawalsProvider
1588 for DatabaseProvider<TX, N>
1589{
1590 fn withdrawals_by_block(
1591 &self,
1592 id: BlockHashOrNumber,
1593 timestamp: u64,
1594 ) -> ProviderResult<Option<Withdrawals>> {
1595 if self.chain_spec.is_shanghai_active_at_timestamp(timestamp) {
1596 if let Some(number) = self.convert_hash_or_number(id)? {
1597 let withdrawals = self
1600 .tx
1601 .get::<tables::BlockWithdrawals>(number)
1602 .map(|w| w.map(|w| w.withdrawals))?
1603 .unwrap_or_default();
1604 return Ok(Some(withdrawals))
1605 }
1606 }
1607 Ok(None)
1608 }
1609
1610 fn latest_withdrawal(&self) -> ProviderResult<Option<Withdrawal>> {
1611 let latest_block_withdrawal = self.tx.cursor_read::<tables::BlockWithdrawals>()?.last()?;
1612 Ok(latest_block_withdrawal
1613 .and_then(|(_, mut block_withdrawal)| block_withdrawal.withdrawals.pop()))
1614 }
1615}
1616
1617impl<TX: DbTx + 'static, N: NodeTypesForProvider> OmmersProvider for DatabaseProvider<TX, N> {
1618 fn ommers(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Vec<Self::Header>>> {
1623 if let Some(number) = self.convert_hash_or_number(id)? {
1624 if self.chain_spec.final_paris_total_difficulty(number).is_some() {
1627 return Ok(Some(Vec::new()))
1628 }
1629
1630 let ommers =
1631 self.tx.get::<tables::BlockOmmers<Self::Header>>(number)?.map(|o| o.ommers);
1632 return Ok(ommers)
1633 }
1634
1635 Ok(None)
1636 }
1637}
1638
1639impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
1640 for DatabaseProvider<TX, N>
1641{
1642 fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1643 Ok(self.tx.get::<tables::BlockBodyIndices>(num)?)
1644 }
1645}
1646
1647impl<TX: DbTx + 'static, N: NodeTypesForProvider> EvmEnvProvider<HeaderTy<N>>
1648 for DatabaseProvider<TX, N>
1649{
1650 fn env_with_header<EvmConfig>(
1651 &self,
1652 header: &HeaderTy<N>,
1653 evm_config: EvmConfig,
1654 ) -> ProviderResult<(CfgEnvWithHandlerCfg, BlockEnv)>
1655 where
1656 EvmConfig: ConfigureEvmEnv<Header = HeaderTy<N>>,
1657 {
1658 let total_difficulty = self
1659 .header_td_by_number(header.number())?
1660 .ok_or_else(|| ProviderError::HeaderNotFound(header.number().into()))?;
1661 Ok(evm_config.cfg_and_block_env(header, total_difficulty))
1662 }
1663}
1664
1665impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
1666 fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1667 Ok(self.tx.get::<tables::StageCheckpoints>(id.to_string())?)
1668 }
1669
1670 fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1672 Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
1673 }
1674
1675 fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1676 self.tx
1677 .cursor_read::<tables::StageCheckpoints>()?
1678 .walk(None)?
1679 .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
1680 .map_err(ProviderError::Database)
1681 }
1682}
1683
1684impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
1685 fn save_stage_checkpoint(
1687 &self,
1688 id: StageId,
1689 checkpoint: StageCheckpoint,
1690 ) -> ProviderResult<()> {
1691 Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
1692 }
1693
1694 fn save_stage_checkpoint_progress(
1696 &self,
1697 id: StageId,
1698 checkpoint: Vec<u8>,
1699 ) -> ProviderResult<()> {
1700 Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
1701 }
1702
1703 fn update_pipeline_stages(
1704 &self,
1705 block_number: BlockNumber,
1706 drop_stage_checkpoint: bool,
1707 ) -> ProviderResult<()> {
1708 let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
1710 for stage_id in StageId::ALL {
1711 let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
1712 cursor.upsert(
1713 stage_id.to_string(),
1714 StageCheckpoint {
1715 block_number,
1716 ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
1717 },
1718 )?;
1719 }
1720
1721 Ok(())
1722 }
1723}
1724
1725impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
1726 fn plain_state_storages(
1727 &self,
1728 addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
1729 ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
1730 let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
1731
1732 addresses_with_keys
1733 .into_iter()
1734 .map(|(address, storage)| {
1735 storage
1736 .into_iter()
1737 .map(|key| -> ProviderResult<_> {
1738 Ok(plain_storage
1739 .seek_by_key_subkey(address, key)?
1740 .filter(|v| v.key == key)
1741 .unwrap_or_else(|| StorageEntry { key, ..Default::default() }))
1742 })
1743 .collect::<ProviderResult<Vec<_>>>()
1744 .map(|storage| (address, storage))
1745 })
1746 .collect::<ProviderResult<Vec<(_, _)>>>()
1747 }
1748
1749 fn changed_storages_with_range(
1750 &self,
1751 range: RangeInclusive<BlockNumber>,
1752 ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
1753 self.tx
1754 .cursor_read::<tables::StorageChangeSets>()?
1755 .walk_range(BlockNumberAddress::range(range))?
1756 .try_fold(BTreeMap::new(), |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
1759 let (BlockNumberAddress((_, address)), storage_entry) = entry?;
1760 accounts.entry(address).or_default().insert(storage_entry.key);
1761 Ok(accounts)
1762 })
1763 }
1764
1765 fn changed_storages_and_blocks_with_range(
1766 &self,
1767 range: RangeInclusive<BlockNumber>,
1768 ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
1769 let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
1770
1771 let storage_changeset_lists =
1772 changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
1773 BTreeMap::new(),
1774 |mut storages: BTreeMap<(Address, B256), Vec<u64>>, entry| -> ProviderResult<_> {
1775 let (index, storage) = entry?;
1776 storages
1777 .entry((index.address(), storage.key))
1778 .or_default()
1779 .push(index.block_number());
1780 Ok(storages)
1781 },
1782 )?;
1783
1784 Ok(storage_changeset_lists)
1785 }
1786}
1787
1788impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
1789 for DatabaseProvider<TX, N>
1790{
1791 type Receipt = ReceiptTy<N>;
1792
1793 fn write_state(
1794 &self,
1795 execution_outcome: ExecutionOutcome<Self::Receipt>,
1796 is_value_known: OriginalValuesKnown,
1797 write_receipts_to: StorageLocation,
1798 ) -> ProviderResult<()> {
1799 let (plain_state, reverts) =
1800 execution_outcome.bundle.to_plain_state_and_reverts(is_value_known);
1801
1802 self.write_state_reverts(reverts, execution_outcome.first_block)?;
1803 self.write_state_changes(plain_state)?;
1804
1805 let mut bodies_cursor = self.tx.cursor_read::<tables::BlockBodyIndices>()?;
1806
1807 let has_receipts_pruning = self.prune_modes.has_receipts_pruning() ||
1808 execution_outcome.receipts.iter().flatten().any(|receipt| receipt.is_none());
1809
1810 let mut receipts_cursor = (write_receipts_to.database() || has_receipts_pruning)
1815 .then(|| self.tx.cursor_write::<tables::Receipts<Self::Receipt>>())
1816 .transpose()?;
1817
1818 let mut receipts_static_writer = (write_receipts_to.static_files() &&
1822 !has_receipts_pruning)
1823 .then(|| {
1824 self.static_file_provider
1825 .get_writer(execution_outcome.first_block, StaticFileSegment::Receipts)
1826 })
1827 .transpose()?;
1828
1829 for (idx, receipts) in execution_outcome.receipts.into_iter().enumerate() {
1830 let block_number = execution_outcome.first_block + idx as u64;
1831
1832 if let Some(writer) = receipts_static_writer.as_mut() {
1834 writer.increment_block(block_number)?;
1835 }
1836
1837 let first_tx_index = bodies_cursor
1838 .seek_exact(block_number)?
1839 .map(|(_, indices)| indices.first_tx_num())
1840 .ok_or(ProviderError::BlockBodyIndicesNotFound(block_number))?;
1841
1842 for (idx, receipt) in receipts.into_iter().enumerate() {
1843 let receipt_idx = first_tx_index + idx as u64;
1844 if let Some(receipt) = receipt {
1845 if let Some(writer) = &mut receipts_static_writer {
1846 writer.append_receipt(receipt_idx, &receipt)?;
1847 }
1848
1849 if let Some(cursor) = &mut receipts_cursor {
1850 cursor.append(receipt_idx, receipt)?;
1851 }
1852 }
1853 }
1854 }
1855
1856 Ok(())
1857 }
1858
1859 fn write_state_reverts(
1860 &self,
1861 reverts: PlainStateReverts,
1862 first_block: BlockNumber,
1863 ) -> ProviderResult<()> {
1864 tracing::trace!("Writing storage changes");
1866 let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1867 let mut storage_changeset_cursor =
1868 self.tx_ref().cursor_dup_write::<tables::StorageChangeSets>()?;
1869 for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
1870 let block_number = first_block + block_index as BlockNumber;
1871
1872 tracing::trace!(block_number, "Writing block change");
1873 storage_changes.par_sort_unstable_by_key(|a| a.address);
1875 for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
1876 let storage_id = BlockNumberAddress((block_number, address));
1877
1878 let mut storage = storage_revert
1879 .into_iter()
1880 .map(|(k, v)| (B256::new(k.to_be_bytes()), v))
1881 .collect::<Vec<_>>();
1882 storage.par_sort_unstable_by_key(|a| a.0);
1884
1885 let mut wiped_storage = Vec::new();
1889 if wiped {
1890 tracing::trace!(?address, "Wiping storage");
1891 if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
1892 wiped_storage.push((entry.key, entry.into()));
1893 while let Some(entry) = storages_cursor.next_dup_val()? {
1894 wiped_storage.push((entry.key, entry.into()))
1895 }
1896 }
1897 }
1898
1899 tracing::trace!(?address, ?storage, "Writing storage reverts");
1900 for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
1901 storage_changeset_cursor.append_dup(
1902 storage_id,
1903 StorageEntry { key, value: value.value, is_private: value.is_private },
1904 )?;
1905 }
1906 }
1907 }
1908
1909 tracing::trace!("Writing account changes");
1911 let mut account_changeset_cursor =
1912 self.tx_ref().cursor_dup_write::<tables::AccountChangeSets>()?;
1913
1914 for (block_index, mut account_block_reverts) in reverts.accounts.into_iter().enumerate() {
1915 let block_number = first_block + block_index as BlockNumber;
1916 account_block_reverts.par_sort_by_key(|a| a.0);
1918
1919 for (address, info) in account_block_reverts {
1920 account_changeset_cursor.append_dup(
1921 block_number,
1922 AccountBeforeTx { address, info: info.map(Into::into) },
1923 )?;
1924 }
1925 }
1926
1927 Ok(())
1928 }
1929
1930 fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
1931 changes.accounts.par_sort_by_key(|a| a.0);
1934 changes.storage.par_sort_by_key(|a| a.address);
1935 changes.contracts.par_sort_by_key(|a| a.0);
1936
1937 tracing::trace!(len = changes.accounts.len(), "Writing new account state");
1939 let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
1940 for (address, account) in changes.accounts {
1942 if let Some(account) = account {
1943 tracing::trace!(?address, "Updating plain state account");
1944 accounts_cursor.upsert(address, account.into())?;
1945 } else if accounts_cursor.seek_exact(address)?.is_some() {
1946 tracing::trace!(?address, "Deleting plain state account");
1947 accounts_cursor.delete_current()?;
1948 }
1949 }
1950
1951 tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
1953 let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
1954 for (hash, bytecode) in changes.contracts {
1955 bytecodes_cursor.upsert(hash, Bytecode(bytecode))?;
1956 }
1957
1958 tracing::trace!(len = changes.storage.len(), "Writing new storage state");
1960 let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1961 for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
1962 if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
1964 storages_cursor.delete_current_duplicates()?;
1965 }
1966 let mut storage = storage
1968 .into_iter()
1969 .map(|(k, value)| StorageEntry {
1970 key: k.into(),
1971 value: value.value,
1972 is_private: value.is_private,
1973 })
1974 .collect::<Vec<_>>();
1975 storage.par_sort_unstable_by_key(|a| a.key);
1977
1978 for entry in storage {
1979 tracing::trace!(?address, ?entry.key, "Updating plain state storage");
1980 if let Some(db_entry) = storages_cursor.seek_by_key_subkey(address, entry.key)? {
1981 if db_entry.key == entry.key {
1982 storages_cursor.delete_current()?;
1983 }
1984 }
1985
1986 if !entry.value.is_zero() {
1987 storages_cursor.upsert(address, entry)?;
1988 }
1989 }
1990 }
1991
1992 Ok(())
1993 }
1994
1995 fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
1996 let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
1998 for (hashed_address, account) in hashed_state.accounts().accounts_sorted() {
1999 if let Some(account) = account {
2000 hashed_accounts_cursor.upsert(hashed_address, account)?;
2001 } else if hashed_accounts_cursor.seek_exact(hashed_address)?.is_some() {
2002 hashed_accounts_cursor.delete_current()?;
2003 }
2004 }
2005
2006 let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
2008 let mut hashed_storage_cursor =
2009 self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
2010 for (hashed_address, storage) in sorted_storages {
2011 if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
2012 hashed_storage_cursor.delete_current_duplicates()?;
2013 }
2014
2015 for (hashed_slot, value) in storage.storage_slots_sorted() {
2016 let entry = StorageEntry {
2017 key: hashed_slot,
2018 value: value.value,
2019 is_private: value.is_private,
2020 };
2021 if let Some(db_entry) =
2022 hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)?
2023 {
2024 if db_entry.key == entry.key {
2025 hashed_storage_cursor.delete_current()?;
2026 }
2027 }
2028
2029 if !entry.value.is_zero() {
2030 hashed_storage_cursor.upsert(*hashed_address, entry)?;
2031 }
2032 }
2033 }
2034
2035 Ok(())
2036 }
2037
2038 fn remove_state_above(
2060 &self,
2061 block: BlockNumber,
2062 remove_receipts_from: StorageLocation,
2063 ) -> ProviderResult<()> {
2064 let range = block + 1..=self.last_block_number()?;
2065
2066 if range.is_empty() {
2067 return Ok(());
2068 }
2069
2070 let block_bodies = self.get::<tables::BlockBodyIndices>(range.clone())?;
2072
2073 let from_transaction_num =
2075 block_bodies.first().expect("already checked if there are blocks").1.first_tx_num();
2076
2077 let storage_range = BlockNumberAddress::range(range.clone());
2078
2079 let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
2080 let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
2081
2082 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2087 let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2088
2089 let (state, _) = self.populate_bundle_state(
2090 account_changeset,
2091 storage_changeset,
2092 &mut plain_accounts_cursor,
2093 &mut plain_storage_cursor,
2094 )?;
2095
2096 for (address, (old_account, new_account, storage)) in &state {
2098 if old_account != new_account {
2100 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2101 if let Some(account) = old_account {
2102 plain_accounts_cursor.upsert(*address, *account)?;
2103 } else if existing_entry.is_some() {
2104 plain_accounts_cursor.delete_current()?;
2105 }
2106 }
2107
2108 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2110 let storage_entry = StorageEntry {
2111 key: *storage_key,
2112 value: old_storage_value.0,
2113 is_private: old_storage_value.1,
2114 };
2115 if plain_storage_cursor
2118 .seek_by_key_subkey(*address, *storage_key)?
2119 .filter(|s| s.key == *storage_key)
2120 .is_some()
2121 {
2122 plain_storage_cursor.delete_current()?
2123 }
2124
2125 if !FlaggedStorage::new(old_storage_value.0, old_storage_value.1).is_zero() {
2127 plain_storage_cursor.upsert(*address, storage_entry)?;
2128 }
2129 }
2130 }
2131
2132 self.remove_receipts_from(from_transaction_num, block, remove_receipts_from)?;
2133
2134 Ok(())
2135 }
2136
2137 fn take_state_above(
2159 &self,
2160 block: BlockNumber,
2161 remove_receipts_from: StorageLocation,
2162 ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
2163 let range = block + 1..=self.last_block_number()?;
2164
2165 if range.is_empty() {
2166 return Ok(ExecutionOutcome::default())
2167 }
2168 let start_block_number = *range.start();
2169
2170 let block_bodies = self.get::<tables::BlockBodyIndices>(range.clone())?;
2172
2173 let from_transaction_num =
2175 block_bodies.first().expect("already checked if there are blocks").1.first_tx_num();
2176 let to_transaction_num =
2177 block_bodies.last().expect("already checked if there are blocks").1.last_tx_num();
2178
2179 let storage_range = BlockNumberAddress::range(range.clone());
2180
2181 let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
2182 let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
2183
2184 let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2189 let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2190
2191 let (state, reverts) = self.populate_bundle_state(
2194 account_changeset,
2195 storage_changeset,
2196 &mut plain_accounts_cursor,
2197 &mut plain_storage_cursor,
2198 )?;
2199
2200 for (address, (old_account, new_account, storage)) in &state {
2202 if old_account != new_account {
2204 let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2205 if let Some(account) = old_account {
2206 plain_accounts_cursor.upsert(*address, *account)?;
2207 } else if existing_entry.is_some() {
2208 plain_accounts_cursor.delete_current()?;
2209 }
2210 }
2211
2212 for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2214 let storage_entry = StorageEntry {
2215 key: *storage_key,
2216 value: old_storage_value.0,
2217 is_private: old_storage_value.1,
2218 };
2219 if plain_storage_cursor
2222 .seek_by_key_subkey(*address, *storage_key)?
2223 .filter(|s| s.key == *storage_key)
2224 .is_some()
2225 {
2226 plain_storage_cursor.delete_current()?
2227 }
2228
2229 if !FlaggedStorage::new(old_storage_value.0, old_storage_value.1).is_zero() {
2231 plain_storage_cursor.upsert(*address, storage_entry)?;
2232 }
2233 }
2234 }
2235
2236 let mut receipts_iter = self
2238 .static_file_provider
2239 .get_range_with_static_file_or_database(
2240 StaticFileSegment::Receipts,
2241 from_transaction_num..to_transaction_num + 1,
2242 |static_file, range, _| {
2243 static_file
2244 .receipts_by_tx_range(range.clone())
2245 .map(|r| range.into_iter().zip(r).collect())
2246 },
2247 |range, _| {
2248 self.tx
2249 .cursor_read::<tables::Receipts<Self::Receipt>>()?
2250 .walk_range(range)?
2251 .map(|r| r.map_err(Into::into))
2252 .collect()
2253 },
2254 |_| true,
2255 )?
2256 .into_iter()
2257 .peekable();
2258
2259 let mut receipts = Vec::with_capacity(block_bodies.len());
2260 for (_, block_body) in block_bodies {
2262 let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
2263 for num in block_body.tx_num_range() {
2264 if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
2265 block_receipts.push(receipts_iter.next().map(|(_, r)| r));
2266 } else {
2267 block_receipts.push(None);
2268 }
2269 }
2270 receipts.push(block_receipts);
2271 }
2272
2273 self.remove_receipts_from(from_transaction_num, block, remove_receipts_from)?;
2274
2275 Ok(ExecutionOutcome::new_init(
2276 state,
2277 reverts,
2278 Vec::new(),
2279 receipts.into(),
2280 start_block_number,
2281 Vec::new(),
2282 ))
2283 }
2284}
2285
2286impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
2287 fn write_trie_updates(&self, trie_updates: &TrieUpdates) -> ProviderResult<usize> {
2289 if trie_updates.is_empty() {
2290 return Ok(0)
2291 }
2292
2293 let mut num_entries = 0;
2295
2296 let mut account_updates = trie_updates
2298 .removed_nodes_ref()
2299 .iter()
2300 .filter_map(|n| {
2301 (!trie_updates.account_nodes_ref().contains_key(n)).then_some((n, None))
2302 })
2303 .collect::<Vec<_>>();
2304 account_updates.extend(
2305 trie_updates.account_nodes_ref().iter().map(|(nibbles, node)| (nibbles, Some(node))),
2306 );
2307 account_updates.sort_unstable_by(|a, b| a.0.cmp(b.0));
2309
2310 let tx = self.tx_ref();
2311 let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
2312 for (key, updated_node) in account_updates {
2313 let nibbles = StoredNibbles(key.clone());
2314 match updated_node {
2315 Some(node) => {
2316 if !nibbles.0.is_empty() {
2317 num_entries += 1;
2318 account_trie_cursor.upsert(nibbles, node.clone())?;
2319 }
2320 }
2321 None => {
2322 num_entries += 1;
2323 if account_trie_cursor.seek_exact(nibbles)?.is_some() {
2324 account_trie_cursor.delete_current()?;
2325 }
2326 }
2327 }
2328 }
2329
2330 num_entries += self.write_storage_trie_updates(trie_updates.storage_tries_ref())?;
2331
2332 Ok(num_entries)
2333 }
2334}
2335
2336impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
2337 fn write_storage_trie_updates(
2340 &self,
2341 storage_tries: &B256HashMap<StorageTrieUpdates>,
2342 ) -> ProviderResult<usize> {
2343 let mut num_entries = 0;
2344 let mut storage_tries = Vec::from_iter(storage_tries);
2345 storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
2346 let mut cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2347 for (hashed_address, storage_trie_updates) in storage_tries {
2348 let mut db_storage_trie_cursor =
2349 DatabaseStorageTrieCursor::new(cursor, *hashed_address);
2350 num_entries +=
2351 db_storage_trie_cursor.write_storage_trie_updates(storage_trie_updates)?;
2352 cursor = db_storage_trie_cursor.cursor;
2353 }
2354
2355 Ok(num_entries)
2356 }
2357
2358 fn write_individual_storage_trie_updates(
2359 &self,
2360 hashed_address: B256,
2361 updates: &StorageTrieUpdates,
2362 ) -> ProviderResult<usize> {
2363 if updates.is_empty() {
2364 return Ok(0)
2365 }
2366
2367 let cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2368 let mut trie_db_cursor = DatabaseStorageTrieCursor::new(cursor, hashed_address);
2369 Ok(trie_db_cursor.write_storage_trie_updates(updates)?)
2370 }
2371}
2372
2373impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
2374 fn unwind_account_hashing<'a>(
2375 &self,
2376 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2377 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2378 let hashed_accounts = changesets
2382 .into_iter()
2383 .map(|(_, e)| (keccak256(e.address), e.info))
2384 .collect::<Vec<_>>()
2385 .into_iter()
2386 .rev()
2387 .collect::<BTreeMap<_, _>>();
2388
2389 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2391 for (hashed_address, account) in &hashed_accounts {
2392 if let Some(account) = account {
2393 hashed_accounts_cursor.upsert(*hashed_address, *account)?;
2394 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2395 hashed_accounts_cursor.delete_current()?;
2396 }
2397 }
2398
2399 Ok(hashed_accounts)
2400 }
2401
2402 fn unwind_account_hashing_range(
2403 &self,
2404 range: impl RangeBounds<BlockNumber>,
2405 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2406 let changesets = self
2407 .tx
2408 .cursor_read::<tables::AccountChangeSets>()?
2409 .walk_range(range)?
2410 .collect::<Result<Vec<_>, _>>()?;
2411 self.unwind_account_hashing(changesets.iter())
2412 }
2413
2414 fn insert_account_for_hashing(
2415 &self,
2416 changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
2417 ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2418 let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2419 let hashed_accounts =
2420 changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
2421 for (hashed_address, account) in &hashed_accounts {
2422 if let Some(account) = account {
2423 hashed_accounts_cursor.upsert(*hashed_address, *account)?;
2424 } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2425 hashed_accounts_cursor.delete_current()?;
2426 }
2427 }
2428 Ok(hashed_accounts)
2429 }
2430
2431 fn unwind_storage_hashing(
2432 &self,
2433 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2434 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2435 let mut hashed_storages = changesets
2437 .into_iter()
2438 .map(|(BlockNumberAddress((_, address)), storage_entry)| {
2439 (
2440 keccak256(address),
2441 keccak256(storage_entry.key),
2442 storage_entry.value,
2443 storage_entry.is_private,
2444 )
2445 })
2446 .collect::<Vec<_>>();
2447 hashed_storages.sort_by_key(|(ha, hk, _, _)| (*ha, *hk));
2448
2449 let mut hashed_storage_keys: HashMap<B256, BTreeSet<B256>> =
2451 HashMap::with_capacity_and_hasher(hashed_storages.len(), Default::default());
2452 let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2453 for (hashed_address, key, value, is_private) in hashed_storages.into_iter().rev() {
2454 hashed_storage_keys.entry(hashed_address).or_default().insert(key);
2455
2456 if hashed_storage
2457 .seek_by_key_subkey(hashed_address, key)?
2458 .filter(|entry| entry.key == key)
2459 .is_some()
2460 {
2461 hashed_storage.delete_current()?;
2462 }
2463
2464 if !value.is_zero() {
2465 hashed_storage.upsert(hashed_address, StorageEntry { key, value, is_private })?;
2466 }
2467 }
2468 Ok(hashed_storage_keys)
2469 }
2470
2471 fn unwind_storage_hashing_range(
2472 &self,
2473 range: impl RangeBounds<BlockNumberAddress>,
2474 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2475 let changesets = self
2476 .tx
2477 .cursor_read::<tables::StorageChangeSets>()?
2478 .walk_range(range)?
2479 .collect::<Result<Vec<_>, _>>()?;
2480 self.unwind_storage_hashing(changesets.into_iter())
2481 }
2482
2483 fn insert_storage_for_hashing(
2484 &self,
2485 storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
2486 ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2487 let hashed_storages =
2489 storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
2490 let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
2491 map.insert(keccak256(entry.key), entry.value);
2492 map
2493 });
2494 map.insert(keccak256(address), storage);
2495 map
2496 });
2497
2498 let hashed_storage_keys =
2499 HashMap::from_iter(hashed_storages.iter().map(|(hashed_address, entries)| {
2500 (*hashed_address, BTreeSet::from_iter(entries.keys().copied()))
2501 }));
2502
2503 let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2504 hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
2507 storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
2508 if hashed_storage_cursor
2509 .seek_by_key_subkey(hashed_address, key)?
2510 .filter(|entry| entry.key == key)
2511 .is_some()
2512 {
2513 hashed_storage_cursor.delete_current()?;
2514 }
2515
2516 if !value.is_zero() {
2517 hashed_storage_cursor.upsert(
2518 hashed_address,
2519 StorageEntry { key, value, ..Default::default() },
2520 )?;
2521 }
2522 Ok(())
2523 })
2524 })?;
2525
2526 Ok(hashed_storage_keys)
2527 }
2528
2529 fn insert_hashes(
2530 &self,
2531 range: RangeInclusive<BlockNumber>,
2532 end_block_hash: B256,
2533 expected_state_root: B256,
2534 ) -> ProviderResult<()> {
2535 let mut account_prefix_set = PrefixSetMut::default();
2537 let mut storage_prefix_sets: HashMap<B256, PrefixSetMut> = HashMap::default();
2538 let mut destroyed_accounts = HashSet::default();
2539
2540 let mut durations_recorder = metrics::DurationsRecorder::default();
2541
2542 {
2544 let lists = self.changed_storages_with_range(range.clone())?;
2545 let storages = self.plain_state_storages(lists)?;
2546 let storage_entries = self.insert_storage_for_hashing(storages)?;
2547 for (hashed_address, hashed_slots) in storage_entries {
2548 account_prefix_set.insert(Nibbles::unpack(hashed_address));
2549 for slot in hashed_slots {
2550 storage_prefix_sets
2551 .entry(hashed_address)
2552 .or_default()
2553 .insert(Nibbles::unpack(slot));
2554 }
2555 }
2556 }
2557 durations_recorder.record_relative(metrics::Action::InsertStorageHashing);
2558
2559 {
2561 let lists = self.changed_accounts_with_range(range.clone())?;
2562 let accounts = self.basic_accounts(lists)?;
2563 let hashed_addresses = self.insert_account_for_hashing(accounts)?;
2564 for (hashed_address, account) in hashed_addresses {
2565 account_prefix_set.insert(Nibbles::unpack(hashed_address));
2566 if account.is_none() {
2567 destroyed_accounts.insert(hashed_address);
2568 }
2569 }
2570 }
2571 durations_recorder.record_relative(metrics::Action::InsertAccountHashing);
2572
2573 {
2575 let prefix_sets = TriePrefixSets {
2578 account_prefix_set: account_prefix_set.freeze(),
2579 storage_prefix_sets: storage_prefix_sets
2580 .into_iter()
2581 .map(|(k, v)| (k, v.freeze()))
2582 .collect(),
2583 destroyed_accounts,
2584 };
2585 let (state_root, trie_updates) = StateRoot::from_tx(&self.tx)
2586 .with_prefix_sets(prefix_sets)
2587 .root_with_updates()
2588 .map_err(reth_db::DatabaseError::from)?;
2589 if state_root != expected_state_root {
2590 return Err(ProviderError::StateRootMismatch(Box::new(RootMismatch {
2591 root: GotExpected { got: state_root, expected: expected_state_root },
2592 block_number: *range.end(),
2593 block_hash: end_block_hash,
2594 })))
2595 }
2596 self.write_trie_updates(&trie_updates)?;
2597 }
2598 durations_recorder.record_relative(metrics::Action::InsertMerkleTree);
2599
2600 debug!(target: "providers::db", ?range, actions = ?durations_recorder.actions, "Inserted hashes");
2601
2602 Ok(())
2603 }
2604}
2605
2606impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
2607 fn unwind_account_history_indices<'a>(
2608 &self,
2609 changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2610 ) -> ProviderResult<usize> {
2611 let mut last_indices = changesets
2612 .into_iter()
2613 .map(|(index, account)| (account.address, *index))
2614 .collect::<Vec<_>>();
2615 last_indices.sort_by_key(|(a, _)| *a);
2616
2617 let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
2619 for &(address, rem_index) in &last_indices {
2620 let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
2621 &mut cursor,
2622 ShardedKey::last(address),
2623 rem_index,
2624 |sharded_key| sharded_key.key == address,
2625 )?;
2626
2627 if !partial_shard.is_empty() {
2630 cursor.insert(
2631 ShardedKey::last(address),
2632 BlockNumberList::new_pre_sorted(partial_shard),
2633 )?;
2634 }
2635 }
2636
2637 let changesets = last_indices.len();
2638 Ok(changesets)
2639 }
2640
2641 fn unwind_account_history_indices_range(
2642 &self,
2643 range: impl RangeBounds<BlockNumber>,
2644 ) -> ProviderResult<usize> {
2645 let changesets = self
2646 .tx
2647 .cursor_read::<tables::AccountChangeSets>()?
2648 .walk_range(range)?
2649 .collect::<Result<Vec<_>, _>>()?;
2650 self.unwind_account_history_indices(changesets.iter())
2651 }
2652
2653 fn insert_account_history_index(
2654 &self,
2655 account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
2656 ) -> ProviderResult<()> {
2657 self.append_history_index::<_, tables::AccountsHistory>(
2658 account_transitions,
2659 ShardedKey::new,
2660 )
2661 }
2662
2663 fn unwind_storage_history_indices(
2664 &self,
2665 changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2666 ) -> ProviderResult<usize> {
2667 let mut storage_changesets = changesets
2668 .into_iter()
2669 .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
2670 .collect::<Vec<_>>();
2671 storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
2672
2673 let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
2674 for &(address, storage_key, rem_index) in &storage_changesets {
2675 let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
2676 &mut cursor,
2677 StorageShardedKey::last(address, storage_key),
2678 rem_index,
2679 |storage_sharded_key| {
2680 storage_sharded_key.address == address &&
2681 storage_sharded_key.sharded_key.key == storage_key
2682 },
2683 )?;
2684
2685 if !partial_shard.is_empty() {
2688 cursor.insert(
2689 StorageShardedKey::last(address, storage_key),
2690 BlockNumberList::new_pre_sorted(partial_shard),
2691 )?;
2692 }
2693 }
2694
2695 let changesets = storage_changesets.len();
2696 Ok(changesets)
2697 }
2698
2699 fn unwind_storage_history_indices_range(
2700 &self,
2701 range: impl RangeBounds<BlockNumberAddress>,
2702 ) -> ProviderResult<usize> {
2703 let changesets = self
2704 .tx
2705 .cursor_read::<tables::StorageChangeSets>()?
2706 .walk_range(range)?
2707 .collect::<Result<Vec<_>, _>>()?;
2708 self.unwind_storage_history_indices(changesets.into_iter())
2709 }
2710
2711 fn insert_storage_history_index(
2712 &self,
2713 storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
2714 ) -> ProviderResult<()> {
2715 self.append_history_index::<_, tables::StoragesHistory>(
2716 storage_transitions,
2717 |(address, storage_key), highest_block_number| {
2718 StorageShardedKey::new(address, storage_key, highest_block_number)
2719 },
2720 )
2721 }
2722
2723 fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
2724 {
2726 let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
2727 self.insert_account_history_index(indices)?;
2728 }
2729
2730 {
2732 let indices = self.changed_storages_and_blocks_with_range(range)?;
2733 self.insert_storage_history_index(indices)?;
2734 }
2735
2736 Ok(())
2737 }
2738}
2739
2740impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockExecutionWriter
2741 for DatabaseProvider<TX, N>
2742{
2743 fn take_block_and_execution_above(
2744 &self,
2745 block: BlockNumber,
2746 remove_from: StorageLocation,
2747 ) -> ProviderResult<Chain<Self::Primitives>> {
2748 let range = block + 1..=self.last_block_number()?;
2749
2750 self.unwind_trie_state_range(range.clone())?;
2751
2752 let execution_state = self.take_state_above(block, remove_from)?;
2754
2755 let blocks = self.sealed_block_with_senders_range(range)?;
2756
2757 self.remove_blocks_above(block, remove_from)?;
2760
2761 self.update_pipeline_stages(block, true)?;
2763
2764 Ok(Chain::new(blocks, execution_state, None))
2765 }
2766
2767 fn remove_block_and_execution_above(
2768 &self,
2769 block: BlockNumber,
2770 remove_from: StorageLocation,
2771 ) -> ProviderResult<()> {
2772 let range = block + 1..=self.last_block_number()?;
2773
2774 self.unwind_trie_state_range(range)?;
2775
2776 self.remove_state_above(block, remove_from)?;
2778
2779 self.remove_blocks_above(block, remove_from)?;
2782
2783 self.update_pipeline_stages(block, true)?;
2785
2786 Ok(())
2787 }
2788}
2789
2790impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWriter
2791 for DatabaseProvider<TX, N>
2792{
2793 type Block = BlockTy<N>;
2794 type Receipt = ReceiptTy<N>;
2795
2796 fn insert_block(
2817 &self,
2818 block: SealedBlockWithSenders<Self::Block>,
2819 write_to: StorageLocation,
2820 ) -> ProviderResult<StoredBlockBodyIndices> {
2821 let block_number = block.number();
2822
2823 let mut durations_recorder = metrics::DurationsRecorder::default();
2824
2825 let ttd = if block_number == 0 {
2827 block.difficulty()
2828 } else {
2829 let parent_block_number = block_number - 1;
2830 let parent_ttd = self.header_td_by_number(parent_block_number)?.unwrap_or_default();
2831 durations_recorder.record_relative(metrics::Action::GetParentTD);
2832 parent_ttd + block.difficulty()
2833 };
2834
2835 if write_to.database() {
2836 self.tx.put::<tables::CanonicalHeaders>(block_number, block.hash())?;
2837 durations_recorder.record_relative(metrics::Action::InsertCanonicalHeaders);
2838
2839 self.tx
2841 .put::<tables::Headers<HeaderTy<N>>>(block_number, block.header.as_ref().clone())?;
2842 durations_recorder.record_relative(metrics::Action::InsertHeaders);
2843
2844 self.tx.put::<tables::HeaderTerminalDifficulties>(block_number, ttd.into())?;
2845 durations_recorder.record_relative(metrics::Action::InsertHeaderTerminalDifficulties);
2846 }
2847
2848 if write_to.static_files() {
2849 let mut writer =
2850 self.static_file_provider.get_writer(block_number, StaticFileSegment::Headers)?;
2851 writer.append_header(&block.header, ttd, &block.hash())?;
2852 }
2853
2854 self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
2855 durations_recorder.record_relative(metrics::Action::InsertHeaderNumbers);
2856
2857 let mut next_tx_num = self
2858 .tx
2859 .cursor_read::<tables::TransactionBlocks>()?
2860 .last()?
2861 .map(|(n, _)| n + 1)
2862 .unwrap_or_default();
2863 durations_recorder.record_relative(metrics::Action::GetNextTxNum);
2864 let first_tx_num = next_tx_num;
2865
2866 let tx_count = block.block.body.transactions().len() as u64;
2867
2868 for (transaction, sender) in
2870 block.block.body.transactions().iter().zip(block.senders.iter())
2871 {
2872 let hash = transaction.tx_hash();
2873
2874 if self.prune_modes.sender_recovery.as_ref().is_none_or(|m| !m.is_full()) {
2875 self.tx.put::<tables::TransactionSenders>(next_tx_num, *sender)?;
2876 }
2877
2878 if self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full()) {
2879 self.tx.put::<tables::TransactionHashNumbers>(*hash, next_tx_num)?;
2880 }
2881 next_tx_num += 1;
2882 }
2883
2884 self.append_block_bodies(vec![(block_number, Some(block.block.body))], write_to)?;
2885
2886 debug!(
2887 target: "providers::db",
2888 ?block_number,
2889 actions = ?durations_recorder.actions,
2890 "Inserted block"
2891 );
2892
2893 Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
2894 }
2895
2896 fn append_block_bodies(
2897 &self,
2898 bodies: Vec<(BlockNumber, Option<BodyTy<N>>)>,
2899 write_to: StorageLocation,
2900 ) -> ProviderResult<()> {
2901 let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
2902
2903 let mut tx_static_writer = write_to
2905 .static_files()
2906 .then(|| {
2907 self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)
2908 })
2909 .transpose()?;
2910
2911 let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
2912 let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
2913
2914 let mut tx_cursor = write_to
2916 .database()
2917 .then(|| self.tx.cursor_write::<tables::Transactions<TxTy<N>>>())
2918 .transpose()?;
2919
2920 let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
2922
2923 for (block_number, body) in &bodies {
2924 if let Some(writer) = tx_static_writer.as_mut() {
2926 writer.increment_block(*block_number)?;
2927 }
2928
2929 let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
2930 let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
2931
2932 let mut durations_recorder = metrics::DurationsRecorder::default();
2933
2934 block_indices_cursor.append(*block_number, block_indices)?;
2936
2937 durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
2938
2939 let Some(body) = body else { continue };
2940
2941 if !body.transactions().is_empty() {
2943 tx_block_cursor.append(block_indices.last_tx_num(), *block_number)?;
2944 durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
2945 }
2946
2947 for transaction in body.transactions() {
2949 if let Some(writer) = tx_static_writer.as_mut() {
2950 writer.append_transaction(next_tx_num, transaction)?;
2951 }
2952 if let Some(cursor) = tx_cursor.as_mut() {
2953 cursor.append(next_tx_num, transaction.clone())?;
2954 }
2955
2956 next_tx_num += 1;
2958 }
2959
2960 debug!(
2961 target: "providers::db",
2962 ?block_number,
2963 actions = ?durations_recorder.actions,
2964 "Inserted block body"
2965 );
2966 }
2967
2968 self.storage.writer().write_block_bodies(self, bodies, write_to)?;
2969
2970 Ok(())
2971 }
2972
2973 fn remove_blocks_above(
2974 &self,
2975 block: BlockNumber,
2976 remove_from: StorageLocation,
2977 ) -> ProviderResult<()> {
2978 let mut canonical_headers_cursor = self.tx.cursor_write::<tables::CanonicalHeaders>()?;
2979 let mut rev_headers = canonical_headers_cursor.walk_back(None)?;
2980
2981 while let Some(Ok((number, hash))) = rev_headers.next() {
2982 if number <= block {
2983 break
2984 }
2985 self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
2986 rev_headers.delete_current()?;
2987 }
2988 self.remove::<tables::Headers<HeaderTy<N>>>(block + 1..)?;
2989 self.remove::<tables::HeaderTerminalDifficulties>(block + 1..)?;
2990
2991 let unwind_tx_from = self
2993 .tx
2994 .get::<tables::BlockBodyIndices>(block)?
2995 .map(|b| b.next_tx_num())
2996 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
2997
2998 let unwind_tx_to = self
3000 .tx
3001 .cursor_read::<tables::BlockBodyIndices>()?
3002 .last()?
3003 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
3005 .1
3006 .last_tx_num();
3007
3008 if unwind_tx_from <= unwind_tx_to {
3009 for (hash, _) in self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))? {
3010 self.tx.delete::<tables::TransactionHashNumbers>(hash, None)?;
3011 }
3012 }
3013
3014 self.remove::<tables::TransactionSenders>(unwind_tx_from..)?;
3015
3016 self.remove_bodies_above(block, remove_from)?;
3017
3018 Ok(())
3019 }
3020
3021 fn remove_bodies_above(
3022 &self,
3023 block: BlockNumber,
3024 remove_from: StorageLocation,
3025 ) -> ProviderResult<()> {
3026 self.storage.writer().remove_block_bodies_above(self, block, remove_from)?;
3027
3028 let unwind_tx_from = self
3030 .tx
3031 .get::<tables::BlockBodyIndices>(block)?
3032 .map(|b| b.next_tx_num())
3033 .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3034
3035 self.remove::<tables::BlockBodyIndices>(block + 1..)?;
3036 self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
3037
3038 if remove_from.database() {
3039 self.remove::<tables::Transactions<TxTy<N>>>(unwind_tx_from..)?;
3040 }
3041
3042 if remove_from.static_files() {
3043 let static_file_tx_num = self
3044 .static_file_provider
3045 .get_highest_static_file_tx(StaticFileSegment::Transactions);
3046
3047 let to_delete = static_file_tx_num
3048 .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
3049 .unwrap_or_default();
3050
3051 self.static_file_provider
3052 .latest_writer(StaticFileSegment::Transactions)?
3053 .prune_transactions(to_delete, block)?;
3054 }
3055
3056 Ok(())
3057 }
3058
3059 fn append_blocks_with_state(
3061 &self,
3062 blocks: Vec<SealedBlockWithSenders<Self::Block>>,
3063 execution_outcome: ExecutionOutcome<Self::Receipt>,
3064 hashed_state: HashedPostStateSorted,
3065 trie_updates: TrieUpdates,
3066 ) -> ProviderResult<()> {
3067 if blocks.is_empty() {
3068 debug!(target: "providers::db", "Attempted to append empty block range");
3069 return Ok(())
3070 }
3071
3072 let first_number = blocks.first().unwrap().number();
3073
3074 let last = blocks.last().unwrap();
3075 let last_block_number = last.number();
3076
3077 let mut durations_recorder = metrics::DurationsRecorder::default();
3078
3079 for block in blocks {
3081 self.insert_block(block, StorageLocation::Database)?;
3082 durations_recorder.record_relative(metrics::Action::InsertBlock);
3083 }
3084
3085 self.write_state(execution_outcome, OriginalValuesKnown::No, StorageLocation::Database)?;
3086 durations_recorder.record_relative(metrics::Action::InsertState);
3087
3088 self.write_hashed_state(&hashed_state)?;
3090 self.write_trie_updates(&trie_updates)?;
3091 durations_recorder.record_relative(metrics::Action::InsertHashes);
3092
3093 self.update_history_indices(first_number..=last_block_number)?;
3094 durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3095
3096 self.update_pipeline_stages(last_block_number, false)?;
3098 durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3099
3100 debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3101
3102 Ok(())
3103 }
3104}
3105
3106impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3107 fn get_prune_checkpoint(
3108 &self,
3109 segment: PruneSegment,
3110 ) -> ProviderResult<Option<PruneCheckpoint>> {
3111 Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3112 }
3113
3114 fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3115 Ok(self
3116 .tx
3117 .cursor_read::<tables::PruneCheckpoints>()?
3118 .walk(None)?
3119 .collect::<Result<_, _>>()?)
3120 }
3121}
3122
3123impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3124 fn save_prune_checkpoint(
3125 &self,
3126 segment: PruneSegment,
3127 checkpoint: PruneCheckpoint,
3128 ) -> ProviderResult<()> {
3129 Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3130 }
3131}
3132
3133impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3134 fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3135 let db_entries = self.tx.entries::<T>()?;
3136 let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3137 Ok(entries) => entries,
3138 Err(ProviderError::UnsupportedProvider) => 0,
3139 Err(err) => return Err(err),
3140 };
3141
3142 Ok(db_entries + static_file_entries)
3143 }
3144}
3145
3146impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3147 fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3148 let mut finalized_blocks = self
3149 .tx
3150 .cursor_read::<tables::ChainState>()?
3151 .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3152 .take(1)
3153 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3154
3155 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3156 Ok(last_finalized_block_number)
3157 }
3158
3159 fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3160 let mut finalized_blocks = self
3161 .tx
3162 .cursor_read::<tables::ChainState>()?
3163 .walk(Some(tables::ChainStateKey::LastSafeBlockBlock))?
3164 .take(1)
3165 .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3166
3167 let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3168 Ok(last_finalized_block_number)
3169 }
3170}
3171
3172impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3173 fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3174 Ok(self
3175 .tx
3176 .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3177 }
3178
3179 fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3180 Ok(self
3181 .tx
3182 .put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlockBlock, block_number)?)
3183 }
3184}
3185
3186impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3187 type Tx = TX;
3188
3189 fn tx_ref(&self) -> &Self::Tx {
3190 &self.tx
3191 }
3192
3193 fn tx_mut(&mut self) -> &mut Self::Tx {
3194 &mut self.tx
3195 }
3196
3197 fn into_tx(self) -> Self::Tx {
3198 self.tx
3199 }
3200
3201 fn prune_modes_ref(&self) -> &PruneModes {
3202 self.prune_modes_ref()
3203 }
3204}