reth_provider/providers/database/
provider.rs

1use crate::{
2    bundle_state::StorageRevertsIter,
3    providers::{
4        database::{chain::ChainStorage, metrics},
5        static_file::StaticFileWriter,
6        NodeTypesForProvider, StaticFileProvider,
7    },
8    to_range,
9    traits::{
10        AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
11    },
12    AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
13    BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
14    DBProvider, HashingWriter, HeaderProvider, HeaderSyncGapProvider, HistoricalStateProvider,
15    HistoricalStateProviderRef, HistoryWriter, LatestStateProvider, LatestStateProviderRef,
16    OriginalValuesKnown, ProviderError, PruneCheckpointReader, PruneCheckpointWriter, RevertsInit,
17    StageCheckpointReader, StateCommitmentProvider, StateProviderBox, StateWriter,
18    StaticFileProviderFactory, StatsReader, StorageLocation, StorageReader, StorageTrieWriter,
19    TransactionVariant, TransactionsProvider, TransactionsProviderExt, TrieWriter,
20    WithdrawalsProvider,
21};
22use alloy_consensus::{
23    transaction::{SignerRecoverable, TransactionMeta},
24    BlockHeader, Header, TxReceipt,
25};
26use alloy_eips::{eip2718::Encodable2718, eip4895::Withdrawals, BlockHashOrNumber};
27use alloy_primitives::{
28    keccak256,
29    map::{hash_map, B256Map, HashMap, HashSet},
30    Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256,
31};
32use itertools::Itertools;
33use rayon::slice::ParallelSliceMut;
34use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, EthereumHardforks};
35use reth_db_api::{
36    cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
37    database::Database,
38    models::{
39        sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
40        ShardedKey, StoredBlockBodyIndices,
41    },
42    table::Table,
43    tables,
44    transaction::{DbTx, DbTxMut},
45    BlockNumberList, DatabaseError, PlainAccountState, PlainStorageState,
46};
47use reth_execution_types::{Chain, ExecutionOutcome};
48use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
49use reth_primitives_traits::{
50    Account, Block as _, BlockBody as _, Bytecode, GotExpected, NodePrimitives, RecoveredBlock,
51    SealedBlock, SealedHeader, SignedTransaction, StorageEntry,
52};
53use reth_prune_types::{
54    PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_PRUNING_DISTANCE,
55};
56use reth_stages_types::{StageCheckpoint, StageId};
57use reth_static_file_types::StaticFileSegment;
58use reth_storage_api::{
59    BlockBodyIndicesProvider, BlockBodyReader, NodePrimitivesProvider, OmmersProvider,
60    StateProvider, StorageChangeSetReader, TryIntoHistoricalStateProvider,
61};
62use reth_storage_errors::provider::{ProviderResult, RootMismatch};
63use reth_trie::{
64    prefix_set::{PrefixSet, PrefixSetMut, TriePrefixSets},
65    updates::{StorageTrieUpdates, TrieUpdates},
66    HashedPostStateSorted, Nibbles, StateRoot, StoredNibbles,
67};
68use reth_trie_db::{DatabaseStateRoot, DatabaseStorageTrieCursor};
69use revm_database::states::{
70    PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
71};
72use std::{
73    cmp::Ordering,
74    collections::{BTreeMap, BTreeSet},
75    fmt::Debug,
76    ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive},
77    sync::{mpsc, Arc},
78};
79use tracing::{debug, trace};
80
81/// A [`DatabaseProvider`] that holds a read-only database transaction.
82pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
83
84/// A [`DatabaseProvider`] that holds a read-write database transaction.
85///
86/// Ideally this would be an alias type. However, there's some weird compiler error (<https://github.com/rust-lang/rust/issues/102211>), that forces us to wrap this in a struct instead.
87/// Once that issue is solved, we can probably revert back to being an alias type.
88#[derive(Debug)]
89pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
90    pub DatabaseProvider<<DB as Database>::TXMut, N>,
91);
92
93impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
94    type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
95
96    fn deref(&self) -> &Self::Target {
97        &self.0
98    }
99}
100
101impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
102    fn deref_mut(&mut self) -> &mut Self::Target {
103        &mut self.0
104    }
105}
106
107impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
108    for DatabaseProviderRW<DB, N>
109{
110    fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
111        &self.0
112    }
113}
114
115impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
116    /// Commit database transaction and static file if it exists.
117    pub fn commit(self) -> ProviderResult<bool> {
118        self.0.commit()
119    }
120
121    /// Consume `DbTx` or `DbTxMut`.
122    pub fn into_tx(self) -> <DB as Database>::TXMut {
123        self.0.into_tx()
124    }
125}
126
127impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
128    for DatabaseProvider<<DB as Database>::TXMut, N>
129{
130    fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
131        provider.0
132    }
133}
134
135/// A provider struct that fetches data from the database.
136/// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`]
137#[derive(Debug)]
138pub struct DatabaseProvider<TX, N: NodeTypes> {
139    /// Database transaction.
140    tx: TX,
141    /// Chain spec
142    chain_spec: Arc<N::ChainSpec>,
143    /// Static File provider
144    static_file_provider: StaticFileProvider<N::Primitives>,
145    /// Pruning configuration
146    prune_modes: PruneModes,
147    /// Node storage handler.
148    storage: Arc<N::Storage>,
149}
150
151impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
152    /// Returns reference to prune modes.
153    pub const fn prune_modes_ref(&self) -> &PruneModes {
154        &self.prune_modes
155    }
156}
157
158impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
159    /// State provider for latest state
160    pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
161        trace!(target: "providers::db", "Returning latest state provider");
162        Box::new(LatestStateProviderRef::new(self))
163    }
164
165    /// Storage provider for state at that given block hash
166    pub fn history_by_block_hash<'a>(
167        &'a self,
168        block_hash: BlockHash,
169    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
170        let mut block_number =
171            self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
172        if block_number == self.best_block_number().unwrap_or_default() &&
173            block_number == self.last_block_number().unwrap_or_default()
174        {
175            return Ok(Box::new(LatestStateProviderRef::new(self)))
176        }
177
178        // +1 as the changeset that we want is the one that was applied after this block.
179        block_number += 1;
180
181        let account_history_prune_checkpoint =
182            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
183        let storage_history_prune_checkpoint =
184            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
185
186        let mut state_provider = HistoricalStateProviderRef::new(self, block_number);
187
188        // If we pruned account or storage history, we can't return state on every historical block.
189        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
190        if let Some(prune_checkpoint_block_number) =
191            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
192        {
193            state_provider = state_provider.with_lowest_available_account_history_block_number(
194                prune_checkpoint_block_number + 1,
195            );
196        }
197        if let Some(prune_checkpoint_block_number) =
198            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
199        {
200            state_provider = state_provider.with_lowest_available_storage_history_block_number(
201                prune_checkpoint_block_number + 1,
202            );
203        }
204
205        Ok(Box::new(state_provider))
206    }
207
208    #[cfg(feature = "test-utils")]
209    /// Sets the prune modes for provider.
210    pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
211        self.prune_modes = prune_modes;
212    }
213}
214
215impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
216    type Primitives = N::Primitives;
217}
218
219impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
220    /// Returns a static file provider
221    fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
222        self.static_file_provider.clone()
223    }
224}
225
226impl<TX: Debug + Send + Sync, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
227    for DatabaseProvider<TX, N>
228{
229    type ChainSpec = N::ChainSpec;
230
231    fn chain_spec(&self) -> Arc<Self::ChainSpec> {
232        self.chain_spec.clone()
233    }
234}
235
236impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
237    /// Creates a provider with an inner read-write transaction.
238    pub const fn new_rw(
239        tx: TX,
240        chain_spec: Arc<N::ChainSpec>,
241        static_file_provider: StaticFileProvider<N::Primitives>,
242        prune_modes: PruneModes,
243        storage: Arc<N::Storage>,
244    ) -> Self {
245        Self { tx, chain_spec, static_file_provider, prune_modes, storage }
246    }
247}
248
249impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
250    fn as_ref(&self) -> &Self {
251        self
252    }
253}
254
255impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
256    /// Unwinds trie state for the given range.
257    ///
258    /// This includes calculating the resulted state root and comparing it with the parent block
259    /// state root.
260    pub fn unwind_trie_state_range(
261        &self,
262        range: RangeInclusive<BlockNumber>,
263    ) -> ProviderResult<()> {
264        let changed_accounts = self
265            .tx
266            .cursor_read::<tables::AccountChangeSets>()?
267            .walk_range(range.clone())?
268            .collect::<Result<Vec<_>, _>>()?;
269
270        // Unwind account hashes. Add changed accounts to account prefix set.
271        let hashed_addresses = self.unwind_account_hashing(changed_accounts.iter())?;
272        let mut account_prefix_set = PrefixSetMut::with_capacity(hashed_addresses.len());
273        let mut destroyed_accounts = HashSet::default();
274        for (hashed_address, account) in hashed_addresses {
275            account_prefix_set.insert(Nibbles::unpack(hashed_address));
276            if account.is_none() {
277                destroyed_accounts.insert(hashed_address);
278            }
279        }
280
281        // Unwind account history indices.
282        self.unwind_account_history_indices(changed_accounts.iter())?;
283        let storage_range = BlockNumberAddress::range(range.clone());
284
285        let changed_storages = self
286            .tx
287            .cursor_read::<tables::StorageChangeSets>()?
288            .walk_range(storage_range)?
289            .collect::<Result<Vec<_>, _>>()?;
290
291        // Unwind storage hashes. Add changed account and storage keys to corresponding prefix
292        // sets.
293        let mut storage_prefix_sets = B256Map::<PrefixSet>::default();
294        let storage_entries = self.unwind_storage_hashing(changed_storages.iter().copied())?;
295        for (hashed_address, hashed_slots) in storage_entries {
296            account_prefix_set.insert(Nibbles::unpack(hashed_address));
297            let mut storage_prefix_set = PrefixSetMut::with_capacity(hashed_slots.len());
298            for slot in hashed_slots {
299                storage_prefix_set.insert(Nibbles::unpack(slot));
300            }
301            storage_prefix_sets.insert(hashed_address, storage_prefix_set.freeze());
302        }
303
304        // Unwind storage history indices.
305        self.unwind_storage_history_indices(changed_storages.iter().copied())?;
306
307        // Calculate the reverted merkle root.
308        // This is the same as `StateRoot::incremental_root_with_updates`, only the prefix sets
309        // are pre-loaded.
310        let prefix_sets = TriePrefixSets {
311            account_prefix_set: account_prefix_set.freeze(),
312            storage_prefix_sets,
313            destroyed_accounts,
314        };
315        let (new_state_root, trie_updates) = StateRoot::from_tx(&self.tx)
316            .with_prefix_sets(prefix_sets)
317            .root_with_updates()
318            .map_err(reth_db_api::DatabaseError::from)?;
319
320        let parent_number = range.start().saturating_sub(1);
321        let parent_state_root = self
322            .header_by_number(parent_number)?
323            .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?
324            .state_root();
325
326        // state root should be always correct as we are reverting state.
327        // but for sake of double verification we will check it again.
328        if new_state_root != parent_state_root {
329            let parent_hash = self
330                .block_hash(parent_number)?
331                .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?;
332            return Err(ProviderError::UnwindStateRootMismatch(Box::new(RootMismatch {
333                root: GotExpected { got: new_state_root, expected: parent_state_root },
334                block_number: parent_number,
335                block_hash: parent_hash,
336            })))
337        }
338        self.write_trie_updates(&trie_updates)?;
339
340        Ok(())
341    }
342
343    /// Removes receipts from all transactions starting with provided number (inclusive).
344    fn remove_receipts_from(
345        &self,
346        from_tx: TxNumber,
347        last_block: BlockNumber,
348        remove_from: StorageLocation,
349    ) -> ProviderResult<()> {
350        if remove_from.database() {
351            // iterate over block body and remove receipts
352            self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
353        }
354
355        if remove_from.static_files() && !self.prune_modes.has_receipts_pruning() {
356            let static_file_receipt_num =
357                self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
358
359            let to_delete = static_file_receipt_num
360                .map(|static_num| (static_num + 1).saturating_sub(from_tx))
361                .unwrap_or_default();
362
363            self.static_file_provider
364                .latest_writer(StaticFileSegment::Receipts)?
365                .prune_receipts(to_delete, last_block)?;
366        }
367
368        Ok(())
369    }
370}
371
372impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
373    fn try_into_history_at_block(
374        self,
375        mut block_number: BlockNumber,
376    ) -> ProviderResult<StateProviderBox> {
377        // if the block number is the same as the currently best block number on disk we can use the
378        // latest state provider here
379        if block_number == self.best_block_number().unwrap_or_default() {
380            return Ok(Box::new(LatestStateProvider::new(self)))
381        }
382
383        // +1 as the changeset that we want is the one that was applied after this block.
384        block_number += 1;
385
386        let account_history_prune_checkpoint =
387            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
388        let storage_history_prune_checkpoint =
389            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
390
391        let mut state_provider = HistoricalStateProvider::new(self, block_number);
392
393        // If we pruned account or storage history, we can't return state on every historical block.
394        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
395        if let Some(prune_checkpoint_block_number) =
396            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
397        {
398            state_provider = state_provider.with_lowest_available_account_history_block_number(
399                prune_checkpoint_block_number + 1,
400            );
401        }
402        if let Some(prune_checkpoint_block_number) =
403            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
404        {
405            state_provider = state_provider.with_lowest_available_storage_history_block_number(
406                prune_checkpoint_block_number + 1,
407            );
408        }
409
410        Ok(Box::new(state_provider))
411    }
412}
413
414impl<TX: DbTx + 'static, N: NodeTypes> StateCommitmentProvider for DatabaseProvider<TX, N> {
415    type StateCommitment = N::StateCommitment;
416}
417
418impl<
419        Tx: DbTx + DbTxMut + 'static,
420        N: NodeTypesForProvider<Primitives: NodePrimitives<BlockHeader = Header>>,
421    > DatabaseProvider<Tx, N>
422{
423    // TODO: uncomment below, once `reth debug_cmd` has been feature gated with dev.
424    // #[cfg(any(test, feature = "test-utils"))]
425    /// Inserts an historical block. **Used for setting up test environments**
426    pub fn insert_historical_block(
427        &self,
428        block: RecoveredBlock<<Self as BlockWriter>::Block>,
429    ) -> ProviderResult<StoredBlockBodyIndices> {
430        let ttd = if block.number() == 0 {
431            block.header().difficulty()
432        } else {
433            let parent_block_number = block.number() - 1;
434            let parent_ttd = self.header_td_by_number(parent_block_number)?.unwrap_or_default();
435            parent_ttd + block.header().difficulty()
436        };
437
438        let mut writer = self.static_file_provider.latest_writer(StaticFileSegment::Headers)?;
439
440        // Backfill: some tests start at a forward block number, but static files require no gaps.
441        let segment_header = writer.user_header();
442        if segment_header.block_end().is_none() && segment_header.expected_block_start() == 0 {
443            for block_number in 0..block.number() {
444                let mut prev = block.clone_header();
445                prev.number = block_number;
446                writer.append_header(&prev, U256::ZERO, &B256::ZERO)?;
447            }
448        }
449
450        writer.append_header(block.header(), ttd, &block.hash())?;
451
452        self.insert_block(block, StorageLocation::Database)
453    }
454}
455
456/// For a given key, unwind all history shards that are below the given block number.
457///
458/// S - Sharded key subtype.
459/// T - Table to walk over.
460/// C - Cursor implementation.
461///
462/// This function walks the entries from the given start key and deletes all shards that belong to
463/// the key and are below the given block number.
464///
465/// The boundary shard (the shard is split by the block number) is removed from the database. Any
466/// indices that are above the block number are filtered out. The boundary shard is returned for
467/// reinsertion (if it's not empty).
468fn unwind_history_shards<S, T, C>(
469    cursor: &mut C,
470    start_key: T::Key,
471    block_number: BlockNumber,
472    mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
473) -> ProviderResult<Vec<u64>>
474where
475    T: Table<Value = BlockNumberList>,
476    T::Key: AsRef<ShardedKey<S>>,
477    C: DbCursorRO<T> + DbCursorRW<T>,
478{
479    let mut item = cursor.seek_exact(start_key)?;
480    while let Some((sharded_key, list)) = item {
481        // If the shard does not belong to the key, break.
482        if !shard_belongs_to_key(&sharded_key) {
483            break
484        }
485        cursor.delete_current()?;
486
487        // Check the first item.
488        // If it is greater or eq to the block number, delete it.
489        let first = list.iter().next().expect("List can't be empty");
490        if first >= block_number {
491            item = cursor.prev()?;
492            continue
493        } else if block_number <= sharded_key.as_ref().highest_block_number {
494            // Filter out all elements greater than block number.
495            return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
496        }
497        return Ok(list.iter().collect::<Vec<_>>())
498    }
499
500    Ok(Vec::new())
501}
502
503impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
504    /// Creates a provider with an inner read-only transaction.
505    pub const fn new(
506        tx: TX,
507        chain_spec: Arc<N::ChainSpec>,
508        static_file_provider: StaticFileProvider<N::Primitives>,
509        prune_modes: PruneModes,
510        storage: Arc<N::Storage>,
511    ) -> Self {
512        Self { tx, chain_spec, static_file_provider, prune_modes, storage }
513    }
514
515    /// Consume `DbTx` or `DbTxMut`.
516    pub fn into_tx(self) -> TX {
517        self.tx
518    }
519
520    /// Pass `DbTx` or `DbTxMut` mutable reference.
521    pub const fn tx_mut(&mut self) -> &mut TX {
522        &mut self.tx
523    }
524
525    /// Pass `DbTx` or `DbTxMut` immutable reference.
526    pub const fn tx_ref(&self) -> &TX {
527        &self.tx
528    }
529
530    /// Returns a reference to the chain specification.
531    pub fn chain_spec(&self) -> &N::ChainSpec {
532        &self.chain_spec
533    }
534}
535
536impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
537    fn transactions_by_tx_range_with_cursor<C>(
538        &self,
539        range: impl RangeBounds<TxNumber>,
540        cursor: &mut C,
541    ) -> ProviderResult<Vec<TxTy<N>>>
542    where
543        C: DbCursorRO<tables::Transactions<TxTy<N>>>,
544    {
545        self.static_file_provider.get_range_with_static_file_or_database(
546            StaticFileSegment::Transactions,
547            to_range(range),
548            |static_file, range, _| static_file.transactions_by_tx_range(range),
549            |range, _| self.cursor_collect(cursor, range),
550            |_| true,
551        )
552    }
553
554    fn recovered_block<H, HF, B, BF>(
555        &self,
556        id: BlockHashOrNumber,
557        _transaction_kind: TransactionVariant,
558        header_by_number: HF,
559        construct_block: BF,
560    ) -> ProviderResult<Option<B>>
561    where
562        H: AsRef<HeaderTy<N>>,
563        HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
564        BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
565    {
566        let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
567        let Some(header) = header_by_number(block_number)? else { return Ok(None) };
568
569        // Get the block body
570        //
571        // If the body indices are not found, this means that the transactions either do not exist
572        // in the database yet, or they do exit but are not indexed. If they exist but are not
573        // indexed, we don't have enough information to return the block anyways, so we return
574        // `None`.
575        let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
576
577        let tx_range = body.tx_num_range();
578
579        let (transactions, senders) = if tx_range.is_empty() {
580            (vec![], vec![])
581        } else {
582            (self.transactions_by_tx_range(tx_range.clone())?, self.senders_by_tx_range(tx_range)?)
583        };
584
585        let body = self
586            .storage
587            .reader()
588            .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
589            .pop()
590            .ok_or(ProviderError::InvalidStorageOutput)?;
591
592        construct_block(header, body, senders)
593    }
594
595    /// Returns a range of blocks from the database.
596    ///
597    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
598    /// construct blocks from the following inputs:
599    ///     – Header
600    ///     - Range of transaction numbers
601    ///     – Ommers
602    ///     – Withdrawals
603    ///     – Senders
604    fn block_range<F, H, HF, R>(
605        &self,
606        range: RangeInclusive<BlockNumber>,
607        headers_range: HF,
608        mut assemble_block: F,
609    ) -> ProviderResult<Vec<R>>
610    where
611        H: AsRef<HeaderTy<N>>,
612        HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
613        F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
614    {
615        if range.is_empty() {
616            return Ok(Vec::new())
617        }
618
619        let len = range.end().saturating_sub(*range.start()) as usize;
620        let mut blocks = Vec::with_capacity(len);
621
622        let headers = headers_range(range.clone())?;
623        let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<TxTy<N>>>()?;
624
625        // If the body indices are not found, this means that the transactions either do
626        // not exist in the database yet, or they do exit but are
627        // not indexed. If they exist but are not indexed, we don't
628        // have enough information to return the block anyways, so
629        // we skip the block.
630        let present_headers = self
631            .block_body_indices_range(range)?
632            .into_iter()
633            .map(|b| b.tx_num_range())
634            .zip(headers)
635            .collect::<Vec<_>>();
636
637        let mut inputs = Vec::new();
638        for (tx_range, header) in &present_headers {
639            let transactions = if tx_range.is_empty() {
640                Vec::new()
641            } else {
642                self.transactions_by_tx_range_with_cursor(tx_range.clone(), &mut tx_cursor)?
643            };
644
645            inputs.push((header.as_ref(), transactions));
646        }
647
648        let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
649
650        for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
651            blocks.push(assemble_block(header, body, tx_range)?);
652        }
653
654        Ok(blocks)
655    }
656
657    /// Returns a range of blocks from the database, along with the senders of each
658    /// transaction in the blocks.
659    ///
660    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
661    /// construct blocks from the following inputs:
662    ///     – Header
663    ///     - Transactions
664    ///     – Ommers
665    ///     – Withdrawals
666    ///     – Senders
667    fn block_with_senders_range<H, HF, B, BF>(
668        &self,
669        range: RangeInclusive<BlockNumber>,
670        headers_range: HF,
671        assemble_block: BF,
672    ) -> ProviderResult<Vec<B>>
673    where
674        H: AsRef<HeaderTy<N>>,
675        HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
676        BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
677    {
678        let mut senders_cursor = self.tx.cursor_read::<tables::TransactionSenders>()?;
679
680        self.block_range(range, headers_range, |header, body, tx_range| {
681            let senders = if tx_range.is_empty() {
682                Vec::new()
683            } else {
684                // fetch senders from the senders table
685                let known_senders =
686                    senders_cursor
687                        .walk_range(tx_range.clone())?
688                        .collect::<Result<HashMap<_, _>, _>>()?;
689
690                let mut senders = Vec::with_capacity(body.transactions().len());
691                for (tx_num, tx) in tx_range.zip(body.transactions()) {
692                    match known_senders.get(&tx_num) {
693                        None => {
694                            // recover the sender from the transaction if not found
695                            let sender = tx.recover_signer_unchecked()?;
696                            senders.push(sender);
697                        }
698                        Some(sender) => senders.push(*sender),
699                    }
700                }
701
702                senders
703            };
704
705            assemble_block(header, body, senders)
706        })
707    }
708
709    /// Populate a [`BundleStateInit`] and [`RevertsInit`] using cursors over the
710    /// [`PlainAccountState`] and [`PlainStorageState`] tables, based on the given storage and
711    /// account changesets.
712    fn populate_bundle_state<A, S>(
713        &self,
714        account_changeset: Vec<(u64, AccountBeforeTx)>,
715        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
716        plain_accounts_cursor: &mut A,
717        plain_storage_cursor: &mut S,
718    ) -> ProviderResult<(BundleStateInit, RevertsInit)>
719    where
720        A: DbCursorRO<PlainAccountState>,
721        S: DbDupCursorRO<PlainStorageState>,
722    {
723        // iterate previous value and get plain state value to create changeset
724        // Double option around Account represent if Account state is know (first option) and
725        // account is removed (Second Option)
726        let mut state: BundleStateInit = HashMap::default();
727
728        // This is not working for blocks that are not at tip. as plain state is not the last
729        // state of end range. We should rename the functions or add support to access
730        // History state. Accessing history state can be tricky but we are not gaining
731        // anything.
732
733        let mut reverts: RevertsInit = HashMap::default();
734
735        // add account changeset changes
736        for (block_number, account_before) in account_changeset.into_iter().rev() {
737            let AccountBeforeTx { info: old_info, address } = account_before;
738            match state.entry(address) {
739                hash_map::Entry::Vacant(entry) => {
740                    let new_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
741                    entry.insert((old_info, new_info, HashMap::default()));
742                }
743                hash_map::Entry::Occupied(mut entry) => {
744                    // overwrite old account state.
745                    entry.get_mut().0 = old_info;
746                }
747            }
748            // insert old info into reverts.
749            reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
750        }
751
752        // add storage changeset changes
753        for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
754            let BlockNumberAddress((block_number, address)) = block_and_address;
755            // get account state or insert from plain state.
756            let account_state = match state.entry(address) {
757                hash_map::Entry::Vacant(entry) => {
758                    let present_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
759                    entry.insert((present_info, present_info, HashMap::default()))
760                }
761                hash_map::Entry::Occupied(entry) => entry.into_mut(),
762            };
763
764            // match storage.
765            match account_state.2.entry(old_storage.key) {
766                hash_map::Entry::Vacant(entry) => {
767                    let new_storage = plain_storage_cursor
768                        .seek_by_key_subkey(address, old_storage.key)?
769                        .filter(|storage| storage.key == old_storage.key)
770                        .unwrap_or_default();
771                    entry.insert((old_storage.value, new_storage.value));
772                }
773                hash_map::Entry::Occupied(mut entry) => {
774                    entry.get_mut().0 = old_storage.value;
775                }
776            };
777
778            reverts
779                .entry(block_number)
780                .or_default()
781                .entry(address)
782                .or_default()
783                .1
784                .push(old_storage);
785        }
786
787        Ok((state, reverts))
788    }
789}
790
791impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
792    /// Commit database transaction.
793    pub fn commit(self) -> ProviderResult<bool> {
794        Ok(self.tx.commit()?)
795    }
796
797    /// Load shard and remove it. If list is empty, last shard was full or
798    /// there are no shards at all.
799    fn take_shard<T>(
800        &self,
801        cursor: &mut <TX as DbTxMut>::CursorMut<T>,
802        key: T::Key,
803    ) -> ProviderResult<Vec<u64>>
804    where
805        T: Table<Value = BlockNumberList>,
806    {
807        if let Some((_, list)) = cursor.seek_exact(key)? {
808            // delete old shard so new one can be inserted.
809            cursor.delete_current()?;
810            let list = list.iter().collect::<Vec<_>>();
811            return Ok(list)
812        }
813        Ok(Vec::new())
814    }
815
816    /// Insert history index to the database.
817    ///
818    /// For each updated partial key, this function removes the last shard from
819    /// the database (if any), appends the new indices to it, chunks the resulting integer list and
820    /// inserts the new shards back into the database.
821    ///
822    /// This function is used by history indexing stages.
823    fn append_history_index<P, T>(
824        &self,
825        index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
826        mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
827    ) -> ProviderResult<()>
828    where
829        P: Copy,
830        T: Table<Value = BlockNumberList>,
831    {
832        let mut cursor = self.tx.cursor_write::<T>()?;
833        for (partial_key, indices) in index_updates {
834            let mut last_shard =
835                self.take_shard::<T>(&mut cursor, sharded_key_factory(partial_key, u64::MAX))?;
836            last_shard.extend(indices);
837            // Chunk indices and insert them in shards of N size.
838            let mut chunks = last_shard.chunks(sharded_key::NUM_OF_INDICES_IN_SHARD).peekable();
839            while let Some(list) = chunks.next() {
840                let highest_block_number = if chunks.peek().is_some() {
841                    *list.last().expect("`chunks` does not return empty list")
842                } else {
843                    // Insert last list with `u64::MAX`.
844                    u64::MAX
845                };
846                cursor.insert(
847                    sharded_key_factory(partial_key, highest_block_number),
848                    &BlockNumberList::new_pre_sorted(list.iter().copied()),
849                )?;
850            }
851        }
852        Ok(())
853    }
854}
855
856impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
857    fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
858        Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
859    }
860}
861
862impl<TX: DbTx, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
863    fn changed_accounts_with_range(
864        &self,
865        range: impl RangeBounds<BlockNumber>,
866    ) -> ProviderResult<BTreeSet<Address>> {
867        self.tx
868            .cursor_read::<tables::AccountChangeSets>()?
869            .walk_range(range)?
870            .map(|entry| {
871                entry.map(|(_, account_before)| account_before.address).map_err(Into::into)
872            })
873            .collect()
874    }
875
876    fn basic_accounts(
877        &self,
878        iter: impl IntoIterator<Item = Address>,
879    ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
880        let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
881        Ok(iter
882            .into_iter()
883            .map(|address| plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v))))
884            .collect::<Result<Vec<_>, _>>()?)
885    }
886
887    fn changed_accounts_and_blocks_with_range(
888        &self,
889        range: RangeInclusive<BlockNumber>,
890    ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
891        let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
892
893        let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
894            BTreeMap::new(),
895            |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
896                let (index, account) = entry?;
897                accounts.entry(account.address).or_default().push(index);
898                Ok(accounts)
899            },
900        )?;
901
902        Ok(account_transitions)
903    }
904}
905
906impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
907    fn storage_changeset(
908        &self,
909        block_number: BlockNumber,
910    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
911        let range = block_number..=block_number;
912        let storage_range = BlockNumberAddress::range(range);
913        self.tx
914            .cursor_dup_read::<tables::StorageChangeSets>()?
915            .walk_range(storage_range)?
916            .map(|result| -> ProviderResult<_> { Ok(result?) })
917            .collect()
918    }
919}
920
921impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
922    fn account_block_changeset(
923        &self,
924        block_number: BlockNumber,
925    ) -> ProviderResult<Vec<AccountBeforeTx>> {
926        let range = block_number..=block_number;
927        self.tx
928            .cursor_read::<tables::AccountChangeSets>()?
929            .walk_range(range)?
930            .map(|result| -> ProviderResult<_> {
931                let (_, account_before) = result?;
932                Ok(account_before)
933            })
934            .collect()
935    }
936}
937
938impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
939    for DatabaseProvider<TX, N>
940{
941    type Header = HeaderTy<N>;
942
943    fn local_tip_header(
944        &self,
945        highest_uninterrupted_block: BlockNumber,
946    ) -> ProviderResult<SealedHeader<Self::Header>> {
947        let static_file_provider = self.static_file_provider();
948
949        // Make sure Headers static file is at the same height. If it's further, this
950        // input execution was interrupted previously and we need to unwind the static file.
951        let next_static_file_block_num = static_file_provider
952            .get_highest_static_file_block(StaticFileSegment::Headers)
953            .map(|id| id + 1)
954            .unwrap_or_default();
955        let next_block = highest_uninterrupted_block + 1;
956
957        match next_static_file_block_num.cmp(&next_block) {
958            // The node shutdown between an executed static file commit and before the database
959            // commit, so we need to unwind the static files.
960            Ordering::Greater => {
961                let mut static_file_producer =
962                    static_file_provider.latest_writer(StaticFileSegment::Headers)?;
963                static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
964                // Since this is a database <-> static file inconsistency, we commit the change
965                // straight away.
966                static_file_producer.commit()?
967            }
968            Ordering::Less => {
969                // There's either missing or corrupted files.
970                return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
971            }
972            Ordering::Equal => {}
973        }
974
975        let local_head = static_file_provider
976            .sealed_header(highest_uninterrupted_block)?
977            .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
978
979        Ok(local_head)
980    }
981}
982
983impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
984    type Header = HeaderTy<N>;
985
986    fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
987        if let Some(num) = self.block_number(*block_hash)? {
988            Ok(self.header_by_number(num)?)
989        } else {
990            Ok(None)
991        }
992    }
993
994    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
995        self.static_file_provider.get_with_static_file_or_database(
996            StaticFileSegment::Headers,
997            num,
998            |static_file| static_file.header_by_number(num),
999            || Ok(self.tx.get::<tables::Headers<Self::Header>>(num)?),
1000        )
1001    }
1002
1003    fn header_td(&self, block_hash: &BlockHash) -> ProviderResult<Option<U256>> {
1004        if let Some(num) = self.block_number(*block_hash)? {
1005            self.header_td_by_number(num)
1006        } else {
1007            Ok(None)
1008        }
1009    }
1010
1011    fn header_td_by_number(&self, number: BlockNumber) -> ProviderResult<Option<U256>> {
1012        if self.chain_spec.is_paris_active_at_block(number) {
1013            if let Some(td) = self.chain_spec.final_paris_total_difficulty() {
1014                // if this block is higher than the final paris(merge) block, return the final paris
1015                // difficulty
1016                return Ok(Some(td))
1017            }
1018        }
1019
1020        self.static_file_provider.get_with_static_file_or_database(
1021            StaticFileSegment::Headers,
1022            number,
1023            |static_file| static_file.header_td_by_number(number),
1024            || Ok(self.tx.get::<tables::HeaderTerminalDifficulties>(number)?.map(|td| td.0)),
1025        )
1026    }
1027
1028    fn headers_range(
1029        &self,
1030        range: impl RangeBounds<BlockNumber>,
1031    ) -> ProviderResult<Vec<Self::Header>> {
1032        self.static_file_provider.get_range_with_static_file_or_database(
1033            StaticFileSegment::Headers,
1034            to_range(range),
1035            |static_file, range, _| static_file.headers_range(range),
1036            |range, _| self.cursor_read_collect::<tables::Headers<Self::Header>>(range),
1037            |_| true,
1038        )
1039    }
1040
1041    fn sealed_header(
1042        &self,
1043        number: BlockNumber,
1044    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1045        self.static_file_provider.get_with_static_file_or_database(
1046            StaticFileSegment::Headers,
1047            number,
1048            |static_file| static_file.sealed_header(number),
1049            || {
1050                if let Some(header) = self.header_by_number(number)? {
1051                    let hash = self
1052                        .block_hash(number)?
1053                        .ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
1054                    Ok(Some(SealedHeader::new(header, hash)))
1055                } else {
1056                    Ok(None)
1057                }
1058            },
1059        )
1060    }
1061
1062    fn sealed_headers_while(
1063        &self,
1064        range: impl RangeBounds<BlockNumber>,
1065        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1066    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1067        self.static_file_provider.get_range_with_static_file_or_database(
1068            StaticFileSegment::Headers,
1069            to_range(range),
1070            |static_file, range, predicate| static_file.sealed_headers_while(range, predicate),
1071            |range, mut predicate| {
1072                let mut headers = vec![];
1073                for entry in
1074                    self.tx.cursor_read::<tables::Headers<Self::Header>>()?.walk_range(range)?
1075                {
1076                    let (number, header) = entry?;
1077                    let hash = self
1078                        .block_hash(number)?
1079                        .ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
1080                    let sealed = SealedHeader::new(header, hash);
1081                    if !predicate(&sealed) {
1082                        break
1083                    }
1084                    headers.push(sealed);
1085                }
1086                Ok(headers)
1087            },
1088            predicate,
1089        )
1090    }
1091}
1092
1093impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1094    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1095        self.static_file_provider.get_with_static_file_or_database(
1096            StaticFileSegment::Headers,
1097            number,
1098            |static_file| static_file.block_hash(number),
1099            || Ok(self.tx.get::<tables::CanonicalHeaders>(number)?),
1100        )
1101    }
1102
1103    fn canonical_hashes_range(
1104        &self,
1105        start: BlockNumber,
1106        end: BlockNumber,
1107    ) -> ProviderResult<Vec<B256>> {
1108        self.static_file_provider.get_range_with_static_file_or_database(
1109            StaticFileSegment::Headers,
1110            start..end,
1111            |static_file, range, _| static_file.canonical_hashes_range(range.start, range.end),
1112            |range, _| self.cursor_read_collect::<tables::CanonicalHeaders>(range),
1113            |_| true,
1114        )
1115    }
1116}
1117
1118impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1119    fn chain_info(&self) -> ProviderResult<ChainInfo> {
1120        let best_number = self.best_block_number()?;
1121        let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1122        Ok(ChainInfo { best_hash, best_number })
1123    }
1124
1125    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1126        // The best block number is tracked via the finished stage which gets updated in the same tx
1127        // when new blocks committed
1128        Ok(self
1129            .get_stage_checkpoint(StageId::Finish)?
1130            .map(|checkpoint| checkpoint.block_number)
1131            .unwrap_or_default())
1132    }
1133
1134    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1135        Ok(self
1136            .tx
1137            .cursor_read::<tables::CanonicalHeaders>()?
1138            .last()?
1139            .map(|(num, _)| num)
1140            .max(
1141                self.static_file_provider.get_highest_static_file_block(StaticFileSegment::Headers),
1142            )
1143            .unwrap_or_default())
1144    }
1145
1146    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1147        Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1148    }
1149}
1150
1151impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1152    type Block = BlockTy<N>;
1153
1154    fn find_block_by_hash(
1155        &self,
1156        hash: B256,
1157        source: BlockSource,
1158    ) -> ProviderResult<Option<Self::Block>> {
1159        if source.is_canonical() {
1160            self.block(hash.into())
1161        } else {
1162            Ok(None)
1163        }
1164    }
1165
1166    /// Returns the block with matching number from database.
1167    ///
1168    /// If the header for this block is not found, this returns `None`.
1169    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1170    /// will return None.
1171    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1172        if let Some(number) = self.convert_hash_or_number(id)? {
1173            if let Some(header) = self.header_by_number(number)? {
1174                // If the body indices are not found, this means that the transactions either do not
1175                // exist in the database yet, or they do exit but are not indexed.
1176                // If they exist but are not indexed, we don't have enough
1177                // information to return the block anyways, so we return `None`.
1178                let Some(transactions) = self.transactions_by_block(number.into())? else {
1179                    return Ok(None)
1180                };
1181
1182                let body = self
1183                    .storage
1184                    .reader()
1185                    .read_block_bodies(self, vec![(&header, transactions)])?
1186                    .pop()
1187                    .ok_or(ProviderError::InvalidStorageOutput)?;
1188
1189                return Ok(Some(Self::Block::new(header, body)))
1190            }
1191        }
1192
1193        Ok(None)
1194    }
1195
1196    fn pending_block(&self) -> ProviderResult<Option<SealedBlock<Self::Block>>> {
1197        Ok(None)
1198    }
1199
1200    fn pending_block_with_senders(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1201        Ok(None)
1202    }
1203
1204    fn pending_block_and_receipts(
1205        &self,
1206    ) -> ProviderResult<Option<(SealedBlock<Self::Block>, Vec<Self::Receipt>)>> {
1207        Ok(None)
1208    }
1209
1210    /// Returns the block with senders with matching number or hash from database.
1211    ///
1212    /// **NOTE: The transactions have invalid hashes, since they would need to be calculated on the
1213    /// spot, and we want fast querying.**
1214    ///
1215    /// If the header for this block is not found, this returns `None`.
1216    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1217    /// will return None.
1218    fn recovered_block(
1219        &self,
1220        id: BlockHashOrNumber,
1221        transaction_kind: TransactionVariant,
1222    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1223        self.recovered_block(
1224            id,
1225            transaction_kind,
1226            |block_number| self.header_by_number(block_number),
1227            |header, body, senders| {
1228                Self::Block::new(header, body)
1229                    // Note: we're using unchecked here because we know the block contains valid txs
1230                    // wrt to its height and can ignore the s value check so pre
1231                    // EIP-2 txs are allowed
1232                    .try_into_recovered_unchecked(senders)
1233                    .map(Some)
1234                    .map_err(|_| ProviderError::SenderRecoveryError)
1235            },
1236        )
1237    }
1238
1239    fn sealed_block_with_senders(
1240        &self,
1241        id: BlockHashOrNumber,
1242        transaction_kind: TransactionVariant,
1243    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1244        self.recovered_block(
1245            id,
1246            transaction_kind,
1247            |block_number| self.sealed_header(block_number),
1248            |header, body, senders| {
1249                Self::Block::new_sealed(header, body)
1250                    // Note: we're using unchecked here because we know the block contains valid txs
1251                    // wrt to its height and can ignore the s value check so pre
1252                    // EIP-2 txs are allowed
1253                    .try_with_senders_unchecked(senders)
1254                    .map(Some)
1255                    .map_err(|_| ProviderError::SenderRecoveryError)
1256            },
1257        )
1258    }
1259
1260    fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1261        self.block_range(
1262            range,
1263            |range| self.headers_range(range),
1264            |header, body, _| Ok(Self::Block::new(header, body)),
1265        )
1266    }
1267
1268    fn block_with_senders_range(
1269        &self,
1270        range: RangeInclusive<BlockNumber>,
1271    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1272        self.block_with_senders_range(
1273            range,
1274            |range| self.headers_range(range),
1275            |header, body, senders| {
1276                Self::Block::new(header, body)
1277                    .try_into_recovered_unchecked(senders)
1278                    .map_err(|_| ProviderError::SenderRecoveryError)
1279            },
1280        )
1281    }
1282
1283    fn recovered_block_range(
1284        &self,
1285        range: RangeInclusive<BlockNumber>,
1286    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1287        self.block_with_senders_range(
1288            range,
1289            |range| self.sealed_headers_range(range),
1290            |header, body, senders| {
1291                Self::Block::new_sealed(header, body)
1292                    .try_with_senders(senders)
1293                    .map_err(|_| ProviderError::SenderRecoveryError)
1294            },
1295        )
1296    }
1297}
1298
1299impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1300    for DatabaseProvider<TX, N>
1301{
1302    /// Recovers transaction hashes by walking through `Transactions` table and
1303    /// calculating them in a parallel manner. Returned unsorted.
1304    fn transaction_hashes_by_range(
1305        &self,
1306        tx_range: Range<TxNumber>,
1307    ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1308        self.static_file_provider.get_range_with_static_file_or_database(
1309            StaticFileSegment::Transactions,
1310            tx_range,
1311            |static_file, range, _| static_file.transaction_hashes_by_range(range),
1312            |tx_range, _| {
1313                let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<TxTy<N>>>()?;
1314                let tx_range_size = tx_range.clone().count();
1315                let tx_walker = tx_cursor.walk_range(tx_range)?;
1316
1317                let chunk_size = (tx_range_size / rayon::current_num_threads()).max(1);
1318                let mut channels = Vec::with_capacity(chunk_size);
1319                let mut transaction_count = 0;
1320
1321                #[inline]
1322                fn calculate_hash<T>(
1323                    entry: Result<(TxNumber, T), DatabaseError>,
1324                    rlp_buf: &mut Vec<u8>,
1325                ) -> Result<(B256, TxNumber), Box<ProviderError>>
1326                where
1327                    T: Encodable2718,
1328                {
1329                    let (tx_id, tx) = entry.map_err(|e| Box::new(e.into()))?;
1330                    tx.encode_2718(rlp_buf);
1331                    Ok((keccak256(rlp_buf), tx_id))
1332                }
1333
1334                for chunk in &tx_walker.chunks(chunk_size) {
1335                    let (tx, rx) = mpsc::channel();
1336                    channels.push(rx);
1337
1338                    // Note: Unfortunate side-effect of how chunk is designed in itertools (it is
1339                    // not Send)
1340                    let chunk: Vec<_> = chunk.collect();
1341                    transaction_count += chunk.len();
1342
1343                    // Spawn the task onto the global rayon pool
1344                    // This task will send the results through the channel after it has calculated
1345                    // the hash.
1346                    rayon::spawn(move || {
1347                        let mut rlp_buf = Vec::with_capacity(128);
1348                        for entry in chunk {
1349                            rlp_buf.clear();
1350                            let _ = tx.send(calculate_hash(entry, &mut rlp_buf));
1351                        }
1352                    });
1353                }
1354                let mut tx_list = Vec::with_capacity(transaction_count);
1355
1356                // Iterate over channels and append the tx hashes unsorted
1357                for channel in channels {
1358                    while let Ok(tx) = channel.recv() {
1359                        let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
1360                        tx_list.push((tx_hash, tx_id));
1361                    }
1362                }
1363
1364                Ok(tx_list)
1365            },
1366            |_| true,
1367        )
1368    }
1369}
1370
1371// Calculates the hash of the given transaction
1372impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1373    type Transaction = TxTy<N>;
1374
1375    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1376        Ok(self.tx.get::<tables::TransactionHashNumbers>(tx_hash)?)
1377    }
1378
1379    fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1380        self.static_file_provider.get_with_static_file_or_database(
1381            StaticFileSegment::Transactions,
1382            id,
1383            |static_file| static_file.transaction_by_id(id),
1384            || Ok(self.tx.get::<tables::Transactions<Self::Transaction>>(id)?),
1385        )
1386    }
1387
1388    fn transaction_by_id_unhashed(
1389        &self,
1390        id: TxNumber,
1391    ) -> ProviderResult<Option<Self::Transaction>> {
1392        self.static_file_provider.get_with_static_file_or_database(
1393            StaticFileSegment::Transactions,
1394            id,
1395            |static_file| static_file.transaction_by_id_unhashed(id),
1396            || Ok(self.tx.get::<tables::Transactions<Self::Transaction>>(id)?),
1397        )
1398    }
1399
1400    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1401        if let Some(id) = self.transaction_id(hash)? {
1402            Ok(self.transaction_by_id_unhashed(id)?)
1403        } else {
1404            Ok(None)
1405        }
1406    }
1407
1408    fn transaction_by_hash_with_meta(
1409        &self,
1410        tx_hash: TxHash,
1411    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1412        let mut transaction_cursor = self.tx.cursor_read::<tables::TransactionBlocks>()?;
1413        if let Some(transaction_id) = self.transaction_id(tx_hash)? {
1414            if let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? {
1415                if let Some(block_number) =
1416                    transaction_cursor.seek(transaction_id).map(|b| b.map(|(_, bn)| bn))?
1417                {
1418                    if let Some(sealed_header) = self.sealed_header(block_number)? {
1419                        let (header, block_hash) = sealed_header.split();
1420                        if let Some(block_body) = self.block_body_indices(block_number)? {
1421                            // the index of the tx in the block is the offset:
1422                            // len([start..tx_id])
1423                            // NOTE: `transaction_id` is always `>=` the block's first
1424                            // index
1425                            let index = transaction_id - block_body.first_tx_num();
1426
1427                            let meta = TransactionMeta {
1428                                tx_hash,
1429                                index,
1430                                block_hash,
1431                                block_number,
1432                                base_fee: header.base_fee_per_gas(),
1433                                excess_blob_gas: header.excess_blob_gas(),
1434                                timestamp: header.timestamp(),
1435                            };
1436
1437                            return Ok(Some((transaction, meta)))
1438                        }
1439                    }
1440                }
1441            }
1442        }
1443
1444        Ok(None)
1445    }
1446
1447    fn transaction_block(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1448        let mut cursor = self.tx.cursor_read::<tables::TransactionBlocks>()?;
1449        Ok(cursor.seek(id)?.map(|(_, bn)| bn))
1450    }
1451
1452    fn transactions_by_block(
1453        &self,
1454        id: BlockHashOrNumber,
1455    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1456        let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<Self::Transaction>>()?;
1457
1458        if let Some(block_number) = self.convert_hash_or_number(id)? {
1459            if let Some(body) = self.block_body_indices(block_number)? {
1460                let tx_range = body.tx_num_range();
1461                return if tx_range.is_empty() {
1462                    Ok(Some(Vec::new()))
1463                } else {
1464                    Ok(Some(self.transactions_by_tx_range_with_cursor(tx_range, &mut tx_cursor)?))
1465                }
1466            }
1467        }
1468        Ok(None)
1469    }
1470
1471    fn transactions_by_block_range(
1472        &self,
1473        range: impl RangeBounds<BlockNumber>,
1474    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1475        let range = to_range(range);
1476        let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<Self::Transaction>>()?;
1477
1478        self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
1479            .into_iter()
1480            .map(|body| {
1481                let tx_num_range = body.tx_num_range();
1482                if tx_num_range.is_empty() {
1483                    Ok(Vec::new())
1484                } else {
1485                    Ok(self
1486                        .transactions_by_tx_range_with_cursor(tx_num_range, &mut tx_cursor)?
1487                        .into_iter()
1488                        .collect())
1489                }
1490            })
1491            .collect()
1492    }
1493
1494    fn transactions_by_tx_range(
1495        &self,
1496        range: impl RangeBounds<TxNumber>,
1497    ) -> ProviderResult<Vec<Self::Transaction>> {
1498        self.transactions_by_tx_range_with_cursor(
1499            range,
1500            &mut self.tx.cursor_read::<tables::Transactions<_>>()?,
1501        )
1502    }
1503
1504    fn senders_by_tx_range(
1505        &self,
1506        range: impl RangeBounds<TxNumber>,
1507    ) -> ProviderResult<Vec<Address>> {
1508        self.cursor_read_collect::<tables::TransactionSenders>(range)
1509    }
1510
1511    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1512        Ok(self.tx.get::<tables::TransactionSenders>(id)?)
1513    }
1514}
1515
1516impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
1517    type Receipt = ReceiptTy<N>;
1518
1519    fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1520        self.static_file_provider.get_with_static_file_or_database(
1521            StaticFileSegment::Receipts,
1522            id,
1523            |static_file| static_file.receipt(id),
1524            || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
1525        )
1526    }
1527
1528    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1529        if let Some(id) = self.transaction_id(hash)? {
1530            self.receipt(id)
1531        } else {
1532            Ok(None)
1533        }
1534    }
1535
1536    fn receipts_by_block(
1537        &self,
1538        block: BlockHashOrNumber,
1539    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1540        if let Some(number) = self.convert_hash_or_number(block)? {
1541            if let Some(body) = self.block_body_indices(number)? {
1542                let tx_range = body.tx_num_range();
1543                return if tx_range.is_empty() {
1544                    Ok(Some(Vec::new()))
1545                } else {
1546                    self.receipts_by_tx_range(tx_range).map(Some)
1547                }
1548            }
1549        }
1550        Ok(None)
1551    }
1552
1553    fn receipts_by_tx_range(
1554        &self,
1555        range: impl RangeBounds<TxNumber>,
1556    ) -> ProviderResult<Vec<Self::Receipt>> {
1557        self.static_file_provider.get_range_with_static_file_or_database(
1558            StaticFileSegment::Receipts,
1559            to_range(range),
1560            |static_file, range, _| static_file.receipts_by_tx_range(range),
1561            |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
1562            |_| true,
1563        )
1564    }
1565}
1566
1567impl<TX: DbTx + 'static, N: NodeTypes<ChainSpec: EthereumHardforks>> WithdrawalsProvider
1568    for DatabaseProvider<TX, N>
1569{
1570    fn withdrawals_by_block(
1571        &self,
1572        id: BlockHashOrNumber,
1573        timestamp: u64,
1574    ) -> ProviderResult<Option<Withdrawals>> {
1575        if self.chain_spec.is_shanghai_active_at_timestamp(timestamp) {
1576            if let Some(number) = self.convert_hash_or_number(id)? {
1577                return self.static_file_provider.get_with_static_file_or_database(
1578                    StaticFileSegment::BlockMeta,
1579                    number,
1580                    |static_file| static_file.withdrawals_by_block(number.into(), timestamp),
1581                    || {
1582                        // If we are past shanghai, then all blocks should have a withdrawal list,
1583                        // even if empty
1584                        let withdrawals = self
1585                            .tx
1586                            .get::<tables::BlockWithdrawals>(number)
1587                            .map(|w| w.map(|w| w.withdrawals))?
1588                            .unwrap_or_default();
1589                        Ok(Some(withdrawals))
1590                    },
1591                )
1592            }
1593        }
1594        Ok(None)
1595    }
1596}
1597
1598impl<TX: DbTx + 'static, N: NodeTypesForProvider> OmmersProvider for DatabaseProvider<TX, N> {
1599    /// Returns the ommers for the block with matching id from the database.
1600    ///
1601    /// If the block is not found, this returns `None`.
1602    /// If the block exists, but doesn't contain ommers, this returns `None`.
1603    fn ommers(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Vec<Self::Header>>> {
1604        if let Some(number) = self.convert_hash_or_number(id)? {
1605            // If the Paris (Merge) hardfork block is known and block is after it, return empty
1606            // ommers.
1607            if self.chain_spec.is_paris_active_at_block(number) {
1608                return Ok(Some(Vec::new()))
1609            }
1610
1611            return self.static_file_provider.get_with_static_file_or_database(
1612                StaticFileSegment::BlockMeta,
1613                number,
1614                |static_file| static_file.ommers(id),
1615                || Ok(self.tx.get::<tables::BlockOmmers<Self::Header>>(number)?.map(|o| o.ommers)),
1616            )
1617        }
1618
1619        Ok(None)
1620    }
1621}
1622
1623impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
1624    for DatabaseProvider<TX, N>
1625{
1626    fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1627        self.static_file_provider.get_with_static_file_or_database(
1628            StaticFileSegment::BlockMeta,
1629            num,
1630            |static_file| static_file.block_body_indices(num),
1631            || Ok(self.tx.get::<tables::BlockBodyIndices>(num)?),
1632        )
1633    }
1634
1635    fn block_body_indices_range(
1636        &self,
1637        range: RangeInclusive<BlockNumber>,
1638    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1639        self.static_file_provider.get_range_with_static_file_or_database(
1640            StaticFileSegment::BlockMeta,
1641            *range.start()..*range.end() + 1,
1642            |static_file, range, _| {
1643                static_file.block_body_indices_range(range.start..=range.end.saturating_sub(1))
1644            },
1645            |range, _| self.cursor_read_collect::<tables::BlockBodyIndices>(range),
1646            |_| true,
1647        )
1648    }
1649}
1650
1651impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
1652    fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1653        Ok(self.tx.get::<tables::StageCheckpoints>(id.to_string())?)
1654    }
1655
1656    /// Get stage checkpoint progress.
1657    fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1658        Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
1659    }
1660
1661    fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1662        self.tx
1663            .cursor_read::<tables::StageCheckpoints>()?
1664            .walk(None)?
1665            .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
1666            .map_err(ProviderError::Database)
1667    }
1668}
1669
1670impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
1671    /// Save stage checkpoint.
1672    fn save_stage_checkpoint(
1673        &self,
1674        id: StageId,
1675        checkpoint: StageCheckpoint,
1676    ) -> ProviderResult<()> {
1677        Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
1678    }
1679
1680    /// Save stage checkpoint progress.
1681    fn save_stage_checkpoint_progress(
1682        &self,
1683        id: StageId,
1684        checkpoint: Vec<u8>,
1685    ) -> ProviderResult<()> {
1686        Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
1687    }
1688
1689    fn update_pipeline_stages(
1690        &self,
1691        block_number: BlockNumber,
1692        drop_stage_checkpoint: bool,
1693    ) -> ProviderResult<()> {
1694        // iterate over all existing stages in the table and update its progress.
1695        let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
1696        for stage_id in StageId::ALL {
1697            let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
1698            cursor.upsert(
1699                stage_id.to_string(),
1700                &StageCheckpoint {
1701                    block_number,
1702                    ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
1703                },
1704            )?;
1705        }
1706
1707        Ok(())
1708    }
1709}
1710
1711impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
1712    fn plain_state_storages(
1713        &self,
1714        addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
1715    ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
1716        let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
1717
1718        addresses_with_keys
1719            .into_iter()
1720            .map(|(address, storage)| {
1721                storage
1722                    .into_iter()
1723                    .map(|key| -> ProviderResult<_> {
1724                        Ok(plain_storage
1725                            .seek_by_key_subkey(address, key)?
1726                            .filter(|v| v.key == key)
1727                            .unwrap_or_else(|| StorageEntry { key, ..Default::default() }))
1728                    })
1729                    .collect::<ProviderResult<Vec<_>>>()
1730                    .map(|storage| (address, storage))
1731            })
1732            .collect::<ProviderResult<Vec<(_, _)>>>()
1733    }
1734
1735    fn changed_storages_with_range(
1736        &self,
1737        range: RangeInclusive<BlockNumber>,
1738    ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
1739        self.tx
1740            .cursor_read::<tables::StorageChangeSets>()?
1741            .walk_range(BlockNumberAddress::range(range))?
1742            // fold all storages and save its old state so we can remove it from HashedStorage
1743            // it is needed as it is dup table.
1744            .try_fold(BTreeMap::new(), |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
1745                let (BlockNumberAddress((_, address)), storage_entry) = entry?;
1746                accounts.entry(address).or_default().insert(storage_entry.key);
1747                Ok(accounts)
1748            })
1749    }
1750
1751    fn changed_storages_and_blocks_with_range(
1752        &self,
1753        range: RangeInclusive<BlockNumber>,
1754    ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
1755        let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
1756
1757        let storage_changeset_lists =
1758            changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
1759                BTreeMap::new(),
1760                |mut storages: BTreeMap<(Address, B256), Vec<u64>>, entry| -> ProviderResult<_> {
1761                    let (index, storage) = entry?;
1762                    storages
1763                        .entry((index.address(), storage.key))
1764                        .or_default()
1765                        .push(index.block_number());
1766                    Ok(storages)
1767                },
1768            )?;
1769
1770        Ok(storage_changeset_lists)
1771    }
1772}
1773
1774impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
1775    for DatabaseProvider<TX, N>
1776{
1777    type Receipt = ReceiptTy<N>;
1778
1779    fn write_state(
1780        &self,
1781        execution_outcome: &ExecutionOutcome<Self::Receipt>,
1782        is_value_known: OriginalValuesKnown,
1783        write_receipts_to: StorageLocation,
1784    ) -> ProviderResult<()> {
1785        let first_block = execution_outcome.first_block();
1786        let block_count = execution_outcome.len() as u64;
1787        let last_block = execution_outcome.last_block();
1788        let block_range = first_block..=last_block;
1789
1790        let tip = self.last_block_number()?.max(last_block);
1791
1792        let (plain_state, reverts) =
1793            execution_outcome.bundle.to_plain_state_and_reverts(is_value_known);
1794
1795        self.write_state_reverts(reverts, first_block)?;
1796        self.write_state_changes(plain_state)?;
1797
1798        // Fetch the first transaction number for each block in the range
1799        let block_indices: Vec<_> = self
1800            .block_body_indices_range(block_range)?
1801            .into_iter()
1802            .map(|b| b.first_tx_num)
1803            .collect();
1804
1805        // Ensure all expected blocks are present.
1806        if block_indices.len() < block_count as usize {
1807            let missing_blocks = block_count - block_indices.len() as u64;
1808            return Err(ProviderError::BlockBodyIndicesNotFound(
1809                last_block.saturating_sub(missing_blocks - 1),
1810            ));
1811        }
1812
1813        let has_receipts_pruning = self.prune_modes.has_receipts_pruning();
1814
1815        // Prepare receipts cursor if we are going to write receipts to the database
1816        //
1817        // We are writing to database if requested or if there's any kind of receipt pruning
1818        // configured
1819        let mut receipts_cursor = (write_receipts_to.database() || has_receipts_pruning)
1820            .then(|| self.tx.cursor_write::<tables::Receipts<Self::Receipt>>())
1821            .transpose()?;
1822
1823        // Prepare receipts static writer if we are going to write receipts to static files
1824        //
1825        // We are writing to static files if requested and if there's no receipt pruning configured
1826        let mut receipts_static_writer = (write_receipts_to.static_files() &&
1827            !has_receipts_pruning)
1828            .then(|| self.static_file_provider.get_writer(first_block, StaticFileSegment::Receipts))
1829            .transpose()?;
1830
1831        let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
1832        let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
1833
1834        // All receipts from the last 128 blocks are required for blockchain tree, even with
1835        // [`PruneSegment::ContractLogs`].
1836        let prunable_receipts =
1837            PruneMode::Distance(MINIMUM_PRUNING_DISTANCE).should_prune(first_block, tip);
1838
1839        // Prepare set of addresses which logs should not be pruned.
1840        let mut allowed_addresses: HashSet<Address, _> = HashSet::new();
1841        for (_, addresses) in contract_log_pruner.range(..first_block) {
1842            allowed_addresses.extend(addresses.iter().copied());
1843        }
1844
1845        for (idx, (receipts, first_tx_index)) in
1846            execution_outcome.receipts.iter().zip(block_indices).enumerate()
1847        {
1848            let block_number = first_block + idx as u64;
1849
1850            // Increment block number for receipts static file writer
1851            if let Some(writer) = receipts_static_writer.as_mut() {
1852                writer.increment_block(block_number)?;
1853            }
1854
1855            // Skip writing receipts if pruning configuration requires us to.
1856            if prunable_receipts &&
1857                self.prune_modes
1858                    .receipts
1859                    .is_some_and(|mode| mode.should_prune(block_number, tip))
1860            {
1861                continue
1862            }
1863
1864            // If there are new addresses to retain after this block number, track them
1865            if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
1866                allowed_addresses.extend(new_addresses.iter().copied());
1867            }
1868
1869            for (idx, receipt) in receipts.iter().enumerate() {
1870                let receipt_idx = first_tx_index + idx as u64;
1871                // Skip writing receipt if log filter is active and it does not have any logs to
1872                // retain
1873                if prunable_receipts &&
1874                    has_contract_log_filter &&
1875                    !receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
1876                {
1877                    continue
1878                }
1879
1880                if let Some(writer) = &mut receipts_static_writer {
1881                    writer.append_receipt(receipt_idx, receipt)?;
1882                }
1883
1884                if let Some(cursor) = &mut receipts_cursor {
1885                    cursor.append(receipt_idx, receipt)?;
1886                }
1887            }
1888        }
1889
1890        Ok(())
1891    }
1892
1893    fn write_state_reverts(
1894        &self,
1895        reverts: PlainStateReverts,
1896        first_block: BlockNumber,
1897    ) -> ProviderResult<()> {
1898        // Write storage changes
1899        tracing::trace!("Writing storage changes");
1900        let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1901        let mut storage_changeset_cursor =
1902            self.tx_ref().cursor_dup_write::<tables::StorageChangeSets>()?;
1903        for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
1904            let block_number = first_block + block_index as BlockNumber;
1905
1906            tracing::trace!(block_number, "Writing block change");
1907            // sort changes by address.
1908            storage_changes.par_sort_unstable_by_key(|a| a.address);
1909            for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
1910                let storage_id = BlockNumberAddress((block_number, address));
1911
1912                let mut storage = storage_revert
1913                    .into_iter()
1914                    .map(|(k, v)| (B256::new(k.to_be_bytes()), v))
1915                    .collect::<Vec<_>>();
1916                // sort storage slots by key.
1917                storage.par_sort_unstable_by_key(|a| a.0);
1918
1919                // If we are writing the primary storage wipe transition, the pre-existing plain
1920                // storage state has to be taken from the database and written to storage history.
1921                // See [StorageWipe::Primary] for more details.
1922                let mut wiped_storage = Vec::new();
1923                if wiped {
1924                    tracing::trace!(?address, "Wiping storage");
1925                    if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
1926                        wiped_storage.push((entry.key, entry.into()));
1927                        while let Some(entry) = storages_cursor.next_dup_val()? {
1928                            wiped_storage.push((entry.key, entry.into()))
1929                        }
1930                    }
1931                }
1932
1933                tracing::trace!(?address, ?storage, "Writing storage reverts");
1934                for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
1935                    storage_changeset_cursor.append_dup(storage_id, StorageEntry { key, value })?;
1936                }
1937            }
1938        }
1939
1940        // Write account changes
1941        tracing::trace!("Writing account changes");
1942        let mut account_changeset_cursor =
1943            self.tx_ref().cursor_dup_write::<tables::AccountChangeSets>()?;
1944
1945        for (block_index, mut account_block_reverts) in reverts.accounts.into_iter().enumerate() {
1946            let block_number = first_block + block_index as BlockNumber;
1947            // Sort accounts by address.
1948            account_block_reverts.par_sort_by_key(|a| a.0);
1949
1950            for (address, info) in account_block_reverts {
1951                account_changeset_cursor.append_dup(
1952                    block_number,
1953                    AccountBeforeTx { address, info: info.map(Into::into) },
1954                )?;
1955            }
1956        }
1957
1958        Ok(())
1959    }
1960
1961    fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
1962        // sort all entries so they can be written to database in more performant way.
1963        // and take smaller memory footprint.
1964        changes.accounts.par_sort_by_key(|a| a.0);
1965        changes.storage.par_sort_by_key(|a| a.address);
1966        changes.contracts.par_sort_by_key(|a| a.0);
1967
1968        // Write new account state
1969        tracing::trace!(len = changes.accounts.len(), "Writing new account state");
1970        let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
1971        // write account to database.
1972        for (address, account) in changes.accounts {
1973            if let Some(account) = account {
1974                tracing::trace!(?address, "Updating plain state account");
1975                accounts_cursor.upsert(address, &account.into())?;
1976            } else if accounts_cursor.seek_exact(address)?.is_some() {
1977                tracing::trace!(?address, "Deleting plain state account");
1978                accounts_cursor.delete_current()?;
1979            }
1980        }
1981
1982        // Write bytecode
1983        tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
1984        let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
1985        for (hash, bytecode) in changes.contracts {
1986            bytecodes_cursor.upsert(hash, &Bytecode(bytecode))?;
1987        }
1988
1989        // Write new storage state and wipe storage if needed.
1990        tracing::trace!(len = changes.storage.len(), "Writing new storage state");
1991        let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1992        for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
1993            // Wiping of storage.
1994            if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
1995                storages_cursor.delete_current_duplicates()?;
1996            }
1997            // cast storages to B256.
1998            let mut storage = storage
1999                .into_iter()
2000                .map(|(k, value)| StorageEntry { key: k.into(), value })
2001                .collect::<Vec<_>>();
2002            // sort storage slots by key.
2003            storage.par_sort_unstable_by_key(|a| a.key);
2004
2005            for entry in storage {
2006                tracing::trace!(?address, ?entry.key, "Updating plain state storage");
2007                if let Some(db_entry) = storages_cursor.seek_by_key_subkey(address, entry.key)? {
2008                    if db_entry.key == entry.key {
2009                        storages_cursor.delete_current()?;
2010                    }
2011                }
2012
2013                if !entry.to_flagged_storage().is_zero() {
2014                    storages_cursor.upsert(address, &entry)?;
2015                }
2016            }
2017        }
2018
2019        Ok(())
2020    }
2021
2022    fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
2023        // Write hashed account updates.
2024        let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
2025        for (hashed_address, account) in hashed_state.accounts().accounts_sorted() {
2026            if let Some(account) = account {
2027                hashed_accounts_cursor.upsert(hashed_address, &account)?;
2028            } else if hashed_accounts_cursor.seek_exact(hashed_address)?.is_some() {
2029                hashed_accounts_cursor.delete_current()?;
2030            }
2031        }
2032
2033        // Write hashed storage changes.
2034        let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
2035        let mut hashed_storage_cursor =
2036            self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
2037        for (hashed_address, storage) in sorted_storages {
2038            if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
2039                hashed_storage_cursor.delete_current_duplicates()?;
2040            }
2041
2042            for (hashed_slot, value) in storage.storage_slots_sorted() {
2043                let entry = StorageEntry { key: hashed_slot, value };
2044                if let Some(db_entry) =
2045                    hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)?
2046                {
2047                    if db_entry.key == entry.key {
2048                        hashed_storage_cursor.delete_current()?;
2049                    }
2050                }
2051
2052                if !entry.value.is_zero() {
2053                    hashed_storage_cursor.upsert(*hashed_address, &entry)?;
2054                }
2055            }
2056        }
2057
2058        Ok(())
2059    }
2060
2061    /// Remove the last N blocks of state.
2062    ///
2063    /// The latest state will be unwound
2064    ///
2065    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
2066    ///    transaction ids.
2067    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
2068    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
2069    ///    the changesets.
2070    ///    - In order to have both the old and new values in the changesets, we also access the
2071    ///      plain state tables.
2072    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
2073    ///    we:
2074    ///     1. Take the old value from the changeset
2075    ///     2. Take the new value from the plain state
2076    ///     3. Save the old value to the local state
2077    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
2078    ///    have seen before we:
2079    ///     1. Take the old value from the changeset
2080    ///     2. Take the new value from the local state
2081    ///     3. Set the local state to the value in the changeset
2082    fn remove_state_above(
2083        &self,
2084        block: BlockNumber,
2085        remove_receipts_from: StorageLocation,
2086    ) -> ProviderResult<()> {
2087        let range = block + 1..=self.last_block_number()?;
2088
2089        if range.is_empty() {
2090            return Ok(());
2091        }
2092
2093        // We are not removing block meta as it is used to get block changesets.
2094        let block_bodies = self.block_body_indices_range(range.clone())?;
2095
2096        // get transaction receipts
2097        let from_transaction_num =
2098            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2099
2100        let storage_range = BlockNumberAddress::range(range.clone());
2101
2102        let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
2103        let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
2104
2105        // This is not working for blocks that are not at tip. as plain state is not the last
2106        // state of end range. We should rename the functions or add support to access
2107        // History state. Accessing history state can be tricky but we are not gaining
2108        // anything.
2109        let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2110        let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2111
2112        let (state, _) = self.populate_bundle_state(
2113            account_changeset,
2114            storage_changeset,
2115            &mut plain_accounts_cursor,
2116            &mut plain_storage_cursor,
2117        )?;
2118
2119        // iterate over local plain state remove all account and all storages.
2120        for (address, (old_account, new_account, storage)) in &state {
2121            // revert account if needed.
2122            if old_account != new_account {
2123                let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2124                if let Some(account) = old_account {
2125                    plain_accounts_cursor.upsert(*address, account)?;
2126                } else if existing_entry.is_some() {
2127                    plain_accounts_cursor.delete_current()?;
2128                }
2129            }
2130
2131            // revert storages
2132            for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2133                let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
2134                // delete previous value
2135                // TODO: This does not use dupsort features
2136                if plain_storage_cursor
2137                    .seek_by_key_subkey(*address, *storage_key)?
2138                    .filter(|s| s.key == *storage_key)
2139                    .is_some()
2140                {
2141                    plain_storage_cursor.delete_current()?
2142                }
2143
2144                // insert value if needed
2145                if !old_storage_value.is_zero() {
2146                    plain_storage_cursor.upsert(*address, &storage_entry)?;
2147                }
2148            }
2149        }
2150
2151        self.remove_receipts_from(from_transaction_num, block, remove_receipts_from)?;
2152
2153        Ok(())
2154    }
2155
2156    /// Take the last N blocks of state, recreating the [`ExecutionOutcome`].
2157    ///
2158    /// The latest state will be unwound and returned back with all the blocks
2159    ///
2160    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
2161    ///    transaction ids.
2162    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
2163    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
2164    ///    the changesets.
2165    ///    - In order to have both the old and new values in the changesets, we also access the
2166    ///      plain state tables.
2167    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
2168    ///    we:
2169    ///     1. Take the old value from the changeset
2170    ///     2. Take the new value from the plain state
2171    ///     3. Save the old value to the local state
2172    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
2173    ///    have seen before we:
2174    ///     1. Take the old value from the changeset
2175    ///     2. Take the new value from the local state
2176    ///     3. Set the local state to the value in the changeset
2177    fn take_state_above(
2178        &self,
2179        block: BlockNumber,
2180        remove_receipts_from: StorageLocation,
2181    ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
2182        let range = block + 1..=self.last_block_number()?;
2183
2184        if range.is_empty() {
2185            return Ok(ExecutionOutcome::default())
2186        }
2187        let start_block_number = *range.start();
2188
2189        // We are not removing block meta as it is used to get block changesets.
2190        let block_bodies = self.block_body_indices_range(range.clone())?;
2191
2192        // get transaction receipts
2193        let from_transaction_num =
2194            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2195        let to_transaction_num =
2196            block_bodies.last().expect("already checked if there are blocks").last_tx_num();
2197
2198        let storage_range = BlockNumberAddress::range(range.clone());
2199
2200        let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
2201        let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
2202
2203        // This is not working for blocks that are not at tip. as plain state is not the last
2204        // state of end range. We should rename the functions or add support to access
2205        // History state. Accessing history state can be tricky but we are not gaining
2206        // anything.
2207        let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2208        let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2209
2210        // populate bundle state and reverts from changesets / state cursors, to iterate over,
2211        // remove, and return later
2212        let (state, reverts) = self.populate_bundle_state(
2213            account_changeset,
2214            storage_changeset,
2215            &mut plain_accounts_cursor,
2216            &mut plain_storage_cursor,
2217        )?;
2218
2219        // iterate over local plain state remove all account and all storages.
2220        for (address, (old_account, new_account, storage)) in &state {
2221            // revert account if needed.
2222            if old_account != new_account {
2223                let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2224                if let Some(account) = old_account {
2225                    plain_accounts_cursor.upsert(*address, account)?;
2226                } else if existing_entry.is_some() {
2227                    plain_accounts_cursor.delete_current()?;
2228                }
2229            }
2230
2231            // revert storages
2232            for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2233                let storage_entry = StorageEntry { key: *storage_key, value: *old_storage_value };
2234                // delete previous value
2235                // TODO: This does not use dupsort features
2236                if plain_storage_cursor
2237                    .seek_by_key_subkey(*address, *storage_key)?
2238                    .filter(|s| s.key == *storage_key)
2239                    .is_some()
2240                {
2241                    plain_storage_cursor.delete_current()?
2242                }
2243
2244                // insert value if needed
2245                if !old_storage_value.is_zero() {
2246                    plain_storage_cursor.upsert(*address, &storage_entry)?;
2247                }
2248            }
2249        }
2250
2251        // Collect receipts into tuples (tx_num, receipt) to correctly handle pruned receipts
2252        let mut receipts_iter = self
2253            .static_file_provider
2254            .get_range_with_static_file_or_database(
2255                StaticFileSegment::Receipts,
2256                from_transaction_num..to_transaction_num + 1,
2257                |static_file, range, _| {
2258                    static_file
2259                        .receipts_by_tx_range(range.clone())
2260                        .map(|r| range.into_iter().zip(r).collect())
2261                },
2262                |range, _| {
2263                    self.tx
2264                        .cursor_read::<tables::Receipts<Self::Receipt>>()?
2265                        .walk_range(range)?
2266                        .map(|r| r.map_err(Into::into))
2267                        .collect()
2268                },
2269                |_| true,
2270            )?
2271            .into_iter()
2272            .peekable();
2273
2274        let mut receipts = Vec::with_capacity(block_bodies.len());
2275        // loop break if we are at the end of the blocks.
2276        for block_body in block_bodies {
2277            let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
2278            for num in block_body.tx_num_range() {
2279                if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
2280                    block_receipts.push(receipts_iter.next().unwrap().1);
2281                }
2282            }
2283            receipts.push(block_receipts);
2284        }
2285
2286        self.remove_receipts_from(from_transaction_num, block, remove_receipts_from)?;
2287
2288        Ok(ExecutionOutcome::new_init(
2289            state,
2290            reverts,
2291            Vec::new(),
2292            receipts,
2293            start_block_number,
2294            Vec::new(),
2295        ))
2296    }
2297}
2298
2299impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
2300    /// Writes trie updates. Returns the number of entries modified.
2301    fn write_trie_updates(&self, trie_updates: &TrieUpdates) -> ProviderResult<usize> {
2302        if trie_updates.is_empty() {
2303            return Ok(0)
2304        }
2305
2306        // Track the number of inserted entries.
2307        let mut num_entries = 0;
2308
2309        // Merge updated and removed nodes. Updated nodes must take precedence.
2310        let mut account_updates = trie_updates
2311            .removed_nodes_ref()
2312            .iter()
2313            .filter_map(|n| {
2314                (!trie_updates.account_nodes_ref().contains_key(n)).then_some((n, None))
2315            })
2316            .collect::<Vec<_>>();
2317        account_updates.extend(
2318            trie_updates.account_nodes_ref().iter().map(|(nibbles, node)| (nibbles, Some(node))),
2319        );
2320        // Sort trie node updates.
2321        account_updates.sort_unstable_by(|a, b| a.0.cmp(b.0));
2322
2323        let tx = self.tx_ref();
2324        let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
2325        for (key, updated_node) in account_updates {
2326            let nibbles = StoredNibbles(key.clone());
2327            match updated_node {
2328                Some(node) => {
2329                    if !nibbles.0.is_empty() {
2330                        num_entries += 1;
2331                        account_trie_cursor.upsert(nibbles, node)?;
2332                    }
2333                }
2334                None => {
2335                    num_entries += 1;
2336                    if account_trie_cursor.seek_exact(nibbles)?.is_some() {
2337                        account_trie_cursor.delete_current()?;
2338                    }
2339                }
2340            }
2341        }
2342
2343        num_entries += self.write_storage_trie_updates(trie_updates.storage_tries_ref())?;
2344
2345        Ok(num_entries)
2346    }
2347}
2348
2349impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
2350    /// Writes storage trie updates from the given storage trie map. First sorts the storage trie
2351    /// updates by the hashed address, writing in sorted order.
2352    fn write_storage_trie_updates(
2353        &self,
2354        storage_tries: &B256Map<StorageTrieUpdates>,
2355    ) -> ProviderResult<usize> {
2356        let mut num_entries = 0;
2357        let mut storage_tries = Vec::from_iter(storage_tries);
2358        storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
2359        let mut cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2360        for (hashed_address, storage_trie_updates) in storage_tries {
2361            let mut db_storage_trie_cursor =
2362                DatabaseStorageTrieCursor::new(cursor, *hashed_address);
2363            num_entries +=
2364                db_storage_trie_cursor.write_storage_trie_updates(storage_trie_updates)?;
2365            cursor = db_storage_trie_cursor.cursor;
2366        }
2367
2368        Ok(num_entries)
2369    }
2370
2371    fn write_individual_storage_trie_updates(
2372        &self,
2373        hashed_address: B256,
2374        updates: &StorageTrieUpdates,
2375    ) -> ProviderResult<usize> {
2376        if updates.is_empty() {
2377            return Ok(0)
2378        }
2379
2380        let cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2381        let mut trie_db_cursor = DatabaseStorageTrieCursor::new(cursor, hashed_address);
2382        Ok(trie_db_cursor.write_storage_trie_updates(updates)?)
2383    }
2384}
2385
2386impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
2387    fn unwind_account_hashing<'a>(
2388        &self,
2389        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2390    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2391        // Aggregate all block changesets and make a list of accounts that have been changed.
2392        // Note that collecting and then reversing the order is necessary to ensure that the
2393        // changes are applied in the correct order.
2394        let hashed_accounts = changesets
2395            .into_iter()
2396            .map(|(_, e)| (keccak256(e.address), e.info))
2397            .collect::<Vec<_>>()
2398            .into_iter()
2399            .rev()
2400            .collect::<BTreeMap<_, _>>();
2401
2402        // Apply values to HashedState, and remove the account if it's None.
2403        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2404        for (hashed_address, account) in &hashed_accounts {
2405            if let Some(account) = account {
2406                hashed_accounts_cursor.upsert(*hashed_address, account)?;
2407            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2408                hashed_accounts_cursor.delete_current()?;
2409            }
2410        }
2411
2412        Ok(hashed_accounts)
2413    }
2414
2415    fn unwind_account_hashing_range(
2416        &self,
2417        range: impl RangeBounds<BlockNumber>,
2418    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2419        let changesets = self
2420            .tx
2421            .cursor_read::<tables::AccountChangeSets>()?
2422            .walk_range(range)?
2423            .collect::<Result<Vec<_>, _>>()?;
2424        self.unwind_account_hashing(changesets.iter())
2425    }
2426
2427    fn insert_account_for_hashing(
2428        &self,
2429        changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
2430    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2431        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2432        let hashed_accounts =
2433            changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
2434        for (hashed_address, account) in &hashed_accounts {
2435            if let Some(account) = account {
2436                hashed_accounts_cursor.upsert(*hashed_address, account)?;
2437            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2438                hashed_accounts_cursor.delete_current()?;
2439            }
2440        }
2441        Ok(hashed_accounts)
2442    }
2443
2444    fn unwind_storage_hashing(
2445        &self,
2446        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2447    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2448        // Aggregate all block changesets and make list of accounts that have been changed.
2449        let mut hashed_storages = changesets
2450            .into_iter()
2451            .map(|(BlockNumberAddress((_, address)), storage_entry)| {
2452                (keccak256(address), keccak256(storage_entry.key), storage_entry.value)
2453            })
2454            .collect::<Vec<_>>();
2455        hashed_storages.sort_by_key(|(ha, hk, _)| (*ha, *hk));
2456
2457        // Apply values to HashedState, and remove the account if it's None.
2458        let mut hashed_storage_keys: HashMap<B256, BTreeSet<B256>> =
2459            HashMap::with_capacity_and_hasher(hashed_storages.len(), Default::default());
2460        let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2461        for (hashed_address, key, value) in hashed_storages.into_iter().rev() {
2462            hashed_storage_keys.entry(hashed_address).or_default().insert(key);
2463
2464            if hashed_storage
2465                .seek_by_key_subkey(hashed_address, key)?
2466                .filter(|entry| entry.key == key)
2467                .is_some()
2468            {
2469                hashed_storage.delete_current()?;
2470            }
2471
2472            if !value.is_zero() {
2473                hashed_storage.upsert(hashed_address, &StorageEntry { key, value })?;
2474            }
2475        }
2476        Ok(hashed_storage_keys)
2477    }
2478
2479    fn unwind_storage_hashing_range(
2480        &self,
2481        range: impl RangeBounds<BlockNumberAddress>,
2482    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2483        let changesets = self
2484            .tx
2485            .cursor_read::<tables::StorageChangeSets>()?
2486            .walk_range(range)?
2487            .collect::<Result<Vec<_>, _>>()?;
2488        self.unwind_storage_hashing(changesets.into_iter())
2489    }
2490
2491    fn insert_storage_for_hashing(
2492        &self,
2493        storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
2494    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2495        // hash values
2496        let hashed_storages =
2497            storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
2498                let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
2499                    map.insert(keccak256(entry.key), entry.value);
2500                    map
2501                });
2502                map.insert(keccak256(address), storage);
2503                map
2504            });
2505
2506        let hashed_storage_keys =
2507            HashMap::from_iter(hashed_storages.iter().map(|(hashed_address, entries)| {
2508                (*hashed_address, BTreeSet::from_iter(entries.keys().copied()))
2509            }));
2510
2511        let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2512        // Hash the address and key and apply them to HashedStorage (if Storage is None
2513        // just remove it);
2514        hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
2515            storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
2516                if hashed_storage_cursor
2517                    .seek_by_key_subkey(hashed_address, key)?
2518                    .filter(|entry| entry.key == key)
2519                    .is_some()
2520                {
2521                    hashed_storage_cursor.delete_current()?;
2522                }
2523
2524                if !value.is_zero() {
2525                    hashed_storage_cursor.upsert(
2526                        hashed_address,
2527                        &StorageEntry { key, value, ..Default::default() },
2528                    )?;
2529                }
2530                Ok(())
2531            })
2532        })?;
2533
2534        Ok(hashed_storage_keys)
2535    }
2536
2537    fn insert_hashes(
2538        &self,
2539        range: RangeInclusive<BlockNumber>,
2540        end_block_hash: B256,
2541        expected_state_root: B256,
2542    ) -> ProviderResult<()> {
2543        // Initialize prefix sets.
2544        let mut account_prefix_set = PrefixSetMut::default();
2545        let mut storage_prefix_sets: HashMap<B256, PrefixSetMut> = HashMap::default();
2546        let mut destroyed_accounts = HashSet::default();
2547
2548        let mut durations_recorder = metrics::DurationsRecorder::default();
2549
2550        // storage hashing stage
2551        {
2552            let lists = self.changed_storages_with_range(range.clone())?;
2553            let storages = self.plain_state_storages(lists)?;
2554            let storage_entries = self.insert_storage_for_hashing(storages)?;
2555            for (hashed_address, hashed_slots) in storage_entries {
2556                account_prefix_set.insert(Nibbles::unpack(hashed_address));
2557                for slot in hashed_slots {
2558                    storage_prefix_sets
2559                        .entry(hashed_address)
2560                        .or_default()
2561                        .insert(Nibbles::unpack(slot));
2562                }
2563            }
2564        }
2565        durations_recorder.record_relative(metrics::Action::InsertStorageHashing);
2566
2567        // account hashing stage
2568        {
2569            let lists = self.changed_accounts_with_range(range.clone())?;
2570            let accounts = self.basic_accounts(lists)?;
2571            let hashed_addresses = self.insert_account_for_hashing(accounts)?;
2572            for (hashed_address, account) in hashed_addresses {
2573                account_prefix_set.insert(Nibbles::unpack(hashed_address));
2574                if account.is_none() {
2575                    destroyed_accounts.insert(hashed_address);
2576                }
2577            }
2578        }
2579        durations_recorder.record_relative(metrics::Action::InsertAccountHashing);
2580
2581        // merkle tree
2582        {
2583            // This is the same as `StateRoot::incremental_root_with_updates`, only the prefix sets
2584            // are pre-loaded.
2585            let prefix_sets = TriePrefixSets {
2586                account_prefix_set: account_prefix_set.freeze(),
2587                storage_prefix_sets: storage_prefix_sets
2588                    .into_iter()
2589                    .map(|(k, v)| (k, v.freeze()))
2590                    .collect(),
2591                destroyed_accounts,
2592            };
2593            let (state_root, trie_updates) = StateRoot::from_tx(&self.tx)
2594                .with_prefix_sets(prefix_sets)
2595                .root_with_updates()
2596                .map_err(reth_db_api::DatabaseError::from)?;
2597            if state_root != expected_state_root {
2598                return Err(ProviderError::StateRootMismatch(Box::new(RootMismatch {
2599                    root: GotExpected { got: state_root, expected: expected_state_root },
2600                    block_number: *range.end(),
2601                    block_hash: end_block_hash,
2602                })))
2603            }
2604            self.write_trie_updates(&trie_updates)?;
2605        }
2606        durations_recorder.record_relative(metrics::Action::InsertMerkleTree);
2607
2608        debug!(target: "providers::db", ?range, actions = ?durations_recorder.actions, "Inserted hashes");
2609
2610        Ok(())
2611    }
2612}
2613
2614impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
2615    fn unwind_account_history_indices<'a>(
2616        &self,
2617        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2618    ) -> ProviderResult<usize> {
2619        let mut last_indices = changesets
2620            .into_iter()
2621            .map(|(index, account)| (account.address, *index))
2622            .collect::<Vec<_>>();
2623        last_indices.sort_by_key(|(a, _)| *a);
2624
2625        // Unwind the account history index.
2626        let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
2627        for &(address, rem_index) in &last_indices {
2628            let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
2629                &mut cursor,
2630                ShardedKey::last(address),
2631                rem_index,
2632                |sharded_key| sharded_key.key == address,
2633            )?;
2634
2635            // Check the last returned partial shard.
2636            // If it's not empty, the shard needs to be reinserted.
2637            if !partial_shard.is_empty() {
2638                cursor.insert(
2639                    ShardedKey::last(address),
2640                    &BlockNumberList::new_pre_sorted(partial_shard),
2641                )?;
2642            }
2643        }
2644
2645        let changesets = last_indices.len();
2646        Ok(changesets)
2647    }
2648
2649    fn unwind_account_history_indices_range(
2650        &self,
2651        range: impl RangeBounds<BlockNumber>,
2652    ) -> ProviderResult<usize> {
2653        let changesets = self
2654            .tx
2655            .cursor_read::<tables::AccountChangeSets>()?
2656            .walk_range(range)?
2657            .collect::<Result<Vec<_>, _>>()?;
2658        self.unwind_account_history_indices(changesets.iter())
2659    }
2660
2661    fn insert_account_history_index(
2662        &self,
2663        account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
2664    ) -> ProviderResult<()> {
2665        self.append_history_index::<_, tables::AccountsHistory>(
2666            account_transitions,
2667            ShardedKey::new,
2668        )
2669    }
2670
2671    fn unwind_storage_history_indices(
2672        &self,
2673        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2674    ) -> ProviderResult<usize> {
2675        let mut storage_changesets = changesets
2676            .into_iter()
2677            .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
2678            .collect::<Vec<_>>();
2679        storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
2680
2681        let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
2682        for &(address, storage_key, rem_index) in &storage_changesets {
2683            let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
2684                &mut cursor,
2685                StorageShardedKey::last(address, storage_key),
2686                rem_index,
2687                |storage_sharded_key| {
2688                    storage_sharded_key.address == address &&
2689                        storage_sharded_key.sharded_key.key == storage_key
2690                },
2691            )?;
2692
2693            // Check the last returned partial shard.
2694            // If it's not empty, the shard needs to be reinserted.
2695            if !partial_shard.is_empty() {
2696                cursor.insert(
2697                    StorageShardedKey::last(address, storage_key),
2698                    &BlockNumberList::new_pre_sorted(partial_shard),
2699                )?;
2700            }
2701        }
2702
2703        let changesets = storage_changesets.len();
2704        Ok(changesets)
2705    }
2706
2707    fn unwind_storage_history_indices_range(
2708        &self,
2709        range: impl RangeBounds<BlockNumberAddress>,
2710    ) -> ProviderResult<usize> {
2711        let changesets = self
2712            .tx
2713            .cursor_read::<tables::StorageChangeSets>()?
2714            .walk_range(range)?
2715            .collect::<Result<Vec<_>, _>>()?;
2716        self.unwind_storage_history_indices(changesets.into_iter())
2717    }
2718
2719    fn insert_storage_history_index(
2720        &self,
2721        storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
2722    ) -> ProviderResult<()> {
2723        self.append_history_index::<_, tables::StoragesHistory>(
2724            storage_transitions,
2725            |(address, storage_key), highest_block_number| {
2726                StorageShardedKey::new(address, storage_key, highest_block_number)
2727            },
2728        )
2729    }
2730
2731    fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
2732        // account history stage
2733        {
2734            let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
2735            self.insert_account_history_index(indices)?;
2736        }
2737
2738        // storage history stage
2739        {
2740            let indices = self.changed_storages_and_blocks_with_range(range)?;
2741            self.insert_storage_history_index(indices)?;
2742        }
2743
2744        Ok(())
2745    }
2746}
2747
2748impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockExecutionWriter
2749    for DatabaseProvider<TX, N>
2750{
2751    fn take_block_and_execution_above(
2752        &self,
2753        block: BlockNumber,
2754        remove_from: StorageLocation,
2755    ) -> ProviderResult<Chain<Self::Primitives>> {
2756        let range = block + 1..=self.last_block_number()?;
2757
2758        self.unwind_trie_state_range(range.clone())?;
2759
2760        // get execution res
2761        let execution_state = self.take_state_above(block, remove_from)?;
2762
2763        let blocks = self.recovered_block_range(range)?;
2764
2765        // remove block bodies it is needed for both get block range and get block execution results
2766        // that is why it is deleted afterwards.
2767        self.remove_blocks_above(block, remove_from)?;
2768
2769        // Update pipeline progress
2770        self.update_pipeline_stages(block, true)?;
2771
2772        Ok(Chain::new(blocks, execution_state, None))
2773    }
2774
2775    fn remove_block_and_execution_above(
2776        &self,
2777        block: BlockNumber,
2778        remove_from: StorageLocation,
2779    ) -> ProviderResult<()> {
2780        let range = block + 1..=self.last_block_number()?;
2781
2782        self.unwind_trie_state_range(range)?;
2783
2784        // remove execution res
2785        self.remove_state_above(block, remove_from)?;
2786
2787        // remove block bodies it is needed for both get block range and get block execution results
2788        // that is why it is deleted afterwards.
2789        self.remove_blocks_above(block, remove_from)?;
2790
2791        // Update pipeline progress
2792        self.update_pipeline_stages(block, true)?;
2793
2794        Ok(())
2795    }
2796}
2797
2798impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWriter
2799    for DatabaseProvider<TX, N>
2800{
2801    type Block = BlockTy<N>;
2802    type Receipt = ReceiptTy<N>;
2803
2804    /// Inserts the block into the database, always modifying the following tables:
2805    /// * [`CanonicalHeaders`](tables::CanonicalHeaders)
2806    /// * [`Headers`](tables::Headers)
2807    /// * [`HeaderNumbers`](tables::HeaderNumbers)
2808    /// * [`HeaderTerminalDifficulties`](tables::HeaderTerminalDifficulties)
2809    /// * [`BlockBodyIndices`](tables::BlockBodyIndices)
2810    ///
2811    /// If there are transactions in the block, the following tables will be modified:
2812    /// * [`Transactions`](tables::Transactions)
2813    /// * [`TransactionBlocks`](tables::TransactionBlocks)
2814    ///
2815    /// If ommers are not empty, this will modify [`BlockOmmers`](tables::BlockOmmers).
2816    /// If withdrawals are not empty, this will modify
2817    /// [`BlockWithdrawals`](tables::BlockWithdrawals).
2818    ///
2819    /// If the provider has __not__ configured full sender pruning, this will modify
2820    /// [`TransactionSenders`](tables::TransactionSenders).
2821    ///
2822    /// If the provider has __not__ configured full transaction lookup pruning, this will modify
2823    /// [`TransactionHashNumbers`](tables::TransactionHashNumbers).
2824    fn insert_block(
2825        &self,
2826        block: RecoveredBlock<Self::Block>,
2827        write_to: StorageLocation,
2828    ) -> ProviderResult<StoredBlockBodyIndices> {
2829        let block_number = block.number();
2830
2831        let mut durations_recorder = metrics::DurationsRecorder::default();
2832
2833        // total difficulty
2834        let ttd = if block_number == 0 {
2835            block.header().difficulty()
2836        } else {
2837            let parent_block_number = block_number - 1;
2838            let parent_ttd = self.header_td_by_number(parent_block_number)?.unwrap_or_default();
2839            durations_recorder.record_relative(metrics::Action::GetParentTD);
2840            parent_ttd + block.header().difficulty()
2841        };
2842
2843        if write_to.database() {
2844            self.tx.put::<tables::CanonicalHeaders>(block_number, block.hash())?;
2845            durations_recorder.record_relative(metrics::Action::InsertCanonicalHeaders);
2846
2847            // Put header with canonical hashes.
2848            self.tx.put::<tables::Headers<HeaderTy<N>>>(block_number, block.header().clone())?;
2849            durations_recorder.record_relative(metrics::Action::InsertHeaders);
2850
2851            self.tx.put::<tables::HeaderTerminalDifficulties>(block_number, ttd.into())?;
2852            durations_recorder.record_relative(metrics::Action::InsertHeaderTerminalDifficulties);
2853        }
2854
2855        if write_to.static_files() {
2856            let mut writer =
2857                self.static_file_provider.get_writer(block_number, StaticFileSegment::Headers)?;
2858            writer.append_header(block.header(), ttd, &block.hash())?;
2859        }
2860
2861        self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
2862        durations_recorder.record_relative(metrics::Action::InsertHeaderNumbers);
2863
2864        let mut next_tx_num = self
2865            .tx
2866            .cursor_read::<tables::TransactionBlocks>()?
2867            .last()?
2868            .map(|(n, _)| n + 1)
2869            .unwrap_or_default();
2870        durations_recorder.record_relative(metrics::Action::GetNextTxNum);
2871        let first_tx_num = next_tx_num;
2872
2873        let tx_count = block.body().transaction_count() as u64;
2874
2875        // Ensures we have all the senders for the block's transactions.
2876        for (transaction, sender) in block.body().transactions_iter().zip(block.senders_iter()) {
2877            let hash = transaction.tx_hash();
2878
2879            if self.prune_modes.sender_recovery.as_ref().is_none_or(|m| !m.is_full()) {
2880                self.tx.put::<tables::TransactionSenders>(next_tx_num, *sender)?;
2881            }
2882
2883            if self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full()) {
2884                self.tx.put::<tables::TransactionHashNumbers>(*hash, next_tx_num)?;
2885            }
2886            next_tx_num += 1;
2887        }
2888
2889        self.append_block_bodies(vec![(block_number, Some(block.into_body()))], write_to)?;
2890
2891        debug!(
2892            target: "providers::db",
2893            ?block_number,
2894            actions = ?durations_recorder.actions,
2895            "Inserted block"
2896        );
2897
2898        Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
2899    }
2900
2901    fn append_block_bodies(
2902        &self,
2903        bodies: Vec<(BlockNumber, Option<BodyTy<N>>)>,
2904        write_to: StorageLocation,
2905    ) -> ProviderResult<()> {
2906        let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
2907
2908        // Initialize writer if we will be writing transactions to staticfiles
2909        let mut tx_static_writer = write_to
2910            .static_files()
2911            .then(|| {
2912                self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)
2913            })
2914            .transpose()?;
2915
2916        let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
2917        let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
2918
2919        // Initialize cursor if we will be writing transactions to database
2920        let mut tx_cursor = write_to
2921            .database()
2922            .then(|| self.tx.cursor_write::<tables::Transactions<TxTy<N>>>())
2923            .transpose()?;
2924
2925        // Get id for the next tx_num or zero if there are no transactions.
2926        let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
2927
2928        for (block_number, body) in &bodies {
2929            // Increment block on static file header.
2930            if let Some(writer) = tx_static_writer.as_mut() {
2931                writer.increment_block(*block_number)?;
2932            }
2933
2934            let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
2935            let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
2936
2937            let mut durations_recorder = metrics::DurationsRecorder::default();
2938
2939            // insert block meta
2940            block_indices_cursor.append(*block_number, &block_indices)?;
2941
2942            durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
2943
2944            let Some(body) = body else { continue };
2945
2946            // write transaction block index
2947            if !body.transactions().is_empty() {
2948                tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
2949                durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
2950            }
2951
2952            // write transactions
2953            for transaction in body.transactions() {
2954                if let Some(writer) = tx_static_writer.as_mut() {
2955                    writer.append_transaction(next_tx_num, transaction)?;
2956                }
2957                if let Some(cursor) = tx_cursor.as_mut() {
2958                    cursor.append(next_tx_num, transaction)?;
2959                }
2960
2961                // Increment transaction id for each transaction.
2962                next_tx_num += 1;
2963            }
2964
2965            debug!(
2966                target: "providers::db",
2967                ?block_number,
2968                actions = ?durations_recorder.actions,
2969                "Inserted block body"
2970            );
2971        }
2972
2973        self.storage.writer().write_block_bodies(self, bodies, write_to)?;
2974
2975        Ok(())
2976    }
2977
2978    fn remove_blocks_above(
2979        &self,
2980        block: BlockNumber,
2981        remove_from: StorageLocation,
2982    ) -> ProviderResult<()> {
2983        for hash in self.canonical_hashes_range(block + 1, self.last_block_number()? + 1)? {
2984            self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
2985        }
2986
2987        // Only prune canonical headers after we've removed the block hashes as we rely on data from
2988        // this table in `canonical_hashes_range`.
2989        self.remove::<tables::CanonicalHeaders>(block + 1..)?;
2990        self.remove::<tables::Headers<HeaderTy<N>>>(block + 1..)?;
2991        self.remove::<tables::HeaderTerminalDifficulties>(block + 1..)?;
2992
2993        // First transaction to be removed
2994        let unwind_tx_from = self
2995            .block_body_indices(block)?
2996            .map(|b| b.next_tx_num())
2997            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
2998
2999        // Last transaction to be removed
3000        let unwind_tx_to = self
3001            .tx
3002            .cursor_read::<tables::BlockBodyIndices>()?
3003            .last()?
3004            // shouldn't happen because this was OK above
3005            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
3006            .1
3007            .last_tx_num();
3008
3009        if unwind_tx_from <= unwind_tx_to {
3010            for (hash, _) in self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))? {
3011                self.tx.delete::<tables::TransactionHashNumbers>(hash, None)?;
3012            }
3013        }
3014
3015        self.remove::<tables::TransactionSenders>(unwind_tx_from..)?;
3016
3017        self.remove_bodies_above(block, remove_from)?;
3018
3019        Ok(())
3020    }
3021
3022    fn remove_bodies_above(
3023        &self,
3024        block: BlockNumber,
3025        remove_from: StorageLocation,
3026    ) -> ProviderResult<()> {
3027        self.storage.writer().remove_block_bodies_above(self, block, remove_from)?;
3028
3029        // First transaction to be removed
3030        let unwind_tx_from = self
3031            .block_body_indices(block)?
3032            .map(|b| b.next_tx_num())
3033            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3034
3035        self.remove::<tables::BlockBodyIndices>(block + 1..)?;
3036        self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
3037
3038        if remove_from.database() {
3039            self.remove::<tables::Transactions<TxTy<N>>>(unwind_tx_from..)?;
3040        }
3041
3042        if remove_from.static_files() {
3043            let static_file_tx_num = self
3044                .static_file_provider
3045                .get_highest_static_file_tx(StaticFileSegment::Transactions);
3046
3047            let to_delete = static_file_tx_num
3048                .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
3049                .unwrap_or_default();
3050
3051            self.static_file_provider
3052                .latest_writer(StaticFileSegment::Transactions)?
3053                .prune_transactions(to_delete, block)?;
3054        }
3055
3056        Ok(())
3057    }
3058
3059    /// TODO(joshie): this fn should be moved to `UnifiedStorageWriter` eventually
3060    fn append_blocks_with_state(
3061        &self,
3062        blocks: Vec<RecoveredBlock<Self::Block>>,
3063        execution_outcome: &ExecutionOutcome<Self::Receipt>,
3064        hashed_state: HashedPostStateSorted,
3065        trie_updates: TrieUpdates,
3066    ) -> ProviderResult<()> {
3067        if blocks.is_empty() {
3068            debug!(target: "providers::db", "Attempted to append empty block range");
3069            return Ok(())
3070        }
3071
3072        let first_number = blocks.first().unwrap().number();
3073
3074        let last = blocks.last().unwrap();
3075        let last_block_number = last.number();
3076
3077        let mut durations_recorder = metrics::DurationsRecorder::default();
3078
3079        // Insert the blocks
3080        for block in blocks {
3081            self.insert_block(block, StorageLocation::Database)?;
3082            durations_recorder.record_relative(metrics::Action::InsertBlock);
3083        }
3084
3085        self.write_state(execution_outcome, OriginalValuesKnown::No, StorageLocation::Database)?;
3086        durations_recorder.record_relative(metrics::Action::InsertState);
3087
3088        // insert hashes and intermediate merkle nodes
3089        self.write_hashed_state(&hashed_state)?;
3090        self.write_trie_updates(&trie_updates)?;
3091        durations_recorder.record_relative(metrics::Action::InsertHashes);
3092
3093        self.update_history_indices(first_number..=last_block_number)?;
3094        durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3095
3096        // Update pipeline progress
3097        self.update_pipeline_stages(last_block_number, false)?;
3098        durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3099
3100        debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3101
3102        Ok(())
3103    }
3104}
3105
3106impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3107    fn get_prune_checkpoint(
3108        &self,
3109        segment: PruneSegment,
3110    ) -> ProviderResult<Option<PruneCheckpoint>> {
3111        Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3112    }
3113
3114    fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3115        Ok(self
3116            .tx
3117            .cursor_read::<tables::PruneCheckpoints>()?
3118            .walk(None)?
3119            .collect::<Result<_, _>>()?)
3120    }
3121}
3122
3123impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3124    fn save_prune_checkpoint(
3125        &self,
3126        segment: PruneSegment,
3127        checkpoint: PruneCheckpoint,
3128    ) -> ProviderResult<()> {
3129        Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3130    }
3131}
3132
3133impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3134    fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3135        let db_entries = self.tx.entries::<T>()?;
3136        let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3137            Ok(entries) => entries,
3138            Err(ProviderError::UnsupportedProvider) => 0,
3139            Err(err) => return Err(err),
3140        };
3141
3142        Ok(db_entries + static_file_entries)
3143    }
3144}
3145
3146impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3147    fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3148        let mut finalized_blocks = self
3149            .tx
3150            .cursor_read::<tables::ChainState>()?
3151            .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3152            .take(1)
3153            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3154
3155        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3156        Ok(last_finalized_block_number)
3157    }
3158
3159    fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3160        let mut finalized_blocks = self
3161            .tx
3162            .cursor_read::<tables::ChainState>()?
3163            .walk(Some(tables::ChainStateKey::LastSafeBlockBlock))?
3164            .take(1)
3165            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3166
3167        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3168        Ok(last_finalized_block_number)
3169    }
3170}
3171
3172impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3173    fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3174        Ok(self
3175            .tx
3176            .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3177    }
3178
3179    fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3180        Ok(self
3181            .tx
3182            .put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlockBlock, block_number)?)
3183    }
3184}
3185
3186impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3187    type Tx = TX;
3188
3189    fn tx_ref(&self) -> &Self::Tx {
3190        &self.tx
3191    }
3192
3193    fn tx_mut(&mut self) -> &mut Self::Tx {
3194        &mut self.tx
3195    }
3196
3197    fn into_tx(self) -> Self::Tx {
3198        self.tx
3199    }
3200
3201    fn prune_modes_ref(&self) -> &PruneModes {
3202        self.prune_modes_ref()
3203    }
3204}