reth_engine_tree/tree/payload_processor/
multiproof.rs

1//! Multiproof task related functionality.
2
3use crate::tree::payload_processor::executor::WorkloadExecutor;
4use alloy_evm::block::StateChangeSource;
5use alloy_primitives::{
6    keccak256,
7    map::{B256Set, HashSet},
8    B256,
9};
10use derive_more::derive::Deref;
11use metrics::Histogram;
12use reth_errors::ProviderError;
13use reth_metrics::Metrics;
14use reth_provider::{
15    providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, FactoryTx,
16    StateCommitmentProvider,
17};
18use reth_revm::state::EvmState;
19use reth_trie::{
20    prefix_set::TriePrefixSetsMut, updates::TrieUpdatesSorted, DecodedMultiProof, HashedPostState,
21    HashedPostStateSorted, HashedStorage, MultiProofTargets, TrieInput,
22};
23use reth_trie_parallel::{proof::ParallelProof, proof_task::ProofTaskManagerHandle};
24use std::{
25    collections::{BTreeMap, VecDeque},
26    ops::DerefMut,
27    sync::{
28        mpsc::{channel, Receiver, Sender},
29        Arc,
30    },
31    time::{Duration, Instant},
32};
33use tracing::{debug, error, trace};
34
35/// The size of proof targets chunk to spawn in one calculation.
36const MULTIPROOF_TARGETS_CHUNK_SIZE: usize = 10;
37
38/// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the
39/// state.
40#[derive(Default, Debug)]
41pub struct SparseTrieUpdate {
42    /// The state update that was used to calculate the proof
43    pub(crate) state: HashedPostState,
44    /// The calculated multiproof
45    pub(crate) multiproof: DecodedMultiProof,
46}
47
48impl SparseTrieUpdate {
49    /// Returns true if the update is empty.
50    pub(super) fn is_empty(&self) -> bool {
51        self.state.is_empty() && self.multiproof.is_empty()
52    }
53
54    /// Construct update from multiproof.
55    #[cfg(test)]
56    pub(super) fn from_multiproof(multiproof: reth_trie::MultiProof) -> alloy_rlp::Result<Self> {
57        Ok(Self { multiproof: multiproof.try_into()?, ..Default::default() })
58    }
59
60    /// Extend update with contents of the other.
61    pub(super) fn extend(&mut self, other: Self) {
62        self.state.extend(other.state);
63        self.multiproof.extend(other.multiproof);
64    }
65}
66
67/// Common configuration for multi proof tasks
68#[derive(Debug, Clone)]
69pub(super) struct MultiProofConfig<Factory> {
70    /// View over the state in the database.
71    pub consistent_view: ConsistentDbView<Factory>,
72    /// The sorted collection of cached in-memory intermediate trie nodes that
73    /// can be reused for computation.
74    pub nodes_sorted: Arc<TrieUpdatesSorted>,
75    /// The sorted in-memory overlay hashed state.
76    pub state_sorted: Arc<HashedPostStateSorted>,
77    /// The collection of prefix sets for the computation. Since the prefix sets _always_
78    /// invalidate the in-memory nodes, not all keys from `state_sorted` might be present here,
79    /// if we have cached nodes for them.
80    pub prefix_sets: Arc<TriePrefixSetsMut>,
81}
82
83impl<Factory> MultiProofConfig<Factory> {
84    /// Creates a new state root config from the consistent view and the trie input.
85    pub(super) fn new_from_input(
86        consistent_view: ConsistentDbView<Factory>,
87        input: TrieInput,
88    ) -> Self {
89        Self {
90            consistent_view,
91            nodes_sorted: Arc::new(input.nodes.into_sorted()),
92            state_sorted: Arc::new(input.state.into_sorted()),
93            prefix_sets: Arc::new(input.prefix_sets),
94        }
95    }
96}
97
98/// Messages used internally by the multi proof task.
99#[derive(Debug)]
100pub(super) enum MultiProofMessage {
101    /// Prefetch proof targets
102    PrefetchProofs(MultiProofTargets),
103    /// New state update from transaction execution with its source
104    StateUpdate(StateChangeSource, EvmState),
105    /// State update that can be applied to the sparse trie without any new proofs.
106    ///
107    /// It can be the case when all accounts and storage slots from the state update were already
108    /// fetched and revealed.
109    EmptyProof {
110        /// The index of this proof in the sequence of state updates
111        sequence_number: u64,
112        /// The state update that was used to calculate the proof
113        state: HashedPostState,
114    },
115    /// Proof calculation completed for a specific state update
116    ProofCalculated(Box<ProofCalculated>),
117    /// Error during proof calculation
118    ProofCalculationError(ProviderError),
119    /// Signals state update stream end.
120    ///
121    /// This is triggered by block execution, indicating that no additional state updates are
122    /// expected.
123    FinishedStateUpdates,
124}
125
126/// Message about completion of proof calculation for a specific state update
127#[derive(Debug)]
128pub(super) struct ProofCalculated {
129    /// The index of this proof in the sequence of state updates
130    sequence_number: u64,
131    /// Sparse trie update
132    update: SparseTrieUpdate,
133    /// The time taken to calculate the proof.
134    elapsed: Duration,
135}
136
137/// Handle to track proof calculation ordering.
138#[derive(Debug, Default)]
139struct ProofSequencer {
140    /// The next proof sequence number to be produced.
141    next_sequence: u64,
142    /// The next sequence number expected to be delivered.
143    next_to_deliver: u64,
144    /// Buffer for out-of-order proofs and corresponding state updates
145    pending_proofs: BTreeMap<u64, SparseTrieUpdate>,
146}
147
148impl ProofSequencer {
149    /// Gets the next sequence number and increments the counter
150    const fn next_sequence(&mut self) -> u64 {
151        let seq = self.next_sequence;
152        self.next_sequence += 1;
153        seq
154    }
155
156    /// Adds a proof with the corresponding state update and returns all sequential proofs and state
157    /// updates if we have a continuous sequence
158    fn add_proof(&mut self, sequence: u64, update: SparseTrieUpdate) -> Vec<SparseTrieUpdate> {
159        if sequence >= self.next_to_deliver {
160            self.pending_proofs.insert(sequence, update);
161        }
162
163        // return early if we don't have the next expected proof
164        if !self.pending_proofs.contains_key(&self.next_to_deliver) {
165            return Vec::new()
166        }
167
168        let mut consecutive_proofs = Vec::with_capacity(self.pending_proofs.len());
169        let mut current_sequence = self.next_to_deliver;
170
171        // keep collecting proofs and state updates as long as we have consecutive sequence numbers
172        while let Some(pending) = self.pending_proofs.remove(&current_sequence) {
173            consecutive_proofs.push(pending);
174            current_sequence += 1;
175
176            // if we don't have the next number, stop collecting
177            if !self.pending_proofs.contains_key(&current_sequence) {
178                break;
179            }
180        }
181
182        self.next_to_deliver += consecutive_proofs.len() as u64;
183
184        consecutive_proofs
185    }
186
187    /// Returns true if we still have pending proofs
188    pub(crate) fn has_pending(&self) -> bool {
189        !self.pending_proofs.is_empty()
190    }
191}
192
193/// A wrapper for the sender that signals completion when dropped.
194///
195/// This type is intended to be used in combination with the evm executor statehook.
196/// This should trigger once the block has been executed (after) the last state update has been
197/// sent. This triggers the exit condition of the multi proof task.
198#[derive(Deref, Debug)]
199pub(super) struct StateHookSender(Sender<MultiProofMessage>);
200
201impl StateHookSender {
202    pub(crate) const fn new(inner: Sender<MultiProofMessage>) -> Self {
203        Self(inner)
204    }
205}
206
207impl Drop for StateHookSender {
208    fn drop(&mut self) {
209        // Send completion signal when the sender is dropped
210        let _ = self.0.send(MultiProofMessage::FinishedStateUpdates);
211    }
212}
213
214pub(crate) fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostState {
215    let mut hashed_state = HashedPostState::with_capacity(update.len());
216
217    for (address, account) in update {
218        if account.is_touched() {
219            let hashed_address = keccak256(address);
220            trace!(target: "engine::root", ?address, ?hashed_address, "Adding account to state update");
221
222            let destroyed = account.is_selfdestructed();
223            let info = if destroyed { None } else { Some(account.info.into()) };
224            hashed_state.accounts.insert(hashed_address, info);
225
226            let mut changed_storage_iter = account
227                .storage
228                .into_iter()
229                .filter(|(_slot, value)| value.is_changed())
230                .map(|(slot, value)| (keccak256(B256::from(slot)), value.present_value))
231                .peekable();
232
233            if destroyed {
234                hashed_state.storages.insert(hashed_address, HashedStorage::new(true));
235            } else if changed_storage_iter.peek().is_some() {
236                hashed_state
237                    .storages
238                    .insert(hashed_address, HashedStorage::from_iter(false, changed_storage_iter));
239            }
240        }
241    }
242
243    hashed_state
244}
245
246/// A pending multiproof task, either [`StorageMultiproofInput`] or [`MultiproofInput`].
247#[derive(Debug)]
248enum PendingMultiproofTask<Factory> {
249    /// A storage multiproof task input.
250    Storage(StorageMultiproofInput<Factory>),
251    /// A regular multiproof task input.
252    Regular(MultiproofInput<Factory>),
253}
254
255impl<Factory> PendingMultiproofTask<Factory> {
256    /// Returns the proof sequence number of the task.
257    const fn proof_sequence_number(&self) -> u64 {
258        match self {
259            Self::Storage(input) => input.proof_sequence_number,
260            Self::Regular(input) => input.proof_sequence_number,
261        }
262    }
263
264    /// Returns whether or not the proof targets are empty.
265    fn proof_targets_is_empty(&self) -> bool {
266        match self {
267            Self::Storage(input) => input.proof_targets.is_empty(),
268            Self::Regular(input) => input.proof_targets.is_empty(),
269        }
270    }
271
272    /// Destroys the input and sends a [`MultiProofMessage::EmptyProof`] message to the sender.
273    fn send_empty_proof(self) {
274        match self {
275            Self::Storage(input) => input.send_empty_proof(),
276            Self::Regular(input) => input.send_empty_proof(),
277        }
278    }
279}
280
281impl<Factory> From<StorageMultiproofInput<Factory>> for PendingMultiproofTask<Factory> {
282    fn from(input: StorageMultiproofInput<Factory>) -> Self {
283        Self::Storage(input)
284    }
285}
286
287impl<Factory> From<MultiproofInput<Factory>> for PendingMultiproofTask<Factory> {
288    fn from(input: MultiproofInput<Factory>) -> Self {
289        Self::Regular(input)
290    }
291}
292
293/// Input parameters for spawning a dedicated storage multiproof calculation.
294#[derive(Debug)]
295struct StorageMultiproofInput<Factory> {
296    config: MultiProofConfig<Factory>,
297    source: Option<StateChangeSource>,
298    hashed_state_update: HashedPostState,
299    hashed_address: B256,
300    proof_targets: B256Set,
301    proof_sequence_number: u64,
302    state_root_message_sender: Sender<MultiProofMessage>,
303}
304
305impl<Factory> StorageMultiproofInput<Factory> {
306    /// Destroys the input and sends a [`MultiProofMessage::EmptyProof`] message to the sender.
307    fn send_empty_proof(self) {
308        let _ = self.state_root_message_sender.send(MultiProofMessage::EmptyProof {
309            sequence_number: self.proof_sequence_number,
310            state: self.hashed_state_update,
311        });
312    }
313}
314
315/// Input parameters for spawning a multiproof calculation.
316#[derive(Debug)]
317struct MultiproofInput<Factory> {
318    config: MultiProofConfig<Factory>,
319    source: Option<StateChangeSource>,
320    hashed_state_update: HashedPostState,
321    proof_targets: MultiProofTargets,
322    proof_sequence_number: u64,
323    state_root_message_sender: Sender<MultiProofMessage>,
324}
325
326impl<Factory> MultiproofInput<Factory> {
327    /// Destroys the input and sends a [`MultiProofMessage::EmptyProof`] message to the sender.
328    fn send_empty_proof(self) {
329        let _ = self.state_root_message_sender.send(MultiProofMessage::EmptyProof {
330            sequence_number: self.proof_sequence_number,
331            state: self.hashed_state_update,
332        });
333    }
334}
335
336/// Manages concurrent multiproof calculations.
337/// Takes care of not having more calculations in flight than a given maximum
338/// concurrency, further calculation requests are queued and spawn later, after
339/// availability has been signaled.
340#[derive(Debug)]
341pub struct MultiproofManager<Factory: DatabaseProviderFactory> {
342    /// Maximum number of concurrent calculations.
343    max_concurrent: usize,
344    /// Currently running calculations.
345    inflight: usize,
346    /// Queued calculations.
347    pending: VecDeque<PendingMultiproofTask<Factory>>,
348    /// Executor for tasks
349    executor: WorkloadExecutor,
350    /// Sender to the storage proof task.
351    storage_proof_task_handle: ProofTaskManagerHandle<FactoryTx<Factory>>,
352    /// Metrics
353    metrics: MultiProofTaskMetrics,
354}
355
356impl<Factory> MultiproofManager<Factory>
357where
358    Factory:
359        DatabaseProviderFactory<Provider: BlockReader> + StateCommitmentProvider + Clone + 'static,
360{
361    /// Creates a new [`MultiproofManager`].
362    fn new(
363        executor: WorkloadExecutor,
364        metrics: MultiProofTaskMetrics,
365        storage_proof_task_handle: ProofTaskManagerHandle<FactoryTx<Factory>>,
366        max_concurrent: usize,
367    ) -> Self {
368        Self {
369            pending: VecDeque::with_capacity(max_concurrent),
370            max_concurrent,
371            executor,
372            inflight: 0,
373            metrics,
374            storage_proof_task_handle,
375        }
376    }
377
378    /// Spawns a new multiproof calculation or enqueues it for later if
379    /// `max_concurrent` are already inflight.
380    fn spawn_or_queue(&mut self, input: PendingMultiproofTask<Factory>) {
381        // If there are no proof targets, we can just send an empty multiproof back immediately
382        if input.proof_targets_is_empty() {
383            debug!(
384                sequence_number = input.proof_sequence_number(),
385                "No proof targets, sending empty multiproof back immediately"
386            );
387            input.send_empty_proof();
388            return
389        }
390
391        if self.inflight >= self.max_concurrent {
392            self.pending.push_back(input);
393            self.metrics.pending_multiproofs_histogram.record(self.pending.len() as f64);
394            return;
395        }
396
397        self.spawn_multiproof_task(input);
398    }
399
400    /// Signals that a multiproof calculation has finished and there's room to
401    /// spawn a new calculation if needed.
402    fn on_calculation_complete(&mut self) {
403        self.inflight = self.inflight.saturating_sub(1);
404        self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);
405
406        if let Some(input) = self.pending.pop_front() {
407            self.metrics.pending_multiproofs_histogram.record(self.pending.len() as f64);
408            self.spawn_multiproof_task(input);
409        }
410    }
411
412    /// Spawns a multiproof task, dispatching to `spawn_storage_proof` if the input is a storage
413    /// multiproof, and dispatching to `spawn_multiproof` otherwise.
414    fn spawn_multiproof_task(&mut self, input: PendingMultiproofTask<Factory>) {
415        match input {
416            PendingMultiproofTask::Storage(storage_input) => {
417                self.spawn_storage_proof(storage_input);
418            }
419            PendingMultiproofTask::Regular(multiproof_input) => {
420                self.spawn_multiproof(multiproof_input);
421            }
422        }
423    }
424
425    /// Spawns a single storage proof calculation task.
426    fn spawn_storage_proof(&mut self, storage_multiproof_input: StorageMultiproofInput<Factory>) {
427        let StorageMultiproofInput {
428            config,
429            source,
430            hashed_state_update,
431            hashed_address,
432            proof_targets,
433            proof_sequence_number,
434            state_root_message_sender,
435        } = storage_multiproof_input;
436
437        let storage_proof_task_handle = self.storage_proof_task_handle.clone();
438
439        self.executor.spawn_blocking(move || {
440            let storage_targets = proof_targets.len();
441
442            trace!(
443                target: "engine::root",
444                proof_sequence_number,
445                ?proof_targets,
446                storage_targets,
447                "Starting dedicated storage proof calculation",
448            );
449            let start = Instant::now();
450            let result = ParallelProof::new(
451                config.consistent_view,
452                config.nodes_sorted,
453                config.state_sorted,
454                config.prefix_sets,
455                storage_proof_task_handle.clone(),
456            )
457            .with_branch_node_masks(true)
458            .decoded_storage_proof(hashed_address, proof_targets);
459            let elapsed = start.elapsed();
460            trace!(
461                target: "engine::root",
462                proof_sequence_number,
463                ?elapsed,
464                ?source,
465                storage_targets,
466                "Storage multiproofs calculated",
467            );
468
469            match result {
470                Ok(proof) => {
471                    let _ = state_root_message_sender.send(MultiProofMessage::ProofCalculated(
472                        Box::new(ProofCalculated {
473                            sequence_number: proof_sequence_number,
474                            update: SparseTrieUpdate {
475                                state: hashed_state_update,
476                                multiproof: DecodedMultiProof::from_storage_proof(
477                                    hashed_address,
478                                    proof,
479                                ),
480                            },
481                            elapsed,
482                        }),
483                    ));
484                }
485                Err(error) => {
486                    let _ = state_root_message_sender
487                        .send(MultiProofMessage::ProofCalculationError(error.into()));
488                }
489            }
490        });
491
492        self.inflight += 1;
493        self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);
494    }
495
496    /// Spawns a single multiproof calculation task.
497    fn spawn_multiproof(&mut self, multiproof_input: MultiproofInput<Factory>) {
498        let MultiproofInput {
499            config,
500            source,
501            hashed_state_update,
502            proof_targets,
503            proof_sequence_number,
504            state_root_message_sender,
505        } = multiproof_input;
506        let storage_proof_task_handle = self.storage_proof_task_handle.clone();
507
508        self.executor.spawn_blocking(move || {
509            let account_targets = proof_targets.len();
510            let storage_targets = proof_targets.values().map(|slots| slots.len()).sum::<usize>();
511
512            trace!(
513                target: "engine::root",
514                proof_sequence_number,
515                ?proof_targets,
516                account_targets,
517                storage_targets,
518                "Starting multiproof calculation",
519            );
520            let start = Instant::now();
521            let result = ParallelProof::new(
522                config.consistent_view,
523                config.nodes_sorted,
524                config.state_sorted,
525                config.prefix_sets,
526                storage_proof_task_handle.clone(),
527            )
528            .with_branch_node_masks(true)
529            .decoded_multiproof(proof_targets);
530            let elapsed = start.elapsed();
531            trace!(
532                target: "engine::root",
533                proof_sequence_number,
534                ?elapsed,
535                ?source,
536                account_targets,
537                storage_targets,
538                "Multiproof calculated",
539            );
540
541            match result {
542                Ok(proof) => {
543                    let _ = state_root_message_sender.send(MultiProofMessage::ProofCalculated(
544                        Box::new(ProofCalculated {
545                            sequence_number: proof_sequence_number,
546                            update: SparseTrieUpdate {
547                                state: hashed_state_update,
548                                multiproof: proof,
549                            },
550                            elapsed,
551                        }),
552                    ));
553                }
554                Err(error) => {
555                    let _ = state_root_message_sender
556                        .send(MultiProofMessage::ProofCalculationError(error.into()));
557                }
558            }
559        });
560
561        self.inflight += 1;
562        self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);
563    }
564}
565
566#[derive(Metrics, Clone)]
567#[metrics(scope = "tree.root")]
568pub(crate) struct MultiProofTaskMetrics {
569    /// Histogram of inflight multiproofs.
570    pub inflight_multiproofs_histogram: Histogram,
571    /// Histogram of pending multiproofs.
572    pub pending_multiproofs_histogram: Histogram,
573
574    /// Histogram of the number of prefetch proof target accounts.
575    pub prefetch_proof_targets_accounts_histogram: Histogram,
576    /// Histogram of the number of prefetch proof target storages.
577    pub prefetch_proof_targets_storages_histogram: Histogram,
578    /// Histogram of the number of prefetch proof target chunks.
579    pub prefetch_proof_chunks_histogram: Histogram,
580
581    /// Histogram of the number of state update proof target accounts.
582    pub state_update_proof_targets_accounts_histogram: Histogram,
583    /// Histogram of the number of state update proof target storages.
584    pub state_update_proof_targets_storages_histogram: Histogram,
585    /// Histogram of the number of state update proof target chunks.
586    pub state_update_proof_chunks_histogram: Histogram,
587
588    /// Histogram of proof calculation durations.
589    pub proof_calculation_duration_histogram: Histogram,
590
591    /// Histogram of sparse trie update durations.
592    pub sparse_trie_update_duration_histogram: Histogram,
593    /// Histogram of sparse trie final update durations.
594    pub sparse_trie_final_update_duration_histogram: Histogram,
595    /// Histogram of sparse trie total durations.
596    pub sparse_trie_total_duration_histogram: Histogram,
597
598    /// Histogram of state updates received.
599    pub state_updates_received_histogram: Histogram,
600    /// Histogram of proofs processed.
601    pub proofs_processed_histogram: Histogram,
602    /// Histogram of total time spent in the multiproof task.
603    pub multiproof_task_total_duration_histogram: Histogram,
604    /// Total time spent waiting for the first state update or prefetch request.
605    pub first_update_wait_time_histogram: Histogram,
606    /// Total time spent waiting for the last proof result.
607    pub last_proof_wait_time_histogram: Histogram,
608}
609
610/// Standalone task that receives a transaction state stream and updates relevant
611/// data structures to calculate state root.
612///
613/// It is responsible of  initializing a blinded sparse trie and subscribe to
614/// transaction state stream. As it receives transaction execution results, it
615/// fetches the proofs for relevant accounts from the database and reveal them
616/// to the tree.
617/// Then it updates relevant leaves according to the result of the transaction.
618/// This feeds updates to the sparse trie task.
619#[derive(Debug)]
620pub(super) struct MultiProofTask<Factory: DatabaseProviderFactory> {
621    /// Task configuration.
622    config: MultiProofConfig<Factory>,
623    /// Receiver for state root related messages.
624    rx: Receiver<MultiProofMessage>,
625    /// Sender for state root related messages.
626    tx: Sender<MultiProofMessage>,
627    /// Sender for state updates emitted by this type.
628    to_sparse_trie: Sender<SparseTrieUpdate>,
629    /// Proof targets that have been already fetched.
630    fetched_proof_targets: MultiProofTargets,
631    /// Proof sequencing handler.
632    proof_sequencer: ProofSequencer,
633    /// Manages calculation of multiproofs.
634    multiproof_manager: MultiproofManager<Factory>,
635    /// multi proof task metrics
636    metrics: MultiProofTaskMetrics,
637}
638
639impl<Factory> MultiProofTask<Factory>
640where
641    Factory:
642        DatabaseProviderFactory<Provider: BlockReader> + StateCommitmentProvider + Clone + 'static,
643{
644    /// Creates a new multi proof task with the unified message channel
645    pub(super) fn new(
646        config: MultiProofConfig<Factory>,
647        executor: WorkloadExecutor,
648        proof_task_handle: ProofTaskManagerHandle<FactoryTx<Factory>>,
649        to_sparse_trie: Sender<SparseTrieUpdate>,
650        max_concurrency: usize,
651    ) -> Self {
652        let (tx, rx) = channel();
653        let metrics = MultiProofTaskMetrics::default();
654
655        Self {
656            config,
657            rx,
658            tx,
659            to_sparse_trie,
660            fetched_proof_targets: Default::default(),
661            proof_sequencer: ProofSequencer::default(),
662            multiproof_manager: MultiproofManager::new(
663                executor,
664                metrics.clone(),
665                proof_task_handle,
666                max_concurrency,
667            ),
668            metrics,
669        }
670    }
671
672    /// Returns a [`Sender`] that can be used to send arbitrary [`MultiProofMessage`]s to this task.
673    pub(super) fn state_root_message_sender(&self) -> Sender<MultiProofMessage> {
674        self.tx.clone()
675    }
676
677    /// Handles request for proof prefetch.
678    ///
679    /// Returns a number of proofs that were spawned.
680    fn on_prefetch_proof(&mut self, targets: MultiProofTargets) -> u64 {
681        let proof_targets = self.get_prefetch_proof_targets(targets);
682        self.fetched_proof_targets.extend_ref(&proof_targets);
683
684        self.metrics.prefetch_proof_targets_accounts_histogram.record(proof_targets.len() as f64);
685        self.metrics
686            .prefetch_proof_targets_storages_histogram
687            .record(proof_targets.values().map(|slots| slots.len()).sum::<usize>() as f64);
688
689        // Process proof targets in chunks.
690        let mut chunks = 0;
691        for proof_targets_chunk in proof_targets.chunks(MULTIPROOF_TARGETS_CHUNK_SIZE) {
692            self.multiproof_manager.spawn_or_queue(
693                MultiproofInput {
694                    config: self.config.clone(),
695                    source: None,
696                    hashed_state_update: Default::default(),
697                    proof_targets: proof_targets_chunk,
698                    proof_sequence_number: self.proof_sequencer.next_sequence(),
699                    state_root_message_sender: self.tx.clone(),
700                }
701                .into(),
702            );
703            chunks += 1;
704        }
705        self.metrics.prefetch_proof_chunks_histogram.record(chunks as f64);
706
707        chunks
708    }
709
710    // Returns true if all state updates finished and all proofs processed.
711    fn is_done(
712        &self,
713        proofs_processed: u64,
714        state_update_proofs_requested: u64,
715        prefetch_proofs_requested: u64,
716        updates_finished: bool,
717    ) -> bool {
718        let all_proofs_processed =
719            proofs_processed >= state_update_proofs_requested + prefetch_proofs_requested;
720        let no_pending = !self.proof_sequencer.has_pending();
721        debug!(
722            target: "engine::root",
723            proofs_processed,
724            state_update_proofs_requested,
725            prefetch_proofs_requested,
726            no_pending,
727            updates_finished,
728            "Checking end condition"
729        );
730        all_proofs_processed && no_pending && updates_finished
731    }
732
733    /// Calls `get_proof_targets` with existing proof targets for prefetching.
734    fn get_prefetch_proof_targets(&self, mut targets: MultiProofTargets) -> MultiProofTargets {
735        // Here we want to filter out any targets that are already fetched
736        //
737        // This means we need to remove any storage slots that have already been fetched
738        let mut duplicates = 0;
739
740        // First remove all storage targets that are subsets of already fetched storage slots
741        targets.retain(|hashed_address, target_storage| {
742            let keep = self
743                .fetched_proof_targets
744                .get(hashed_address)
745                // do NOT remove if None, because that means the account has not been fetched yet
746                .is_none_or(|fetched_storage| {
747                    // remove if a subset
748                    !target_storage.is_subset(fetched_storage)
749                });
750
751            if !keep {
752                duplicates += target_storage.len();
753            }
754
755            keep
756        });
757
758        // For all non-subset remaining targets, we have to calculate the difference
759        for (hashed_address, target_storage) in targets.deref_mut() {
760            let Some(fetched_storage) = self.fetched_proof_targets.get(hashed_address) else {
761                // this means the account has not been fetched yet, so we must fetch everything
762                // associated with this account
763                continue
764            };
765
766            let prev_target_storage_len = target_storage.len();
767
768            // keep only the storage slots that have not been fetched yet
769            //
770            // we already removed subsets, so this should only remove duplicates
771            target_storage.retain(|slot| !fetched_storage.contains(slot));
772
773            duplicates += prev_target_storage_len - target_storage.len();
774        }
775
776        if duplicates > 0 {
777            trace!(target: "engine::root", duplicates, "Removed duplicate prefetch proof targets");
778        }
779
780        targets
781    }
782
783    /// Handles state updates.
784    ///
785    /// Returns a number of proofs that were spawned.
786    fn on_state_update(&mut self, source: StateChangeSource, update: EvmState) -> u64 {
787        let hashed_state_update = evm_state_to_hashed_post_state(update);
788        // Split the state update into already fetched and not fetched according to the proof
789        // targets.
790        let (fetched_state_update, not_fetched_state_update) =
791            hashed_state_update.partition_by_targets(&self.fetched_proof_targets);
792
793        let mut state_updates = 0;
794        // If there are any accounts or storage slots that we already fetched the proofs for,
795        // send them immediately, as they don't require spawning any additional multiproofs.
796        if !fetched_state_update.is_empty() {
797            let _ = self.tx.send(MultiProofMessage::EmptyProof {
798                sequence_number: self.proof_sequencer.next_sequence(),
799                state: fetched_state_update,
800            });
801            state_updates += 1;
802        }
803
804        // Process state updates in chunks.
805        let mut chunks = 0;
806        let mut spawned_proof_targets = MultiProofTargets::default();
807        for chunk in not_fetched_state_update.chunks(MULTIPROOF_TARGETS_CHUNK_SIZE) {
808            let proof_targets = get_proof_targets(&chunk, &self.fetched_proof_targets);
809            spawned_proof_targets.extend_ref(&proof_targets);
810
811            self.multiproof_manager.spawn_or_queue(
812                MultiproofInput {
813                    config: self.config.clone(),
814                    source: Some(source),
815                    hashed_state_update: chunk,
816                    proof_targets,
817                    proof_sequence_number: self.proof_sequencer.next_sequence(),
818                    state_root_message_sender: self.tx.clone(),
819                }
820                .into(),
821            );
822            chunks += 1;
823        }
824
825        self.metrics
826            .state_update_proof_targets_accounts_histogram
827            .record(spawned_proof_targets.len() as f64);
828        self.metrics
829            .state_update_proof_targets_storages_histogram
830            .record(spawned_proof_targets.values().map(|slots| slots.len()).sum::<usize>() as f64);
831        self.metrics.state_update_proof_chunks_histogram.record(chunks as f64);
832
833        self.fetched_proof_targets.extend(spawned_proof_targets);
834
835        state_updates + chunks
836    }
837
838    /// Handler for new proof calculated, aggregates all the existing sequential proofs.
839    fn on_proof(
840        &mut self,
841        sequence_number: u64,
842        update: SparseTrieUpdate,
843    ) -> Option<SparseTrieUpdate> {
844        let ready_proofs = self.proof_sequencer.add_proof(sequence_number, update);
845
846        ready_proofs
847            .into_iter()
848            // Merge all ready proofs and state updates
849            .reduce(|mut acc_update, update| {
850                acc_update.extend(update);
851                acc_update
852            })
853            // Return None if the resulting proof is empty
854            .filter(|proof| !proof.is_empty())
855    }
856
857    /// Starts the main loop that handles all incoming messages, fetches proofs, applies them to the
858    /// sparse trie, updates the sparse trie, and eventually returns the state root.
859    ///
860    /// The lifecycle is the following:
861    /// 1. Either [`MultiProofMessage::PrefetchProofs`] or [`MultiProofMessage::StateUpdate`] is
862    ///    received from the engine.
863    ///    * For [`MultiProofMessage::StateUpdate`], the state update is hashed with
864    ///      [`evm_state_to_hashed_post_state`], and then (proof targets)[`MultiProofTargets`] are
865    ///      extracted with [`get_proof_targets`].
866    ///    * For both messages, proof targets are deduplicated according to `fetched_proof_targets`,
867    ///      so that the proofs for accounts and storage slots that were already fetched are not
868    ///      requested again.
869    /// 2. Using the proof targets, a new multiproof is calculated using
870    ///    [`MultiproofManager::spawn_or_queue`].
871    ///    * If the list of proof targets is empty, the [`MultiProofMessage::EmptyProof`] message is
872    ///      sent back to this task along with the original state update.
873    ///    * Otherwise, the multiproof is calculated and the [`MultiProofMessage::ProofCalculated`]
874    ///      message is sent back to this task along with the resulting multiproof, proof targets
875    ///      and original state update.
876    /// 3. Either [`MultiProofMessage::EmptyProof`] or [`MultiProofMessage::ProofCalculated`] is
877    ///    received.
878    ///    * The multiproof is added to the (proof sequencer)[`ProofSequencer`].
879    ///    * If the proof sequencer has a contiguous sequence of multiproofs in the same order as
880    ///      state updates arrived (i.e. transaction order), such sequence is returned.
881    /// 4. Once there's a sequence of contiguous multiproofs along with the proof targets and state
882    ///    updates associated with them, a [`SparseTrieUpdate`] is generated and sent to the sparse
883    ///    trie task.
884    /// 5. Steps above are repeated until this task receives a
885    ///    [`MultiProofMessage::FinishedStateUpdates`].
886    ///    * Once this message is received, on every [`MultiProofMessage::EmptyProof`] and
887    ///      [`MultiProofMessage::ProofCalculated`] message, we check if there are any proofs are
888    ///      currently being calculated, or if there are any pending proofs in the proof sequencer
889    ///      left to be revealed by checking the pending tasks.
890    /// 6. This task exits after all pending proofs are processed.
891    pub(crate) fn run(mut self) {
892        // TODO convert those into fields
893        let mut prefetch_proofs_requested = 0;
894        let mut state_update_proofs_requested = 0;
895        let mut proofs_processed = 0;
896
897        let mut updates_finished = false;
898
899        // Timestamp before the first state update or prefetch was received
900        let start = Instant::now();
901
902        // Timestamp when the first state update or prefetch was received
903        let mut first_update_time = None;
904        // Timestamp when state updates have finished
905        let mut updates_finished_time = None;
906
907        loop {
908            trace!(target: "engine::root", "entering main channel receiving loop");
909            match self.rx.recv() {
910                Ok(message) => match message {
911                    MultiProofMessage::PrefetchProofs(targets) => {
912                        trace!(target: "engine::root", "processing MultiProofMessage::PrefetchProofs");
913                        if first_update_time.is_none() {
914                            // record the wait time
915                            self.metrics
916                                .first_update_wait_time_histogram
917                                .record(start.elapsed().as_secs_f64());
918                            first_update_time = Some(Instant::now());
919                            debug!(target: "engine::root", "Started state root calculation");
920                        }
921
922                        let account_targets = targets.len();
923                        let storage_targets =
924                            targets.values().map(|slots| slots.len()).sum::<usize>();
925                        prefetch_proofs_requested += self.on_prefetch_proof(targets);
926                        debug!(
927                            target: "engine::root",
928                            account_targets,
929                            storage_targets,
930                            prefetch_proofs_requested,
931                            "Prefetching proofs"
932                        );
933                    }
934                    MultiProofMessage::StateUpdate(source, update) => {
935                        trace!(target: "engine::root", "processing MultiProofMessage::StateUpdate");
936                        if first_update_time.is_none() {
937                            // record the wait time
938                            self.metrics
939                                .first_update_wait_time_histogram
940                                .record(start.elapsed().as_secs_f64());
941                            first_update_time = Some(Instant::now());
942                            debug!(target: "engine::root", "Started state root calculation");
943                        }
944
945                        let len = update.len();
946                        state_update_proofs_requested += self.on_state_update(source, update);
947                        debug!(
948                            target: "engine::root",
949                            ?source,
950                            len,
951                            ?state_update_proofs_requested,
952                            "Received new state update"
953                        );
954                    }
955                    MultiProofMessage::FinishedStateUpdates => {
956                        trace!(target: "engine::root", "processing MultiProofMessage::FinishedStateUpdates");
957                        updates_finished = true;
958                        updates_finished_time = Some(Instant::now());
959                        if self.is_done(
960                            proofs_processed,
961                            state_update_proofs_requested,
962                            prefetch_proofs_requested,
963                            updates_finished,
964                        ) {
965                            debug!(
966                                target: "engine::root",
967                                "State updates finished and all proofs processed, ending calculation"
968                            );
969                            break
970                        }
971                    }
972                    MultiProofMessage::EmptyProof { sequence_number, state } => {
973                        trace!(target: "engine::root", "processing MultiProofMessage::EmptyProof");
974
975                        proofs_processed += 1;
976
977                        if let Some(combined_update) = self.on_proof(
978                            sequence_number,
979                            SparseTrieUpdate { state, multiproof: Default::default() },
980                        ) {
981                            let _ = self.to_sparse_trie.send(combined_update);
982                        }
983
984                        if self.is_done(
985                            proofs_processed,
986                            state_update_proofs_requested,
987                            prefetch_proofs_requested,
988                            updates_finished,
989                        ) {
990                            debug!(
991                                target: "engine::root",
992                                "State updates finished and all proofs processed, ending calculation"
993                            );
994                            break
995                        }
996                    }
997                    MultiProofMessage::ProofCalculated(proof_calculated) => {
998                        trace!(target: "engine::root", "processing
999        MultiProofMessage::ProofCalculated");
1000
1001                        // we increment proofs_processed for both state updates and prefetches,
1002                        // because both are used for the root termination condition.
1003                        proofs_processed += 1;
1004
1005                        self.metrics
1006                            .proof_calculation_duration_histogram
1007                            .record(proof_calculated.elapsed);
1008
1009                        debug!(
1010                            target: "engine::root",
1011                            sequence = proof_calculated.sequence_number,
1012                            total_proofs = proofs_processed,
1013                            "Processing calculated proof"
1014                        );
1015
1016                        self.multiproof_manager.on_calculation_complete();
1017
1018                        if let Some(combined_update) =
1019                            self.on_proof(proof_calculated.sequence_number, proof_calculated.update)
1020                        {
1021                            let _ = self.to_sparse_trie.send(combined_update);
1022                        }
1023
1024                        if self.is_done(
1025                            proofs_processed,
1026                            state_update_proofs_requested,
1027                            prefetch_proofs_requested,
1028                            updates_finished,
1029                        ) {
1030                            debug!(
1031                                target: "engine::root",
1032                                "State updates finished and all proofs processed, ending calculation");
1033                            break
1034                        }
1035                    }
1036                    MultiProofMessage::ProofCalculationError(err) => {
1037                        error!(
1038                            target: "engine::root",
1039                            ?err,
1040                            "proof calculation error"
1041                        );
1042                        return
1043                    }
1044                },
1045                Err(_) => {
1046                    // this means our internal message channel is closed, which shouldn't happen
1047                    // in normal operation since we hold both ends
1048                    error!(
1049                        target: "engine::root",
1050                        "Internal message channel closed unexpectedly"
1051                    );
1052                }
1053            }
1054        }
1055
1056        debug!(
1057            target: "engine::root",
1058            total_updates = state_update_proofs_requested,
1059            total_proofs = proofs_processed,
1060            total_time = ?first_update_time.map(|t|t.elapsed()),
1061            time_since_updates_finished = ?updates_finished_time.map(|t|t.elapsed()),
1062            "All proofs processed, ending calculation"
1063        );
1064
1065        // update total metrics on finish
1066        self.metrics.state_updates_received_histogram.record(state_update_proofs_requested as f64);
1067        self.metrics.proofs_processed_histogram.record(proofs_processed as f64);
1068        if let Some(total_time) = first_update_time.map(|t| t.elapsed()) {
1069            self.metrics.multiproof_task_total_duration_histogram.record(total_time);
1070        }
1071
1072        if let Some(updates_finished_time) = updates_finished_time {
1073            self.metrics
1074                .last_proof_wait_time_histogram
1075                .record(updates_finished_time.elapsed().as_secs_f64());
1076        }
1077    }
1078}
1079
1080/// Returns accounts only with those storages that were not already fetched, and
1081/// if there are no such storages and the account itself was already fetched, the
1082/// account shouldn't be included.
1083fn get_proof_targets(
1084    state_update: &HashedPostState,
1085    fetched_proof_targets: &MultiProofTargets,
1086) -> MultiProofTargets {
1087    let mut targets = MultiProofTargets::default();
1088
1089    // first collect all new accounts (not previously fetched)
1090    for &hashed_address in state_update.accounts.keys() {
1091        if !fetched_proof_targets.contains_key(&hashed_address) {
1092            targets.insert(hashed_address, HashSet::default());
1093        }
1094    }
1095
1096    // then process storage slots for all accounts in the state update
1097    for (hashed_address, storage) in &state_update.storages {
1098        let fetched = fetched_proof_targets.get(hashed_address);
1099        let mut changed_slots = storage
1100            .storage
1101            .keys()
1102            .filter(|slot| !fetched.is_some_and(|f| f.contains(*slot)))
1103            .peekable();
1104
1105        // If the storage is wiped, we still need to fetch the account proof.
1106        if storage.wiped && fetched.is_none() {
1107            targets.entry(*hashed_address).or_default();
1108        }
1109
1110        if changed_slots.peek().is_some() {
1111            targets.entry(*hashed_address).or_default().extend(changed_slots);
1112        }
1113    }
1114
1115    targets
1116}
1117
1118#[cfg(test)]
1119mod tests {
1120    use super::*;
1121    use alloy_primitives::map::B256Set;
1122    use reth_provider::{providers::ConsistentDbView, test_utils::create_test_provider_factory};
1123    use reth_trie::{MultiProof, TrieInput};
1124    use reth_trie_parallel::proof_task::{ProofTaskCtx, ProofTaskManager};
1125    use revm_primitives::{B256, U256};
1126    use std::sync::Arc;
1127
1128    fn create_state_root_config<F>(factory: F, input: TrieInput) -> MultiProofConfig<F>
1129    where
1130        F: DatabaseProviderFactory<Provider: BlockReader>
1131            + StateCommitmentProvider
1132            + Clone
1133            + 'static,
1134    {
1135        let consistent_view = ConsistentDbView::new(factory, None);
1136        let nodes_sorted = Arc::new(input.nodes.clone().into_sorted());
1137        let state_sorted = Arc::new(input.state.clone().into_sorted());
1138        let prefix_sets = Arc::new(input.prefix_sets);
1139
1140        MultiProofConfig { consistent_view, nodes_sorted, state_sorted, prefix_sets }
1141    }
1142
1143    fn create_test_state_root_task<F>(factory: F) -> MultiProofTask<F>
1144    where
1145        F: DatabaseProviderFactory<Provider: BlockReader>
1146            + StateCommitmentProvider
1147            + Clone
1148            + 'static,
1149    {
1150        let executor = WorkloadExecutor::default();
1151        let config = create_state_root_config(factory, TrieInput::default());
1152        let task_ctx = ProofTaskCtx::new(
1153            config.nodes_sorted.clone(),
1154            config.state_sorted.clone(),
1155            config.prefix_sets.clone(),
1156        );
1157        let proof_task = ProofTaskManager::new(
1158            executor.handle().clone(),
1159            config.consistent_view.clone(),
1160            task_ctx,
1161            1,
1162        );
1163        let channel = channel();
1164
1165        MultiProofTask::new(config, executor, proof_task.handle(), channel.0, 1)
1166    }
1167
1168    #[test]
1169    fn test_add_proof_in_sequence() {
1170        let mut sequencer = ProofSequencer::default();
1171        let proof1 = MultiProof::default();
1172        let proof2 = MultiProof::default();
1173        sequencer.next_sequence = 2;
1174
1175        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1176        assert_eq!(ready.len(), 1);
1177        assert!(!sequencer.has_pending());
1178
1179        let ready = sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proof2).unwrap());
1180        assert_eq!(ready.len(), 1);
1181        assert!(!sequencer.has_pending());
1182    }
1183
1184    #[test]
1185    fn test_add_proof_out_of_order() {
1186        let mut sequencer = ProofSequencer::default();
1187        let proof1 = MultiProof::default();
1188        let proof2 = MultiProof::default();
1189        let proof3 = MultiProof::default();
1190        sequencer.next_sequence = 3;
1191
1192        let ready = sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proof3).unwrap());
1193        assert_eq!(ready.len(), 0);
1194        assert!(sequencer.has_pending());
1195
1196        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1197        assert_eq!(ready.len(), 1);
1198        assert!(sequencer.has_pending());
1199
1200        let ready = sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proof2).unwrap());
1201        assert_eq!(ready.len(), 2);
1202        assert!(!sequencer.has_pending());
1203    }
1204
1205    #[test]
1206    fn test_add_proof_with_gaps() {
1207        let mut sequencer = ProofSequencer::default();
1208        let proof1 = MultiProof::default();
1209        let proof3 = MultiProof::default();
1210        sequencer.next_sequence = 3;
1211
1212        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1213        assert_eq!(ready.len(), 1);
1214
1215        let ready = sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proof3).unwrap());
1216        assert_eq!(ready.len(), 0);
1217        assert!(sequencer.has_pending());
1218    }
1219
1220    #[test]
1221    fn test_add_proof_duplicate_sequence() {
1222        let mut sequencer = ProofSequencer::default();
1223        let proof1 = MultiProof::default();
1224        let proof2 = MultiProof::default();
1225
1226        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1227        assert_eq!(ready.len(), 1);
1228
1229        let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof2).unwrap());
1230        assert_eq!(ready.len(), 0);
1231        assert!(!sequencer.has_pending());
1232    }
1233
1234    #[test]
1235    fn test_add_proof_batch_processing() {
1236        let mut sequencer = ProofSequencer::default();
1237        let proofs: Vec<_> = (0..5).map(|_| MultiProof::default()).collect();
1238        sequencer.next_sequence = 5;
1239
1240        sequencer.add_proof(4, SparseTrieUpdate::from_multiproof(proofs[4].clone()).unwrap());
1241        sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proofs[2].clone()).unwrap());
1242        sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proofs[1].clone()).unwrap());
1243        sequencer.add_proof(3, SparseTrieUpdate::from_multiproof(proofs[3].clone()).unwrap());
1244
1245        let ready =
1246            sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proofs[0].clone()).unwrap());
1247        assert_eq!(ready.len(), 5);
1248        assert!(!sequencer.has_pending());
1249    }
1250
1251    fn create_get_proof_targets_state() -> HashedPostState {
1252        let mut state = HashedPostState::default();
1253
1254        let addr1 = B256::random();
1255        let addr2 = B256::random();
1256        state.accounts.insert(addr1, Some(Default::default()));
1257        state.accounts.insert(addr2, Some(Default::default()));
1258
1259        let mut storage = HashedStorage::default();
1260        let slot1 = B256::random();
1261        let slot2 = B256::random();
1262        storage.storage.insert(slot1, U256::ZERO.into());
1263        storage.storage.insert(slot2, U256::from(1).into());
1264        state.storages.insert(addr1, storage);
1265
1266        state
1267    }
1268
1269    #[test]
1270    fn test_get_proof_targets_new_account_targets() {
1271        let state = create_get_proof_targets_state();
1272        let fetched = MultiProofTargets::default();
1273
1274        let targets = get_proof_targets(&state, &fetched);
1275
1276        // should return all accounts as targets since nothing was fetched before
1277        assert_eq!(targets.len(), state.accounts.len());
1278        for addr in state.accounts.keys() {
1279            assert!(targets.contains_key(addr));
1280        }
1281    }
1282
1283    #[test]
1284    fn test_get_proof_targets_new_storage_targets() {
1285        let state = create_get_proof_targets_state();
1286        let fetched = MultiProofTargets::default();
1287
1288        let targets = get_proof_targets(&state, &fetched);
1289
1290        // verify storage slots are included for accounts with storage
1291        for (addr, storage) in &state.storages {
1292            assert!(targets.contains_key(addr));
1293            let target_slots = &targets[addr];
1294            assert_eq!(target_slots.len(), storage.storage.len());
1295            for slot in storage.storage.keys() {
1296                assert!(target_slots.contains(slot));
1297            }
1298        }
1299    }
1300
1301    #[test]
1302    fn test_get_proof_targets_filter_already_fetched_accounts() {
1303        let state = create_get_proof_targets_state();
1304        let mut fetched = MultiProofTargets::default();
1305
1306        // select an account that has no storage updates
1307        let fetched_addr = state
1308            .accounts
1309            .keys()
1310            .find(|&&addr| !state.storages.contains_key(&addr))
1311            .expect("Should have an account without storage");
1312
1313        // mark the account as already fetched
1314        fetched.insert(*fetched_addr, HashSet::default());
1315
1316        let targets = get_proof_targets(&state, &fetched);
1317
1318        // should not include the already fetched account since it has no storage updates
1319        assert!(!targets.contains_key(fetched_addr));
1320        // other accounts should still be included
1321        assert_eq!(targets.len(), state.accounts.len() - 1);
1322    }
1323
1324    #[test]
1325    fn test_get_proof_targets_filter_already_fetched_storage() {
1326        let state = create_get_proof_targets_state();
1327        let mut fetched = MultiProofTargets::default();
1328
1329        // mark one storage slot as already fetched
1330        let (addr, storage) = state.storages.iter().next().unwrap();
1331        let mut fetched_slots = HashSet::default();
1332        let fetched_slot = *storage.storage.keys().next().unwrap();
1333        fetched_slots.insert(fetched_slot);
1334        fetched.insert(*addr, fetched_slots);
1335
1336        let targets = get_proof_targets(&state, &fetched);
1337
1338        // should not include the already fetched storage slot
1339        let target_slots = &targets[addr];
1340        assert!(!target_slots.contains(&fetched_slot));
1341        assert_eq!(target_slots.len(), storage.storage.len() - 1);
1342    }
1343
1344    #[test]
1345    fn test_get_proof_targets_empty_state() {
1346        let state = HashedPostState::default();
1347        let fetched = MultiProofTargets::default();
1348
1349        let targets = get_proof_targets(&state, &fetched);
1350
1351        assert!(targets.is_empty());
1352    }
1353
1354    #[test]
1355    fn test_get_proof_targets_mixed_fetched_state() {
1356        let mut state = HashedPostState::default();
1357        let mut fetched = MultiProofTargets::default();
1358
1359        let addr1 = B256::random();
1360        let addr2 = B256::random();
1361        let slot1 = B256::random();
1362        let slot2 = B256::random();
1363
1364        state.accounts.insert(addr1, Some(Default::default()));
1365        state.accounts.insert(addr2, Some(Default::default()));
1366
1367        let mut storage = HashedStorage::default();
1368        storage.storage.insert(slot1, U256::ZERO.into());
1369        storage.storage.insert(slot2, U256::from(1).into());
1370        state.storages.insert(addr1, storage);
1371
1372        let mut fetched_slots = HashSet::default();
1373        fetched_slots.insert(slot1);
1374        fetched.insert(addr1, fetched_slots);
1375
1376        let targets = get_proof_targets(&state, &fetched);
1377
1378        assert!(targets.contains_key(&addr2));
1379        assert!(!targets[&addr1].contains(&slot1));
1380        assert!(targets[&addr1].contains(&slot2));
1381    }
1382
1383    #[test]
1384    fn test_get_proof_targets_unmodified_account_with_storage() {
1385        let mut state = HashedPostState::default();
1386        let fetched = MultiProofTargets::default();
1387
1388        let addr = B256::random();
1389        let slot1 = B256::random();
1390        let slot2 = B256::random();
1391
1392        // don't add the account to state.accounts (simulating unmodified account)
1393        // but add storage updates for this account
1394        let mut storage = HashedStorage::default();
1395        storage.storage.insert(slot1, U256::from(1).into());
1396        storage.storage.insert(slot2, U256::from(2).into());
1397        state.storages.insert(addr, storage);
1398
1399        assert!(!state.accounts.contains_key(&addr));
1400        assert!(!fetched.contains_key(&addr));
1401
1402        let targets = get_proof_targets(&state, &fetched);
1403
1404        // verify that we still get the storage slots for the unmodified account
1405        assert!(targets.contains_key(&addr));
1406
1407        let target_slots = &targets[&addr];
1408        assert_eq!(target_slots.len(), 2);
1409        assert!(target_slots.contains(&slot1));
1410        assert!(target_slots.contains(&slot2));
1411    }
1412
1413    #[test]
1414    fn test_get_prefetch_proof_targets_no_duplicates() {
1415        let test_provider_factory = create_test_provider_factory();
1416        let mut test_state_root_task = create_test_state_root_task(test_provider_factory);
1417
1418        // populate some targets
1419        let mut targets = MultiProofTargets::default();
1420        let addr1 = B256::random();
1421        let addr2 = B256::random();
1422        let slot1 = B256::random();
1423        let slot2 = B256::random();
1424        targets.insert(addr1, vec![slot1].into_iter().collect());
1425        targets.insert(addr2, vec![slot2].into_iter().collect());
1426
1427        let prefetch_proof_targets =
1428            test_state_root_task.get_prefetch_proof_targets(targets.clone());
1429
1430        // check that the prefetch proof targets are the same because there are no fetched proof
1431        // targets yet
1432        assert_eq!(prefetch_proof_targets, targets);
1433
1434        // add a different addr and slot to fetched proof targets
1435        let addr3 = B256::random();
1436        let slot3 = B256::random();
1437        test_state_root_task.fetched_proof_targets.insert(addr3, vec![slot3].into_iter().collect());
1438
1439        let prefetch_proof_targets =
1440            test_state_root_task.get_prefetch_proof_targets(targets.clone());
1441
1442        // check that the prefetch proof targets are the same because the fetched proof targets
1443        // don't overlap with the prefetch targets
1444        assert_eq!(prefetch_proof_targets, targets);
1445    }
1446
1447    #[test]
1448    fn test_get_prefetch_proof_targets_remove_subset() {
1449        let test_provider_factory = create_test_provider_factory();
1450        let mut test_state_root_task = create_test_state_root_task(test_provider_factory);
1451
1452        // populate some targe
1453        let mut targets = MultiProofTargets::default();
1454        let addr1 = B256::random();
1455        let addr2 = B256::random();
1456        let slot1 = B256::random();
1457        let slot2 = B256::random();
1458        targets.insert(addr1, vec![slot1].into_iter().collect());
1459        targets.insert(addr2, vec![slot2].into_iter().collect());
1460
1461        // add a subset of the first target to fetched proof targets
1462        test_state_root_task.fetched_proof_targets.insert(addr1, vec![slot1].into_iter().collect());
1463
1464        let prefetch_proof_targets =
1465            test_state_root_task.get_prefetch_proof_targets(targets.clone());
1466
1467        // check that the prefetch proof targets do not include the subset
1468        assert_eq!(prefetch_proof_targets.len(), 1);
1469        assert!(!prefetch_proof_targets.contains_key(&addr1));
1470        assert!(prefetch_proof_targets.contains_key(&addr2));
1471
1472        // now add one more slot to the prefetch targets
1473        let slot3 = B256::random();
1474        targets.get_mut(&addr1).unwrap().insert(slot3);
1475
1476        let prefetch_proof_targets =
1477            test_state_root_task.get_prefetch_proof_targets(targets.clone());
1478
1479        // check that the prefetch proof targets do not include the subset
1480        // but include the new slot
1481        assert_eq!(prefetch_proof_targets.len(), 2);
1482        assert!(prefetch_proof_targets.contains_key(&addr1));
1483        assert_eq!(
1484            *prefetch_proof_targets.get(&addr1).unwrap(),
1485            vec![slot3].into_iter().collect::<B256Set>()
1486        );
1487        assert!(prefetch_proof_targets.contains_key(&addr2));
1488        assert_eq!(
1489            *prefetch_proof_targets.get(&addr2).unwrap(),
1490            vec![slot2].into_iter().collect::<B256Set>()
1491        );
1492    }
1493}