reth_provider/providers/database/
provider.rs

1use crate::{
2    bundle_state::StorageRevertsIter,
3    providers::{
4        database::{chain::ChainStorage, metrics},
5        static_file::StaticFileWriter,
6        NodeTypesForProvider, StaticFileProvider,
7    },
8    to_range,
9    traits::{
10        AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
11    },
12    AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
13    BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
14    DBProvider, EvmEnvProvider, HashingWriter, HeaderProvider, HeaderSyncGap,
15    HeaderSyncGapProvider, HistoricalStateProvider, HistoricalStateProviderRef, HistoryWriter,
16    LatestStateProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError,
17    PruneCheckpointReader, PruneCheckpointWriter, RevertsInit, StageCheckpointReader,
18    StateCommitmentProvider, StateProviderBox, StateWriter, StaticFileProviderFactory, StatsReader,
19    StorageLocation, StorageReader, StorageTrieWriter, TransactionVariant, TransactionsProvider,
20    TransactionsProviderExt, TrieWriter, WithdrawalsProvider,
21};
22use alloy_consensus::{BlockHeader, Header};
23use alloy_eips::{
24    eip2718::Encodable2718,
25    eip4895::{Withdrawal, Withdrawals},
26    BlockHashOrNumber,
27};
28use alloy_primitives::{
29    keccak256,
30    map::{hash_map, B256HashMap, HashMap, HashSet},
31    Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256,
32};
33use itertools::Itertools;
34use rayon::slice::ParallelSliceMut;
35use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, EthereumHardforks};
36use reth_db::{
37    cursor::DbDupCursorRW, tables, BlockNumberList, PlainAccountState, PlainStorageState,
38};
39use reth_db_api::{
40    cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
41    database::Database,
42    models::{
43        sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
44        ShardedKey, StoredBlockBodyIndices,
45    },
46    table::Table,
47    transaction::{DbTx, DbTxMut},
48    DatabaseError,
49};
50use reth_evm::ConfigureEvmEnv;
51use reth_execution_types::{Chain, ExecutionOutcome};
52use reth_network_p2p::headers::downloader::SyncTarget;
53use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
54use reth_primitives::{
55    Account, BlockExt, BlockWithSenders, Bytecode, GotExpected, NodePrimitives, SealedBlock,
56    SealedBlockFor, SealedBlockWithSenders, SealedHeader, StaticFileSegment, StorageEntry,
57    TransactionMeta,
58};
59use reth_primitives_traits::{Block as _, BlockBody as _, SignedTransaction};
60use reth_prune_types::{PruneCheckpoint, PruneModes, PruneSegment};
61use reth_stages_types::{StageCheckpoint, StageId};
62use reth_storage_api::{
63    BlockBodyIndicesProvider, BlockBodyReader, NodePrimitivesProvider, OmmersProvider,
64    StateProvider, StorageChangeSetReader, TryIntoHistoricalStateProvider,
65};
66use reth_storage_errors::provider::{ProviderResult, RootMismatch};
67use reth_trie::{
68    prefix_set::{PrefixSet, PrefixSetMut, TriePrefixSets},
69    updates::{StorageTrieUpdates, TrieUpdates},
70    HashedPostStateSorted, Nibbles, StateRoot, StoredNibbles,
71};
72use reth_trie_db::{DatabaseStateRoot, DatabaseStorageTrieCursor};
73use revm::{
74    db::states::{PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset},
75    primitives::{BlockEnv, CfgEnvWithHandlerCfg, FlaggedStorage},
76};
77use std::{
78    cmp::Ordering,
79    collections::{BTreeMap, BTreeSet},
80    fmt::Debug,
81    ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive},
82    sync::{mpsc, Arc},
83};
84use tokio::sync::watch;
85use tracing::{debug, trace};
86
87/// A [`DatabaseProvider`] that holds a read-only database transaction.
88pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
89
90/// A [`DatabaseProvider`] that holds a read-write database transaction.
91///
92/// Ideally this would be an alias type. However, there's some weird compiler error (<https://github.com/rust-lang/rust/issues/102211>), that forces us to wrap this in a struct instead.
93/// Once that issue is solved, we can probably revert back to being an alias type.
94#[derive(Debug)]
95pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
96    pub DatabaseProvider<<DB as Database>::TXMut, N>,
97);
98
99impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
100    type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
101
102    fn deref(&self) -> &Self::Target {
103        &self.0
104    }
105}
106
107impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
108    fn deref_mut(&mut self) -> &mut Self::Target {
109        &mut self.0
110    }
111}
112
113impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
114    for DatabaseProviderRW<DB, N>
115{
116    fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
117        &self.0
118    }
119}
120
121impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
122    /// Commit database transaction and static file if it exists.
123    pub fn commit(self) -> ProviderResult<bool> {
124        self.0.commit()
125    }
126
127    /// Consume `DbTx` or `DbTxMut`.
128    pub fn into_tx(self) -> <DB as Database>::TXMut {
129        self.0.into_tx()
130    }
131}
132
133impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
134    for DatabaseProvider<<DB as Database>::TXMut, N>
135{
136    fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
137        provider.0
138    }
139}
140
141/// A provider struct that fetches data from the database.
142/// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`]
143#[derive(Debug)]
144pub struct DatabaseProvider<TX, N: NodeTypes> {
145    /// Database transaction.
146    tx: TX,
147    /// Chain spec
148    chain_spec: Arc<N::ChainSpec>,
149    /// Static File provider
150    static_file_provider: StaticFileProvider<N::Primitives>,
151    /// Pruning configuration
152    prune_modes: PruneModes,
153    /// Node storage handler.
154    storage: Arc<N::Storage>,
155}
156
157impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
158    /// Returns reference to prune modes.
159    pub const fn prune_modes_ref(&self) -> &PruneModes {
160        &self.prune_modes
161    }
162}
163
164impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
165    /// State provider for latest state
166    pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
167        trace!(target: "providers::db", "Returning latest state provider");
168        Box::new(LatestStateProviderRef::new(self))
169    }
170
171    /// Storage provider for state at that given block hash
172    pub fn history_by_block_hash<'a>(
173        &'a self,
174        block_hash: BlockHash,
175    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
176        let mut block_number =
177            self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
178        if block_number == self.best_block_number().unwrap_or_default() &&
179            block_number == self.last_block_number().unwrap_or_default()
180        {
181            return Ok(Box::new(LatestStateProviderRef::new(self)))
182        }
183
184        // +1 as the changeset that we want is the one that was applied after this block.
185        block_number += 1;
186
187        let account_history_prune_checkpoint =
188            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
189        let storage_history_prune_checkpoint =
190            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
191
192        let mut state_provider = HistoricalStateProviderRef::new(self, block_number);
193
194        // If we pruned account or storage history, we can't return state on every historical block.
195        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
196        if let Some(prune_checkpoint_block_number) =
197            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
198        {
199            state_provider = state_provider.with_lowest_available_account_history_block_number(
200                prune_checkpoint_block_number + 1,
201            );
202        }
203        if let Some(prune_checkpoint_block_number) =
204            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
205        {
206            state_provider = state_provider.with_lowest_available_storage_history_block_number(
207                prune_checkpoint_block_number + 1,
208            );
209        }
210
211        Ok(Box::new(state_provider))
212    }
213
214    #[cfg(feature = "test-utils")]
215    /// Sets the prune modes for provider.
216    pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
217        self.prune_modes = prune_modes;
218    }
219}
220
221impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
222    type Primitives = N::Primitives;
223}
224
225impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
226    /// Returns a static file provider
227    fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
228        self.static_file_provider.clone()
229    }
230}
231
232impl<TX: Send + Sync, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
233    for DatabaseProvider<TX, N>
234{
235    type ChainSpec = N::ChainSpec;
236
237    fn chain_spec(&self) -> Arc<Self::ChainSpec> {
238        self.chain_spec.clone()
239    }
240}
241
242impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
243    /// Creates a provider with an inner read-write transaction.
244    pub const fn new_rw(
245        tx: TX,
246        chain_spec: Arc<N::ChainSpec>,
247        static_file_provider: StaticFileProvider<N::Primitives>,
248        prune_modes: PruneModes,
249        storage: Arc<N::Storage>,
250    ) -> Self {
251        Self { tx, chain_spec, static_file_provider, prune_modes, storage }
252    }
253}
254
255impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
256    fn as_ref(&self) -> &Self {
257        self
258    }
259}
260
261impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
262    /// Unwinds trie state for the given range.
263    ///
264    /// This includes calculating the resulted state root and comparing it with the parent block
265    /// state root.
266    pub fn unwind_trie_state_range(
267        &self,
268        range: RangeInclusive<BlockNumber>,
269    ) -> ProviderResult<()> {
270        let changed_accounts = self
271            .tx
272            .cursor_read::<tables::AccountChangeSets>()?
273            .walk_range(range.clone())?
274            .collect::<Result<Vec<_>, _>>()?;
275
276        // Unwind account hashes. Add changed accounts to account prefix set.
277        let hashed_addresses = self.unwind_account_hashing(changed_accounts.iter())?;
278        let mut account_prefix_set = PrefixSetMut::with_capacity(hashed_addresses.len());
279        let mut destroyed_accounts = HashSet::default();
280        for (hashed_address, account) in hashed_addresses {
281            account_prefix_set.insert(Nibbles::unpack(hashed_address));
282            if account.is_none() {
283                destroyed_accounts.insert(hashed_address);
284            }
285        }
286
287        // Unwind account history indices.
288        self.unwind_account_history_indices(changed_accounts.iter())?;
289        let storage_range = BlockNumberAddress::range(range.clone());
290
291        let changed_storages = self
292            .tx
293            .cursor_read::<tables::StorageChangeSets>()?
294            .walk_range(storage_range)?
295            .collect::<Result<Vec<_>, _>>()?;
296
297        // Unwind storage hashes. Add changed account and storage keys to corresponding prefix
298        // sets.
299        let mut storage_prefix_sets = B256HashMap::<PrefixSet>::default();
300        let storage_entries = self.unwind_storage_hashing(changed_storages.iter().copied())?;
301        for (hashed_address, hashed_slots) in storage_entries {
302            account_prefix_set.insert(Nibbles::unpack(hashed_address));
303            let mut storage_prefix_set = PrefixSetMut::with_capacity(hashed_slots.len());
304            for slot in hashed_slots {
305                storage_prefix_set.insert(Nibbles::unpack(slot));
306            }
307            storage_prefix_sets.insert(hashed_address, storage_prefix_set.freeze());
308        }
309
310        // Unwind storage history indices.
311        self.unwind_storage_history_indices(changed_storages.iter().copied())?;
312
313        // Calculate the reverted merkle root.
314        // This is the same as `StateRoot::incremental_root_with_updates`, only the prefix sets
315        // are pre-loaded.
316        let prefix_sets = TriePrefixSets {
317            account_prefix_set: account_prefix_set.freeze(),
318            storage_prefix_sets,
319            destroyed_accounts,
320        };
321        let (new_state_root, trie_updates) = StateRoot::from_tx(&self.tx)
322            .with_prefix_sets(prefix_sets)
323            .root_with_updates()
324            .map_err(reth_db::DatabaseError::from)?;
325
326        let parent_number = range.start().saturating_sub(1);
327        let parent_state_root = self
328            .header_by_number(parent_number)?
329            .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?
330            .state_root();
331
332        // state root should be always correct as we are reverting state.
333        // but for sake of double verification we will check it again.
334        if new_state_root != parent_state_root {
335            let parent_hash = self
336                .block_hash(parent_number)?
337                .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?;
338            return Err(ProviderError::UnwindStateRootMismatch(Box::new(RootMismatch {
339                root: GotExpected { got: new_state_root, expected: parent_state_root },
340                block_number: parent_number,
341                block_hash: parent_hash,
342            })))
343        }
344        self.write_trie_updates(&trie_updates)?;
345
346        Ok(())
347    }
348
349    /// Removes receipts from all transactions starting with provided number (inclusive).
350    fn remove_receipts_from(
351        &self,
352        from_tx: TxNumber,
353        last_block: BlockNumber,
354        remove_from: StorageLocation,
355    ) -> ProviderResult<()> {
356        if remove_from.database() {
357            // iterate over block body and remove receipts
358            self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
359        }
360
361        if remove_from.static_files() && !self.prune_modes.has_receipts_pruning() {
362            let static_file_receipt_num =
363                self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
364
365            let to_delete = static_file_receipt_num
366                .map(|static_num| (static_num + 1).saturating_sub(from_tx))
367                .unwrap_or_default();
368
369            self.static_file_provider
370                .latest_writer(StaticFileSegment::Receipts)?
371                .prune_receipts(to_delete, last_block)?;
372        }
373
374        Ok(())
375    }
376}
377
378impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
379    fn try_into_history_at_block(
380        self,
381        mut block_number: BlockNumber,
382    ) -> ProviderResult<StateProviderBox> {
383        if block_number == self.best_block_number().unwrap_or_default() &&
384            block_number == self.last_block_number().unwrap_or_default()
385        {
386            return Ok(Box::new(LatestStateProvider::new(self)))
387        }
388
389        // +1 as the changeset that we want is the one that was applied after this block.
390        block_number += 1;
391
392        let account_history_prune_checkpoint =
393            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
394        let storage_history_prune_checkpoint =
395            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
396
397        let mut state_provider = HistoricalStateProvider::new(self, block_number);
398
399        // If we pruned account or storage history, we can't return state on every historical block.
400        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
401        if let Some(prune_checkpoint_block_number) =
402            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
403        {
404            state_provider = state_provider.with_lowest_available_account_history_block_number(
405                prune_checkpoint_block_number + 1,
406            );
407        }
408        if let Some(prune_checkpoint_block_number) =
409            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
410        {
411            state_provider = state_provider.with_lowest_available_storage_history_block_number(
412                prune_checkpoint_block_number + 1,
413            );
414        }
415
416        Ok(Box::new(state_provider))
417    }
418}
419
420impl<TX: DbTx + 'static, N: NodeTypes> StateCommitmentProvider for DatabaseProvider<TX, N> {
421    type StateCommitment = N::StateCommitment;
422}
423
424impl<
425        Tx: DbTx + DbTxMut + 'static,
426        N: NodeTypesForProvider<Primitives: NodePrimitives<BlockHeader = Header>>,
427    > DatabaseProvider<Tx, N>
428{
429    // TODO: uncomment below, once `reth debug_cmd` has been feature gated with dev.
430    // #[cfg(any(test, feature = "test-utils"))]
431    /// Inserts an historical block. **Used for setting up test environments**
432    pub fn insert_historical_block(
433        &self,
434        block: SealedBlockWithSenders<<Self as BlockWriter>::Block>,
435    ) -> ProviderResult<StoredBlockBodyIndices> {
436        let ttd = if block.number == 0 {
437            block.difficulty
438        } else {
439            let parent_block_number = block.number - 1;
440            let parent_ttd = self.header_td_by_number(parent_block_number)?.unwrap_or_default();
441            parent_ttd + block.difficulty
442        };
443
444        let mut writer = self.static_file_provider.latest_writer(StaticFileSegment::Headers)?;
445
446        // Backfill: some tests start at a forward block number, but static files require no gaps.
447        let segment_header = writer.user_header();
448        if segment_header.block_end().is_none() && segment_header.expected_block_start() == 0 {
449            for block_number in 0..block.number {
450                let mut prev = block.header.clone().unseal();
451                prev.number = block_number;
452                writer.append_header(&prev, U256::ZERO, &B256::ZERO)?;
453            }
454        }
455
456        writer.append_header(block.header.as_ref(), ttd, &block.hash())?;
457
458        self.insert_block(block, StorageLocation::Database)
459    }
460}
461
462/// For a given key, unwind all history shards that are below the given block number.
463///
464/// S - Sharded key subtype.
465/// T - Table to walk over.
466/// C - Cursor implementation.
467///
468/// This function walks the entries from the given start key and deletes all shards that belong to
469/// the key and are below the given block number.
470///
471/// The boundary shard (the shard is split by the block number) is removed from the database. Any
472/// indices that are above the block number are filtered out. The boundary shard is returned for
473/// reinsertion (if it's not empty).
474fn unwind_history_shards<S, T, C>(
475    cursor: &mut C,
476    start_key: T::Key,
477    block_number: BlockNumber,
478    mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
479) -> ProviderResult<Vec<u64>>
480where
481    T: Table<Value = BlockNumberList>,
482    T::Key: AsRef<ShardedKey<S>>,
483    C: DbCursorRO<T> + DbCursorRW<T>,
484{
485    let mut item = cursor.seek_exact(start_key)?;
486    while let Some((sharded_key, list)) = item {
487        // If the shard does not belong to the key, break.
488        if !shard_belongs_to_key(&sharded_key) {
489            break
490        }
491        cursor.delete_current()?;
492
493        // Check the first item.
494        // If it is greater or eq to the block number, delete it.
495        let first = list.iter().next().expect("List can't be empty");
496        if first >= block_number {
497            item = cursor.prev()?;
498            continue
499        } else if block_number <= sharded_key.as_ref().highest_block_number {
500            // Filter out all elements greater than block number.
501            return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
502        }
503        return Ok(list.iter().collect::<Vec<_>>())
504    }
505
506    Ok(Vec::new())
507}
508
509impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
510    /// Creates a provider with an inner read-only transaction.
511    pub const fn new(
512        tx: TX,
513        chain_spec: Arc<N::ChainSpec>,
514        static_file_provider: StaticFileProvider<N::Primitives>,
515        prune_modes: PruneModes,
516        storage: Arc<N::Storage>,
517    ) -> Self {
518        Self { tx, chain_spec, static_file_provider, prune_modes, storage }
519    }
520
521    /// Consume `DbTx` or `DbTxMut`.
522    pub fn into_tx(self) -> TX {
523        self.tx
524    }
525
526    /// Pass `DbTx` or `DbTxMut` mutable reference.
527    pub fn tx_mut(&mut self) -> &mut TX {
528        &mut self.tx
529    }
530
531    /// Pass `DbTx` or `DbTxMut` immutable reference.
532    pub const fn tx_ref(&self) -> &TX {
533        &self.tx
534    }
535
536    /// Returns a reference to the chain specification.
537    pub fn chain_spec(&self) -> &N::ChainSpec {
538        &self.chain_spec
539    }
540}
541
542impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
543    fn transactions_by_tx_range_with_cursor<C>(
544        &self,
545        range: impl RangeBounds<TxNumber>,
546        cursor: &mut C,
547    ) -> ProviderResult<Vec<TxTy<N>>>
548    where
549        C: DbCursorRO<tables::Transactions<TxTy<N>>>,
550    {
551        self.static_file_provider.get_range_with_static_file_or_database(
552            StaticFileSegment::Transactions,
553            to_range(range),
554            |static_file, range, _| static_file.transactions_by_tx_range(range),
555            |range, _| self.cursor_collect(cursor, range),
556            |_| true,
557        )
558    }
559
560    fn block_with_senders<H, HF, B, BF>(
561        &self,
562        id: BlockHashOrNumber,
563        _transaction_kind: TransactionVariant,
564        header_by_number: HF,
565        construct_block: BF,
566    ) -> ProviderResult<Option<B>>
567    where
568        H: AsRef<HeaderTy<N>>,
569        HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
570        BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
571    {
572        let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
573        let Some(header) = header_by_number(block_number)? else { return Ok(None) };
574
575        // Get the block body
576        //
577        // If the body indices are not found, this means that the transactions either do not exist
578        // in the database yet, or they do exit but are not indexed. If they exist but are not
579        // indexed, we don't have enough information to return the block anyways, so we return
580        // `None`.
581        let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
582
583        let tx_range = body.tx_num_range();
584
585        let (transactions, senders) = if tx_range.is_empty() {
586            (vec![], vec![])
587        } else {
588            (self.transactions_by_tx_range(tx_range.clone())?, self.senders_by_tx_range(tx_range)?)
589        };
590
591        let body = self
592            .storage
593            .reader()
594            .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
595            .pop()
596            .ok_or(ProviderError::InvalidStorageOutput)?;
597
598        construct_block(header, body, senders)
599    }
600
601    /// Returns a range of blocks from the database.
602    ///
603    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
604    /// construct blocks from the following inputs:
605    ///     – Header
606    ///     - Range of transaction numbers
607    ///     – Ommers
608    ///     – Withdrawals
609    ///     – Senders
610    fn block_range<F, H, HF, R>(
611        &self,
612        range: RangeInclusive<BlockNumber>,
613        headers_range: HF,
614        mut assemble_block: F,
615    ) -> ProviderResult<Vec<R>>
616    where
617        H: AsRef<HeaderTy<N>>,
618        HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
619        F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
620    {
621        if range.is_empty() {
622            return Ok(Vec::new())
623        }
624
625        let len = range.end().saturating_sub(*range.start()) as usize;
626        let mut blocks = Vec::with_capacity(len);
627
628        let headers = headers_range(range)?;
629        let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<TxTy<N>>>()?;
630        let mut block_body_cursor = self.tx.cursor_read::<tables::BlockBodyIndices>()?;
631
632        let mut present_headers = Vec::new();
633        for header in headers {
634            // If the body indices are not found, this means that the transactions either do
635            // not exist in the database yet, or they do exit but are
636            // not indexed. If they exist but are not indexed, we don't
637            // have enough information to return the block anyways, so
638            // we skip the block.
639            if let Some((_, block_body_indices)) =
640                block_body_cursor.seek_exact(header.as_ref().number())?
641            {
642                let tx_range = block_body_indices.tx_num_range();
643                present_headers.push((header, tx_range));
644            }
645        }
646
647        let mut inputs = Vec::new();
648        for (header, tx_range) in &present_headers {
649            let transactions = if tx_range.is_empty() {
650                Vec::new()
651            } else {
652                self.transactions_by_tx_range_with_cursor(tx_range.clone(), &mut tx_cursor)?
653            };
654
655            inputs.push((header.as_ref(), transactions));
656        }
657
658        let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
659
660        for ((header, tx_range), body) in present_headers.into_iter().zip(bodies) {
661            blocks.push(assemble_block(header, body, tx_range)?);
662        }
663
664        Ok(blocks)
665    }
666
667    /// Returns a range of blocks from the database, along with the senders of each
668    /// transaction in the blocks.
669    ///
670    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
671    /// construct blocks from the following inputs:
672    ///     – Header
673    ///     - Transactions
674    ///     – Ommers
675    ///     – Withdrawals
676    ///     – Senders
677    fn block_with_senders_range<H, HF, B, BF>(
678        &self,
679        range: RangeInclusive<BlockNumber>,
680        headers_range: HF,
681        assemble_block: BF,
682    ) -> ProviderResult<Vec<B>>
683    where
684        H: AsRef<HeaderTy<N>>,
685        HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
686        BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
687    {
688        let mut senders_cursor = self.tx.cursor_read::<tables::TransactionSenders>()?;
689
690        self.block_range(range, headers_range, |header, body, tx_range| {
691            let senders = if tx_range.is_empty() {
692                Vec::new()
693            } else {
694                // fetch senders from the senders table
695                let known_senders =
696                    senders_cursor
697                        .walk_range(tx_range.clone())?
698                        .collect::<Result<HashMap<_, _>, _>>()?;
699
700                let mut senders = Vec::with_capacity(body.transactions().len());
701                for (tx_num, tx) in tx_range.zip(body.transactions()) {
702                    match known_senders.get(&tx_num) {
703                        None => {
704                            // recover the sender from the transaction if not found
705                            let sender = tx
706                                .recover_signer_unchecked()
707                                .ok_or(ProviderError::SenderRecoveryError)?;
708                            senders.push(sender);
709                        }
710                        Some(sender) => senders.push(*sender),
711                    }
712                }
713
714                senders
715            };
716
717            assemble_block(header, body, senders)
718        })
719    }
720
721    /// Populate a [`BundleStateInit`] and [`RevertsInit`] using cursors over the
722    /// [`PlainAccountState`] and [`PlainStorageState`] tables, based on the given storage and
723    /// account changesets.
724    fn populate_bundle_state<A, S>(
725        &self,
726        account_changeset: Vec<(u64, AccountBeforeTx)>,
727        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
728        plain_accounts_cursor: &mut A,
729        plain_storage_cursor: &mut S,
730    ) -> ProviderResult<(BundleStateInit, RevertsInit)>
731    where
732        A: DbCursorRO<PlainAccountState>,
733        S: DbDupCursorRO<PlainStorageState>,
734    {
735        // iterate previous value and get plain state value to create changeset
736        // Double option around Account represent if Account state is know (first option) and
737        // account is removed (Second Option)
738        let mut state: BundleStateInit = HashMap::default();
739
740        // This is not working for blocks that are not at tip. as plain state is not the last
741        // state of end range. We should rename the functions or add support to access
742        // History state. Accessing history state can be tricky but we are not gaining
743        // anything.
744
745        let mut reverts: RevertsInit = HashMap::default();
746
747        // add account changeset changes
748        for (block_number, account_before) in account_changeset.into_iter().rev() {
749            let AccountBeforeTx { info: old_info, address } = account_before;
750            match state.entry(address) {
751                hash_map::Entry::Vacant(entry) => {
752                    let new_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
753                    entry.insert((old_info, new_info, HashMap::default()));
754                }
755                hash_map::Entry::Occupied(mut entry) => {
756                    // overwrite old account state.
757                    entry.get_mut().0 = old_info;
758                }
759            }
760            // insert old info into reverts.
761            reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
762        }
763
764        // add storage changeset changes
765        for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
766            let BlockNumberAddress((block_number, address)) = block_and_address;
767            // get account state or insert from plain state.
768            let account_state = match state.entry(address) {
769                hash_map::Entry::Vacant(entry) => {
770                    let present_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
771                    entry.insert((present_info, present_info, HashMap::default()))
772                }
773                hash_map::Entry::Occupied(entry) => entry.into_mut(),
774            };
775
776            // match storage.
777            match account_state.2.entry(old_storage.key) {
778                hash_map::Entry::Vacant(entry) => {
779                    let new_storage = plain_storage_cursor
780                        .seek_by_key_subkey(address, old_storage.key)?
781                        .filter(|storage| storage.key == old_storage.key)
782                        .unwrap_or_default();
783                    entry.insert((
784                        (old_storage.value, old_storage.is_private),
785                        (new_storage.value, new_storage.is_private),
786                    ));
787                }
788                hash_map::Entry::Occupied(mut entry) => {
789                    entry.get_mut().0 = (old_storage.value, old_storage.is_private);
790                }
791            };
792
793            reverts
794                .entry(block_number)
795                .or_default()
796                .entry(address)
797                .or_default()
798                .1
799                .push(old_storage);
800        }
801
802        Ok((state, reverts))
803    }
804}
805
806impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
807    /// Commit database transaction.
808    pub fn commit(self) -> ProviderResult<bool> {
809        Ok(self.tx.commit()?)
810    }
811
812    /// Load shard and remove it. If list is empty, last shard was full or
813    /// there are no shards at all.
814    fn take_shard<T>(&self, key: T::Key) -> ProviderResult<Vec<u64>>
815    where
816        T: Table<Value = BlockNumberList>,
817    {
818        let mut cursor = self.tx.cursor_read::<T>()?;
819        let shard = cursor.seek_exact(key)?;
820        if let Some((shard_key, list)) = shard {
821            // delete old shard so new one can be inserted.
822            self.tx.delete::<T>(shard_key, None)?;
823            let list = list.iter().collect::<Vec<_>>();
824            return Ok(list)
825        }
826        Ok(Vec::new())
827    }
828
829    /// Insert history index to the database.
830    ///
831    /// For each updated partial key, this function removes the last shard from
832    /// the database (if any), appends the new indices to it, chunks the resulting integer list and
833    /// inserts the new shards back into the database.
834    ///
835    /// This function is used by history indexing stages.
836    fn append_history_index<P, T>(
837        &self,
838        index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
839        mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
840    ) -> ProviderResult<()>
841    where
842        P: Copy,
843        T: Table<Value = BlockNumberList>,
844    {
845        for (partial_key, indices) in index_updates {
846            let mut last_shard =
847                self.take_shard::<T>(sharded_key_factory(partial_key, u64::MAX))?;
848            last_shard.extend(indices);
849            // Chunk indices and insert them in shards of N size.
850            let indices = last_shard;
851            let mut chunks = indices.chunks(sharded_key::NUM_OF_INDICES_IN_SHARD).peekable();
852            while let Some(list) = chunks.next() {
853                let highest_block_number = if chunks.peek().is_some() {
854                    *list.last().expect("`chunks` does not return empty list")
855                } else {
856                    // Insert last list with `u64::MAX`.
857                    u64::MAX
858                };
859                self.tx.put::<T>(
860                    sharded_key_factory(partial_key, highest_block_number),
861                    BlockNumberList::new_pre_sorted(list.iter().copied()),
862                )?;
863            }
864        }
865        Ok(())
866    }
867}
868
869impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
870    fn basic_account(&self, address: Address) -> ProviderResult<Option<Account>> {
871        Ok(self.tx.get::<tables::PlainAccountState>(address)?)
872    }
873}
874
875impl<TX: DbTx, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
876    fn changed_accounts_with_range(
877        &self,
878        range: impl RangeBounds<BlockNumber>,
879    ) -> ProviderResult<BTreeSet<Address>> {
880        self.tx
881            .cursor_read::<tables::AccountChangeSets>()?
882            .walk_range(range)?
883            .map(|entry| {
884                entry.map(|(_, account_before)| account_before.address).map_err(Into::into)
885            })
886            .collect()
887    }
888
889    fn basic_accounts(
890        &self,
891        iter: impl IntoIterator<Item = Address>,
892    ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
893        let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
894        Ok(iter
895            .into_iter()
896            .map(|address| plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v))))
897            .collect::<Result<Vec<_>, _>>()?)
898    }
899
900    fn changed_accounts_and_blocks_with_range(
901        &self,
902        range: RangeInclusive<BlockNumber>,
903    ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
904        let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
905
906        let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
907            BTreeMap::new(),
908            |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
909                let (index, account) = entry?;
910                accounts.entry(account.address).or_default().push(index);
911                Ok(accounts)
912            },
913        )?;
914
915        Ok(account_transitions)
916    }
917}
918
919impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
920    fn storage_changeset(
921        &self,
922        block_number: BlockNumber,
923    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
924        let range = block_number..=block_number;
925        let storage_range = BlockNumberAddress::range(range);
926        self.tx
927            .cursor_dup_read::<tables::StorageChangeSets>()?
928            .walk_range(storage_range)?
929            .map(|result| -> ProviderResult<_> { Ok(result?) })
930            .collect()
931    }
932}
933
934impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
935    fn account_block_changeset(
936        &self,
937        block_number: BlockNumber,
938    ) -> ProviderResult<Vec<AccountBeforeTx>> {
939        let range = block_number..=block_number;
940        self.tx
941            .cursor_read::<tables::AccountChangeSets>()?
942            .walk_range(range)?
943            .map(|result| -> ProviderResult<_> {
944                let (_, account_before) = result?;
945                Ok(account_before)
946            })
947            .collect()
948    }
949}
950
951impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
952    for DatabaseProvider<TX, N>
953{
954    type Header = HeaderTy<N>;
955
956    fn sync_gap(
957        &self,
958        tip: watch::Receiver<B256>,
959        highest_uninterrupted_block: BlockNumber,
960    ) -> ProviderResult<HeaderSyncGap<Self::Header>> {
961        let static_file_provider = self.static_file_provider();
962
963        // Make sure Headers static file is at the same height. If it's further, this
964        // input execution was interrupted previously and we need to unwind the static file.
965        let next_static_file_block_num = static_file_provider
966            .get_highest_static_file_block(StaticFileSegment::Headers)
967            .map(|id| id + 1)
968            .unwrap_or_default();
969        let next_block = highest_uninterrupted_block + 1;
970
971        match next_static_file_block_num.cmp(&next_block) {
972            // The node shutdown between an executed static file commit and before the database
973            // commit, so we need to unwind the static files.
974            Ordering::Greater => {
975                let mut static_file_producer =
976                    static_file_provider.latest_writer(StaticFileSegment::Headers)?;
977                static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
978                // Since this is a database <-> static file inconsistency, we commit the change
979                // straight away.
980                static_file_producer.commit()?
981            }
982            Ordering::Less => {
983                // There's either missing or corrupted files.
984                return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
985            }
986            Ordering::Equal => {}
987        }
988
989        let local_head = static_file_provider
990            .sealed_header(highest_uninterrupted_block)?
991            .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
992
993        let target = SyncTarget::Tip(*tip.borrow());
994
995        Ok(HeaderSyncGap { local_head, target })
996    }
997}
998
999impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
1000    type Header = HeaderTy<N>;
1001
1002    fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
1003        if let Some(num) = self.block_number(*block_hash)? {
1004            Ok(self.header_by_number(num)?)
1005        } else {
1006            Ok(None)
1007        }
1008    }
1009
1010    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1011        self.static_file_provider.get_with_static_file_or_database(
1012            StaticFileSegment::Headers,
1013            num,
1014            |static_file| static_file.header_by_number(num),
1015            || Ok(self.tx.get::<tables::Headers<Self::Header>>(num)?),
1016        )
1017    }
1018
1019    fn header_td(&self, block_hash: &BlockHash) -> ProviderResult<Option<U256>> {
1020        if let Some(num) = self.block_number(*block_hash)? {
1021            self.header_td_by_number(num)
1022        } else {
1023            Ok(None)
1024        }
1025    }
1026
1027    fn header_td_by_number(&self, number: BlockNumber) -> ProviderResult<Option<U256>> {
1028        if let Some(td) = self.chain_spec.final_paris_total_difficulty(number) {
1029            // if this block is higher than the final paris(merge) block, return the final paris
1030            // difficulty
1031            return Ok(Some(td))
1032        }
1033
1034        self.static_file_provider.get_with_static_file_or_database(
1035            StaticFileSegment::Headers,
1036            number,
1037            |static_file| static_file.header_td_by_number(number),
1038            || Ok(self.tx.get::<tables::HeaderTerminalDifficulties>(number)?.map(|td| td.0)),
1039        )
1040    }
1041
1042    fn headers_range(
1043        &self,
1044        range: impl RangeBounds<BlockNumber>,
1045    ) -> ProviderResult<Vec<Self::Header>> {
1046        self.static_file_provider.get_range_with_static_file_or_database(
1047            StaticFileSegment::Headers,
1048            to_range(range),
1049            |static_file, range, _| static_file.headers_range(range),
1050            |range, _| {
1051                self.cursor_read_collect::<tables::Headers<Self::Header>>(range).map_err(Into::into)
1052            },
1053            |_| true,
1054        )
1055    }
1056
1057    fn sealed_header(
1058        &self,
1059        number: BlockNumber,
1060    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1061        self.static_file_provider.get_with_static_file_or_database(
1062            StaticFileSegment::Headers,
1063            number,
1064            |static_file| static_file.sealed_header(number),
1065            || {
1066                if let Some(header) = self.header_by_number(number)? {
1067                    let hash = self
1068                        .block_hash(number)?
1069                        .ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
1070                    Ok(Some(SealedHeader::new(header, hash)))
1071                } else {
1072                    Ok(None)
1073                }
1074            },
1075        )
1076    }
1077
1078    fn sealed_headers_while(
1079        &self,
1080        range: impl RangeBounds<BlockNumber>,
1081        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1082    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1083        self.static_file_provider.get_range_with_static_file_or_database(
1084            StaticFileSegment::Headers,
1085            to_range(range),
1086            |static_file, range, predicate| static_file.sealed_headers_while(range, predicate),
1087            |range, mut predicate| {
1088                let mut headers = vec![];
1089                for entry in
1090                    self.tx.cursor_read::<tables::Headers<Self::Header>>()?.walk_range(range)?
1091                {
1092                    let (number, header) = entry?;
1093                    let hash = self
1094                        .block_hash(number)?
1095                        .ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
1096                    let sealed = SealedHeader::new(header, hash);
1097                    if !predicate(&sealed) {
1098                        break
1099                    }
1100                    headers.push(sealed);
1101                }
1102                Ok(headers)
1103            },
1104            predicate,
1105        )
1106    }
1107}
1108
1109impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1110    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1111        self.static_file_provider.get_with_static_file_or_database(
1112            StaticFileSegment::Headers,
1113            number,
1114            |static_file| static_file.block_hash(number),
1115            || Ok(self.tx.get::<tables::CanonicalHeaders>(number)?),
1116        )
1117    }
1118
1119    fn canonical_hashes_range(
1120        &self,
1121        start: BlockNumber,
1122        end: BlockNumber,
1123    ) -> ProviderResult<Vec<B256>> {
1124        self.static_file_provider.get_range_with_static_file_or_database(
1125            StaticFileSegment::Headers,
1126            start..end,
1127            |static_file, range, _| static_file.canonical_hashes_range(range.start, range.end),
1128            |range, _| {
1129                self.cursor_read_collect::<tables::CanonicalHeaders>(range).map_err(Into::into)
1130            },
1131            |_| true,
1132        )
1133    }
1134}
1135
1136impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1137    fn chain_info(&self) -> ProviderResult<ChainInfo> {
1138        let best_number = self.best_block_number()?;
1139        let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1140        Ok(ChainInfo { best_hash, best_number })
1141    }
1142
1143    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1144        Ok(self
1145            .get_stage_checkpoint(StageId::Finish)?
1146            .map(|checkpoint| checkpoint.block_number)
1147            .unwrap_or_default())
1148    }
1149
1150    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1151        Ok(self
1152            .tx
1153            .cursor_read::<tables::CanonicalHeaders>()?
1154            .last()?
1155            .map(|(num, _)| num)
1156            .max(
1157                self.static_file_provider.get_highest_static_file_block(StaticFileSegment::Headers),
1158            )
1159            .unwrap_or_default())
1160    }
1161
1162    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1163        Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1164    }
1165}
1166
1167impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1168    type Block = BlockTy<N>;
1169
1170    fn find_block_by_hash(
1171        &self,
1172        hash: B256,
1173        source: BlockSource,
1174    ) -> ProviderResult<Option<Self::Block>> {
1175        if source.is_canonical() {
1176            self.block(hash.into())
1177        } else {
1178            Ok(None)
1179        }
1180    }
1181
1182    /// Returns the block with matching number from database.
1183    ///
1184    /// If the header for this block is not found, this returns `None`.
1185    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1186    /// will return None.
1187    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1188        if let Some(number) = self.convert_hash_or_number(id)? {
1189            if let Some(header) = self.header_by_number(number)? {
1190                // If the body indices are not found, this means that the transactions either do not
1191                // exist in the database yet, or they do exit but are not indexed.
1192                // If they exist but are not indexed, we don't have enough
1193                // information to return the block anyways, so we return `None`.
1194                let Some(transactions) = self.transactions_by_block(number.into())? else {
1195                    return Ok(None)
1196                };
1197
1198                let body = self
1199                    .storage
1200                    .reader()
1201                    .read_block_bodies(self, vec![(&header, transactions)])?
1202                    .pop()
1203                    .ok_or(ProviderError::InvalidStorageOutput)?;
1204
1205                return Ok(Some(Self::Block::new(header, body)))
1206            }
1207        }
1208
1209        Ok(None)
1210    }
1211
1212    fn pending_block(&self) -> ProviderResult<Option<SealedBlockFor<Self::Block>>> {
1213        Ok(None)
1214    }
1215
1216    fn pending_block_with_senders(
1217        &self,
1218    ) -> ProviderResult<Option<SealedBlockWithSenders<Self::Block>>> {
1219        Ok(None)
1220    }
1221
1222    fn pending_block_and_receipts(
1223        &self,
1224    ) -> ProviderResult<Option<(SealedBlockFor<Self::Block>, Vec<Self::Receipt>)>> {
1225        Ok(None)
1226    }
1227
1228    /// Returns the block with senders with matching number or hash from database.
1229    ///
1230    /// **NOTE: The transactions have invalid hashes, since they would need to be calculated on the
1231    /// spot, and we want fast querying.**
1232    ///
1233    /// If the header for this block is not found, this returns `None`.
1234    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1235    /// will return None.
1236    fn block_with_senders(
1237        &self,
1238        id: BlockHashOrNumber,
1239        transaction_kind: TransactionVariant,
1240    ) -> ProviderResult<Option<BlockWithSenders<Self::Block>>> {
1241        self.block_with_senders(
1242            id,
1243            transaction_kind,
1244            |block_number| self.header_by_number(block_number),
1245            |header, body, senders| {
1246                Self::Block::new(header, body)
1247                    // Note: we're using unchecked here because we know the block contains valid txs
1248                    // wrt to its height and can ignore the s value check so pre
1249                    // EIP-2 txs are allowed
1250                    .try_with_senders_unchecked(senders)
1251                    .map(Some)
1252                    .map_err(|_| ProviderError::SenderRecoveryError)
1253            },
1254        )
1255    }
1256
1257    fn sealed_block_with_senders(
1258        &self,
1259        id: BlockHashOrNumber,
1260        transaction_kind: TransactionVariant,
1261    ) -> ProviderResult<Option<SealedBlockWithSenders<Self::Block>>> {
1262        self.block_with_senders(
1263            id,
1264            transaction_kind,
1265            |block_number| self.sealed_header(block_number),
1266            |header, body, senders| {
1267                SealedBlock { header, body }
1268                    // Note: we're using unchecked here because we know the block contains valid txs
1269                    // wrt to its height and can ignore the s value check so pre
1270                    // EIP-2 txs are allowed
1271                    .try_with_senders_unchecked(senders)
1272                    .map(Some)
1273                    .map_err(|_| ProviderError::SenderRecoveryError)
1274            },
1275        )
1276    }
1277
1278    fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1279        self.block_range(
1280            range,
1281            |range| self.headers_range(range),
1282            |header, body, _| Ok(Self::Block::new(header, body)),
1283        )
1284    }
1285
1286    fn block_with_senders_range(
1287        &self,
1288        range: RangeInclusive<BlockNumber>,
1289    ) -> ProviderResult<Vec<BlockWithSenders<Self::Block>>> {
1290        self.block_with_senders_range(
1291            range,
1292            |range| self.headers_range(range),
1293            |header, body, senders| {
1294                Self::Block::new(header, body)
1295                    .try_with_senders_unchecked(senders)
1296                    .map_err(|_| ProviderError::SenderRecoveryError)
1297            },
1298        )
1299    }
1300
1301    fn sealed_block_with_senders_range(
1302        &self,
1303        range: RangeInclusive<BlockNumber>,
1304    ) -> ProviderResult<Vec<SealedBlockWithSenders<Self::Block>>> {
1305        self.block_with_senders_range(
1306            range,
1307            |range| self.sealed_headers_range(range),
1308            |header, body, senders| {
1309                SealedBlockWithSenders::new(SealedBlock { header, body }, senders)
1310                    .ok_or(ProviderError::SenderRecoveryError)
1311            },
1312        )
1313    }
1314}
1315
1316impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1317    for DatabaseProvider<TX, N>
1318{
1319    /// Recovers transaction hashes by walking through `Transactions` table and
1320    /// calculating them in a parallel manner. Returned unsorted.
1321    fn transaction_hashes_by_range(
1322        &self,
1323        tx_range: Range<TxNumber>,
1324    ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1325        self.static_file_provider.get_range_with_static_file_or_database(
1326            StaticFileSegment::Transactions,
1327            tx_range,
1328            |static_file, range, _| static_file.transaction_hashes_by_range(range),
1329            |tx_range, _| {
1330                let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<TxTy<N>>>()?;
1331                let tx_range_size = tx_range.clone().count();
1332                let tx_walker = tx_cursor.walk_range(tx_range)?;
1333
1334                let chunk_size = (tx_range_size / rayon::current_num_threads()).max(1);
1335                let mut channels = Vec::with_capacity(chunk_size);
1336                let mut transaction_count = 0;
1337
1338                #[inline]
1339                fn calculate_hash<T>(
1340                    entry: Result<(TxNumber, T), DatabaseError>,
1341                    rlp_buf: &mut Vec<u8>,
1342                ) -> Result<(B256, TxNumber), Box<ProviderError>>
1343                where
1344                    T: Encodable2718,
1345                {
1346                    let (tx_id, tx) = entry.map_err(|e| Box::new(e.into()))?;
1347                    tx.encode_2718(rlp_buf);
1348                    Ok((keccak256(rlp_buf), tx_id))
1349                }
1350
1351                for chunk in &tx_walker.chunks(chunk_size) {
1352                    let (tx, rx) = mpsc::channel();
1353                    channels.push(rx);
1354
1355                    // Note: Unfortunate side-effect of how chunk is designed in itertools (it is
1356                    // not Send)
1357                    let chunk: Vec<_> = chunk.collect();
1358                    transaction_count += chunk.len();
1359
1360                    // Spawn the task onto the global rayon pool
1361                    // This task will send the results through the channel after it has calculated
1362                    // the hash.
1363                    rayon::spawn(move || {
1364                        let mut rlp_buf = Vec::with_capacity(128);
1365                        for entry in chunk {
1366                            rlp_buf.clear();
1367                            let _ = tx.send(calculate_hash(entry, &mut rlp_buf));
1368                        }
1369                    });
1370                }
1371                let mut tx_list = Vec::with_capacity(transaction_count);
1372
1373                // Iterate over channels and append the tx hashes unsorted
1374                for channel in channels {
1375                    while let Ok(tx) = channel.recv() {
1376                        let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
1377                        tx_list.push((tx_hash, tx_id));
1378                    }
1379                }
1380
1381                Ok(tx_list)
1382            },
1383            |_| true,
1384        )
1385    }
1386}
1387
1388// Calculates the hash of the given transaction
1389impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1390    type Transaction = TxTy<N>;
1391
1392    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1393        Ok(self.tx.get::<tables::TransactionHashNumbers>(tx_hash)?)
1394    }
1395
1396    fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1397        self.static_file_provider.get_with_static_file_or_database(
1398            StaticFileSegment::Transactions,
1399            id,
1400            |static_file| static_file.transaction_by_id(id),
1401            || Ok(self.tx.get::<tables::Transactions<Self::Transaction>>(id)?),
1402        )
1403    }
1404
1405    fn transaction_by_id_unhashed(
1406        &self,
1407        id: TxNumber,
1408    ) -> ProviderResult<Option<Self::Transaction>> {
1409        self.static_file_provider.get_with_static_file_or_database(
1410            StaticFileSegment::Transactions,
1411            id,
1412            |static_file| static_file.transaction_by_id_unhashed(id),
1413            || Ok(self.tx.get::<tables::Transactions<Self::Transaction>>(id)?),
1414        )
1415    }
1416
1417    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1418        if let Some(id) = self.transaction_id(hash)? {
1419            Ok(self.transaction_by_id_unhashed(id)?)
1420        } else {
1421            Ok(None)
1422        }
1423    }
1424
1425    fn transaction_by_hash_with_meta(
1426        &self,
1427        tx_hash: TxHash,
1428    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1429        let mut transaction_cursor = self.tx.cursor_read::<tables::TransactionBlocks>()?;
1430        if let Some(transaction_id) = self.transaction_id(tx_hash)? {
1431            if let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? {
1432                if let Some(block_number) =
1433                    transaction_cursor.seek(transaction_id).map(|b| b.map(|(_, bn)| bn))?
1434                {
1435                    if let Some(sealed_header) = self.sealed_header(block_number)? {
1436                        let (header, block_hash) = sealed_header.split();
1437                        if let Some(block_body) = self.block_body_indices(block_number)? {
1438                            // the index of the tx in the block is the offset:
1439                            // len([start..tx_id])
1440                            // NOTE: `transaction_id` is always `>=` the block's first
1441                            // index
1442                            let index = transaction_id - block_body.first_tx_num();
1443
1444                            let meta = TransactionMeta {
1445                                tx_hash,
1446                                index,
1447                                block_hash,
1448                                block_number,
1449                                base_fee: header.base_fee_per_gas(),
1450                                excess_blob_gas: header.excess_blob_gas(),
1451                                timestamp: header.timestamp(),
1452                            };
1453
1454                            return Ok(Some((transaction, meta)))
1455                        }
1456                    }
1457                }
1458            }
1459        }
1460
1461        Ok(None)
1462    }
1463
1464    fn transaction_block(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1465        let mut cursor = self.tx.cursor_read::<tables::TransactionBlocks>()?;
1466        Ok(cursor.seek(id)?.map(|(_, bn)| bn))
1467    }
1468
1469    fn transactions_by_block(
1470        &self,
1471        id: BlockHashOrNumber,
1472    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1473        let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<Self::Transaction>>()?;
1474
1475        if let Some(block_number) = self.convert_hash_or_number(id)? {
1476            if let Some(body) = self.block_body_indices(block_number)? {
1477                let tx_range = body.tx_num_range();
1478                return if tx_range.is_empty() {
1479                    Ok(Some(Vec::new()))
1480                } else {
1481                    Ok(Some(self.transactions_by_tx_range_with_cursor(tx_range, &mut tx_cursor)?))
1482                }
1483            }
1484        }
1485        Ok(None)
1486    }
1487
1488    fn transactions_by_block_range(
1489        &self,
1490        range: impl RangeBounds<BlockNumber>,
1491    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1492        let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<Self::Transaction>>()?;
1493        let mut results = Vec::new();
1494        let mut body_cursor = self.tx.cursor_read::<tables::BlockBodyIndices>()?;
1495        for entry in body_cursor.walk_range(range)? {
1496            let (_, body) = entry?;
1497            let tx_num_range = body.tx_num_range();
1498            if tx_num_range.is_empty() {
1499                results.push(Vec::new());
1500            } else {
1501                results.push(
1502                    self.transactions_by_tx_range_with_cursor(tx_num_range, &mut tx_cursor)?
1503                        .into_iter()
1504                        .collect(),
1505                );
1506            }
1507        }
1508        Ok(results)
1509    }
1510
1511    fn transactions_by_tx_range(
1512        &self,
1513        range: impl RangeBounds<TxNumber>,
1514    ) -> ProviderResult<Vec<Self::Transaction>> {
1515        self.transactions_by_tx_range_with_cursor(
1516            range,
1517            &mut self.tx.cursor_read::<tables::Transactions<_>>()?,
1518        )
1519    }
1520
1521    fn senders_by_tx_range(
1522        &self,
1523        range: impl RangeBounds<TxNumber>,
1524    ) -> ProviderResult<Vec<Address>> {
1525        self.cursor_read_collect::<tables::TransactionSenders>(range).map_err(Into::into)
1526    }
1527
1528    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1529        Ok(self.tx.get::<tables::TransactionSenders>(id)?)
1530    }
1531}
1532
1533impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
1534    type Receipt = ReceiptTy<N>;
1535
1536    fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1537        self.static_file_provider.get_with_static_file_or_database(
1538            StaticFileSegment::Receipts,
1539            id,
1540            |static_file| static_file.receipt(id),
1541            || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
1542        )
1543    }
1544
1545    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1546        if let Some(id) = self.transaction_id(hash)? {
1547            self.receipt(id)
1548        } else {
1549            Ok(None)
1550        }
1551    }
1552
1553    fn receipts_by_block(
1554        &self,
1555        block: BlockHashOrNumber,
1556    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1557        if let Some(number) = self.convert_hash_or_number(block)? {
1558            if let Some(body) = self.block_body_indices(number)? {
1559                let tx_range = body.tx_num_range();
1560                return if tx_range.is_empty() {
1561                    Ok(Some(Vec::new()))
1562                } else {
1563                    self.receipts_by_tx_range(tx_range).map(Some)
1564                }
1565            }
1566        }
1567        Ok(None)
1568    }
1569
1570    fn receipts_by_tx_range(
1571        &self,
1572        range: impl RangeBounds<TxNumber>,
1573    ) -> ProviderResult<Vec<Self::Receipt>> {
1574        self.static_file_provider.get_range_with_static_file_or_database(
1575            StaticFileSegment::Receipts,
1576            to_range(range),
1577            |static_file, range, _| static_file.receipts_by_tx_range(range),
1578            |range, _| {
1579                self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range)
1580                    .map_err(Into::into)
1581            },
1582            |_| true,
1583        )
1584    }
1585}
1586
1587impl<TX: DbTx + 'static, N: NodeTypes<ChainSpec: EthereumHardforks>> WithdrawalsProvider
1588    for DatabaseProvider<TX, N>
1589{
1590    fn withdrawals_by_block(
1591        &self,
1592        id: BlockHashOrNumber,
1593        timestamp: u64,
1594    ) -> ProviderResult<Option<Withdrawals>> {
1595        if self.chain_spec.is_shanghai_active_at_timestamp(timestamp) {
1596            if let Some(number) = self.convert_hash_or_number(id)? {
1597                // If we are past shanghai, then all blocks should have a withdrawal list, even if
1598                // empty
1599                let withdrawals = self
1600                    .tx
1601                    .get::<tables::BlockWithdrawals>(number)
1602                    .map(|w| w.map(|w| w.withdrawals))?
1603                    .unwrap_or_default();
1604                return Ok(Some(withdrawals))
1605            }
1606        }
1607        Ok(None)
1608    }
1609
1610    fn latest_withdrawal(&self) -> ProviderResult<Option<Withdrawal>> {
1611        let latest_block_withdrawal = self.tx.cursor_read::<tables::BlockWithdrawals>()?.last()?;
1612        Ok(latest_block_withdrawal
1613            .and_then(|(_, mut block_withdrawal)| block_withdrawal.withdrawals.pop()))
1614    }
1615}
1616
1617impl<TX: DbTx + 'static, N: NodeTypesForProvider> OmmersProvider for DatabaseProvider<TX, N> {
1618    /// Returns the ommers for the block with matching id from the database.
1619    ///
1620    /// If the block is not found, this returns `None`.
1621    /// If the block exists, but doesn't contain ommers, this returns `None`.
1622    fn ommers(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Vec<Self::Header>>> {
1623        if let Some(number) = self.convert_hash_or_number(id)? {
1624            // If the Paris (Merge) hardfork block is known and block is after it, return empty
1625            // ommers.
1626            if self.chain_spec.final_paris_total_difficulty(number).is_some() {
1627                return Ok(Some(Vec::new()))
1628            }
1629
1630            let ommers =
1631                self.tx.get::<tables::BlockOmmers<Self::Header>>(number)?.map(|o| o.ommers);
1632            return Ok(ommers)
1633        }
1634
1635        Ok(None)
1636    }
1637}
1638
1639impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
1640    for DatabaseProvider<TX, N>
1641{
1642    fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1643        Ok(self.tx.get::<tables::BlockBodyIndices>(num)?)
1644    }
1645}
1646
1647impl<TX: DbTx + 'static, N: NodeTypesForProvider> EvmEnvProvider<HeaderTy<N>>
1648    for DatabaseProvider<TX, N>
1649{
1650    fn env_with_header<EvmConfig>(
1651        &self,
1652        header: &HeaderTy<N>,
1653        evm_config: EvmConfig,
1654    ) -> ProviderResult<(CfgEnvWithHandlerCfg, BlockEnv)>
1655    where
1656        EvmConfig: ConfigureEvmEnv<Header = HeaderTy<N>>,
1657    {
1658        let total_difficulty = self
1659            .header_td_by_number(header.number())?
1660            .ok_or_else(|| ProviderError::HeaderNotFound(header.number().into()))?;
1661        Ok(evm_config.cfg_and_block_env(header, total_difficulty))
1662    }
1663}
1664
1665impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
1666    fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1667        Ok(self.tx.get::<tables::StageCheckpoints>(id.to_string())?)
1668    }
1669
1670    /// Get stage checkpoint progress.
1671    fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1672        Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
1673    }
1674
1675    fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1676        self.tx
1677            .cursor_read::<tables::StageCheckpoints>()?
1678            .walk(None)?
1679            .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
1680            .map_err(ProviderError::Database)
1681    }
1682}
1683
1684impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
1685    /// Save stage checkpoint.
1686    fn save_stage_checkpoint(
1687        &self,
1688        id: StageId,
1689        checkpoint: StageCheckpoint,
1690    ) -> ProviderResult<()> {
1691        Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
1692    }
1693
1694    /// Save stage checkpoint progress.
1695    fn save_stage_checkpoint_progress(
1696        &self,
1697        id: StageId,
1698        checkpoint: Vec<u8>,
1699    ) -> ProviderResult<()> {
1700        Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
1701    }
1702
1703    fn update_pipeline_stages(
1704        &self,
1705        block_number: BlockNumber,
1706        drop_stage_checkpoint: bool,
1707    ) -> ProviderResult<()> {
1708        // iterate over all existing stages in the table and update its progress.
1709        let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
1710        for stage_id in StageId::ALL {
1711            let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
1712            cursor.upsert(
1713                stage_id.to_string(),
1714                StageCheckpoint {
1715                    block_number,
1716                    ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
1717                },
1718            )?;
1719        }
1720
1721        Ok(())
1722    }
1723}
1724
1725impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
1726    fn plain_state_storages(
1727        &self,
1728        addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
1729    ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
1730        let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
1731
1732        addresses_with_keys
1733            .into_iter()
1734            .map(|(address, storage)| {
1735                storage
1736                    .into_iter()
1737                    .map(|key| -> ProviderResult<_> {
1738                        Ok(plain_storage
1739                            .seek_by_key_subkey(address, key)?
1740                            .filter(|v| v.key == key)
1741                            .unwrap_or_else(|| StorageEntry { key, ..Default::default() }))
1742                    })
1743                    .collect::<ProviderResult<Vec<_>>>()
1744                    .map(|storage| (address, storage))
1745            })
1746            .collect::<ProviderResult<Vec<(_, _)>>>()
1747    }
1748
1749    fn changed_storages_with_range(
1750        &self,
1751        range: RangeInclusive<BlockNumber>,
1752    ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
1753        self.tx
1754            .cursor_read::<tables::StorageChangeSets>()?
1755            .walk_range(BlockNumberAddress::range(range))?
1756            // fold all storages and save its old state so we can remove it from HashedStorage
1757            // it is needed as it is dup table.
1758            .try_fold(BTreeMap::new(), |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
1759                let (BlockNumberAddress((_, address)), storage_entry) = entry?;
1760                accounts.entry(address).or_default().insert(storage_entry.key);
1761                Ok(accounts)
1762            })
1763    }
1764
1765    fn changed_storages_and_blocks_with_range(
1766        &self,
1767        range: RangeInclusive<BlockNumber>,
1768    ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
1769        let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
1770
1771        let storage_changeset_lists =
1772            changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
1773                BTreeMap::new(),
1774                |mut storages: BTreeMap<(Address, B256), Vec<u64>>, entry| -> ProviderResult<_> {
1775                    let (index, storage) = entry?;
1776                    storages
1777                        .entry((index.address(), storage.key))
1778                        .or_default()
1779                        .push(index.block_number());
1780                    Ok(storages)
1781                },
1782            )?;
1783
1784        Ok(storage_changeset_lists)
1785    }
1786}
1787
1788impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
1789    for DatabaseProvider<TX, N>
1790{
1791    type Receipt = ReceiptTy<N>;
1792
1793    fn write_state(
1794        &self,
1795        execution_outcome: ExecutionOutcome<Self::Receipt>,
1796        is_value_known: OriginalValuesKnown,
1797        write_receipts_to: StorageLocation,
1798    ) -> ProviderResult<()> {
1799        let (plain_state, reverts) =
1800            execution_outcome.bundle.to_plain_state_and_reverts(is_value_known);
1801
1802        self.write_state_reverts(reverts, execution_outcome.first_block)?;
1803        self.write_state_changes(plain_state)?;
1804
1805        let mut bodies_cursor = self.tx.cursor_read::<tables::BlockBodyIndices>()?;
1806
1807        let has_receipts_pruning = self.prune_modes.has_receipts_pruning() ||
1808            execution_outcome.receipts.iter().flatten().any(|receipt| receipt.is_none());
1809
1810        // Prepare receipts cursor if we are going to write receipts to the database
1811        //
1812        // We are writing to database if requested or if there's any kind of receipt pruning
1813        // configured
1814        let mut receipts_cursor = (write_receipts_to.database() || has_receipts_pruning)
1815            .then(|| self.tx.cursor_write::<tables::Receipts<Self::Receipt>>())
1816            .transpose()?;
1817
1818        // Prepare receipts static writer if we are going to write receipts to static files
1819        //
1820        // We are writing to static files if requested and if there's no receipt pruning configured
1821        let mut receipts_static_writer = (write_receipts_to.static_files() &&
1822            !has_receipts_pruning)
1823            .then(|| {
1824                self.static_file_provider
1825                    .get_writer(execution_outcome.first_block, StaticFileSegment::Receipts)
1826            })
1827            .transpose()?;
1828
1829        for (idx, receipts) in execution_outcome.receipts.into_iter().enumerate() {
1830            let block_number = execution_outcome.first_block + idx as u64;
1831
1832            // Increment block number for receipts static file writer
1833            if let Some(writer) = receipts_static_writer.as_mut() {
1834                writer.increment_block(block_number)?;
1835            }
1836
1837            let first_tx_index = bodies_cursor
1838                .seek_exact(block_number)?
1839                .map(|(_, indices)| indices.first_tx_num())
1840                .ok_or(ProviderError::BlockBodyIndicesNotFound(block_number))?;
1841
1842            for (idx, receipt) in receipts.into_iter().enumerate() {
1843                let receipt_idx = first_tx_index + idx as u64;
1844                if let Some(receipt) = receipt {
1845                    if let Some(writer) = &mut receipts_static_writer {
1846                        writer.append_receipt(receipt_idx, &receipt)?;
1847                    }
1848
1849                    if let Some(cursor) = &mut receipts_cursor {
1850                        cursor.append(receipt_idx, receipt)?;
1851                    }
1852                }
1853            }
1854        }
1855
1856        Ok(())
1857    }
1858
1859    fn write_state_reverts(
1860        &self,
1861        reverts: PlainStateReverts,
1862        first_block: BlockNumber,
1863    ) -> ProviderResult<()> {
1864        // Write storage changes
1865        tracing::trace!("Writing storage changes");
1866        let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1867        let mut storage_changeset_cursor =
1868            self.tx_ref().cursor_dup_write::<tables::StorageChangeSets>()?;
1869        for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
1870            let block_number = first_block + block_index as BlockNumber;
1871
1872            tracing::trace!(block_number, "Writing block change");
1873            // sort changes by address.
1874            storage_changes.par_sort_unstable_by_key(|a| a.address);
1875            for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
1876                let storage_id = BlockNumberAddress((block_number, address));
1877
1878                let mut storage = storage_revert
1879                    .into_iter()
1880                    .map(|(k, v)| (B256::new(k.to_be_bytes()), v))
1881                    .collect::<Vec<_>>();
1882                // sort storage slots by key.
1883                storage.par_sort_unstable_by_key(|a| a.0);
1884
1885                // If we are writing the primary storage wipe transition, the pre-existing plain
1886                // storage state has to be taken from the database and written to storage history.
1887                // See [StorageWipe::Primary] for more details.
1888                let mut wiped_storage = Vec::new();
1889                if wiped {
1890                    tracing::trace!(?address, "Wiping storage");
1891                    if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
1892                        wiped_storage.push((entry.key, entry.into()));
1893                        while let Some(entry) = storages_cursor.next_dup_val()? {
1894                            wiped_storage.push((entry.key, entry.into()))
1895                        }
1896                    }
1897                }
1898
1899                tracing::trace!(?address, ?storage, "Writing storage reverts");
1900                for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
1901                    storage_changeset_cursor.append_dup(
1902                        storage_id,
1903                        StorageEntry { key, value: value.value, is_private: value.is_private },
1904                    )?;
1905                }
1906            }
1907        }
1908
1909        // Write account changes
1910        tracing::trace!("Writing account changes");
1911        let mut account_changeset_cursor =
1912            self.tx_ref().cursor_dup_write::<tables::AccountChangeSets>()?;
1913
1914        for (block_index, mut account_block_reverts) in reverts.accounts.into_iter().enumerate() {
1915            let block_number = first_block + block_index as BlockNumber;
1916            // Sort accounts by address.
1917            account_block_reverts.par_sort_by_key(|a| a.0);
1918
1919            for (address, info) in account_block_reverts {
1920                account_changeset_cursor.append_dup(
1921                    block_number,
1922                    AccountBeforeTx { address, info: info.map(Into::into) },
1923                )?;
1924            }
1925        }
1926
1927        Ok(())
1928    }
1929
1930    fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
1931        // sort all entries so they can be written to database in more performant way.
1932        // and take smaller memory footprint.
1933        changes.accounts.par_sort_by_key(|a| a.0);
1934        changes.storage.par_sort_by_key(|a| a.address);
1935        changes.contracts.par_sort_by_key(|a| a.0);
1936
1937        // Write new account state
1938        tracing::trace!(len = changes.accounts.len(), "Writing new account state");
1939        let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
1940        // write account to database.
1941        for (address, account) in changes.accounts {
1942            if let Some(account) = account {
1943                tracing::trace!(?address, "Updating plain state account");
1944                accounts_cursor.upsert(address, account.into())?;
1945            } else if accounts_cursor.seek_exact(address)?.is_some() {
1946                tracing::trace!(?address, "Deleting plain state account");
1947                accounts_cursor.delete_current()?;
1948            }
1949        }
1950
1951        // Write bytecode
1952        tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
1953        let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
1954        for (hash, bytecode) in changes.contracts {
1955            bytecodes_cursor.upsert(hash, Bytecode(bytecode))?;
1956        }
1957
1958        // Write new storage state and wipe storage if needed.
1959        tracing::trace!(len = changes.storage.len(), "Writing new storage state");
1960        let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1961        for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
1962            // Wiping of storage.
1963            if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
1964                storages_cursor.delete_current_duplicates()?;
1965            }
1966            // cast storages to B256.
1967            let mut storage = storage
1968                .into_iter()
1969                .map(|(k, value)| StorageEntry {
1970                    key: k.into(),
1971                    value: value.value,
1972                    is_private: value.is_private,
1973                })
1974                .collect::<Vec<_>>();
1975            // sort storage slots by key.
1976            storage.par_sort_unstable_by_key(|a| a.key);
1977
1978            for entry in storage {
1979                tracing::trace!(?address, ?entry.key, "Updating plain state storage");
1980                if let Some(db_entry) = storages_cursor.seek_by_key_subkey(address, entry.key)? {
1981                    if db_entry.key == entry.key {
1982                        storages_cursor.delete_current()?;
1983                    }
1984                }
1985
1986                if !entry.value.is_zero() {
1987                    storages_cursor.upsert(address, entry)?;
1988                }
1989            }
1990        }
1991
1992        Ok(())
1993    }
1994
1995    fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
1996        // Write hashed account updates.
1997        let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
1998        for (hashed_address, account) in hashed_state.accounts().accounts_sorted() {
1999            if let Some(account) = account {
2000                hashed_accounts_cursor.upsert(hashed_address, account)?;
2001            } else if hashed_accounts_cursor.seek_exact(hashed_address)?.is_some() {
2002                hashed_accounts_cursor.delete_current()?;
2003            }
2004        }
2005
2006        // Write hashed storage changes.
2007        let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
2008        let mut hashed_storage_cursor =
2009            self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
2010        for (hashed_address, storage) in sorted_storages {
2011            if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
2012                hashed_storage_cursor.delete_current_duplicates()?;
2013            }
2014
2015            for (hashed_slot, value) in storage.storage_slots_sorted() {
2016                let entry = StorageEntry {
2017                    key: hashed_slot,
2018                    value: value.value,
2019                    is_private: value.is_private,
2020                };
2021                if let Some(db_entry) =
2022                    hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)?
2023                {
2024                    if db_entry.key == entry.key {
2025                        hashed_storage_cursor.delete_current()?;
2026                    }
2027                }
2028
2029                if !entry.value.is_zero() {
2030                    hashed_storage_cursor.upsert(*hashed_address, entry)?;
2031                }
2032            }
2033        }
2034
2035        Ok(())
2036    }
2037
2038    /// Remove the last N blocks of state.
2039    ///
2040    /// The latest state will be unwound
2041    ///
2042    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
2043    ///    transaction ids.
2044    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
2045    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
2046    ///    the changesets.
2047    ///    - In order to have both the old and new values in the changesets, we also access the
2048    ///      plain state tables.
2049    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
2050    ///    we:
2051    ///     1. Take the old value from the changeset
2052    ///     2. Take the new value from the plain state
2053    ///     3. Save the old value to the local state
2054    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
2055    ///    have seen before we:
2056    ///     1. Take the old value from the changeset
2057    ///     2. Take the new value from the local state
2058    ///     3. Set the local state to the value in the changeset
2059    fn remove_state_above(
2060        &self,
2061        block: BlockNumber,
2062        remove_receipts_from: StorageLocation,
2063    ) -> ProviderResult<()> {
2064        let range = block + 1..=self.last_block_number()?;
2065
2066        if range.is_empty() {
2067            return Ok(());
2068        }
2069
2070        // We are not removing block meta as it is used to get block changesets.
2071        let block_bodies = self.get::<tables::BlockBodyIndices>(range.clone())?;
2072
2073        // get transaction receipts
2074        let from_transaction_num =
2075            block_bodies.first().expect("already checked if there are blocks").1.first_tx_num();
2076
2077        let storage_range = BlockNumberAddress::range(range.clone());
2078
2079        let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
2080        let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
2081
2082        // This is not working for blocks that are not at tip. as plain state is not the last
2083        // state of end range. We should rename the functions or add support to access
2084        // History state. Accessing history state can be tricky but we are not gaining
2085        // anything.
2086        let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2087        let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2088
2089        let (state, _) = self.populate_bundle_state(
2090            account_changeset,
2091            storage_changeset,
2092            &mut plain_accounts_cursor,
2093            &mut plain_storage_cursor,
2094        )?;
2095
2096        // iterate over local plain state remove all account and all storages.
2097        for (address, (old_account, new_account, storage)) in &state {
2098            // revert account if needed.
2099            if old_account != new_account {
2100                let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2101                if let Some(account) = old_account {
2102                    plain_accounts_cursor.upsert(*address, *account)?;
2103                } else if existing_entry.is_some() {
2104                    plain_accounts_cursor.delete_current()?;
2105                }
2106            }
2107
2108            // revert storages
2109            for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2110                let storage_entry = StorageEntry {
2111                    key: *storage_key,
2112                    value: old_storage_value.0,
2113                    is_private: old_storage_value.1,
2114                };
2115                // delete previous value
2116                // TODO: This does not use dupsort features
2117                if plain_storage_cursor
2118                    .seek_by_key_subkey(*address, *storage_key)?
2119                    .filter(|s| s.key == *storage_key)
2120                    .is_some()
2121                {
2122                    plain_storage_cursor.delete_current()?
2123                }
2124
2125                // insert value if needed
2126                if !FlaggedStorage::new(old_storage_value.0, old_storage_value.1).is_zero() {
2127                    plain_storage_cursor.upsert(*address, storage_entry)?;
2128                }
2129            }
2130        }
2131
2132        self.remove_receipts_from(from_transaction_num, block, remove_receipts_from)?;
2133
2134        Ok(())
2135    }
2136
2137    /// Take the last N blocks of state, recreating the [`ExecutionOutcome`].
2138    ///
2139    /// The latest state will be unwound and returned back with all the blocks
2140    ///
2141    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
2142    ///    transaction ids.
2143    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
2144    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
2145    ///    the changesets.
2146    ///    - In order to have both the old and new values in the changesets, we also access the
2147    ///      plain state tables.
2148    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
2149    ///    we:
2150    ///     1. Take the old value from the changeset
2151    ///     2. Take the new value from the plain state
2152    ///     3. Save the old value to the local state
2153    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
2154    ///    have seen before we:
2155    ///     1. Take the old value from the changeset
2156    ///     2. Take the new value from the local state
2157    ///     3. Set the local state to the value in the changeset
2158    fn take_state_above(
2159        &self,
2160        block: BlockNumber,
2161        remove_receipts_from: StorageLocation,
2162    ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
2163        let range = block + 1..=self.last_block_number()?;
2164
2165        if range.is_empty() {
2166            return Ok(ExecutionOutcome::default())
2167        }
2168        let start_block_number = *range.start();
2169
2170        // We are not removing block meta as it is used to get block changesets.
2171        let block_bodies = self.get::<tables::BlockBodyIndices>(range.clone())?;
2172
2173        // get transaction receipts
2174        let from_transaction_num =
2175            block_bodies.first().expect("already checked if there are blocks").1.first_tx_num();
2176        let to_transaction_num =
2177            block_bodies.last().expect("already checked if there are blocks").1.last_tx_num();
2178
2179        let storage_range = BlockNumberAddress::range(range.clone());
2180
2181        let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
2182        let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
2183
2184        // This is not working for blocks that are not at tip. as plain state is not the last
2185        // state of end range. We should rename the functions or add support to access
2186        // History state. Accessing history state can be tricky but we are not gaining
2187        // anything.
2188        let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2189        let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2190
2191        // populate bundle state and reverts from changesets / state cursors, to iterate over,
2192        // remove, and return later
2193        let (state, reverts) = self.populate_bundle_state(
2194            account_changeset,
2195            storage_changeset,
2196            &mut plain_accounts_cursor,
2197            &mut plain_storage_cursor,
2198        )?;
2199
2200        // iterate over local plain state remove all account and all storages.
2201        for (address, (old_account, new_account, storage)) in &state {
2202            // revert account if needed.
2203            if old_account != new_account {
2204                let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2205                if let Some(account) = old_account {
2206                    plain_accounts_cursor.upsert(*address, *account)?;
2207                } else if existing_entry.is_some() {
2208                    plain_accounts_cursor.delete_current()?;
2209                }
2210            }
2211
2212            // revert storages
2213            for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2214                let storage_entry = StorageEntry {
2215                    key: *storage_key,
2216                    value: old_storage_value.0,
2217                    is_private: old_storage_value.1,
2218                };
2219                // delete previous value
2220                // TODO: This does not use dupsort features
2221                if plain_storage_cursor
2222                    .seek_by_key_subkey(*address, *storage_key)?
2223                    .filter(|s| s.key == *storage_key)
2224                    .is_some()
2225                {
2226                    plain_storage_cursor.delete_current()?
2227                }
2228
2229                // insert value if needed
2230                if !FlaggedStorage::new(old_storage_value.0, old_storage_value.1).is_zero() {
2231                    plain_storage_cursor.upsert(*address, storage_entry)?;
2232                }
2233            }
2234        }
2235
2236        // Collect receipts into tuples (tx_num, receipt) to correctly handle pruned receipts
2237        let mut receipts_iter = self
2238            .static_file_provider
2239            .get_range_with_static_file_or_database(
2240                StaticFileSegment::Receipts,
2241                from_transaction_num..to_transaction_num + 1,
2242                |static_file, range, _| {
2243                    static_file
2244                        .receipts_by_tx_range(range.clone())
2245                        .map(|r| range.into_iter().zip(r).collect())
2246                },
2247                |range, _| {
2248                    self.tx
2249                        .cursor_read::<tables::Receipts<Self::Receipt>>()?
2250                        .walk_range(range)?
2251                        .map(|r| r.map_err(Into::into))
2252                        .collect()
2253                },
2254                |_| true,
2255            )?
2256            .into_iter()
2257            .peekable();
2258
2259        let mut receipts = Vec::with_capacity(block_bodies.len());
2260        // loop break if we are at the end of the blocks.
2261        for (_, block_body) in block_bodies {
2262            let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
2263            for num in block_body.tx_num_range() {
2264                if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
2265                    block_receipts.push(receipts_iter.next().map(|(_, r)| r));
2266                } else {
2267                    block_receipts.push(None);
2268                }
2269            }
2270            receipts.push(block_receipts);
2271        }
2272
2273        self.remove_receipts_from(from_transaction_num, block, remove_receipts_from)?;
2274
2275        Ok(ExecutionOutcome::new_init(
2276            state,
2277            reverts,
2278            Vec::new(),
2279            receipts.into(),
2280            start_block_number,
2281            Vec::new(),
2282        ))
2283    }
2284}
2285
2286impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
2287    /// Writes trie updates. Returns the number of entries modified.
2288    fn write_trie_updates(&self, trie_updates: &TrieUpdates) -> ProviderResult<usize> {
2289        if trie_updates.is_empty() {
2290            return Ok(0)
2291        }
2292
2293        // Track the number of inserted entries.
2294        let mut num_entries = 0;
2295
2296        // Merge updated and removed nodes. Updated nodes must take precedence.
2297        let mut account_updates = trie_updates
2298            .removed_nodes_ref()
2299            .iter()
2300            .filter_map(|n| {
2301                (!trie_updates.account_nodes_ref().contains_key(n)).then_some((n, None))
2302            })
2303            .collect::<Vec<_>>();
2304        account_updates.extend(
2305            trie_updates.account_nodes_ref().iter().map(|(nibbles, node)| (nibbles, Some(node))),
2306        );
2307        // Sort trie node updates.
2308        account_updates.sort_unstable_by(|a, b| a.0.cmp(b.0));
2309
2310        let tx = self.tx_ref();
2311        let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
2312        for (key, updated_node) in account_updates {
2313            let nibbles = StoredNibbles(key.clone());
2314            match updated_node {
2315                Some(node) => {
2316                    if !nibbles.0.is_empty() {
2317                        num_entries += 1;
2318                        account_trie_cursor.upsert(nibbles, node.clone())?;
2319                    }
2320                }
2321                None => {
2322                    num_entries += 1;
2323                    if account_trie_cursor.seek_exact(nibbles)?.is_some() {
2324                        account_trie_cursor.delete_current()?;
2325                    }
2326                }
2327            }
2328        }
2329
2330        num_entries += self.write_storage_trie_updates(trie_updates.storage_tries_ref())?;
2331
2332        Ok(num_entries)
2333    }
2334}
2335
2336impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
2337    /// Writes storage trie updates from the given storage trie map. First sorts the storage trie
2338    /// updates by the hashed address, writing in sorted order.
2339    fn write_storage_trie_updates(
2340        &self,
2341        storage_tries: &B256HashMap<StorageTrieUpdates>,
2342    ) -> ProviderResult<usize> {
2343        let mut num_entries = 0;
2344        let mut storage_tries = Vec::from_iter(storage_tries);
2345        storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
2346        let mut cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2347        for (hashed_address, storage_trie_updates) in storage_tries {
2348            let mut db_storage_trie_cursor =
2349                DatabaseStorageTrieCursor::new(cursor, *hashed_address);
2350            num_entries +=
2351                db_storage_trie_cursor.write_storage_trie_updates(storage_trie_updates)?;
2352            cursor = db_storage_trie_cursor.cursor;
2353        }
2354
2355        Ok(num_entries)
2356    }
2357
2358    fn write_individual_storage_trie_updates(
2359        &self,
2360        hashed_address: B256,
2361        updates: &StorageTrieUpdates,
2362    ) -> ProviderResult<usize> {
2363        if updates.is_empty() {
2364            return Ok(0)
2365        }
2366
2367        let cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2368        let mut trie_db_cursor = DatabaseStorageTrieCursor::new(cursor, hashed_address);
2369        Ok(trie_db_cursor.write_storage_trie_updates(updates)?)
2370    }
2371}
2372
2373impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
2374    fn unwind_account_hashing<'a>(
2375        &self,
2376        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2377    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2378        // Aggregate all block changesets and make a list of accounts that have been changed.
2379        // Note that collecting and then reversing the order is necessary to ensure that the
2380        // changes are applied in the correct order.
2381        let hashed_accounts = changesets
2382            .into_iter()
2383            .map(|(_, e)| (keccak256(e.address), e.info))
2384            .collect::<Vec<_>>()
2385            .into_iter()
2386            .rev()
2387            .collect::<BTreeMap<_, _>>();
2388
2389        // Apply values to HashedState, and remove the account if it's None.
2390        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2391        for (hashed_address, account) in &hashed_accounts {
2392            if let Some(account) = account {
2393                hashed_accounts_cursor.upsert(*hashed_address, *account)?;
2394            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2395                hashed_accounts_cursor.delete_current()?;
2396            }
2397        }
2398
2399        Ok(hashed_accounts)
2400    }
2401
2402    fn unwind_account_hashing_range(
2403        &self,
2404        range: impl RangeBounds<BlockNumber>,
2405    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2406        let changesets = self
2407            .tx
2408            .cursor_read::<tables::AccountChangeSets>()?
2409            .walk_range(range)?
2410            .collect::<Result<Vec<_>, _>>()?;
2411        self.unwind_account_hashing(changesets.iter())
2412    }
2413
2414    fn insert_account_for_hashing(
2415        &self,
2416        changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
2417    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2418        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2419        let hashed_accounts =
2420            changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
2421        for (hashed_address, account) in &hashed_accounts {
2422            if let Some(account) = account {
2423                hashed_accounts_cursor.upsert(*hashed_address, *account)?;
2424            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2425                hashed_accounts_cursor.delete_current()?;
2426            }
2427        }
2428        Ok(hashed_accounts)
2429    }
2430
2431    fn unwind_storage_hashing(
2432        &self,
2433        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2434    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2435        // Aggregate all block changesets and make list of accounts that have been changed.
2436        let mut hashed_storages = changesets
2437            .into_iter()
2438            .map(|(BlockNumberAddress((_, address)), storage_entry)| {
2439                (
2440                    keccak256(address),
2441                    keccak256(storage_entry.key),
2442                    storage_entry.value,
2443                    storage_entry.is_private,
2444                )
2445            })
2446            .collect::<Vec<_>>();
2447        hashed_storages.sort_by_key(|(ha, hk, _, _)| (*ha, *hk));
2448
2449        // Apply values to HashedState, and remove the account if it's None.
2450        let mut hashed_storage_keys: HashMap<B256, BTreeSet<B256>> =
2451            HashMap::with_capacity_and_hasher(hashed_storages.len(), Default::default());
2452        let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2453        for (hashed_address, key, value, is_private) in hashed_storages.into_iter().rev() {
2454            hashed_storage_keys.entry(hashed_address).or_default().insert(key);
2455
2456            if hashed_storage
2457                .seek_by_key_subkey(hashed_address, key)?
2458                .filter(|entry| entry.key == key)
2459                .is_some()
2460            {
2461                hashed_storage.delete_current()?;
2462            }
2463
2464            if !value.is_zero() {
2465                hashed_storage.upsert(hashed_address, StorageEntry { key, value, is_private })?;
2466            }
2467        }
2468        Ok(hashed_storage_keys)
2469    }
2470
2471    fn unwind_storage_hashing_range(
2472        &self,
2473        range: impl RangeBounds<BlockNumberAddress>,
2474    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2475        let changesets = self
2476            .tx
2477            .cursor_read::<tables::StorageChangeSets>()?
2478            .walk_range(range)?
2479            .collect::<Result<Vec<_>, _>>()?;
2480        self.unwind_storage_hashing(changesets.into_iter())
2481    }
2482
2483    fn insert_storage_for_hashing(
2484        &self,
2485        storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
2486    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2487        // hash values
2488        let hashed_storages =
2489            storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
2490                let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
2491                    map.insert(keccak256(entry.key), entry.value);
2492                    map
2493                });
2494                map.insert(keccak256(address), storage);
2495                map
2496            });
2497
2498        let hashed_storage_keys =
2499            HashMap::from_iter(hashed_storages.iter().map(|(hashed_address, entries)| {
2500                (*hashed_address, BTreeSet::from_iter(entries.keys().copied()))
2501            }));
2502
2503        let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2504        // Hash the address and key and apply them to HashedStorage (if Storage is None
2505        // just remove it);
2506        hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
2507            storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
2508                if hashed_storage_cursor
2509                    .seek_by_key_subkey(hashed_address, key)?
2510                    .filter(|entry| entry.key == key)
2511                    .is_some()
2512                {
2513                    hashed_storage_cursor.delete_current()?;
2514                }
2515
2516                if !value.is_zero() {
2517                    hashed_storage_cursor.upsert(
2518                        hashed_address,
2519                        StorageEntry { key, value, ..Default::default() },
2520                    )?;
2521                }
2522                Ok(())
2523            })
2524        })?;
2525
2526        Ok(hashed_storage_keys)
2527    }
2528
2529    fn insert_hashes(
2530        &self,
2531        range: RangeInclusive<BlockNumber>,
2532        end_block_hash: B256,
2533        expected_state_root: B256,
2534    ) -> ProviderResult<()> {
2535        // Initialize prefix sets.
2536        let mut account_prefix_set = PrefixSetMut::default();
2537        let mut storage_prefix_sets: HashMap<B256, PrefixSetMut> = HashMap::default();
2538        let mut destroyed_accounts = HashSet::default();
2539
2540        let mut durations_recorder = metrics::DurationsRecorder::default();
2541
2542        // storage hashing stage
2543        {
2544            let lists = self.changed_storages_with_range(range.clone())?;
2545            let storages = self.plain_state_storages(lists)?;
2546            let storage_entries = self.insert_storage_for_hashing(storages)?;
2547            for (hashed_address, hashed_slots) in storage_entries {
2548                account_prefix_set.insert(Nibbles::unpack(hashed_address));
2549                for slot in hashed_slots {
2550                    storage_prefix_sets
2551                        .entry(hashed_address)
2552                        .or_default()
2553                        .insert(Nibbles::unpack(slot));
2554                }
2555            }
2556        }
2557        durations_recorder.record_relative(metrics::Action::InsertStorageHashing);
2558
2559        // account hashing stage
2560        {
2561            let lists = self.changed_accounts_with_range(range.clone())?;
2562            let accounts = self.basic_accounts(lists)?;
2563            let hashed_addresses = self.insert_account_for_hashing(accounts)?;
2564            for (hashed_address, account) in hashed_addresses {
2565                account_prefix_set.insert(Nibbles::unpack(hashed_address));
2566                if account.is_none() {
2567                    destroyed_accounts.insert(hashed_address);
2568                }
2569            }
2570        }
2571        durations_recorder.record_relative(metrics::Action::InsertAccountHashing);
2572
2573        // merkle tree
2574        {
2575            // This is the same as `StateRoot::incremental_root_with_updates`, only the prefix sets
2576            // are pre-loaded.
2577            let prefix_sets = TriePrefixSets {
2578                account_prefix_set: account_prefix_set.freeze(),
2579                storage_prefix_sets: storage_prefix_sets
2580                    .into_iter()
2581                    .map(|(k, v)| (k, v.freeze()))
2582                    .collect(),
2583                destroyed_accounts,
2584            };
2585            let (state_root, trie_updates) = StateRoot::from_tx(&self.tx)
2586                .with_prefix_sets(prefix_sets)
2587                .root_with_updates()
2588                .map_err(reth_db::DatabaseError::from)?;
2589            if state_root != expected_state_root {
2590                return Err(ProviderError::StateRootMismatch(Box::new(RootMismatch {
2591                    root: GotExpected { got: state_root, expected: expected_state_root },
2592                    block_number: *range.end(),
2593                    block_hash: end_block_hash,
2594                })))
2595            }
2596            self.write_trie_updates(&trie_updates)?;
2597        }
2598        durations_recorder.record_relative(metrics::Action::InsertMerkleTree);
2599
2600        debug!(target: "providers::db", ?range, actions = ?durations_recorder.actions, "Inserted hashes");
2601
2602        Ok(())
2603    }
2604}
2605
2606impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
2607    fn unwind_account_history_indices<'a>(
2608        &self,
2609        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2610    ) -> ProviderResult<usize> {
2611        let mut last_indices = changesets
2612            .into_iter()
2613            .map(|(index, account)| (account.address, *index))
2614            .collect::<Vec<_>>();
2615        last_indices.sort_by_key(|(a, _)| *a);
2616
2617        // Unwind the account history index.
2618        let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
2619        for &(address, rem_index) in &last_indices {
2620            let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
2621                &mut cursor,
2622                ShardedKey::last(address),
2623                rem_index,
2624                |sharded_key| sharded_key.key == address,
2625            )?;
2626
2627            // Check the last returned partial shard.
2628            // If it's not empty, the shard needs to be reinserted.
2629            if !partial_shard.is_empty() {
2630                cursor.insert(
2631                    ShardedKey::last(address),
2632                    BlockNumberList::new_pre_sorted(partial_shard),
2633                )?;
2634            }
2635        }
2636
2637        let changesets = last_indices.len();
2638        Ok(changesets)
2639    }
2640
2641    fn unwind_account_history_indices_range(
2642        &self,
2643        range: impl RangeBounds<BlockNumber>,
2644    ) -> ProviderResult<usize> {
2645        let changesets = self
2646            .tx
2647            .cursor_read::<tables::AccountChangeSets>()?
2648            .walk_range(range)?
2649            .collect::<Result<Vec<_>, _>>()?;
2650        self.unwind_account_history_indices(changesets.iter())
2651    }
2652
2653    fn insert_account_history_index(
2654        &self,
2655        account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
2656    ) -> ProviderResult<()> {
2657        self.append_history_index::<_, tables::AccountsHistory>(
2658            account_transitions,
2659            ShardedKey::new,
2660        )
2661    }
2662
2663    fn unwind_storage_history_indices(
2664        &self,
2665        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2666    ) -> ProviderResult<usize> {
2667        let mut storage_changesets = changesets
2668            .into_iter()
2669            .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
2670            .collect::<Vec<_>>();
2671        storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
2672
2673        let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
2674        for &(address, storage_key, rem_index) in &storage_changesets {
2675            let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
2676                &mut cursor,
2677                StorageShardedKey::last(address, storage_key),
2678                rem_index,
2679                |storage_sharded_key| {
2680                    storage_sharded_key.address == address &&
2681                        storage_sharded_key.sharded_key.key == storage_key
2682                },
2683            )?;
2684
2685            // Check the last returned partial shard.
2686            // If it's not empty, the shard needs to be reinserted.
2687            if !partial_shard.is_empty() {
2688                cursor.insert(
2689                    StorageShardedKey::last(address, storage_key),
2690                    BlockNumberList::new_pre_sorted(partial_shard),
2691                )?;
2692            }
2693        }
2694
2695        let changesets = storage_changesets.len();
2696        Ok(changesets)
2697    }
2698
2699    fn unwind_storage_history_indices_range(
2700        &self,
2701        range: impl RangeBounds<BlockNumberAddress>,
2702    ) -> ProviderResult<usize> {
2703        let changesets = self
2704            .tx
2705            .cursor_read::<tables::StorageChangeSets>()?
2706            .walk_range(range)?
2707            .collect::<Result<Vec<_>, _>>()?;
2708        self.unwind_storage_history_indices(changesets.into_iter())
2709    }
2710
2711    fn insert_storage_history_index(
2712        &self,
2713        storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
2714    ) -> ProviderResult<()> {
2715        self.append_history_index::<_, tables::StoragesHistory>(
2716            storage_transitions,
2717            |(address, storage_key), highest_block_number| {
2718                StorageShardedKey::new(address, storage_key, highest_block_number)
2719            },
2720        )
2721    }
2722
2723    fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
2724        // account history stage
2725        {
2726            let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
2727            self.insert_account_history_index(indices)?;
2728        }
2729
2730        // storage history stage
2731        {
2732            let indices = self.changed_storages_and_blocks_with_range(range)?;
2733            self.insert_storage_history_index(indices)?;
2734        }
2735
2736        Ok(())
2737    }
2738}
2739
2740impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockExecutionWriter
2741    for DatabaseProvider<TX, N>
2742{
2743    fn take_block_and_execution_above(
2744        &self,
2745        block: BlockNumber,
2746        remove_from: StorageLocation,
2747    ) -> ProviderResult<Chain<Self::Primitives>> {
2748        let range = block + 1..=self.last_block_number()?;
2749
2750        self.unwind_trie_state_range(range.clone())?;
2751
2752        // get execution res
2753        let execution_state = self.take_state_above(block, remove_from)?;
2754
2755        let blocks = self.sealed_block_with_senders_range(range)?;
2756
2757        // remove block bodies it is needed for both get block range and get block execution results
2758        // that is why it is deleted afterwards.
2759        self.remove_blocks_above(block, remove_from)?;
2760
2761        // Update pipeline progress
2762        self.update_pipeline_stages(block, true)?;
2763
2764        Ok(Chain::new(blocks, execution_state, None))
2765    }
2766
2767    fn remove_block_and_execution_above(
2768        &self,
2769        block: BlockNumber,
2770        remove_from: StorageLocation,
2771    ) -> ProviderResult<()> {
2772        let range = block + 1..=self.last_block_number()?;
2773
2774        self.unwind_trie_state_range(range)?;
2775
2776        // remove execution res
2777        self.remove_state_above(block, remove_from)?;
2778
2779        // remove block bodies it is needed for both get block range and get block execution results
2780        // that is why it is deleted afterwards.
2781        self.remove_blocks_above(block, remove_from)?;
2782
2783        // Update pipeline progress
2784        self.update_pipeline_stages(block, true)?;
2785
2786        Ok(())
2787    }
2788}
2789
2790impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWriter
2791    for DatabaseProvider<TX, N>
2792{
2793    type Block = BlockTy<N>;
2794    type Receipt = ReceiptTy<N>;
2795
2796    /// Inserts the block into the database, always modifying the following tables:
2797    /// * [`CanonicalHeaders`](tables::CanonicalHeaders)
2798    /// * [`Headers`](tables::Headers)
2799    /// * [`HeaderNumbers`](tables::HeaderNumbers)
2800    /// * [`HeaderTerminalDifficulties`](tables::HeaderTerminalDifficulties)
2801    /// * [`BlockBodyIndices`](tables::BlockBodyIndices)
2802    ///
2803    /// If there are transactions in the block, the following tables will be modified:
2804    /// * [`Transactions`](tables::Transactions)
2805    /// * [`TransactionBlocks`](tables::TransactionBlocks)
2806    ///
2807    /// If ommers are not empty, this will modify [`BlockOmmers`](tables::BlockOmmers).
2808    /// If withdrawals are not empty, this will modify
2809    /// [`BlockWithdrawals`](tables::BlockWithdrawals).
2810    ///
2811    /// If the provider has __not__ configured full sender pruning, this will modify
2812    /// [`TransactionSenders`](tables::TransactionSenders).
2813    ///
2814    /// If the provider has __not__ configured full transaction lookup pruning, this will modify
2815    /// [`TransactionHashNumbers`](tables::TransactionHashNumbers).
2816    fn insert_block(
2817        &self,
2818        block: SealedBlockWithSenders<Self::Block>,
2819        write_to: StorageLocation,
2820    ) -> ProviderResult<StoredBlockBodyIndices> {
2821        let block_number = block.number();
2822
2823        let mut durations_recorder = metrics::DurationsRecorder::default();
2824
2825        // total difficulty
2826        let ttd = if block_number == 0 {
2827            block.difficulty()
2828        } else {
2829            let parent_block_number = block_number - 1;
2830            let parent_ttd = self.header_td_by_number(parent_block_number)?.unwrap_or_default();
2831            durations_recorder.record_relative(metrics::Action::GetParentTD);
2832            parent_ttd + block.difficulty()
2833        };
2834
2835        if write_to.database() {
2836            self.tx.put::<tables::CanonicalHeaders>(block_number, block.hash())?;
2837            durations_recorder.record_relative(metrics::Action::InsertCanonicalHeaders);
2838
2839            // Put header with canonical hashes.
2840            self.tx
2841                .put::<tables::Headers<HeaderTy<N>>>(block_number, block.header.as_ref().clone())?;
2842            durations_recorder.record_relative(metrics::Action::InsertHeaders);
2843
2844            self.tx.put::<tables::HeaderTerminalDifficulties>(block_number, ttd.into())?;
2845            durations_recorder.record_relative(metrics::Action::InsertHeaderTerminalDifficulties);
2846        }
2847
2848        if write_to.static_files() {
2849            let mut writer =
2850                self.static_file_provider.get_writer(block_number, StaticFileSegment::Headers)?;
2851            writer.append_header(&block.header, ttd, &block.hash())?;
2852        }
2853
2854        self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
2855        durations_recorder.record_relative(metrics::Action::InsertHeaderNumbers);
2856
2857        let mut next_tx_num = self
2858            .tx
2859            .cursor_read::<tables::TransactionBlocks>()?
2860            .last()?
2861            .map(|(n, _)| n + 1)
2862            .unwrap_or_default();
2863        durations_recorder.record_relative(metrics::Action::GetNextTxNum);
2864        let first_tx_num = next_tx_num;
2865
2866        let tx_count = block.block.body.transactions().len() as u64;
2867
2868        // Ensures we have all the senders for the block's transactions.
2869        for (transaction, sender) in
2870            block.block.body.transactions().iter().zip(block.senders.iter())
2871        {
2872            let hash = transaction.tx_hash();
2873
2874            if self.prune_modes.sender_recovery.as_ref().is_none_or(|m| !m.is_full()) {
2875                self.tx.put::<tables::TransactionSenders>(next_tx_num, *sender)?;
2876            }
2877
2878            if self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full()) {
2879                self.tx.put::<tables::TransactionHashNumbers>(*hash, next_tx_num)?;
2880            }
2881            next_tx_num += 1;
2882        }
2883
2884        self.append_block_bodies(vec![(block_number, Some(block.block.body))], write_to)?;
2885
2886        debug!(
2887            target: "providers::db",
2888            ?block_number,
2889            actions = ?durations_recorder.actions,
2890            "Inserted block"
2891        );
2892
2893        Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
2894    }
2895
2896    fn append_block_bodies(
2897        &self,
2898        bodies: Vec<(BlockNumber, Option<BodyTy<N>>)>,
2899        write_to: StorageLocation,
2900    ) -> ProviderResult<()> {
2901        let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
2902
2903        // Initialize writer if we will be writing transactions to staticfiles
2904        let mut tx_static_writer = write_to
2905            .static_files()
2906            .then(|| {
2907                self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)
2908            })
2909            .transpose()?;
2910
2911        let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
2912        let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
2913
2914        // Initialize cursor if we will be writing transactions to database
2915        let mut tx_cursor = write_to
2916            .database()
2917            .then(|| self.tx.cursor_write::<tables::Transactions<TxTy<N>>>())
2918            .transpose()?;
2919
2920        // Get id for the next tx_num or zero if there are no transactions.
2921        let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
2922
2923        for (block_number, body) in &bodies {
2924            // Increment block on static file header.
2925            if let Some(writer) = tx_static_writer.as_mut() {
2926                writer.increment_block(*block_number)?;
2927            }
2928
2929            let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
2930            let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
2931
2932            let mut durations_recorder = metrics::DurationsRecorder::default();
2933
2934            // insert block meta
2935            block_indices_cursor.append(*block_number, block_indices)?;
2936
2937            durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
2938
2939            let Some(body) = body else { continue };
2940
2941            // write transaction block index
2942            if !body.transactions().is_empty() {
2943                tx_block_cursor.append(block_indices.last_tx_num(), *block_number)?;
2944                durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
2945            }
2946
2947            // write transactions
2948            for transaction in body.transactions() {
2949                if let Some(writer) = tx_static_writer.as_mut() {
2950                    writer.append_transaction(next_tx_num, transaction)?;
2951                }
2952                if let Some(cursor) = tx_cursor.as_mut() {
2953                    cursor.append(next_tx_num, transaction.clone())?;
2954                }
2955
2956                // Increment transaction id for each transaction.
2957                next_tx_num += 1;
2958            }
2959
2960            debug!(
2961                target: "providers::db",
2962                ?block_number,
2963                actions = ?durations_recorder.actions,
2964                "Inserted block body"
2965            );
2966        }
2967
2968        self.storage.writer().write_block_bodies(self, bodies, write_to)?;
2969
2970        Ok(())
2971    }
2972
2973    fn remove_blocks_above(
2974        &self,
2975        block: BlockNumber,
2976        remove_from: StorageLocation,
2977    ) -> ProviderResult<()> {
2978        let mut canonical_headers_cursor = self.tx.cursor_write::<tables::CanonicalHeaders>()?;
2979        let mut rev_headers = canonical_headers_cursor.walk_back(None)?;
2980
2981        while let Some(Ok((number, hash))) = rev_headers.next() {
2982            if number <= block {
2983                break
2984            }
2985            self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
2986            rev_headers.delete_current()?;
2987        }
2988        self.remove::<tables::Headers<HeaderTy<N>>>(block + 1..)?;
2989        self.remove::<tables::HeaderTerminalDifficulties>(block + 1..)?;
2990
2991        // First transaction to be removed
2992        let unwind_tx_from = self
2993            .tx
2994            .get::<tables::BlockBodyIndices>(block)?
2995            .map(|b| b.next_tx_num())
2996            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
2997
2998        // Last transaction to be removed
2999        let unwind_tx_to = self
3000            .tx
3001            .cursor_read::<tables::BlockBodyIndices>()?
3002            .last()?
3003            // shouldn't happen because this was OK above
3004            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
3005            .1
3006            .last_tx_num();
3007
3008        if unwind_tx_from <= unwind_tx_to {
3009            for (hash, _) in self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))? {
3010                self.tx.delete::<tables::TransactionHashNumbers>(hash, None)?;
3011            }
3012        }
3013
3014        self.remove::<tables::TransactionSenders>(unwind_tx_from..)?;
3015
3016        self.remove_bodies_above(block, remove_from)?;
3017
3018        Ok(())
3019    }
3020
3021    fn remove_bodies_above(
3022        &self,
3023        block: BlockNumber,
3024        remove_from: StorageLocation,
3025    ) -> ProviderResult<()> {
3026        self.storage.writer().remove_block_bodies_above(self, block, remove_from)?;
3027
3028        // First transaction to be removed
3029        let unwind_tx_from = self
3030            .tx
3031            .get::<tables::BlockBodyIndices>(block)?
3032            .map(|b| b.next_tx_num())
3033            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3034
3035        self.remove::<tables::BlockBodyIndices>(block + 1..)?;
3036        self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
3037
3038        if remove_from.database() {
3039            self.remove::<tables::Transactions<TxTy<N>>>(unwind_tx_from..)?;
3040        }
3041
3042        if remove_from.static_files() {
3043            let static_file_tx_num = self
3044                .static_file_provider
3045                .get_highest_static_file_tx(StaticFileSegment::Transactions);
3046
3047            let to_delete = static_file_tx_num
3048                .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
3049                .unwrap_or_default();
3050
3051            self.static_file_provider
3052                .latest_writer(StaticFileSegment::Transactions)?
3053                .prune_transactions(to_delete, block)?;
3054        }
3055
3056        Ok(())
3057    }
3058
3059    /// TODO(joshie): this fn should be moved to `UnifiedStorageWriter` eventually
3060    fn append_blocks_with_state(
3061        &self,
3062        blocks: Vec<SealedBlockWithSenders<Self::Block>>,
3063        execution_outcome: ExecutionOutcome<Self::Receipt>,
3064        hashed_state: HashedPostStateSorted,
3065        trie_updates: TrieUpdates,
3066    ) -> ProviderResult<()> {
3067        if blocks.is_empty() {
3068            debug!(target: "providers::db", "Attempted to append empty block range");
3069            return Ok(())
3070        }
3071
3072        let first_number = blocks.first().unwrap().number();
3073
3074        let last = blocks.last().unwrap();
3075        let last_block_number = last.number();
3076
3077        let mut durations_recorder = metrics::DurationsRecorder::default();
3078
3079        // Insert the blocks
3080        for block in blocks {
3081            self.insert_block(block, StorageLocation::Database)?;
3082            durations_recorder.record_relative(metrics::Action::InsertBlock);
3083        }
3084
3085        self.write_state(execution_outcome, OriginalValuesKnown::No, StorageLocation::Database)?;
3086        durations_recorder.record_relative(metrics::Action::InsertState);
3087
3088        // insert hashes and intermediate merkle nodes
3089        self.write_hashed_state(&hashed_state)?;
3090        self.write_trie_updates(&trie_updates)?;
3091        durations_recorder.record_relative(metrics::Action::InsertHashes);
3092
3093        self.update_history_indices(first_number..=last_block_number)?;
3094        durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3095
3096        // Update pipeline progress
3097        self.update_pipeline_stages(last_block_number, false)?;
3098        durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3099
3100        debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3101
3102        Ok(())
3103    }
3104}
3105
3106impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3107    fn get_prune_checkpoint(
3108        &self,
3109        segment: PruneSegment,
3110    ) -> ProviderResult<Option<PruneCheckpoint>> {
3111        Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3112    }
3113
3114    fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3115        Ok(self
3116            .tx
3117            .cursor_read::<tables::PruneCheckpoints>()?
3118            .walk(None)?
3119            .collect::<Result<_, _>>()?)
3120    }
3121}
3122
3123impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3124    fn save_prune_checkpoint(
3125        &self,
3126        segment: PruneSegment,
3127        checkpoint: PruneCheckpoint,
3128    ) -> ProviderResult<()> {
3129        Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3130    }
3131}
3132
3133impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3134    fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3135        let db_entries = self.tx.entries::<T>()?;
3136        let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3137            Ok(entries) => entries,
3138            Err(ProviderError::UnsupportedProvider) => 0,
3139            Err(err) => return Err(err),
3140        };
3141
3142        Ok(db_entries + static_file_entries)
3143    }
3144}
3145
3146impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3147    fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3148        let mut finalized_blocks = self
3149            .tx
3150            .cursor_read::<tables::ChainState>()?
3151            .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3152            .take(1)
3153            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3154
3155        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3156        Ok(last_finalized_block_number)
3157    }
3158
3159    fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3160        let mut finalized_blocks = self
3161            .tx
3162            .cursor_read::<tables::ChainState>()?
3163            .walk(Some(tables::ChainStateKey::LastSafeBlockBlock))?
3164            .take(1)
3165            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3166
3167        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3168        Ok(last_finalized_block_number)
3169    }
3170}
3171
3172impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3173    fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3174        Ok(self
3175            .tx
3176            .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3177    }
3178
3179    fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3180        Ok(self
3181            .tx
3182            .put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlockBlock, block_number)?)
3183    }
3184}
3185
3186impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3187    type Tx = TX;
3188
3189    fn tx_ref(&self) -> &Self::Tx {
3190        &self.tx
3191    }
3192
3193    fn tx_mut(&mut self) -> &mut Self::Tx {
3194        &mut self.tx
3195    }
3196
3197    fn into_tx(self) -> Self::Tx {
3198        self.tx
3199    }
3200
3201    fn prune_modes_ref(&self) -> &PruneModes {
3202        self.prune_modes_ref()
3203    }
3204}