reth_provider/providers/database/
provider.rs

1use crate::{
2    bundle_state::StorageRevertsIter,
3    providers::{
4        database::{chain::ChainStorage, metrics},
5        static_file::StaticFileWriter,
6        NodeTypesForProvider, StaticFileProvider,
7    },
8    to_range,
9    traits::{
10        AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
11    },
12    AccountReader, BlockBodyWriter, BlockExecutionWriter, BlockHashReader, BlockNumReader,
13    BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
14    DBProvider, HashingWriter, HeaderProvider, HeaderSyncGapProvider, HistoricalStateProvider,
15    HistoricalStateProviderRef, HistoryWriter, LatestStateProvider, LatestStateProviderRef,
16    OriginalValuesKnown, ProviderError, PruneCheckpointReader, PruneCheckpointWriter, RevertsInit,
17    StageCheckpointReader, StateCommitmentProvider, StateProviderBox, StateWriter,
18    StaticFileProviderFactory, StatsReader, StorageLocation, StorageReader, StorageTrieWriter,
19    TransactionVariant, TransactionsProvider, TransactionsProviderExt, TrieWriter,
20    WithdrawalsProvider,
21};
22use alloy_consensus::{
23    transaction::{SignerRecoverable, TransactionMeta},
24    BlockHeader, Header, TxReceipt,
25};
26use alloy_eips::{eip2718::Encodable2718, eip4895::Withdrawals, BlockHashOrNumber};
27use alloy_primitives::{
28    keccak256,
29    map::{hash_map, B256Map, HashMap, HashSet},
30    Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256,
31};
32use itertools::Itertools;
33use rayon::slice::ParallelSliceMut;
34use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, EthereumHardforks};
35use reth_db_api::{
36    cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
37    database::Database,
38    models::{
39        sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
40        ShardedKey, StoredBlockBodyIndices,
41    },
42    table::Table,
43    tables,
44    transaction::{DbTx, DbTxMut},
45    BlockNumberList, DatabaseError, PlainAccountState, PlainStorageState,
46};
47use reth_execution_types::{Chain, ExecutionOutcome};
48use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy};
49use reth_primitives_traits::{
50    Account, Block as _, BlockBody as _, Bytecode, GotExpected, NodePrimitives, RecoveredBlock,
51    SealedBlock, SealedHeader, SignedTransaction, StorageEntry,
52};
53use reth_prune_types::{
54    PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_PRUNING_DISTANCE,
55};
56use reth_stages_types::{StageCheckpoint, StageId};
57use reth_static_file_types::StaticFileSegment;
58use reth_storage_api::{
59    BlockBodyIndicesProvider, BlockBodyReader, NodePrimitivesProvider, OmmersProvider,
60    StateProvider, StorageChangeSetReader, TryIntoHistoricalStateProvider,
61};
62use reth_storage_errors::provider::{ProviderResult, RootMismatch};
63use reth_trie::{
64    prefix_set::{PrefixSet, PrefixSetMut, TriePrefixSets},
65    updates::{StorageTrieUpdates, TrieUpdates},
66    HashedPostStateSorted, Nibbles, StateRoot, StoredNibbles,
67};
68use reth_trie_db::{DatabaseStateRoot, DatabaseStorageTrieCursor};
69use revm_database::states::{
70    PlainStateReverts, PlainStorageChangeset, PlainStorageRevert, StateChangeset,
71};
72use revm_state::FlaggedStorage;
73use std::{
74    cmp::Ordering,
75    collections::{BTreeMap, BTreeSet},
76    fmt::Debug,
77    ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive},
78    sync::{mpsc, Arc},
79};
80use tracing::{debug, trace};
81
82/// A [`DatabaseProvider`] that holds a read-only database transaction.
83pub type DatabaseProviderRO<DB, N> = DatabaseProvider<<DB as Database>::TX, N>;
84
85/// A [`DatabaseProvider`] that holds a read-write database transaction.
86///
87/// Ideally this would be an alias type. However, there's some weird compiler error (<https://github.com/rust-lang/rust/issues/102211>), that forces us to wrap this in a struct instead.
88/// Once that issue is solved, we can probably revert back to being an alias type.
89#[derive(Debug)]
90pub struct DatabaseProviderRW<DB: Database, N: NodeTypes>(
91    pub DatabaseProvider<<DB as Database>::TXMut, N>,
92);
93
94impl<DB: Database, N: NodeTypes> Deref for DatabaseProviderRW<DB, N> {
95    type Target = DatabaseProvider<<DB as Database>::TXMut, N>;
96
97    fn deref(&self) -> &Self::Target {
98        &self.0
99    }
100}
101
102impl<DB: Database, N: NodeTypes> DerefMut for DatabaseProviderRW<DB, N> {
103    fn deref_mut(&mut self) -> &mut Self::Target {
104        &mut self.0
105    }
106}
107
108impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut, N>>
109    for DatabaseProviderRW<DB, N>
110{
111    fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut, N> {
112        &self.0
113    }
114}
115
116impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
117    /// Commit database transaction and static file if it exists.
118    pub fn commit(self) -> ProviderResult<bool> {
119        self.0.commit()
120    }
121
122    /// Consume `DbTx` or `DbTxMut`.
123    pub fn into_tx(self) -> <DB as Database>::TXMut {
124        self.0.into_tx()
125    }
126}
127
128impl<DB: Database, N: NodeTypes> From<DatabaseProviderRW<DB, N>>
129    for DatabaseProvider<<DB as Database>::TXMut, N>
130{
131    fn from(provider: DatabaseProviderRW<DB, N>) -> Self {
132        provider.0
133    }
134}
135
136/// A provider struct that fetches data from the database.
137/// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`]
138#[derive(Debug)]
139pub struct DatabaseProvider<TX, N: NodeTypes> {
140    /// Database transaction.
141    tx: TX,
142    /// Chain spec
143    chain_spec: Arc<N::ChainSpec>,
144    /// Static File provider
145    static_file_provider: StaticFileProvider<N::Primitives>,
146    /// Pruning configuration
147    prune_modes: PruneModes,
148    /// Node storage handler.
149    storage: Arc<N::Storage>,
150}
151
152impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
153    /// Returns reference to prune modes.
154    pub const fn prune_modes_ref(&self) -> &PruneModes {
155        &self.prune_modes
156    }
157}
158
159impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
160    /// State provider for latest state
161    pub fn latest<'a>(&'a self) -> Box<dyn StateProvider + 'a> {
162        trace!(target: "providers::db", "Returning latest state provider");
163        Box::new(LatestStateProviderRef::new(self))
164    }
165
166    /// Storage provider for state at that given block hash
167    pub fn history_by_block_hash<'a>(
168        &'a self,
169        block_hash: BlockHash,
170    ) -> ProviderResult<Box<dyn StateProvider + 'a>> {
171        let mut block_number =
172            self.block_number(block_hash)?.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
173        if block_number == self.best_block_number().unwrap_or_default() &&
174            block_number == self.last_block_number().unwrap_or_default()
175        {
176            return Ok(Box::new(LatestStateProviderRef::new(self)))
177        }
178
179        // +1 as the changeset that we want is the one that was applied after this block.
180        block_number += 1;
181
182        let account_history_prune_checkpoint =
183            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
184        let storage_history_prune_checkpoint =
185            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
186
187        let mut state_provider = HistoricalStateProviderRef::new(self, block_number);
188
189        // If we pruned account or storage history, we can't return state on every historical block.
190        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
191        if let Some(prune_checkpoint_block_number) =
192            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
193        {
194            state_provider = state_provider.with_lowest_available_account_history_block_number(
195                prune_checkpoint_block_number + 1,
196            );
197        }
198        if let Some(prune_checkpoint_block_number) =
199            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
200        {
201            state_provider = state_provider.with_lowest_available_storage_history_block_number(
202                prune_checkpoint_block_number + 1,
203            );
204        }
205
206        Ok(Box::new(state_provider))
207    }
208
209    #[cfg(feature = "test-utils")]
210    /// Sets the prune modes for provider.
211    pub fn set_prune_modes(&mut self, prune_modes: PruneModes) {
212        self.prune_modes = prune_modes;
213    }
214}
215
216impl<TX, N: NodeTypes> NodePrimitivesProvider for DatabaseProvider<TX, N> {
217    type Primitives = N::Primitives;
218}
219
220impl<TX, N: NodeTypes> StaticFileProviderFactory for DatabaseProvider<TX, N> {
221    /// Returns a static file provider
222    fn static_file_provider(&self) -> StaticFileProvider<Self::Primitives> {
223        self.static_file_provider.clone()
224    }
225}
226
227impl<TX: Debug + Send + Sync, N: NodeTypes<ChainSpec: EthChainSpec + 'static>> ChainSpecProvider
228    for DatabaseProvider<TX, N>
229{
230    type ChainSpec = N::ChainSpec;
231
232    fn chain_spec(&self) -> Arc<Self::ChainSpec> {
233        self.chain_spec.clone()
234    }
235}
236
237impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
238    /// Creates a provider with an inner read-write transaction.
239    pub const fn new_rw(
240        tx: TX,
241        chain_spec: Arc<N::ChainSpec>,
242        static_file_provider: StaticFileProvider<N::Primitives>,
243        prune_modes: PruneModes,
244        storage: Arc<N::Storage>,
245    ) -> Self {
246        Self { tx, chain_spec, static_file_provider, prune_modes, storage }
247    }
248}
249
250impl<TX, N: NodeTypes> AsRef<Self> for DatabaseProvider<TX, N> {
251    fn as_ref(&self) -> &Self {
252        self
253    }
254}
255
256impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
257    /// Unwinds trie state for the given range.
258    ///
259    /// This includes calculating the resulted state root and comparing it with the parent block
260    /// state root.
261    pub fn unwind_trie_state_range(
262        &self,
263        range: RangeInclusive<BlockNumber>,
264    ) -> ProviderResult<()> {
265        let changed_accounts = self
266            .tx
267            .cursor_read::<tables::AccountChangeSets>()?
268            .walk_range(range.clone())?
269            .collect::<Result<Vec<_>, _>>()?;
270
271        // Unwind account hashes. Add changed accounts to account prefix set.
272        let hashed_addresses = self.unwind_account_hashing(changed_accounts.iter())?;
273        let mut account_prefix_set = PrefixSetMut::with_capacity(hashed_addresses.len());
274        let mut destroyed_accounts = HashSet::default();
275        for (hashed_address, account) in hashed_addresses {
276            account_prefix_set.insert(Nibbles::unpack(hashed_address));
277            if account.is_none() {
278                destroyed_accounts.insert(hashed_address);
279            }
280        }
281
282        // Unwind account history indices.
283        self.unwind_account_history_indices(changed_accounts.iter())?;
284        let storage_range = BlockNumberAddress::range(range.clone());
285
286        let changed_storages = self
287            .tx
288            .cursor_read::<tables::StorageChangeSets>()?
289            .walk_range(storage_range)?
290            .collect::<Result<Vec<_>, _>>()?;
291
292        // Unwind storage hashes. Add changed account and storage keys to corresponding prefix
293        // sets.
294        let mut storage_prefix_sets = B256Map::<PrefixSet>::default();
295        let storage_entries = self.unwind_storage_hashing(changed_storages.iter().copied())?;
296        for (hashed_address, hashed_slots) in storage_entries {
297            account_prefix_set.insert(Nibbles::unpack(hashed_address));
298            let mut storage_prefix_set = PrefixSetMut::with_capacity(hashed_slots.len());
299            for slot in hashed_slots {
300                storage_prefix_set.insert(Nibbles::unpack(slot));
301            }
302            storage_prefix_sets.insert(hashed_address, storage_prefix_set.freeze());
303        }
304
305        // Unwind storage history indices.
306        self.unwind_storage_history_indices(changed_storages.iter().copied())?;
307
308        // Calculate the reverted merkle root.
309        // This is the same as `StateRoot::incremental_root_with_updates`, only the prefix sets
310        // are pre-loaded.
311        let prefix_sets = TriePrefixSets {
312            account_prefix_set: account_prefix_set.freeze(),
313            storage_prefix_sets,
314            destroyed_accounts,
315        };
316        let (new_state_root, trie_updates) = StateRoot::from_tx(&self.tx)
317            .with_prefix_sets(prefix_sets)
318            .root_with_updates()
319            .map_err(reth_db_api::DatabaseError::from)?;
320
321        let parent_number = range.start().saturating_sub(1);
322        let parent_state_root = self
323            .header_by_number(parent_number)?
324            .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?
325            .state_root();
326
327        // state root should be always correct as we are reverting state.
328        // but for sake of double verification we will check it again.
329        if new_state_root != parent_state_root {
330            let parent_hash = self
331                .block_hash(parent_number)?
332                .ok_or_else(|| ProviderError::HeaderNotFound(parent_number.into()))?;
333            return Err(ProviderError::UnwindStateRootMismatch(Box::new(RootMismatch {
334                root: GotExpected { got: new_state_root, expected: parent_state_root },
335                block_number: parent_number,
336                block_hash: parent_hash,
337            })))
338        }
339        self.write_trie_updates(&trie_updates)?;
340
341        Ok(())
342    }
343
344    /// Removes receipts from all transactions starting with provided number (inclusive).
345    fn remove_receipts_from(
346        &self,
347        from_tx: TxNumber,
348        last_block: BlockNumber,
349        remove_from: StorageLocation,
350    ) -> ProviderResult<()> {
351        if remove_from.database() {
352            // iterate over block body and remove receipts
353            self.remove::<tables::Receipts<ReceiptTy<N>>>(from_tx..)?;
354        }
355
356        if remove_from.static_files() && !self.prune_modes.has_receipts_pruning() {
357            let static_file_receipt_num =
358                self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts);
359
360            let to_delete = static_file_receipt_num
361                .map(|static_num| (static_num + 1).saturating_sub(from_tx))
362                .unwrap_or_default();
363
364            self.static_file_provider
365                .latest_writer(StaticFileSegment::Receipts)?
366                .prune_receipts(to_delete, last_block)?;
367        }
368
369        Ok(())
370    }
371}
372
373impl<TX: DbTx + 'static, N: NodeTypes> TryIntoHistoricalStateProvider for DatabaseProvider<TX, N> {
374    fn try_into_history_at_block(
375        self,
376        mut block_number: BlockNumber,
377    ) -> ProviderResult<StateProviderBox> {
378        // if the block number is the same as the currently best block number on disk we can use the
379        // latest state provider here
380        if block_number == self.best_block_number().unwrap_or_default() {
381            return Ok(Box::new(LatestStateProvider::new(self)))
382        }
383
384        // +1 as the changeset that we want is the one that was applied after this block.
385        block_number += 1;
386
387        let account_history_prune_checkpoint =
388            self.get_prune_checkpoint(PruneSegment::AccountHistory)?;
389        let storage_history_prune_checkpoint =
390            self.get_prune_checkpoint(PruneSegment::StorageHistory)?;
391
392        let mut state_provider = HistoricalStateProvider::new(self, block_number);
393
394        // If we pruned account or storage history, we can't return state on every historical block.
395        // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.
396        if let Some(prune_checkpoint_block_number) =
397            account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
398        {
399            state_provider = state_provider.with_lowest_available_account_history_block_number(
400                prune_checkpoint_block_number + 1,
401            );
402        }
403        if let Some(prune_checkpoint_block_number) =
404            storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
405        {
406            state_provider = state_provider.with_lowest_available_storage_history_block_number(
407                prune_checkpoint_block_number + 1,
408            );
409        }
410
411        Ok(Box::new(state_provider))
412    }
413}
414
415impl<TX: DbTx + 'static, N: NodeTypes> StateCommitmentProvider for DatabaseProvider<TX, N> {
416    type StateCommitment = N::StateCommitment;
417}
418
419impl<
420        Tx: DbTx + DbTxMut + 'static,
421        N: NodeTypesForProvider<Primitives: NodePrimitives<BlockHeader = Header>>,
422    > DatabaseProvider<Tx, N>
423{
424    // TODO: uncomment below, once `reth debug_cmd` has been feature gated with dev.
425    // #[cfg(any(test, feature = "test-utils"))]
426    /// Inserts an historical block. **Used for setting up test environments**
427    pub fn insert_historical_block(
428        &self,
429        block: RecoveredBlock<<Self as BlockWriter>::Block>,
430    ) -> ProviderResult<StoredBlockBodyIndices> {
431        let ttd = if block.number() == 0 {
432            block.header().difficulty()
433        } else {
434            let parent_block_number = block.number() - 1;
435            let parent_ttd = self.header_td_by_number(parent_block_number)?.unwrap_or_default();
436            parent_ttd + block.header().difficulty()
437        };
438
439        let mut writer = self.static_file_provider.latest_writer(StaticFileSegment::Headers)?;
440
441        // Backfill: some tests start at a forward block number, but static files require no gaps.
442        let segment_header = writer.user_header();
443        if segment_header.block_end().is_none() && segment_header.expected_block_start() == 0 {
444            for block_number in 0..block.number() {
445                let mut prev = block.clone_header();
446                prev.number = block_number;
447                writer.append_header(&prev, U256::ZERO, &B256::ZERO)?;
448            }
449        }
450
451        writer.append_header(block.header(), ttd, &block.hash())?;
452
453        self.insert_block(block, StorageLocation::Database)
454    }
455}
456
457/// For a given key, unwind all history shards that are below the given block number.
458///
459/// S - Sharded key subtype.
460/// T - Table to walk over.
461/// C - Cursor implementation.
462///
463/// This function walks the entries from the given start key and deletes all shards that belong to
464/// the key and are below the given block number.
465///
466/// The boundary shard (the shard is split by the block number) is removed from the database. Any
467/// indices that are above the block number are filtered out. The boundary shard is returned for
468/// reinsertion (if it's not empty).
469fn unwind_history_shards<S, T, C>(
470    cursor: &mut C,
471    start_key: T::Key,
472    block_number: BlockNumber,
473    mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool,
474) -> ProviderResult<Vec<u64>>
475where
476    T: Table<Value = BlockNumberList>,
477    T::Key: AsRef<ShardedKey<S>>,
478    C: DbCursorRO<T> + DbCursorRW<T>,
479{
480    let mut item = cursor.seek_exact(start_key)?;
481    while let Some((sharded_key, list)) = item {
482        // If the shard does not belong to the key, break.
483        if !shard_belongs_to_key(&sharded_key) {
484            break
485        }
486        cursor.delete_current()?;
487
488        // Check the first item.
489        // If it is greater or eq to the block number, delete it.
490        let first = list.iter().next().expect("List can't be empty");
491        if first >= block_number {
492            item = cursor.prev()?;
493            continue
494        } else if block_number <= sharded_key.as_ref().highest_block_number {
495            // Filter out all elements greater than block number.
496            return Ok(list.iter().take_while(|i| *i < block_number).collect::<Vec<_>>())
497        }
498        return Ok(list.iter().collect::<Vec<_>>())
499    }
500
501    Ok(Vec::new())
502}
503
504impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
505    /// Creates a provider with an inner read-only transaction.
506    pub const fn new(
507        tx: TX,
508        chain_spec: Arc<N::ChainSpec>,
509        static_file_provider: StaticFileProvider<N::Primitives>,
510        prune_modes: PruneModes,
511        storage: Arc<N::Storage>,
512    ) -> Self {
513        Self { tx, chain_spec, static_file_provider, prune_modes, storage }
514    }
515
516    /// Consume `DbTx` or `DbTxMut`.
517    pub fn into_tx(self) -> TX {
518        self.tx
519    }
520
521    /// Pass `DbTx` or `DbTxMut` mutable reference.
522    pub const fn tx_mut(&mut self) -> &mut TX {
523        &mut self.tx
524    }
525
526    /// Pass `DbTx` or `DbTxMut` immutable reference.
527    pub const fn tx_ref(&self) -> &TX {
528        &self.tx
529    }
530
531    /// Returns a reference to the chain specification.
532    pub fn chain_spec(&self) -> &N::ChainSpec {
533        &self.chain_spec
534    }
535}
536
537impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
538    fn transactions_by_tx_range_with_cursor<C>(
539        &self,
540        range: impl RangeBounds<TxNumber>,
541        cursor: &mut C,
542    ) -> ProviderResult<Vec<TxTy<N>>>
543    where
544        C: DbCursorRO<tables::Transactions<TxTy<N>>>,
545    {
546        self.static_file_provider.get_range_with_static_file_or_database(
547            StaticFileSegment::Transactions,
548            to_range(range),
549            |static_file, range, _| static_file.transactions_by_tx_range(range),
550            |range, _| self.cursor_collect(cursor, range),
551            |_| true,
552        )
553    }
554
555    fn recovered_block<H, HF, B, BF>(
556        &self,
557        id: BlockHashOrNumber,
558        _transaction_kind: TransactionVariant,
559        header_by_number: HF,
560        construct_block: BF,
561    ) -> ProviderResult<Option<B>>
562    where
563        H: AsRef<HeaderTy<N>>,
564        HF: FnOnce(BlockNumber) -> ProviderResult<Option<H>>,
565        BF: FnOnce(H, BodyTy<N>, Vec<Address>) -> ProviderResult<Option<B>>,
566    {
567        let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
568        let Some(header) = header_by_number(block_number)? else { return Ok(None) };
569
570        // Get the block body
571        //
572        // If the body indices are not found, this means that the transactions either do not exist
573        // in the database yet, or they do exit but are not indexed. If they exist but are not
574        // indexed, we don't have enough information to return the block anyways, so we return
575        // `None`.
576        let Some(body) = self.block_body_indices(block_number)? else { return Ok(None) };
577
578        let tx_range = body.tx_num_range();
579
580        let (transactions, senders) = if tx_range.is_empty() {
581            (vec![], vec![])
582        } else {
583            (self.transactions_by_tx_range(tx_range.clone())?, self.senders_by_tx_range(tx_range)?)
584        };
585
586        let body = self
587            .storage
588            .reader()
589            .read_block_bodies(self, vec![(header.as_ref(), transactions)])?
590            .pop()
591            .ok_or(ProviderError::InvalidStorageOutput)?;
592
593        construct_block(header, body, senders)
594    }
595
596    /// Returns a range of blocks from the database.
597    ///
598    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
599    /// construct blocks from the following inputs:
600    ///     – Header
601    ///     - Range of transaction numbers
602    ///     – Ommers
603    ///     – Withdrawals
604    ///     – Senders
605    fn block_range<F, H, HF, R>(
606        &self,
607        range: RangeInclusive<BlockNumber>,
608        headers_range: HF,
609        mut assemble_block: F,
610    ) -> ProviderResult<Vec<R>>
611    where
612        H: AsRef<HeaderTy<N>>,
613        HF: FnOnce(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
614        F: FnMut(H, BodyTy<N>, Range<TxNumber>) -> ProviderResult<R>,
615    {
616        if range.is_empty() {
617            return Ok(Vec::new())
618        }
619
620        let len = range.end().saturating_sub(*range.start()) as usize;
621        let mut blocks = Vec::with_capacity(len);
622
623        let headers = headers_range(range.clone())?;
624        let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<TxTy<N>>>()?;
625
626        // If the body indices are not found, this means that the transactions either do
627        // not exist in the database yet, or they do exit but are
628        // not indexed. If they exist but are not indexed, we don't
629        // have enough information to return the block anyways, so
630        // we skip the block.
631        let present_headers = self
632            .block_body_indices_range(range)?
633            .into_iter()
634            .map(|b| b.tx_num_range())
635            .zip(headers)
636            .collect::<Vec<_>>();
637
638        let mut inputs = Vec::new();
639        for (tx_range, header) in &present_headers {
640            let transactions = if tx_range.is_empty() {
641                Vec::new()
642            } else {
643                self.transactions_by_tx_range_with_cursor(tx_range.clone(), &mut tx_cursor)?
644            };
645
646            inputs.push((header.as_ref(), transactions));
647        }
648
649        let bodies = self.storage.reader().read_block_bodies(self, inputs)?;
650
651        for ((tx_range, header), body) in present_headers.into_iter().zip(bodies) {
652            blocks.push(assemble_block(header, body, tx_range)?);
653        }
654
655        Ok(blocks)
656    }
657
658    /// Returns a range of blocks from the database, along with the senders of each
659    /// transaction in the blocks.
660    ///
661    /// Uses the provided `headers_range` to get the headers for the range, and `assemble_block` to
662    /// construct blocks from the following inputs:
663    ///     – Header
664    ///     - Transactions
665    ///     – Ommers
666    ///     – Withdrawals
667    ///     – Senders
668    fn block_with_senders_range<H, HF, B, BF>(
669        &self,
670        range: RangeInclusive<BlockNumber>,
671        headers_range: HF,
672        assemble_block: BF,
673    ) -> ProviderResult<Vec<B>>
674    where
675        H: AsRef<HeaderTy<N>>,
676        HF: Fn(RangeInclusive<BlockNumber>) -> ProviderResult<Vec<H>>,
677        BF: Fn(H, BodyTy<N>, Vec<Address>) -> ProviderResult<B>,
678    {
679        let mut senders_cursor = self.tx.cursor_read::<tables::TransactionSenders>()?;
680
681        self.block_range(range, headers_range, |header, body, tx_range| {
682            let senders = if tx_range.is_empty() {
683                Vec::new()
684            } else {
685                // fetch senders from the senders table
686                let known_senders =
687                    senders_cursor
688                        .walk_range(tx_range.clone())?
689                        .collect::<Result<HashMap<_, _>, _>>()?;
690
691                let mut senders = Vec::with_capacity(body.transactions().len());
692                for (tx_num, tx) in tx_range.zip(body.transactions()) {
693                    match known_senders.get(&tx_num) {
694                        None => {
695                            // recover the sender from the transaction if not found
696                            let sender = tx.recover_signer_unchecked()?;
697                            senders.push(sender);
698                        }
699                        Some(sender) => senders.push(*sender),
700                    }
701                }
702
703                senders
704            };
705
706            assemble_block(header, body, senders)
707        })
708    }
709
710    /// Populate a [`BundleStateInit`] and [`RevertsInit`] using cursors over the
711    /// [`PlainAccountState`] and [`PlainStorageState`] tables, based on the given storage and
712    /// account changesets.
713    fn populate_bundle_state<A, S>(
714        &self,
715        account_changeset: Vec<(u64, AccountBeforeTx)>,
716        storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
717        plain_accounts_cursor: &mut A,
718        plain_storage_cursor: &mut S,
719    ) -> ProviderResult<(BundleStateInit, RevertsInit)>
720    where
721        A: DbCursorRO<PlainAccountState>,
722        S: DbDupCursorRO<PlainStorageState>,
723    {
724        // iterate previous value and get plain state value to create changeset
725        // Double option around Account represent if Account state is know (first option) and
726        // account is removed (Second Option)
727        let mut state: BundleStateInit = HashMap::default();
728
729        // This is not working for blocks that are not at tip. as plain state is not the last
730        // state of end range. We should rename the functions or add support to access
731        // History state. Accessing history state can be tricky but we are not gaining
732        // anything.
733
734        let mut reverts: RevertsInit = HashMap::default();
735
736        // add account changeset changes
737        for (block_number, account_before) in account_changeset.into_iter().rev() {
738            let AccountBeforeTx { info: old_info, address } = account_before;
739            match state.entry(address) {
740                hash_map::Entry::Vacant(entry) => {
741                    let new_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
742                    entry.insert((old_info, new_info, HashMap::default()));
743                }
744                hash_map::Entry::Occupied(mut entry) => {
745                    // overwrite old account state.
746                    entry.get_mut().0 = old_info;
747                }
748            }
749            // insert old info into reverts.
750            reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
751        }
752
753        // add storage changeset changes
754        for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
755            let BlockNumberAddress((block_number, address)) = block_and_address;
756            // get account state or insert from plain state.
757            let account_state = match state.entry(address) {
758                hash_map::Entry::Vacant(entry) => {
759                    let present_info = plain_accounts_cursor.seek_exact(address)?.map(|kv| kv.1);
760                    entry.insert((present_info, present_info, HashMap::default()))
761                }
762                hash_map::Entry::Occupied(entry) => entry.into_mut(),
763            };
764
765            // match storage.
766            match account_state.2.entry(old_storage.key) {
767                hash_map::Entry::Vacant(entry) => {
768                    let new_storage = plain_storage_cursor
769                        .seek_by_key_subkey(address, old_storage.key)?
770                        .filter(|storage| storage.key == old_storage.key)
771                        .unwrap_or_default();
772                    entry.insert((
773                        (old_storage.value, old_storage.is_private),
774                        (new_storage.value, new_storage.is_private),
775                    ));
776                }
777                hash_map::Entry::Occupied(mut entry) => {
778                    entry.get_mut().0 = (old_storage.value, old_storage.is_private);
779                }
780            };
781
782            reverts
783                .entry(block_number)
784                .or_default()
785                .entry(address)
786                .or_default()
787                .1
788                .push(old_storage);
789        }
790
791        Ok((state, reverts))
792    }
793}
794
795impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
796    /// Commit database transaction.
797    pub fn commit(self) -> ProviderResult<bool> {
798        Ok(self.tx.commit()?)
799    }
800
801    /// Load shard and remove it. If list is empty, last shard was full or
802    /// there are no shards at all.
803    fn take_shard<T>(
804        &self,
805        cursor: &mut <TX as DbTxMut>::CursorMut<T>,
806        key: T::Key,
807    ) -> ProviderResult<Vec<u64>>
808    where
809        T: Table<Value = BlockNumberList>,
810    {
811        if let Some((_, list)) = cursor.seek_exact(key)? {
812            // delete old shard so new one can be inserted.
813            cursor.delete_current()?;
814            let list = list.iter().collect::<Vec<_>>();
815            return Ok(list)
816        }
817        Ok(Vec::new())
818    }
819
820    /// Insert history index to the database.
821    ///
822    /// For each updated partial key, this function removes the last shard from
823    /// the database (if any), appends the new indices to it, chunks the resulting integer list and
824    /// inserts the new shards back into the database.
825    ///
826    /// This function is used by history indexing stages.
827    fn append_history_index<P, T>(
828        &self,
829        index_updates: impl IntoIterator<Item = (P, impl IntoIterator<Item = u64>)>,
830        mut sharded_key_factory: impl FnMut(P, BlockNumber) -> T::Key,
831    ) -> ProviderResult<()>
832    where
833        P: Copy,
834        T: Table<Value = BlockNumberList>,
835    {
836        let mut cursor = self.tx.cursor_write::<T>()?;
837        for (partial_key, indices) in index_updates {
838            let mut last_shard =
839                self.take_shard::<T>(&mut cursor, sharded_key_factory(partial_key, u64::MAX))?;
840            last_shard.extend(indices);
841            // Chunk indices and insert them in shards of N size.
842            let mut chunks = last_shard.chunks(sharded_key::NUM_OF_INDICES_IN_SHARD).peekable();
843            while let Some(list) = chunks.next() {
844                let highest_block_number = if chunks.peek().is_some() {
845                    *list.last().expect("`chunks` does not return empty list")
846                } else {
847                    // Insert last list with `u64::MAX`.
848                    u64::MAX
849                };
850                cursor.insert(
851                    sharded_key_factory(partial_key, highest_block_number),
852                    &BlockNumberList::new_pre_sorted(list.iter().copied()),
853                )?;
854            }
855        }
856        Ok(())
857    }
858}
859
860impl<TX: DbTx, N: NodeTypes> AccountReader for DatabaseProvider<TX, N> {
861    fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
862        Ok(self.tx.get_by_encoded_key::<tables::PlainAccountState>(address)?)
863    }
864}
865
866impl<TX: DbTx, N: NodeTypes> AccountExtReader for DatabaseProvider<TX, N> {
867    fn changed_accounts_with_range(
868        &self,
869        range: impl RangeBounds<BlockNumber>,
870    ) -> ProviderResult<BTreeSet<Address>> {
871        self.tx
872            .cursor_read::<tables::AccountChangeSets>()?
873            .walk_range(range)?
874            .map(|entry| {
875                entry.map(|(_, account_before)| account_before.address).map_err(Into::into)
876            })
877            .collect()
878    }
879
880    fn basic_accounts(
881        &self,
882        iter: impl IntoIterator<Item = Address>,
883    ) -> ProviderResult<Vec<(Address, Option<Account>)>> {
884        let mut plain_accounts = self.tx.cursor_read::<tables::PlainAccountState>()?;
885        Ok(iter
886            .into_iter()
887            .map(|address| plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v))))
888            .collect::<Result<Vec<_>, _>>()?)
889    }
890
891    fn changed_accounts_and_blocks_with_range(
892        &self,
893        range: RangeInclusive<BlockNumber>,
894    ) -> ProviderResult<BTreeMap<Address, Vec<u64>>> {
895        let mut changeset_cursor = self.tx.cursor_read::<tables::AccountChangeSets>()?;
896
897        let account_transitions = changeset_cursor.walk_range(range)?.try_fold(
898            BTreeMap::new(),
899            |mut accounts: BTreeMap<Address, Vec<u64>>, entry| -> ProviderResult<_> {
900                let (index, account) = entry?;
901                accounts.entry(account.address).or_default().push(index);
902                Ok(accounts)
903            },
904        )?;
905
906        Ok(account_transitions)
907    }
908}
909
910impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N> {
911    fn storage_changeset(
912        &self,
913        block_number: BlockNumber,
914    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
915        let range = block_number..=block_number;
916        let storage_range = BlockNumberAddress::range(range);
917        self.tx
918            .cursor_dup_read::<tables::StorageChangeSets>()?
919            .walk_range(storage_range)?
920            .map(|result| -> ProviderResult<_> { Ok(result?) })
921            .collect()
922    }
923}
924
925impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
926    fn account_block_changeset(
927        &self,
928        block_number: BlockNumber,
929    ) -> ProviderResult<Vec<AccountBeforeTx>> {
930        let range = block_number..=block_number;
931        self.tx
932            .cursor_read::<tables::AccountChangeSets>()?
933            .walk_range(range)?
934            .map(|result| -> ProviderResult<_> {
935                let (_, account_before) = result?;
936                Ok(account_before)
937            })
938            .collect()
939    }
940}
941
942impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
943    for DatabaseProvider<TX, N>
944{
945    type Header = HeaderTy<N>;
946
947    fn local_tip_header(
948        &self,
949        highest_uninterrupted_block: BlockNumber,
950    ) -> ProviderResult<SealedHeader<Self::Header>> {
951        let static_file_provider = self.static_file_provider();
952
953        // Make sure Headers static file is at the same height. If it's further, this
954        // input execution was interrupted previously and we need to unwind the static file.
955        let next_static_file_block_num = static_file_provider
956            .get_highest_static_file_block(StaticFileSegment::Headers)
957            .map(|id| id + 1)
958            .unwrap_or_default();
959        let next_block = highest_uninterrupted_block + 1;
960
961        match next_static_file_block_num.cmp(&next_block) {
962            // The node shutdown between an executed static file commit and before the database
963            // commit, so we need to unwind the static files.
964            Ordering::Greater => {
965                let mut static_file_producer =
966                    static_file_provider.latest_writer(StaticFileSegment::Headers)?;
967                static_file_producer.prune_headers(next_static_file_block_num - next_block)?;
968                // Since this is a database <-> static file inconsistency, we commit the change
969                // straight away.
970                static_file_producer.commit()?
971            }
972            Ordering::Less => {
973                // There's either missing or corrupted files.
974                return Err(ProviderError::HeaderNotFound(next_static_file_block_num.into()))
975            }
976            Ordering::Equal => {}
977        }
978
979        let local_head = static_file_provider
980            .sealed_header(highest_uninterrupted_block)?
981            .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
982
983        Ok(local_head)
984    }
985}
986
987impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderProvider for DatabaseProvider<TX, N> {
988    type Header = HeaderTy<N>;
989
990    fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
991        if let Some(num) = self.block_number(*block_hash)? {
992            Ok(self.header_by_number(num)?)
993        } else {
994            Ok(None)
995        }
996    }
997
998    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
999        self.static_file_provider.get_with_static_file_or_database(
1000            StaticFileSegment::Headers,
1001            num,
1002            |static_file| static_file.header_by_number(num),
1003            || Ok(self.tx.get::<tables::Headers<Self::Header>>(num)?),
1004        )
1005    }
1006
1007    fn header_td(&self, block_hash: &BlockHash) -> ProviderResult<Option<U256>> {
1008        if let Some(num) = self.block_number(*block_hash)? {
1009            self.header_td_by_number(num)
1010        } else {
1011            Ok(None)
1012        }
1013    }
1014
1015    fn header_td_by_number(&self, number: BlockNumber) -> ProviderResult<Option<U256>> {
1016        if self.chain_spec.is_paris_active_at_block(number) {
1017            if let Some(td) = self.chain_spec.final_paris_total_difficulty() {
1018                // if this block is higher than the final paris(merge) block, return the final paris
1019                // difficulty
1020                return Ok(Some(td))
1021            }
1022        }
1023
1024        self.static_file_provider.get_with_static_file_or_database(
1025            StaticFileSegment::Headers,
1026            number,
1027            |static_file| static_file.header_td_by_number(number),
1028            || Ok(self.tx.get::<tables::HeaderTerminalDifficulties>(number)?.map(|td| td.0)),
1029        )
1030    }
1031
1032    fn headers_range(
1033        &self,
1034        range: impl RangeBounds<BlockNumber>,
1035    ) -> ProviderResult<Vec<Self::Header>> {
1036        self.static_file_provider.get_range_with_static_file_or_database(
1037            StaticFileSegment::Headers,
1038            to_range(range),
1039            |static_file, range, _| static_file.headers_range(range),
1040            |range, _| self.cursor_read_collect::<tables::Headers<Self::Header>>(range),
1041            |_| true,
1042        )
1043    }
1044
1045    fn sealed_header(
1046        &self,
1047        number: BlockNumber,
1048    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1049        self.static_file_provider.get_with_static_file_or_database(
1050            StaticFileSegment::Headers,
1051            number,
1052            |static_file| static_file.sealed_header(number),
1053            || {
1054                if let Some(header) = self.header_by_number(number)? {
1055                    let hash = self
1056                        .block_hash(number)?
1057                        .ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
1058                    Ok(Some(SealedHeader::new(header, hash)))
1059                } else {
1060                    Ok(None)
1061                }
1062            },
1063        )
1064    }
1065
1066    fn sealed_headers_while(
1067        &self,
1068        range: impl RangeBounds<BlockNumber>,
1069        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1070    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1071        self.static_file_provider.get_range_with_static_file_or_database(
1072            StaticFileSegment::Headers,
1073            to_range(range),
1074            |static_file, range, predicate| static_file.sealed_headers_while(range, predicate),
1075            |range, mut predicate| {
1076                let mut headers = vec![];
1077                for entry in
1078                    self.tx.cursor_read::<tables::Headers<Self::Header>>()?.walk_range(range)?
1079                {
1080                    let (number, header) = entry?;
1081                    let hash = self
1082                        .block_hash(number)?
1083                        .ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?;
1084                    let sealed = SealedHeader::new(header, hash);
1085                    if !predicate(&sealed) {
1086                        break
1087                    }
1088                    headers.push(sealed);
1089                }
1090                Ok(headers)
1091            },
1092            predicate,
1093        )
1094    }
1095}
1096
1097impl<TX: DbTx + 'static, N: NodeTypes> BlockHashReader for DatabaseProvider<TX, N> {
1098    fn block_hash(&self, number: u64) -> ProviderResult<Option<B256>> {
1099        self.static_file_provider.get_with_static_file_or_database(
1100            StaticFileSegment::Headers,
1101            number,
1102            |static_file| static_file.block_hash(number),
1103            || Ok(self.tx.get::<tables::CanonicalHeaders>(number)?),
1104        )
1105    }
1106
1107    fn canonical_hashes_range(
1108        &self,
1109        start: BlockNumber,
1110        end: BlockNumber,
1111    ) -> ProviderResult<Vec<B256>> {
1112        self.static_file_provider.get_range_with_static_file_or_database(
1113            StaticFileSegment::Headers,
1114            start..end,
1115            |static_file, range, _| static_file.canonical_hashes_range(range.start, range.end),
1116            |range, _| self.cursor_read_collect::<tables::CanonicalHeaders>(range),
1117            |_| true,
1118        )
1119    }
1120}
1121
1122impl<TX: DbTx + 'static, N: NodeTypes> BlockNumReader for DatabaseProvider<TX, N> {
1123    fn chain_info(&self) -> ProviderResult<ChainInfo> {
1124        let best_number = self.best_block_number()?;
1125        let best_hash = self.block_hash(best_number)?.unwrap_or_default();
1126        Ok(ChainInfo { best_hash, best_number })
1127    }
1128
1129    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1130        // The best block number is tracked via the finished stage which gets updated in the same tx
1131        // when new blocks committed
1132        Ok(self
1133            .get_stage_checkpoint(StageId::Finish)?
1134            .map(|checkpoint| checkpoint.block_number)
1135            .unwrap_or_default())
1136    }
1137
1138    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1139        Ok(self
1140            .tx
1141            .cursor_read::<tables::CanonicalHeaders>()?
1142            .last()?
1143            .map(|(num, _)| num)
1144            .max(
1145                self.static_file_provider.get_highest_static_file_block(StaticFileSegment::Headers),
1146            )
1147            .unwrap_or_default())
1148    }
1149
1150    fn block_number(&self, hash: B256) -> ProviderResult<Option<BlockNumber>> {
1151        Ok(self.tx.get::<tables::HeaderNumbers>(hash)?)
1152    }
1153}
1154
1155impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvider<TX, N> {
1156    type Block = BlockTy<N>;
1157
1158    fn find_block_by_hash(
1159        &self,
1160        hash: B256,
1161        source: BlockSource,
1162    ) -> ProviderResult<Option<Self::Block>> {
1163        if source.is_canonical() {
1164            self.block(hash.into())
1165        } else {
1166            Ok(None)
1167        }
1168    }
1169
1170    /// Returns the block with matching number from database.
1171    ///
1172    /// If the header for this block is not found, this returns `None`.
1173    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1174    /// will return None.
1175    fn block(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1176        if let Some(number) = self.convert_hash_or_number(id)? {
1177            if let Some(header) = self.header_by_number(number)? {
1178                // If the body indices are not found, this means that the transactions either do not
1179                // exist in the database yet, or they do exit but are not indexed.
1180                // If they exist but are not indexed, we don't have enough
1181                // information to return the block anyways, so we return `None`.
1182                let Some(transactions) = self.transactions_by_block(number.into())? else {
1183                    return Ok(None)
1184                };
1185
1186                let body = self
1187                    .storage
1188                    .reader()
1189                    .read_block_bodies(self, vec![(&header, transactions)])?
1190                    .pop()
1191                    .ok_or(ProviderError::InvalidStorageOutput)?;
1192
1193                return Ok(Some(Self::Block::new(header, body)))
1194            }
1195        }
1196
1197        Ok(None)
1198    }
1199
1200    fn pending_block(&self) -> ProviderResult<Option<SealedBlock<Self::Block>>> {
1201        Ok(None)
1202    }
1203
1204    fn pending_block_with_senders(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1205        Ok(None)
1206    }
1207
1208    fn pending_block_and_receipts(
1209        &self,
1210    ) -> ProviderResult<Option<(SealedBlock<Self::Block>, Vec<Self::Receipt>)>> {
1211        Ok(None)
1212    }
1213
1214    /// Returns the block with senders with matching number or hash from database.
1215    ///
1216    /// **NOTE: The transactions have invalid hashes, since they would need to be calculated on the
1217    /// spot, and we want fast querying.**
1218    ///
1219    /// If the header for this block is not found, this returns `None`.
1220    /// If the header is found, but the transactions either do not exist, or are not indexed, this
1221    /// will return None.
1222    fn recovered_block(
1223        &self,
1224        id: BlockHashOrNumber,
1225        transaction_kind: TransactionVariant,
1226    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1227        self.recovered_block(
1228            id,
1229            transaction_kind,
1230            |block_number| self.header_by_number(block_number),
1231            |header, body, senders| {
1232                Self::Block::new(header, body)
1233                    // Note: we're using unchecked here because we know the block contains valid txs
1234                    // wrt to its height and can ignore the s value check so pre
1235                    // EIP-2 txs are allowed
1236                    .try_into_recovered_unchecked(senders)
1237                    .map(Some)
1238                    .map_err(|_| ProviderError::SenderRecoveryError)
1239            },
1240        )
1241    }
1242
1243    fn sealed_block_with_senders(
1244        &self,
1245        id: BlockHashOrNumber,
1246        transaction_kind: TransactionVariant,
1247    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1248        self.recovered_block(
1249            id,
1250            transaction_kind,
1251            |block_number| self.sealed_header(block_number),
1252            |header, body, senders| {
1253                Self::Block::new_sealed(header, body)
1254                    // Note: we're using unchecked here because we know the block contains valid txs
1255                    // wrt to its height and can ignore the s value check so pre
1256                    // EIP-2 txs are allowed
1257                    .try_with_senders_unchecked(senders)
1258                    .map(Some)
1259                    .map_err(|_| ProviderError::SenderRecoveryError)
1260            },
1261        )
1262    }
1263
1264    fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1265        self.block_range(
1266            range,
1267            |range| self.headers_range(range),
1268            |header, body, _| Ok(Self::Block::new(header, body)),
1269        )
1270    }
1271
1272    fn block_with_senders_range(
1273        &self,
1274        range: RangeInclusive<BlockNumber>,
1275    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1276        self.block_with_senders_range(
1277            range,
1278            |range| self.headers_range(range),
1279            |header, body, senders| {
1280                Self::Block::new(header, body)
1281                    .try_into_recovered_unchecked(senders)
1282                    .map_err(|_| ProviderError::SenderRecoveryError)
1283            },
1284        )
1285    }
1286
1287    fn recovered_block_range(
1288        &self,
1289        range: RangeInclusive<BlockNumber>,
1290    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1291        self.block_with_senders_range(
1292            range,
1293            |range| self.sealed_headers_range(range),
1294            |header, body, senders| {
1295                Self::Block::new_sealed(header, body)
1296                    .try_with_senders(senders)
1297                    .map_err(|_| ProviderError::SenderRecoveryError)
1298            },
1299        )
1300    }
1301}
1302
1303impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProviderExt
1304    for DatabaseProvider<TX, N>
1305{
1306    /// Recovers transaction hashes by walking through `Transactions` table and
1307    /// calculating them in a parallel manner. Returned unsorted.
1308    fn transaction_hashes_by_range(
1309        &self,
1310        tx_range: Range<TxNumber>,
1311    ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1312        self.static_file_provider.get_range_with_static_file_or_database(
1313            StaticFileSegment::Transactions,
1314            tx_range,
1315            |static_file, range, _| static_file.transaction_hashes_by_range(range),
1316            |tx_range, _| {
1317                let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<TxTy<N>>>()?;
1318                let tx_range_size = tx_range.clone().count();
1319                let tx_walker = tx_cursor.walk_range(tx_range)?;
1320
1321                let chunk_size = (tx_range_size / rayon::current_num_threads()).max(1);
1322                let mut channels = Vec::with_capacity(chunk_size);
1323                let mut transaction_count = 0;
1324
1325                #[inline]
1326                fn calculate_hash<T>(
1327                    entry: Result<(TxNumber, T), DatabaseError>,
1328                    rlp_buf: &mut Vec<u8>,
1329                ) -> Result<(B256, TxNumber), Box<ProviderError>>
1330                where
1331                    T: Encodable2718,
1332                {
1333                    let (tx_id, tx) = entry.map_err(|e| Box::new(e.into()))?;
1334                    tx.encode_2718(rlp_buf);
1335                    Ok((keccak256(rlp_buf), tx_id))
1336                }
1337
1338                for chunk in &tx_walker.chunks(chunk_size) {
1339                    let (tx, rx) = mpsc::channel();
1340                    channels.push(rx);
1341
1342                    // Note: Unfortunate side-effect of how chunk is designed in itertools (it is
1343                    // not Send)
1344                    let chunk: Vec<_> = chunk.collect();
1345                    transaction_count += chunk.len();
1346
1347                    // Spawn the task onto the global rayon pool
1348                    // This task will send the results through the channel after it has calculated
1349                    // the hash.
1350                    rayon::spawn(move || {
1351                        let mut rlp_buf = Vec::with_capacity(128);
1352                        for entry in chunk {
1353                            rlp_buf.clear();
1354                            let _ = tx.send(calculate_hash(entry, &mut rlp_buf));
1355                        }
1356                    });
1357                }
1358                let mut tx_list = Vec::with_capacity(transaction_count);
1359
1360                // Iterate over channels and append the tx hashes unsorted
1361                for channel in channels {
1362                    while let Ok(tx) = channel.recv() {
1363                        let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
1364                        tx_list.push((tx_hash, tx_id));
1365                    }
1366                }
1367
1368                Ok(tx_list)
1369            },
1370            |_| true,
1371        )
1372    }
1373}
1374
1375// Calculates the hash of the given transaction
1376impl<TX: DbTx + 'static, N: NodeTypesForProvider> TransactionsProvider for DatabaseProvider<TX, N> {
1377    type Transaction = TxTy<N>;
1378
1379    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1380        Ok(self.tx.get::<tables::TransactionHashNumbers>(tx_hash)?)
1381    }
1382
1383    fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1384        self.static_file_provider.get_with_static_file_or_database(
1385            StaticFileSegment::Transactions,
1386            id,
1387            |static_file| static_file.transaction_by_id(id),
1388            || Ok(self.tx.get::<tables::Transactions<Self::Transaction>>(id)?),
1389        )
1390    }
1391
1392    fn transaction_by_id_unhashed(
1393        &self,
1394        id: TxNumber,
1395    ) -> ProviderResult<Option<Self::Transaction>> {
1396        self.static_file_provider.get_with_static_file_or_database(
1397            StaticFileSegment::Transactions,
1398            id,
1399            |static_file| static_file.transaction_by_id_unhashed(id),
1400            || Ok(self.tx.get::<tables::Transactions<Self::Transaction>>(id)?),
1401        )
1402    }
1403
1404    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1405        if let Some(id) = self.transaction_id(hash)? {
1406            Ok(self.transaction_by_id_unhashed(id)?)
1407        } else {
1408            Ok(None)
1409        }
1410    }
1411
1412    fn transaction_by_hash_with_meta(
1413        &self,
1414        tx_hash: TxHash,
1415    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1416        let mut transaction_cursor = self.tx.cursor_read::<tables::TransactionBlocks>()?;
1417        if let Some(transaction_id) = self.transaction_id(tx_hash)? {
1418            if let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? {
1419                if let Some(block_number) =
1420                    transaction_cursor.seek(transaction_id).map(|b| b.map(|(_, bn)| bn))?
1421                {
1422                    if let Some(sealed_header) = self.sealed_header(block_number)? {
1423                        let (header, block_hash) = sealed_header.split();
1424                        if let Some(block_body) = self.block_body_indices(block_number)? {
1425                            // the index of the tx in the block is the offset:
1426                            // len([start..tx_id])
1427                            // NOTE: `transaction_id` is always `>=` the block's first
1428                            // index
1429                            let index = transaction_id - block_body.first_tx_num();
1430
1431                            let meta = TransactionMeta {
1432                                tx_hash,
1433                                index,
1434                                block_hash,
1435                                block_number,
1436                                base_fee: header.base_fee_per_gas(),
1437                                excess_blob_gas: header.excess_blob_gas(),
1438                                timestamp: header.timestamp(),
1439                            };
1440
1441                            return Ok(Some((transaction, meta)))
1442                        }
1443                    }
1444                }
1445            }
1446        }
1447
1448        Ok(None)
1449    }
1450
1451    fn transaction_block(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1452        let mut cursor = self.tx.cursor_read::<tables::TransactionBlocks>()?;
1453        Ok(cursor.seek(id)?.map(|(_, bn)| bn))
1454    }
1455
1456    fn transactions_by_block(
1457        &self,
1458        id: BlockHashOrNumber,
1459    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1460        let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<Self::Transaction>>()?;
1461
1462        if let Some(block_number) = self.convert_hash_or_number(id)? {
1463            if let Some(body) = self.block_body_indices(block_number)? {
1464                let tx_range = body.tx_num_range();
1465                return if tx_range.is_empty() {
1466                    Ok(Some(Vec::new()))
1467                } else {
1468                    Ok(Some(self.transactions_by_tx_range_with_cursor(tx_range, &mut tx_cursor)?))
1469                }
1470            }
1471        }
1472        Ok(None)
1473    }
1474
1475    fn transactions_by_block_range(
1476        &self,
1477        range: impl RangeBounds<BlockNumber>,
1478    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1479        let range = to_range(range);
1480        let mut tx_cursor = self.tx.cursor_read::<tables::Transactions<Self::Transaction>>()?;
1481
1482        self.block_body_indices_range(range.start..=range.end.saturating_sub(1))?
1483            .into_iter()
1484            .map(|body| {
1485                let tx_num_range = body.tx_num_range();
1486                if tx_num_range.is_empty() {
1487                    Ok(Vec::new())
1488                } else {
1489                    Ok(self
1490                        .transactions_by_tx_range_with_cursor(tx_num_range, &mut tx_cursor)?
1491                        .into_iter()
1492                        .collect())
1493                }
1494            })
1495            .collect()
1496    }
1497
1498    fn transactions_by_tx_range(
1499        &self,
1500        range: impl RangeBounds<TxNumber>,
1501    ) -> ProviderResult<Vec<Self::Transaction>> {
1502        self.transactions_by_tx_range_with_cursor(
1503            range,
1504            &mut self.tx.cursor_read::<tables::Transactions<_>>()?,
1505        )
1506    }
1507
1508    fn senders_by_tx_range(
1509        &self,
1510        range: impl RangeBounds<TxNumber>,
1511    ) -> ProviderResult<Vec<Address>> {
1512        self.cursor_read_collect::<tables::TransactionSenders>(range)
1513    }
1514
1515    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1516        Ok(self.tx.get::<tables::TransactionSenders>(id)?)
1517    }
1518}
1519
1520impl<TX: DbTx + 'static, N: NodeTypesForProvider> ReceiptProvider for DatabaseProvider<TX, N> {
1521    type Receipt = ReceiptTy<N>;
1522
1523    fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1524        self.static_file_provider.get_with_static_file_or_database(
1525            StaticFileSegment::Receipts,
1526            id,
1527            |static_file| static_file.receipt(id),
1528            || Ok(self.tx.get::<tables::Receipts<Self::Receipt>>(id)?),
1529        )
1530    }
1531
1532    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1533        if let Some(id) = self.transaction_id(hash)? {
1534            self.receipt(id)
1535        } else {
1536            Ok(None)
1537        }
1538    }
1539
1540    fn receipts_by_block(
1541        &self,
1542        block: BlockHashOrNumber,
1543    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1544        if let Some(number) = self.convert_hash_or_number(block)? {
1545            if let Some(body) = self.block_body_indices(number)? {
1546                let tx_range = body.tx_num_range();
1547                return if tx_range.is_empty() {
1548                    Ok(Some(Vec::new()))
1549                } else {
1550                    self.receipts_by_tx_range(tx_range).map(Some)
1551                }
1552            }
1553        }
1554        Ok(None)
1555    }
1556
1557    fn receipts_by_tx_range(
1558        &self,
1559        range: impl RangeBounds<TxNumber>,
1560    ) -> ProviderResult<Vec<Self::Receipt>> {
1561        self.static_file_provider.get_range_with_static_file_or_database(
1562            StaticFileSegment::Receipts,
1563            to_range(range),
1564            |static_file, range, _| static_file.receipts_by_tx_range(range),
1565            |range, _| self.cursor_read_collect::<tables::Receipts<Self::Receipt>>(range),
1566            |_| true,
1567        )
1568    }
1569}
1570
1571impl<TX: DbTx + 'static, N: NodeTypes<ChainSpec: EthereumHardforks>> WithdrawalsProvider
1572    for DatabaseProvider<TX, N>
1573{
1574    fn withdrawals_by_block(
1575        &self,
1576        id: BlockHashOrNumber,
1577        timestamp: u64,
1578    ) -> ProviderResult<Option<Withdrawals>> {
1579        if self.chain_spec.is_shanghai_active_at_timestamp(timestamp) {
1580            if let Some(number) = self.convert_hash_or_number(id)? {
1581                return self.static_file_provider.get_with_static_file_or_database(
1582                    StaticFileSegment::BlockMeta,
1583                    number,
1584                    |static_file| static_file.withdrawals_by_block(number.into(), timestamp),
1585                    || {
1586                        // If we are past shanghai, then all blocks should have a withdrawal list,
1587                        // even if empty
1588                        let withdrawals = self
1589                            .tx
1590                            .get::<tables::BlockWithdrawals>(number)
1591                            .map(|w| w.map(|w| w.withdrawals))?
1592                            .unwrap_or_default();
1593                        Ok(Some(withdrawals))
1594                    },
1595                )
1596            }
1597        }
1598        Ok(None)
1599    }
1600}
1601
1602impl<TX: DbTx + 'static, N: NodeTypesForProvider> OmmersProvider for DatabaseProvider<TX, N> {
1603    /// Returns the ommers for the block with matching id from the database.
1604    ///
1605    /// If the block is not found, this returns `None`.
1606    /// If the block exists, but doesn't contain ommers, this returns `None`.
1607    fn ommers(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Vec<Self::Header>>> {
1608        if let Some(number) = self.convert_hash_or_number(id)? {
1609            // If the Paris (Merge) hardfork block is known and block is after it, return empty
1610            // ommers.
1611            if self.chain_spec.is_paris_active_at_block(number) {
1612                return Ok(Some(Vec::new()))
1613            }
1614
1615            return self.static_file_provider.get_with_static_file_or_database(
1616                StaticFileSegment::BlockMeta,
1617                number,
1618                |static_file| static_file.ommers(id),
1619                || Ok(self.tx.get::<tables::BlockOmmers<Self::Header>>(number)?.map(|o| o.ommers)),
1620            )
1621        }
1622
1623        Ok(None)
1624    }
1625}
1626
1627impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockBodyIndicesProvider
1628    for DatabaseProvider<TX, N>
1629{
1630    fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1631        self.static_file_provider.get_with_static_file_or_database(
1632            StaticFileSegment::BlockMeta,
1633            num,
1634            |static_file| static_file.block_body_indices(num),
1635            || Ok(self.tx.get::<tables::BlockBodyIndices>(num)?),
1636        )
1637    }
1638
1639    fn block_body_indices_range(
1640        &self,
1641        range: RangeInclusive<BlockNumber>,
1642    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1643        self.static_file_provider.get_range_with_static_file_or_database(
1644            StaticFileSegment::BlockMeta,
1645            *range.start()..*range.end() + 1,
1646            |static_file, range, _| {
1647                static_file.block_body_indices_range(range.start..=range.end.saturating_sub(1))
1648            },
1649            |range, _| self.cursor_read_collect::<tables::BlockBodyIndices>(range),
1650            |_| true,
1651        )
1652    }
1653}
1654
1655impl<TX: DbTx, N: NodeTypes> StageCheckpointReader for DatabaseProvider<TX, N> {
1656    fn get_stage_checkpoint(&self, id: StageId) -> ProviderResult<Option<StageCheckpoint>> {
1657        Ok(self.tx.get::<tables::StageCheckpoints>(id.to_string())?)
1658    }
1659
1660    /// Get stage checkpoint progress.
1661    fn get_stage_checkpoint_progress(&self, id: StageId) -> ProviderResult<Option<Vec<u8>>> {
1662        Ok(self.tx.get::<tables::StageCheckpointProgresses>(id.to_string())?)
1663    }
1664
1665    fn get_all_checkpoints(&self) -> ProviderResult<Vec<(String, StageCheckpoint)>> {
1666        self.tx
1667            .cursor_read::<tables::StageCheckpoints>()?
1668            .walk(None)?
1669            .collect::<Result<Vec<(String, StageCheckpoint)>, _>>()
1670            .map_err(ProviderError::Database)
1671    }
1672}
1673
1674impl<TX: DbTxMut, N: NodeTypes> StageCheckpointWriter for DatabaseProvider<TX, N> {
1675    /// Save stage checkpoint.
1676    fn save_stage_checkpoint(
1677        &self,
1678        id: StageId,
1679        checkpoint: StageCheckpoint,
1680    ) -> ProviderResult<()> {
1681        Ok(self.tx.put::<tables::StageCheckpoints>(id.to_string(), checkpoint)?)
1682    }
1683
1684    /// Save stage checkpoint progress.
1685    fn save_stage_checkpoint_progress(
1686        &self,
1687        id: StageId,
1688        checkpoint: Vec<u8>,
1689    ) -> ProviderResult<()> {
1690        Ok(self.tx.put::<tables::StageCheckpointProgresses>(id.to_string(), checkpoint)?)
1691    }
1692
1693    fn update_pipeline_stages(
1694        &self,
1695        block_number: BlockNumber,
1696        drop_stage_checkpoint: bool,
1697    ) -> ProviderResult<()> {
1698        // iterate over all existing stages in the table and update its progress.
1699        let mut cursor = self.tx.cursor_write::<tables::StageCheckpoints>()?;
1700        for stage_id in StageId::ALL {
1701            let (_, checkpoint) = cursor.seek_exact(stage_id.to_string())?.unwrap_or_default();
1702            cursor.upsert(
1703                stage_id.to_string(),
1704                &StageCheckpoint {
1705                    block_number,
1706                    ..if drop_stage_checkpoint { Default::default() } else { checkpoint }
1707                },
1708            )?;
1709        }
1710
1711        Ok(())
1712    }
1713}
1714
1715impl<TX: DbTx + 'static, N: NodeTypes> StorageReader for DatabaseProvider<TX, N> {
1716    fn plain_state_storages(
1717        &self,
1718        addresses_with_keys: impl IntoIterator<Item = (Address, impl IntoIterator<Item = B256>)>,
1719    ) -> ProviderResult<Vec<(Address, Vec<StorageEntry>)>> {
1720        let mut plain_storage = self.tx.cursor_dup_read::<tables::PlainStorageState>()?;
1721
1722        addresses_with_keys
1723            .into_iter()
1724            .map(|(address, storage)| {
1725                storage
1726                    .into_iter()
1727                    .map(|key| -> ProviderResult<_> {
1728                        Ok(plain_storage
1729                            .seek_by_key_subkey(address, key)?
1730                            .filter(|v| v.key == key)
1731                            .unwrap_or_else(|| StorageEntry { key, ..Default::default() }))
1732                    })
1733                    .collect::<ProviderResult<Vec<_>>>()
1734                    .map(|storage| (address, storage))
1735            })
1736            .collect::<ProviderResult<Vec<(_, _)>>>()
1737    }
1738
1739    fn changed_storages_with_range(
1740        &self,
1741        range: RangeInclusive<BlockNumber>,
1742    ) -> ProviderResult<BTreeMap<Address, BTreeSet<B256>>> {
1743        self.tx
1744            .cursor_read::<tables::StorageChangeSets>()?
1745            .walk_range(BlockNumberAddress::range(range))?
1746            // fold all storages and save its old state so we can remove it from HashedStorage
1747            // it is needed as it is dup table.
1748            .try_fold(BTreeMap::new(), |mut accounts: BTreeMap<Address, BTreeSet<B256>>, entry| {
1749                let (BlockNumberAddress((_, address)), storage_entry) = entry?;
1750                accounts.entry(address).or_default().insert(storage_entry.key);
1751                Ok(accounts)
1752            })
1753    }
1754
1755    fn changed_storages_and_blocks_with_range(
1756        &self,
1757        range: RangeInclusive<BlockNumber>,
1758    ) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>> {
1759        let mut changeset_cursor = self.tx.cursor_read::<tables::StorageChangeSets>()?;
1760
1761        let storage_changeset_lists =
1762            changeset_cursor.walk_range(BlockNumberAddress::range(range))?.try_fold(
1763                BTreeMap::new(),
1764                |mut storages: BTreeMap<(Address, B256), Vec<u64>>, entry| -> ProviderResult<_> {
1765                    let (index, storage) = entry?;
1766                    storages
1767                        .entry((index.address(), storage.key))
1768                        .or_default()
1769                        .push(index.block_number());
1770                    Ok(storages)
1771                },
1772            )?;
1773
1774        Ok(storage_changeset_lists)
1775    }
1776}
1777
1778impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
1779    for DatabaseProvider<TX, N>
1780{
1781    type Receipt = ReceiptTy<N>;
1782
1783    fn write_state(
1784        &self,
1785        execution_outcome: &ExecutionOutcome<Self::Receipt>,
1786        is_value_known: OriginalValuesKnown,
1787        write_receipts_to: StorageLocation,
1788    ) -> ProviderResult<()> {
1789        let first_block = execution_outcome.first_block();
1790        let block_count = execution_outcome.len() as u64;
1791        let last_block = execution_outcome.last_block();
1792        let block_range = first_block..=last_block;
1793
1794        let tip = self.last_block_number()?.max(last_block);
1795
1796        let (plain_state, reverts) =
1797            execution_outcome.bundle.to_plain_state_and_reverts(is_value_known);
1798
1799        self.write_state_reverts(reverts, first_block)?;
1800        self.write_state_changes(plain_state)?;
1801
1802        // Fetch the first transaction number for each block in the range
1803        let block_indices: Vec<_> = self
1804            .block_body_indices_range(block_range)?
1805            .into_iter()
1806            .map(|b| b.first_tx_num)
1807            .collect();
1808
1809        // Ensure all expected blocks are present.
1810        if block_indices.len() < block_count as usize {
1811            let missing_blocks = block_count - block_indices.len() as u64;
1812            return Err(ProviderError::BlockBodyIndicesNotFound(
1813                last_block.saturating_sub(missing_blocks - 1),
1814            ));
1815        }
1816
1817        let has_receipts_pruning = self.prune_modes.has_receipts_pruning();
1818
1819        // Prepare receipts cursor if we are going to write receipts to the database
1820        //
1821        // We are writing to database if requested or if there's any kind of receipt pruning
1822        // configured
1823        let mut receipts_cursor = (write_receipts_to.database() || has_receipts_pruning)
1824            .then(|| self.tx.cursor_write::<tables::Receipts<Self::Receipt>>())
1825            .transpose()?;
1826
1827        // Prepare receipts static writer if we are going to write receipts to static files
1828        //
1829        // We are writing to static files if requested and if there's no receipt pruning configured
1830        let mut receipts_static_writer = (write_receipts_to.static_files() &&
1831            !has_receipts_pruning)
1832            .then(|| self.static_file_provider.get_writer(first_block, StaticFileSegment::Receipts))
1833            .transpose()?;
1834
1835        let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
1836        let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
1837
1838        // All receipts from the last 128 blocks are required for blockchain tree, even with
1839        // [`PruneSegment::ContractLogs`].
1840        let prunable_receipts =
1841            PruneMode::Distance(MINIMUM_PRUNING_DISTANCE).should_prune(first_block, tip);
1842
1843        // Prepare set of addresses which logs should not be pruned.
1844        let mut allowed_addresses: HashSet<Address, _> = HashSet::new();
1845        for (_, addresses) in contract_log_pruner.range(..first_block) {
1846            allowed_addresses.extend(addresses.iter().copied());
1847        }
1848
1849        for (idx, (receipts, first_tx_index)) in
1850            execution_outcome.receipts.iter().zip(block_indices).enumerate()
1851        {
1852            let block_number = first_block + idx as u64;
1853
1854            // Increment block number for receipts static file writer
1855            if let Some(writer) = receipts_static_writer.as_mut() {
1856                writer.increment_block(block_number)?;
1857            }
1858
1859            // Skip writing receipts if pruning configuration requires us to.
1860            if prunable_receipts &&
1861                self.prune_modes
1862                    .receipts
1863                    .is_some_and(|mode| mode.should_prune(block_number, tip))
1864            {
1865                continue
1866            }
1867
1868            // If there are new addresses to retain after this block number, track them
1869            if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
1870                allowed_addresses.extend(new_addresses.iter().copied());
1871            }
1872
1873            for (idx, receipt) in receipts.iter().enumerate() {
1874                let receipt_idx = first_tx_index + idx as u64;
1875                // Skip writing receipt if log filter is active and it does not have any logs to
1876                // retain
1877                if prunable_receipts &&
1878                    has_contract_log_filter &&
1879                    !receipt.logs().iter().any(|log| allowed_addresses.contains(&log.address))
1880                {
1881                    continue
1882                }
1883
1884                if let Some(writer) = &mut receipts_static_writer {
1885                    writer.append_receipt(receipt_idx, receipt)?;
1886                }
1887
1888                if let Some(cursor) = &mut receipts_cursor {
1889                    cursor.append(receipt_idx, receipt)?;
1890                }
1891            }
1892        }
1893
1894        Ok(())
1895    }
1896
1897    fn write_state_reverts(
1898        &self,
1899        reverts: PlainStateReverts,
1900        first_block: BlockNumber,
1901    ) -> ProviderResult<()> {
1902        // Write storage changes
1903        tracing::trace!("Writing storage changes");
1904        let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1905        let mut storage_changeset_cursor =
1906            self.tx_ref().cursor_dup_write::<tables::StorageChangeSets>()?;
1907        for (block_index, mut storage_changes) in reverts.storage.into_iter().enumerate() {
1908            let block_number = first_block + block_index as BlockNumber;
1909
1910            tracing::trace!(block_number, "Writing block change");
1911            // sort changes by address.
1912            storage_changes.par_sort_unstable_by_key(|a| a.address);
1913            for PlainStorageRevert { address, wiped, storage_revert } in storage_changes {
1914                let storage_id = BlockNumberAddress((block_number, address));
1915
1916                let mut storage = storage_revert
1917                    .into_iter()
1918                    .map(|(k, v)| (B256::new(k.to_be_bytes()), v))
1919                    .collect::<Vec<_>>();
1920                // sort storage slots by key.
1921                storage.par_sort_unstable_by_key(|a| a.0);
1922
1923                // If we are writing the primary storage wipe transition, the pre-existing plain
1924                // storage state has to be taken from the database and written to storage history.
1925                // See [StorageWipe::Primary] for more details.
1926                let mut wiped_storage = Vec::new();
1927                if wiped {
1928                    tracing::trace!(?address, "Wiping storage");
1929                    if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
1930                        wiped_storage.push((entry.key, entry.into()));
1931                        while let Some(entry) = storages_cursor.next_dup_val()? {
1932                            wiped_storage.push((entry.key, entry.into()))
1933                        }
1934                    }
1935                }
1936
1937                tracing::trace!(?address, ?storage, "Writing storage reverts");
1938                for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
1939                    storage_changeset_cursor.append_dup(
1940                        storage_id,
1941                        StorageEntry { key, value: value.value, is_private: value.is_private },
1942                    )?;
1943                }
1944            }
1945        }
1946
1947        // Write account changes
1948        tracing::trace!("Writing account changes");
1949        let mut account_changeset_cursor =
1950            self.tx_ref().cursor_dup_write::<tables::AccountChangeSets>()?;
1951
1952        for (block_index, mut account_block_reverts) in reverts.accounts.into_iter().enumerate() {
1953            let block_number = first_block + block_index as BlockNumber;
1954            // Sort accounts by address.
1955            account_block_reverts.par_sort_by_key(|a| a.0);
1956
1957            for (address, info) in account_block_reverts {
1958                account_changeset_cursor.append_dup(
1959                    block_number,
1960                    AccountBeforeTx { address, info: info.map(Into::into) },
1961                )?;
1962            }
1963        }
1964
1965        Ok(())
1966    }
1967
1968    fn write_state_changes(&self, mut changes: StateChangeset) -> ProviderResult<()> {
1969        // sort all entries so they can be written to database in more performant way.
1970        // and take smaller memory footprint.
1971        changes.accounts.par_sort_by_key(|a| a.0);
1972        changes.storage.par_sort_by_key(|a| a.address);
1973        changes.contracts.par_sort_by_key(|a| a.0);
1974
1975        // Write new account state
1976        tracing::trace!(len = changes.accounts.len(), "Writing new account state");
1977        let mut accounts_cursor = self.tx_ref().cursor_write::<tables::PlainAccountState>()?;
1978        // write account to database.
1979        for (address, account) in changes.accounts {
1980            if let Some(account) = account {
1981                tracing::trace!(?address, "Updating plain state account");
1982                accounts_cursor.upsert(address, &account.into())?;
1983            } else if accounts_cursor.seek_exact(address)?.is_some() {
1984                tracing::trace!(?address, "Deleting plain state account");
1985                accounts_cursor.delete_current()?;
1986            }
1987        }
1988
1989        // Write bytecode
1990        tracing::trace!(len = changes.contracts.len(), "Writing bytecodes");
1991        let mut bytecodes_cursor = self.tx_ref().cursor_write::<tables::Bytecodes>()?;
1992        for (hash, bytecode) in changes.contracts {
1993            bytecodes_cursor.upsert(hash, &Bytecode(bytecode))?;
1994        }
1995
1996        // Write new storage state and wipe storage if needed.
1997        tracing::trace!(len = changes.storage.len(), "Writing new storage state");
1998        let mut storages_cursor = self.tx_ref().cursor_dup_write::<tables::PlainStorageState>()?;
1999        for PlainStorageChangeset { address, wipe_storage, storage } in changes.storage {
2000            // Wiping of storage.
2001            if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
2002                storages_cursor.delete_current_duplicates()?;
2003            }
2004            // cast storages to B256.
2005            let mut storage = storage
2006                .into_iter()
2007                .map(|(k, value)| StorageEntry {
2008                    key: k.into(),
2009                    value: value.value,
2010                    is_private: value.is_private,
2011                })
2012                .collect::<Vec<_>>();
2013            // sort storage slots by key.
2014            storage.par_sort_unstable_by_key(|a| a.key);
2015
2016            for entry in storage {
2017                tracing::trace!(?address, ?entry.key, "Updating plain state storage");
2018                if let Some(db_entry) = storages_cursor.seek_by_key_subkey(address, entry.key)? {
2019                    if db_entry.key == entry.key {
2020                        storages_cursor.delete_current()?;
2021                    }
2022                }
2023
2024                if !entry.value.is_zero() {
2025                    storages_cursor.upsert(address, &entry)?;
2026                }
2027            }
2028        }
2029
2030        Ok(())
2031    }
2032
2033    fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
2034        // Write hashed account updates.
2035        let mut hashed_accounts_cursor = self.tx_ref().cursor_write::<tables::HashedAccounts>()?;
2036        for (hashed_address, account) in hashed_state.accounts().accounts_sorted() {
2037            if let Some(account) = account {
2038                hashed_accounts_cursor.upsert(hashed_address, &account)?;
2039            } else if hashed_accounts_cursor.seek_exact(hashed_address)?.is_some() {
2040                hashed_accounts_cursor.delete_current()?;
2041            }
2042        }
2043
2044        // Write hashed storage changes.
2045        let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
2046        let mut hashed_storage_cursor =
2047            self.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
2048        for (hashed_address, storage) in sorted_storages {
2049            if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
2050                hashed_storage_cursor.delete_current_duplicates()?;
2051            }
2052
2053            for (hashed_slot, value) in storage.storage_slots_sorted() {
2054                let entry = StorageEntry {
2055                    key: hashed_slot,
2056                    value: value.value,
2057                    is_private: value.is_private,
2058                };
2059                if let Some(db_entry) =
2060                    hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)?
2061                {
2062                    if db_entry.key == entry.key {
2063                        hashed_storage_cursor.delete_current()?;
2064                    }
2065                }
2066
2067                if !entry.value.is_zero() {
2068                    hashed_storage_cursor.upsert(*hashed_address, &entry)?;
2069                }
2070            }
2071        }
2072
2073        Ok(())
2074    }
2075
2076    /// Remove the last N blocks of state.
2077    ///
2078    /// The latest state will be unwound
2079    ///
2080    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
2081    ///    transaction ids.
2082    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
2083    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
2084    ///    the changesets.
2085    ///    - In order to have both the old and new values in the changesets, we also access the
2086    ///      plain state tables.
2087    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
2088    ///    we:
2089    ///     1. Take the old value from the changeset
2090    ///     2. Take the new value from the plain state
2091    ///     3. Save the old value to the local state
2092    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
2093    ///    have seen before we:
2094    ///     1. Take the old value from the changeset
2095    ///     2. Take the new value from the local state
2096    ///     3. Set the local state to the value in the changeset
2097    fn remove_state_above(
2098        &self,
2099        block: BlockNumber,
2100        remove_receipts_from: StorageLocation,
2101    ) -> ProviderResult<()> {
2102        let range = block + 1..=self.last_block_number()?;
2103
2104        if range.is_empty() {
2105            return Ok(());
2106        }
2107
2108        // We are not removing block meta as it is used to get block changesets.
2109        let block_bodies = self.block_body_indices_range(range.clone())?;
2110
2111        // get transaction receipts
2112        let from_transaction_num =
2113            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2114
2115        let storage_range = BlockNumberAddress::range(range.clone());
2116
2117        let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
2118        let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
2119
2120        // This is not working for blocks that are not at tip. as plain state is not the last
2121        // state of end range. We should rename the functions or add support to access
2122        // History state. Accessing history state can be tricky but we are not gaining
2123        // anything.
2124        let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2125        let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2126
2127        let (state, _) = self.populate_bundle_state(
2128            account_changeset,
2129            storage_changeset,
2130            &mut plain_accounts_cursor,
2131            &mut plain_storage_cursor,
2132        )?;
2133
2134        // iterate over local plain state remove all account and all storages.
2135        for (address, (old_account, new_account, storage)) in &state {
2136            // revert account if needed.
2137            if old_account != new_account {
2138                let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2139                if let Some(account) = old_account {
2140                    plain_accounts_cursor.upsert(*address, account)?;
2141                } else if existing_entry.is_some() {
2142                    plain_accounts_cursor.delete_current()?;
2143                }
2144            }
2145
2146            // revert storages
2147            for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2148                let storage_entry = StorageEntry {
2149                    key: *storage_key,
2150                    value: old_storage_value.0,
2151                    is_private: old_storage_value.1,
2152                };
2153                // delete previous value
2154                // TODO: This does not use dupsort features
2155                if plain_storage_cursor
2156                    .seek_by_key_subkey(*address, *storage_key)?
2157                    .filter(|s| s.key == *storage_key)
2158                    .is_some()
2159                {
2160                    plain_storage_cursor.delete_current()?
2161                }
2162
2163                // insert value if needed
2164                if !FlaggedStorage::new(old_storage_value.0, old_storage_value.1).is_zero() {
2165                    plain_storage_cursor.upsert(*address, &storage_entry)?;
2166                }
2167            }
2168        }
2169
2170        self.remove_receipts_from(from_transaction_num, block, remove_receipts_from)?;
2171
2172        Ok(())
2173    }
2174
2175    /// Take the last N blocks of state, recreating the [`ExecutionOutcome`].
2176    ///
2177    /// The latest state will be unwound and returned back with all the blocks
2178    ///
2179    /// 1. Iterate over the [`BlockBodyIndices`][tables::BlockBodyIndices] table to get all the
2180    ///    transaction ids.
2181    /// 2. Iterate over the [`StorageChangeSets`][tables::StorageChangeSets] table and the
2182    ///    [`AccountChangeSets`][tables::AccountChangeSets] tables in reverse order to reconstruct
2183    ///    the changesets.
2184    ///    - In order to have both the old and new values in the changesets, we also access the
2185    ///      plain state tables.
2186    /// 3. While iterating over the changeset tables, if we encounter a new account or storage slot,
2187    ///    we:
2188    ///     1. Take the old value from the changeset
2189    ///     2. Take the new value from the plain state
2190    ///     3. Save the old value to the local state
2191    /// 4. While iterating over the changeset tables, if we encounter an account/storage slot we
2192    ///    have seen before we:
2193    ///     1. Take the old value from the changeset
2194    ///     2. Take the new value from the local state
2195    ///     3. Set the local state to the value in the changeset
2196    fn take_state_above(
2197        &self,
2198        block: BlockNumber,
2199        remove_receipts_from: StorageLocation,
2200    ) -> ProviderResult<ExecutionOutcome<Self::Receipt>> {
2201        let range = block + 1..=self.last_block_number()?;
2202
2203        if range.is_empty() {
2204            return Ok(ExecutionOutcome::default())
2205        }
2206        let start_block_number = *range.start();
2207
2208        // We are not removing block meta as it is used to get block changesets.
2209        let block_bodies = self.block_body_indices_range(range.clone())?;
2210
2211        // get transaction receipts
2212        let from_transaction_num =
2213            block_bodies.first().expect("already checked if there are blocks").first_tx_num();
2214        let to_transaction_num =
2215            block_bodies.last().expect("already checked if there are blocks").last_tx_num();
2216
2217        let storage_range = BlockNumberAddress::range(range.clone());
2218
2219        let storage_changeset = self.take::<tables::StorageChangeSets>(storage_range)?;
2220        let account_changeset = self.take::<tables::AccountChangeSets>(range)?;
2221
2222        // This is not working for blocks that are not at tip. as plain state is not the last
2223        // state of end range. We should rename the functions or add support to access
2224        // History state. Accessing history state can be tricky but we are not gaining
2225        // anything.
2226        let mut plain_accounts_cursor = self.tx.cursor_write::<tables::PlainAccountState>()?;
2227        let mut plain_storage_cursor = self.tx.cursor_dup_write::<tables::PlainStorageState>()?;
2228
2229        // populate bundle state and reverts from changesets / state cursors, to iterate over,
2230        // remove, and return later
2231        let (state, reverts) = self.populate_bundle_state(
2232            account_changeset,
2233            storage_changeset,
2234            &mut plain_accounts_cursor,
2235            &mut plain_storage_cursor,
2236        )?;
2237
2238        // iterate over local plain state remove all account and all storages.
2239        for (address, (old_account, new_account, storage)) in &state {
2240            // revert account if needed.
2241            if old_account != new_account {
2242                let existing_entry = plain_accounts_cursor.seek_exact(*address)?;
2243                if let Some(account) = old_account {
2244                    plain_accounts_cursor.upsert(*address, account)?;
2245                } else if existing_entry.is_some() {
2246                    plain_accounts_cursor.delete_current()?;
2247                }
2248            }
2249
2250            // revert storages
2251            for (storage_key, (old_storage_value, _new_storage_value)) in storage {
2252                let storage_entry = StorageEntry {
2253                    key: *storage_key,
2254                    value: old_storage_value.0,
2255                    is_private: old_storage_value.1,
2256                };
2257                // delete previous value
2258                // TODO: This does not use dupsort features
2259                if plain_storage_cursor
2260                    .seek_by_key_subkey(*address, *storage_key)?
2261                    .filter(|s| s.key == *storage_key)
2262                    .is_some()
2263                {
2264                    plain_storage_cursor.delete_current()?
2265                }
2266
2267                // insert value if needed
2268                if !FlaggedStorage::new(old_storage_value.0, old_storage_value.1).is_zero() {
2269                    plain_storage_cursor.upsert(*address, &storage_entry)?;
2270                }
2271            }
2272        }
2273
2274        // Collect receipts into tuples (tx_num, receipt) to correctly handle pruned receipts
2275        let mut receipts_iter = self
2276            .static_file_provider
2277            .get_range_with_static_file_or_database(
2278                StaticFileSegment::Receipts,
2279                from_transaction_num..to_transaction_num + 1,
2280                |static_file, range, _| {
2281                    static_file
2282                        .receipts_by_tx_range(range.clone())
2283                        .map(|r| range.into_iter().zip(r).collect())
2284                },
2285                |range, _| {
2286                    self.tx
2287                        .cursor_read::<tables::Receipts<Self::Receipt>>()?
2288                        .walk_range(range)?
2289                        .map(|r| r.map_err(Into::into))
2290                        .collect()
2291                },
2292                |_| true,
2293            )?
2294            .into_iter()
2295            .peekable();
2296
2297        let mut receipts = Vec::with_capacity(block_bodies.len());
2298        // loop break if we are at the end of the blocks.
2299        for block_body in block_bodies {
2300            let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
2301            for num in block_body.tx_num_range() {
2302                if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
2303                    block_receipts.push(receipts_iter.next().unwrap().1);
2304                }
2305            }
2306            receipts.push(block_receipts);
2307        }
2308
2309        self.remove_receipts_from(from_transaction_num, block, remove_receipts_from)?;
2310
2311        Ok(ExecutionOutcome::new_init(
2312            state,
2313            reverts,
2314            Vec::new(),
2315            receipts,
2316            start_block_number,
2317            Vec::new(),
2318        ))
2319    }
2320}
2321
2322impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> TrieWriter for DatabaseProvider<TX, N> {
2323    /// Writes trie updates. Returns the number of entries modified.
2324    fn write_trie_updates(&self, trie_updates: &TrieUpdates) -> ProviderResult<usize> {
2325        if trie_updates.is_empty() {
2326            return Ok(0)
2327        }
2328
2329        // Track the number of inserted entries.
2330        let mut num_entries = 0;
2331
2332        // Merge updated and removed nodes. Updated nodes must take precedence.
2333        let mut account_updates = trie_updates
2334            .removed_nodes_ref()
2335            .iter()
2336            .filter_map(|n| {
2337                (!trie_updates.account_nodes_ref().contains_key(n)).then_some((n, None))
2338            })
2339            .collect::<Vec<_>>();
2340        account_updates.extend(
2341            trie_updates.account_nodes_ref().iter().map(|(nibbles, node)| (nibbles, Some(node))),
2342        );
2343        // Sort trie node updates.
2344        account_updates.sort_unstable_by(|a, b| a.0.cmp(b.0));
2345
2346        let tx = self.tx_ref();
2347        let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
2348        for (key, updated_node) in account_updates {
2349            let nibbles = StoredNibbles(key.clone());
2350            match updated_node {
2351                Some(node) => {
2352                    if !nibbles.0.is_empty() {
2353                        num_entries += 1;
2354                        account_trie_cursor.upsert(nibbles, node)?;
2355                    }
2356                }
2357                None => {
2358                    num_entries += 1;
2359                    if account_trie_cursor.seek_exact(nibbles)?.is_some() {
2360                        account_trie_cursor.delete_current()?;
2361                    }
2362                }
2363            }
2364        }
2365
2366        num_entries += self.write_storage_trie_updates(trie_updates.storage_tries_ref())?;
2367
2368        Ok(num_entries)
2369    }
2370}
2371
2372impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> StorageTrieWriter for DatabaseProvider<TX, N> {
2373    /// Writes storage trie updates from the given storage trie map. First sorts the storage trie
2374    /// updates by the hashed address, writing in sorted order.
2375    fn write_storage_trie_updates(
2376        &self,
2377        storage_tries: &B256Map<StorageTrieUpdates>,
2378    ) -> ProviderResult<usize> {
2379        let mut num_entries = 0;
2380        let mut storage_tries = Vec::from_iter(storage_tries);
2381        storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
2382        let mut cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2383        for (hashed_address, storage_trie_updates) in storage_tries {
2384            let mut db_storage_trie_cursor =
2385                DatabaseStorageTrieCursor::new(cursor, *hashed_address);
2386            num_entries +=
2387                db_storage_trie_cursor.write_storage_trie_updates(storage_trie_updates)?;
2388            cursor = db_storage_trie_cursor.cursor;
2389        }
2390
2391        Ok(num_entries)
2392    }
2393
2394    fn write_individual_storage_trie_updates(
2395        &self,
2396        hashed_address: B256,
2397        updates: &StorageTrieUpdates,
2398    ) -> ProviderResult<usize> {
2399        if updates.is_empty() {
2400            return Ok(0)
2401        }
2402
2403        let cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
2404        let mut trie_db_cursor = DatabaseStorageTrieCursor::new(cursor, hashed_address);
2405        Ok(trie_db_cursor.write_storage_trie_updates(updates)?)
2406    }
2407}
2408
2409impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HashingWriter for DatabaseProvider<TX, N> {
2410    fn unwind_account_hashing<'a>(
2411        &self,
2412        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2413    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2414        // Aggregate all block changesets and make a list of accounts that have been changed.
2415        // Note that collecting and then reversing the order is necessary to ensure that the
2416        // changes are applied in the correct order.
2417        let hashed_accounts = changesets
2418            .into_iter()
2419            .map(|(_, e)| (keccak256(e.address), e.info))
2420            .collect::<Vec<_>>()
2421            .into_iter()
2422            .rev()
2423            .collect::<BTreeMap<_, _>>();
2424
2425        // Apply values to HashedState, and remove the account if it's None.
2426        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2427        for (hashed_address, account) in &hashed_accounts {
2428            if let Some(account) = account {
2429                hashed_accounts_cursor.upsert(*hashed_address, account)?;
2430            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2431                hashed_accounts_cursor.delete_current()?;
2432            }
2433        }
2434
2435        Ok(hashed_accounts)
2436    }
2437
2438    fn unwind_account_hashing_range(
2439        &self,
2440        range: impl RangeBounds<BlockNumber>,
2441    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2442        let changesets = self
2443            .tx
2444            .cursor_read::<tables::AccountChangeSets>()?
2445            .walk_range(range)?
2446            .collect::<Result<Vec<_>, _>>()?;
2447        self.unwind_account_hashing(changesets.iter())
2448    }
2449
2450    fn insert_account_for_hashing(
2451        &self,
2452        changesets: impl IntoIterator<Item = (Address, Option<Account>)>,
2453    ) -> ProviderResult<BTreeMap<B256, Option<Account>>> {
2454        let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccounts>()?;
2455        let hashed_accounts =
2456            changesets.into_iter().map(|(ad, ac)| (keccak256(ad), ac)).collect::<BTreeMap<_, _>>();
2457        for (hashed_address, account) in &hashed_accounts {
2458            if let Some(account) = account {
2459                hashed_accounts_cursor.upsert(*hashed_address, account)?;
2460            } else if hashed_accounts_cursor.seek_exact(*hashed_address)?.is_some() {
2461                hashed_accounts_cursor.delete_current()?;
2462            }
2463        }
2464        Ok(hashed_accounts)
2465    }
2466
2467    fn unwind_storage_hashing(
2468        &self,
2469        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2470    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2471        // Aggregate all block changesets and make list of accounts that have been changed.
2472        let mut hashed_storages = changesets
2473            .into_iter()
2474            .map(|(BlockNumberAddress((_, address)), storage_entry)| {
2475                (
2476                    keccak256(address),
2477                    keccak256(storage_entry.key),
2478                    storage_entry.value,
2479                    storage_entry.is_private,
2480                )
2481            })
2482            .collect::<Vec<_>>();
2483        hashed_storages.sort_by_key(|(ha, hk, _, _)| (*ha, *hk));
2484
2485        // Apply values to HashedState, and remove the account if it's None.
2486        let mut hashed_storage_keys: HashMap<B256, BTreeSet<B256>> =
2487            HashMap::with_capacity_and_hasher(hashed_storages.len(), Default::default());
2488        let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2489        for (hashed_address, key, value, is_private) in hashed_storages.into_iter().rev() {
2490            hashed_storage_keys.entry(hashed_address).or_default().insert(key);
2491
2492            if hashed_storage
2493                .seek_by_key_subkey(hashed_address, key)?
2494                .filter(|entry| entry.key == key)
2495                .is_some()
2496            {
2497                hashed_storage.delete_current()?;
2498            }
2499
2500            if !value.is_zero() {
2501                hashed_storage.upsert(hashed_address, &StorageEntry { key, value, is_private })?;
2502            }
2503        }
2504        Ok(hashed_storage_keys)
2505    }
2506
2507    fn unwind_storage_hashing_range(
2508        &self,
2509        range: impl RangeBounds<BlockNumberAddress>,
2510    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2511        let changesets = self
2512            .tx
2513            .cursor_read::<tables::StorageChangeSets>()?
2514            .walk_range(range)?
2515            .collect::<Result<Vec<_>, _>>()?;
2516        self.unwind_storage_hashing(changesets.into_iter())
2517    }
2518
2519    fn insert_storage_for_hashing(
2520        &self,
2521        storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
2522    ) -> ProviderResult<HashMap<B256, BTreeSet<B256>>> {
2523        // hash values
2524        let hashed_storages =
2525            storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
2526                let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
2527                    map.insert(keccak256(entry.key), entry.value);
2528                    map
2529                });
2530                map.insert(keccak256(address), storage);
2531                map
2532            });
2533
2534        let hashed_storage_keys =
2535            HashMap::from_iter(hashed_storages.iter().map(|(hashed_address, entries)| {
2536                (*hashed_address, BTreeSet::from_iter(entries.keys().copied()))
2537            }));
2538
2539        let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorages>()?;
2540        // Hash the address and key and apply them to HashedStorage (if Storage is None
2541        // just remove it);
2542        hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
2543            storage.into_iter().try_for_each(|(key, value)| -> ProviderResult<()> {
2544                if hashed_storage_cursor
2545                    .seek_by_key_subkey(hashed_address, key)?
2546                    .filter(|entry| entry.key == key)
2547                    .is_some()
2548                {
2549                    hashed_storage_cursor.delete_current()?;
2550                }
2551
2552                if !value.is_zero() {
2553                    hashed_storage_cursor.upsert(
2554                        hashed_address,
2555                        &StorageEntry { key, value, ..Default::default() },
2556                    )?;
2557                }
2558                Ok(())
2559            })
2560        })?;
2561
2562        Ok(hashed_storage_keys)
2563    }
2564
2565    fn insert_hashes(
2566        &self,
2567        range: RangeInclusive<BlockNumber>,
2568        end_block_hash: B256,
2569        expected_state_root: B256,
2570    ) -> ProviderResult<()> {
2571        // Initialize prefix sets.
2572        let mut account_prefix_set = PrefixSetMut::default();
2573        let mut storage_prefix_sets: HashMap<B256, PrefixSetMut> = HashMap::default();
2574        let mut destroyed_accounts = HashSet::default();
2575
2576        let mut durations_recorder = metrics::DurationsRecorder::default();
2577
2578        // storage hashing stage
2579        {
2580            let lists = self.changed_storages_with_range(range.clone())?;
2581            let storages = self.plain_state_storages(lists)?;
2582            let storage_entries = self.insert_storage_for_hashing(storages)?;
2583            for (hashed_address, hashed_slots) in storage_entries {
2584                account_prefix_set.insert(Nibbles::unpack(hashed_address));
2585                for slot in hashed_slots {
2586                    storage_prefix_sets
2587                        .entry(hashed_address)
2588                        .or_default()
2589                        .insert(Nibbles::unpack(slot));
2590                }
2591            }
2592        }
2593        durations_recorder.record_relative(metrics::Action::InsertStorageHashing);
2594
2595        // account hashing stage
2596        {
2597            let lists = self.changed_accounts_with_range(range.clone())?;
2598            let accounts = self.basic_accounts(lists)?;
2599            let hashed_addresses = self.insert_account_for_hashing(accounts)?;
2600            for (hashed_address, account) in hashed_addresses {
2601                account_prefix_set.insert(Nibbles::unpack(hashed_address));
2602                if account.is_none() {
2603                    destroyed_accounts.insert(hashed_address);
2604                }
2605            }
2606        }
2607        durations_recorder.record_relative(metrics::Action::InsertAccountHashing);
2608
2609        // merkle tree
2610        {
2611            // This is the same as `StateRoot::incremental_root_with_updates`, only the prefix sets
2612            // are pre-loaded.
2613            let prefix_sets = TriePrefixSets {
2614                account_prefix_set: account_prefix_set.freeze(),
2615                storage_prefix_sets: storage_prefix_sets
2616                    .into_iter()
2617                    .map(|(k, v)| (k, v.freeze()))
2618                    .collect(),
2619                destroyed_accounts,
2620            };
2621            let (state_root, trie_updates) = StateRoot::from_tx(&self.tx)
2622                .with_prefix_sets(prefix_sets)
2623                .root_with_updates()
2624                .map_err(reth_db_api::DatabaseError::from)?;
2625            if state_root != expected_state_root {
2626                return Err(ProviderError::StateRootMismatch(Box::new(RootMismatch {
2627                    root: GotExpected { got: state_root, expected: expected_state_root },
2628                    block_number: *range.end(),
2629                    block_hash: end_block_hash,
2630                })))
2631            }
2632            self.write_trie_updates(&trie_updates)?;
2633        }
2634        durations_recorder.record_relative(metrics::Action::InsertMerkleTree);
2635
2636        debug!(target: "providers::db", ?range, actions = ?durations_recorder.actions, "Inserted hashes");
2637
2638        Ok(())
2639    }
2640}
2641
2642impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvider<TX, N> {
2643    fn unwind_account_history_indices<'a>(
2644        &self,
2645        changesets: impl Iterator<Item = &'a (BlockNumber, AccountBeforeTx)>,
2646    ) -> ProviderResult<usize> {
2647        let mut last_indices = changesets
2648            .into_iter()
2649            .map(|(index, account)| (account.address, *index))
2650            .collect::<Vec<_>>();
2651        last_indices.sort_by_key(|(a, _)| *a);
2652
2653        // Unwind the account history index.
2654        let mut cursor = self.tx.cursor_write::<tables::AccountsHistory>()?;
2655        for &(address, rem_index) in &last_indices {
2656            let partial_shard = unwind_history_shards::<_, tables::AccountsHistory, _>(
2657                &mut cursor,
2658                ShardedKey::last(address),
2659                rem_index,
2660                |sharded_key| sharded_key.key == address,
2661            )?;
2662
2663            // Check the last returned partial shard.
2664            // If it's not empty, the shard needs to be reinserted.
2665            if !partial_shard.is_empty() {
2666                cursor.insert(
2667                    ShardedKey::last(address),
2668                    &BlockNumberList::new_pre_sorted(partial_shard),
2669                )?;
2670            }
2671        }
2672
2673        let changesets = last_indices.len();
2674        Ok(changesets)
2675    }
2676
2677    fn unwind_account_history_indices_range(
2678        &self,
2679        range: impl RangeBounds<BlockNumber>,
2680    ) -> ProviderResult<usize> {
2681        let changesets = self
2682            .tx
2683            .cursor_read::<tables::AccountChangeSets>()?
2684            .walk_range(range)?
2685            .collect::<Result<Vec<_>, _>>()?;
2686        self.unwind_account_history_indices(changesets.iter())
2687    }
2688
2689    fn insert_account_history_index(
2690        &self,
2691        account_transitions: impl IntoIterator<Item = (Address, impl IntoIterator<Item = u64>)>,
2692    ) -> ProviderResult<()> {
2693        self.append_history_index::<_, tables::AccountsHistory>(
2694            account_transitions,
2695            ShardedKey::new,
2696        )
2697    }
2698
2699    fn unwind_storage_history_indices(
2700        &self,
2701        changesets: impl Iterator<Item = (BlockNumberAddress, StorageEntry)>,
2702    ) -> ProviderResult<usize> {
2703        let mut storage_changesets = changesets
2704            .into_iter()
2705            .map(|(BlockNumberAddress((bn, address)), storage)| (address, storage.key, bn))
2706            .collect::<Vec<_>>();
2707        storage_changesets.sort_by_key(|(address, key, _)| (*address, *key));
2708
2709        let mut cursor = self.tx.cursor_write::<tables::StoragesHistory>()?;
2710        for &(address, storage_key, rem_index) in &storage_changesets {
2711            let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>(
2712                &mut cursor,
2713                StorageShardedKey::last(address, storage_key),
2714                rem_index,
2715                |storage_sharded_key| {
2716                    storage_sharded_key.address == address &&
2717                        storage_sharded_key.sharded_key.key == storage_key
2718                },
2719            )?;
2720
2721            // Check the last returned partial shard.
2722            // If it's not empty, the shard needs to be reinserted.
2723            if !partial_shard.is_empty() {
2724                cursor.insert(
2725                    StorageShardedKey::last(address, storage_key),
2726                    &BlockNumberList::new_pre_sorted(partial_shard),
2727                )?;
2728            }
2729        }
2730
2731        let changesets = storage_changesets.len();
2732        Ok(changesets)
2733    }
2734
2735    fn unwind_storage_history_indices_range(
2736        &self,
2737        range: impl RangeBounds<BlockNumberAddress>,
2738    ) -> ProviderResult<usize> {
2739        let changesets = self
2740            .tx
2741            .cursor_read::<tables::StorageChangeSets>()?
2742            .walk_range(range)?
2743            .collect::<Result<Vec<_>, _>>()?;
2744        self.unwind_storage_history_indices(changesets.into_iter())
2745    }
2746
2747    fn insert_storage_history_index(
2748        &self,
2749        storage_transitions: impl IntoIterator<Item = ((Address, B256), impl IntoIterator<Item = u64>)>,
2750    ) -> ProviderResult<()> {
2751        self.append_history_index::<_, tables::StoragesHistory>(
2752            storage_transitions,
2753            |(address, storage_key), highest_block_number| {
2754                StorageShardedKey::new(address, storage_key, highest_block_number)
2755            },
2756        )
2757    }
2758
2759    fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
2760        // account history stage
2761        {
2762            let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
2763            self.insert_account_history_index(indices)?;
2764        }
2765
2766        // storage history stage
2767        {
2768            let indices = self.changed_storages_and_blocks_with_range(range)?;
2769            self.insert_storage_history_index(indices)?;
2770        }
2771
2772        Ok(())
2773    }
2774}
2775
2776impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockExecutionWriter
2777    for DatabaseProvider<TX, N>
2778{
2779    fn take_block_and_execution_above(
2780        &self,
2781        block: BlockNumber,
2782        remove_from: StorageLocation,
2783    ) -> ProviderResult<Chain<Self::Primitives>> {
2784        let range = block + 1..=self.last_block_number()?;
2785
2786        self.unwind_trie_state_range(range.clone())?;
2787
2788        // get execution res
2789        let execution_state = self.take_state_above(block, remove_from)?;
2790
2791        let blocks = self.recovered_block_range(range)?;
2792
2793        // remove block bodies it is needed for both get block range and get block execution results
2794        // that is why it is deleted afterwards.
2795        self.remove_blocks_above(block, remove_from)?;
2796
2797        // Update pipeline progress
2798        self.update_pipeline_stages(block, true)?;
2799
2800        Ok(Chain::new(blocks, execution_state, None))
2801    }
2802
2803    fn remove_block_and_execution_above(
2804        &self,
2805        block: BlockNumber,
2806        remove_from: StorageLocation,
2807    ) -> ProviderResult<()> {
2808        let range = block + 1..=self.last_block_number()?;
2809
2810        self.unwind_trie_state_range(range)?;
2811
2812        // remove execution res
2813        self.remove_state_above(block, remove_from)?;
2814
2815        // remove block bodies it is needed for both get block range and get block execution results
2816        // that is why it is deleted afterwards.
2817        self.remove_blocks_above(block, remove_from)?;
2818
2819        // Update pipeline progress
2820        self.update_pipeline_stages(block, true)?;
2821
2822        Ok(())
2823    }
2824}
2825
2826impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWriter
2827    for DatabaseProvider<TX, N>
2828{
2829    type Block = BlockTy<N>;
2830    type Receipt = ReceiptTy<N>;
2831
2832    /// Inserts the block into the database, always modifying the following tables:
2833    /// * [`CanonicalHeaders`](tables::CanonicalHeaders)
2834    /// * [`Headers`](tables::Headers)
2835    /// * [`HeaderNumbers`](tables::HeaderNumbers)
2836    /// * [`HeaderTerminalDifficulties`](tables::HeaderTerminalDifficulties)
2837    /// * [`BlockBodyIndices`](tables::BlockBodyIndices)
2838    ///
2839    /// If there are transactions in the block, the following tables will be modified:
2840    /// * [`Transactions`](tables::Transactions)
2841    /// * [`TransactionBlocks`](tables::TransactionBlocks)
2842    ///
2843    /// If ommers are not empty, this will modify [`BlockOmmers`](tables::BlockOmmers).
2844    /// If withdrawals are not empty, this will modify
2845    /// [`BlockWithdrawals`](tables::BlockWithdrawals).
2846    ///
2847    /// If the provider has __not__ configured full sender pruning, this will modify
2848    /// [`TransactionSenders`](tables::TransactionSenders).
2849    ///
2850    /// If the provider has __not__ configured full transaction lookup pruning, this will modify
2851    /// [`TransactionHashNumbers`](tables::TransactionHashNumbers).
2852    fn insert_block(
2853        &self,
2854        block: RecoveredBlock<Self::Block>,
2855        write_to: StorageLocation,
2856    ) -> ProviderResult<StoredBlockBodyIndices> {
2857        let block_number = block.number();
2858
2859        let mut durations_recorder = metrics::DurationsRecorder::default();
2860
2861        // total difficulty
2862        let ttd = if block_number == 0 {
2863            block.header().difficulty()
2864        } else {
2865            let parent_block_number = block_number - 1;
2866            let parent_ttd = self.header_td_by_number(parent_block_number)?.unwrap_or_default();
2867            durations_recorder.record_relative(metrics::Action::GetParentTD);
2868            parent_ttd + block.header().difficulty()
2869        };
2870
2871        if write_to.database() {
2872            self.tx.put::<tables::CanonicalHeaders>(block_number, block.hash())?;
2873            durations_recorder.record_relative(metrics::Action::InsertCanonicalHeaders);
2874
2875            // Put header with canonical hashes.
2876            self.tx.put::<tables::Headers<HeaderTy<N>>>(block_number, block.header().clone())?;
2877            durations_recorder.record_relative(metrics::Action::InsertHeaders);
2878
2879            self.tx.put::<tables::HeaderTerminalDifficulties>(block_number, ttd.into())?;
2880            durations_recorder.record_relative(metrics::Action::InsertHeaderTerminalDifficulties);
2881        }
2882
2883        if write_to.static_files() {
2884            let mut writer =
2885                self.static_file_provider.get_writer(block_number, StaticFileSegment::Headers)?;
2886            writer.append_header(block.header(), ttd, &block.hash())?;
2887        }
2888
2889        self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
2890        durations_recorder.record_relative(metrics::Action::InsertHeaderNumbers);
2891
2892        let mut next_tx_num = self
2893            .tx
2894            .cursor_read::<tables::TransactionBlocks>()?
2895            .last()?
2896            .map(|(n, _)| n + 1)
2897            .unwrap_or_default();
2898        durations_recorder.record_relative(metrics::Action::GetNextTxNum);
2899        let first_tx_num = next_tx_num;
2900
2901        let tx_count = block.body().transaction_count() as u64;
2902
2903        // Ensures we have all the senders for the block's transactions.
2904        for (transaction, sender) in block.body().transactions_iter().zip(block.senders_iter()) {
2905            let hash = transaction.tx_hash();
2906
2907            if self.prune_modes.sender_recovery.as_ref().is_none_or(|m| !m.is_full()) {
2908                self.tx.put::<tables::TransactionSenders>(next_tx_num, *sender)?;
2909            }
2910
2911            if self.prune_modes.transaction_lookup.is_none_or(|m| !m.is_full()) {
2912                self.tx.put::<tables::TransactionHashNumbers>(*hash, next_tx_num)?;
2913            }
2914            next_tx_num += 1;
2915        }
2916
2917        self.append_block_bodies(vec![(block_number, Some(block.into_body()))], write_to)?;
2918
2919        debug!(
2920            target: "providers::db",
2921            ?block_number,
2922            actions = ?durations_recorder.actions,
2923            "Inserted block"
2924        );
2925
2926        Ok(StoredBlockBodyIndices { first_tx_num, tx_count })
2927    }
2928
2929    fn append_block_bodies(
2930        &self,
2931        bodies: Vec<(BlockNumber, Option<BodyTy<N>>)>,
2932        write_to: StorageLocation,
2933    ) -> ProviderResult<()> {
2934        let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) };
2935
2936        // Initialize writer if we will be writing transactions to staticfiles
2937        let mut tx_static_writer = write_to
2938            .static_files()
2939            .then(|| {
2940                self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)
2941            })
2942            .transpose()?;
2943
2944        let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
2945        let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
2946
2947        // Initialize cursor if we will be writing transactions to database
2948        let mut tx_cursor = write_to
2949            .database()
2950            .then(|| self.tx.cursor_write::<tables::Transactions<TxTy<N>>>())
2951            .transpose()?;
2952
2953        // Get id for the next tx_num or zero if there are no transactions.
2954        let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
2955
2956        for (block_number, body) in &bodies {
2957            // Increment block on static file header.
2958            if let Some(writer) = tx_static_writer.as_mut() {
2959                writer.increment_block(*block_number)?;
2960            }
2961
2962            let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default();
2963            let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };
2964
2965            let mut durations_recorder = metrics::DurationsRecorder::default();
2966
2967            // insert block meta
2968            block_indices_cursor.append(*block_number, &block_indices)?;
2969
2970            durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
2971
2972            let Some(body) = body else { continue };
2973
2974            // write transaction block index
2975            if !body.transactions().is_empty() {
2976                tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
2977                durations_recorder.record_relative(metrics::Action::InsertTransactionBlocks);
2978            }
2979
2980            // write transactions
2981            for transaction in body.transactions() {
2982                if let Some(writer) = tx_static_writer.as_mut() {
2983                    writer.append_transaction(next_tx_num, transaction)?;
2984                }
2985                if let Some(cursor) = tx_cursor.as_mut() {
2986                    cursor.append(next_tx_num, transaction)?;
2987                }
2988
2989                // Increment transaction id for each transaction.
2990                next_tx_num += 1;
2991            }
2992
2993            debug!(
2994                target: "providers::db",
2995                ?block_number,
2996                actions = ?durations_recorder.actions,
2997                "Inserted block body"
2998            );
2999        }
3000
3001        self.storage.writer().write_block_bodies(self, bodies, write_to)?;
3002
3003        Ok(())
3004    }
3005
3006    fn remove_blocks_above(
3007        &self,
3008        block: BlockNumber,
3009        remove_from: StorageLocation,
3010    ) -> ProviderResult<()> {
3011        for hash in self.canonical_hashes_range(block + 1, self.last_block_number()? + 1)? {
3012            self.tx.delete::<tables::HeaderNumbers>(hash, None)?;
3013        }
3014
3015        // Only prune canonical headers after we've removed the block hashes as we rely on data from
3016        // this table in `canonical_hashes_range`.
3017        self.remove::<tables::CanonicalHeaders>(block + 1..)?;
3018        self.remove::<tables::Headers<HeaderTy<N>>>(block + 1..)?;
3019        self.remove::<tables::HeaderTerminalDifficulties>(block + 1..)?;
3020
3021        // First transaction to be removed
3022        let unwind_tx_from = self
3023            .block_body_indices(block)?
3024            .map(|b| b.next_tx_num())
3025            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3026
3027        // Last transaction to be removed
3028        let unwind_tx_to = self
3029            .tx
3030            .cursor_read::<tables::BlockBodyIndices>()?
3031            .last()?
3032            // shouldn't happen because this was OK above
3033            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?
3034            .1
3035            .last_tx_num();
3036
3037        if unwind_tx_from <= unwind_tx_to {
3038            for (hash, _) in self.transaction_hashes_by_range(unwind_tx_from..(unwind_tx_to + 1))? {
3039                self.tx.delete::<tables::TransactionHashNumbers>(hash, None)?;
3040            }
3041        }
3042
3043        self.remove::<tables::TransactionSenders>(unwind_tx_from..)?;
3044
3045        self.remove_bodies_above(block, remove_from)?;
3046
3047        Ok(())
3048    }
3049
3050    fn remove_bodies_above(
3051        &self,
3052        block: BlockNumber,
3053        remove_from: StorageLocation,
3054    ) -> ProviderResult<()> {
3055        self.storage.writer().remove_block_bodies_above(self, block, remove_from)?;
3056
3057        // First transaction to be removed
3058        let unwind_tx_from = self
3059            .block_body_indices(block)?
3060            .map(|b| b.next_tx_num())
3061            .ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
3062
3063        self.remove::<tables::BlockBodyIndices>(block + 1..)?;
3064        self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
3065
3066        if remove_from.database() {
3067            self.remove::<tables::Transactions<TxTy<N>>>(unwind_tx_from..)?;
3068        }
3069
3070        if remove_from.static_files() {
3071            let static_file_tx_num = self
3072                .static_file_provider
3073                .get_highest_static_file_tx(StaticFileSegment::Transactions);
3074
3075            let to_delete = static_file_tx_num
3076                .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from))
3077                .unwrap_or_default();
3078
3079            self.static_file_provider
3080                .latest_writer(StaticFileSegment::Transactions)?
3081                .prune_transactions(to_delete, block)?;
3082        }
3083
3084        Ok(())
3085    }
3086
3087    /// TODO(joshie): this fn should be moved to `UnifiedStorageWriter` eventually
3088    fn append_blocks_with_state(
3089        &self,
3090        blocks: Vec<RecoveredBlock<Self::Block>>,
3091        execution_outcome: &ExecutionOutcome<Self::Receipt>,
3092        hashed_state: HashedPostStateSorted,
3093        trie_updates: TrieUpdates,
3094    ) -> ProviderResult<()> {
3095        if blocks.is_empty() {
3096            debug!(target: "providers::db", "Attempted to append empty block range");
3097            return Ok(())
3098        }
3099
3100        let first_number = blocks.first().unwrap().number();
3101
3102        let last = blocks.last().unwrap();
3103        let last_block_number = last.number();
3104
3105        let mut durations_recorder = metrics::DurationsRecorder::default();
3106
3107        // Insert the blocks
3108        for block in blocks {
3109            self.insert_block(block, StorageLocation::Database)?;
3110            durations_recorder.record_relative(metrics::Action::InsertBlock);
3111        }
3112
3113        self.write_state(execution_outcome, OriginalValuesKnown::No, StorageLocation::Database)?;
3114        durations_recorder.record_relative(metrics::Action::InsertState);
3115
3116        // insert hashes and intermediate merkle nodes
3117        self.write_hashed_state(&hashed_state)?;
3118        self.write_trie_updates(&trie_updates)?;
3119        durations_recorder.record_relative(metrics::Action::InsertHashes);
3120
3121        self.update_history_indices(first_number..=last_block_number)?;
3122        durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
3123
3124        // Update pipeline progress
3125        self.update_pipeline_stages(last_block_number, false)?;
3126        durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
3127
3128        debug!(target: "providers::db", range = ?first_number..=last_block_number, actions = ?durations_recorder.actions, "Appended blocks");
3129
3130        Ok(())
3131    }
3132}
3133
3134impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvider<TX, N> {
3135    fn get_prune_checkpoint(
3136        &self,
3137        segment: PruneSegment,
3138    ) -> ProviderResult<Option<PruneCheckpoint>> {
3139        Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
3140    }
3141
3142    fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
3143        Ok(self
3144            .tx
3145            .cursor_read::<tables::PruneCheckpoints>()?
3146            .walk(None)?
3147            .collect::<Result<_, _>>()?)
3148    }
3149}
3150
3151impl<TX: DbTxMut, N: NodeTypes> PruneCheckpointWriter for DatabaseProvider<TX, N> {
3152    fn save_prune_checkpoint(
3153        &self,
3154        segment: PruneSegment,
3155        checkpoint: PruneCheckpoint,
3156    ) -> ProviderResult<()> {
3157        Ok(self.tx.put::<tables::PruneCheckpoints>(segment, checkpoint)?)
3158    }
3159}
3160
3161impl<TX: DbTx + 'static, N: NodeTypesForProvider> StatsReader for DatabaseProvider<TX, N> {
3162    fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3163        let db_entries = self.tx.entries::<T>()?;
3164        let static_file_entries = match self.static_file_provider.count_entries::<T>() {
3165            Ok(entries) => entries,
3166            Err(ProviderError::UnsupportedProvider) => 0,
3167            Err(err) => return Err(err),
3168        };
3169
3170        Ok(db_entries + static_file_entries)
3171    }
3172}
3173
3174impl<TX: DbTx + 'static, N: NodeTypes> ChainStateBlockReader for DatabaseProvider<TX, N> {
3175    fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3176        let mut finalized_blocks = self
3177            .tx
3178            .cursor_read::<tables::ChainState>()?
3179            .walk(Some(tables::ChainStateKey::LastFinalizedBlock))?
3180            .take(1)
3181            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3182
3183        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3184        Ok(last_finalized_block_number)
3185    }
3186
3187    fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
3188        let mut finalized_blocks = self
3189            .tx
3190            .cursor_read::<tables::ChainState>()?
3191            .walk(Some(tables::ChainStateKey::LastSafeBlockBlock))?
3192            .take(1)
3193            .collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;
3194
3195        let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
3196        Ok(last_finalized_block_number)
3197    }
3198}
3199
3200impl<TX: DbTxMut, N: NodeTypes> ChainStateBlockWriter for DatabaseProvider<TX, N> {
3201    fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3202        Ok(self
3203            .tx
3204            .put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
3205    }
3206
3207    fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
3208        Ok(self
3209            .tx
3210            .put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlockBlock, block_number)?)
3211    }
3212}
3213
3214impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider<TX, N> {
3215    type Tx = TX;
3216
3217    fn tx_ref(&self) -> &Self::Tx {
3218        &self.tx
3219    }
3220
3221    fn tx_mut(&mut self) -> &mut Self::Tx {
3222        &mut self.tx
3223    }
3224
3225    fn into_tx(self) -> Self::Tx {
3226        self.tx
3227    }
3228
3229    fn prune_modes_ref(&self) -> &PruneModes {
3230        self.prune_modes_ref()
3231    }
3232}