1use crate::tree::payload_processor::executor::WorkloadExecutor;
4use alloy_evm::block::StateChangeSource;
5use alloy_primitives::{
6 keccak256,
7 map::{B256Set, HashSet},
8 B256,
9};
10use derive_more::derive::Deref;
11use metrics::Histogram;
12use reth_errors::ProviderError;
13use reth_metrics::Metrics;
14use reth_provider::{
15 providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, FactoryTx,
16 StateCommitmentProvider,
17};
18use reth_revm::state::EvmState;
19use reth_trie::{
20 prefix_set::TriePrefixSetsMut, updates::TrieUpdatesSorted, DecodedMultiProof, HashedPostState,
21 HashedPostStateSorted, HashedStorage, MultiProofTargets, TrieInput,
22};
23use reth_trie_parallel::{proof::ParallelProof, proof_task::ProofTaskManagerHandle};
24use std::{
25 collections::{BTreeMap, VecDeque},
26 ops::DerefMut,
27 sync::{
28 mpsc::{channel, Receiver, Sender},
29 Arc,
30 },
31 time::{Duration, Instant},
32};
33use tracing::{debug, error, trace};
34
35const MULTIPROOF_TARGETS_CHUNK_SIZE: usize = 10;
37
38#[derive(Default, Debug)]
41pub struct SparseTrieUpdate {
42 pub(crate) state: HashedPostState,
44 pub(crate) multiproof: DecodedMultiProof,
46}
47
48impl SparseTrieUpdate {
49 pub(super) fn is_empty(&self) -> bool {
51 self.state.is_empty() && self.multiproof.is_empty()
52 }
53
54 #[cfg(test)]
56 pub(super) fn from_multiproof(multiproof: reth_trie::MultiProof) -> alloy_rlp::Result<Self> {
57 Ok(Self { multiproof: multiproof.try_into()?, ..Default::default() })
58 }
59
60 pub(super) fn extend(&mut self, other: Self) {
62 self.state.extend(other.state);
63 self.multiproof.extend(other.multiproof);
64 }
65}
66
67#[derive(Debug, Clone)]
69pub(super) struct MultiProofConfig<Factory> {
70 pub consistent_view: ConsistentDbView<Factory>,
72 pub nodes_sorted: Arc<TrieUpdatesSorted>,
75 pub state_sorted: Arc<HashedPostStateSorted>,
77 pub prefix_sets: Arc<TriePrefixSetsMut>,
81}
82
83impl<Factory> MultiProofConfig<Factory> {
84 pub(super) fn new_from_input(
86 consistent_view: ConsistentDbView<Factory>,
87 input: TrieInput,
88 ) -> Self {
89 Self {
90 consistent_view,
91 nodes_sorted: Arc::new(input.nodes.into_sorted()),
92 state_sorted: Arc::new(input.state.into_sorted()),
93 prefix_sets: Arc::new(input.prefix_sets),
94 }
95 }
96}
97
98#[derive(Debug)]
100pub(super) enum MultiProofMessage {
101 PrefetchProofs(MultiProofTargets),
103 StateUpdate(StateChangeSource, EvmState),
105 EmptyProof {
110 sequence_number: u64,
112 state: HashedPostState,
114 },
115 ProofCalculated(Box<ProofCalculated>),
117 ProofCalculationError(ProviderError),
119 FinishedStateUpdates,
124}
125
126#[derive(Debug)]
128pub(super) struct ProofCalculated {
129 sequence_number: u64,
131 update: SparseTrieUpdate,
133 elapsed: Duration,
135}
136
137#[derive(Debug, Default)]
139struct ProofSequencer {
140 next_sequence: u64,
142 next_to_deliver: u64,
144 pending_proofs: BTreeMap<u64, SparseTrieUpdate>,
146}
147
148impl ProofSequencer {
149 const fn next_sequence(&mut self) -> u64 {
151 let seq = self.next_sequence;
152 self.next_sequence += 1;
153 seq
154 }
155
156 fn add_proof(&mut self, sequence: u64, update: SparseTrieUpdate) -> Vec<SparseTrieUpdate> {
159 if sequence >= self.next_to_deliver {
160 self.pending_proofs.insert(sequence, update);
161 }
162
163 if !self.pending_proofs.contains_key(&self.next_to_deliver) {
165 return Vec::new()
166 }
167
168 let mut consecutive_proofs = Vec::with_capacity(self.pending_proofs.len());
169 let mut current_sequence = self.next_to_deliver;
170
171 while let Some(pending) = self.pending_proofs.remove(¤t_sequence) {
173 consecutive_proofs.push(pending);
174 current_sequence += 1;
175
176 if !self.pending_proofs.contains_key(¤t_sequence) {
178 break;
179 }
180 }
181
182 self.next_to_deliver += consecutive_proofs.len() as u64;
183
184 consecutive_proofs
185 }
186
187 pub(crate) fn has_pending(&self) -> bool {
189 !self.pending_proofs.is_empty()
190 }
191}
192
193#[derive(Deref, Debug)]
199pub(super) struct StateHookSender(Sender<MultiProofMessage>);
200
201impl StateHookSender {
202 pub(crate) const fn new(inner: Sender<MultiProofMessage>) -> Self {
203 Self(inner)
204 }
205}
206
207impl Drop for StateHookSender {
208 fn drop(&mut self) {
209 let _ = self.0.send(MultiProofMessage::FinishedStateUpdates);
211 }
212}
213
214pub(crate) fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostState {
215 let mut hashed_state = HashedPostState::with_capacity(update.len());
216
217 for (address, account) in update {
218 if account.is_touched() {
219 let hashed_address = keccak256(address);
220 trace!(target: "engine::root", ?address, ?hashed_address, "Adding account to state update");
221
222 let destroyed = account.is_selfdestructed();
223 let info = if destroyed { None } else { Some(account.info.into()) };
224 hashed_state.accounts.insert(hashed_address, info);
225
226 let mut changed_storage_iter = account
227 .storage
228 .into_iter()
229 .filter(|(_slot, value)| value.is_changed())
230 .map(|(slot, value)| (keccak256(B256::from(slot)), value.present_value))
231 .peekable();
232
233 if destroyed {
234 hashed_state.storages.insert(hashed_address, HashedStorage::new(true));
235 } else if changed_storage_iter.peek().is_some() {
236 hashed_state
237 .storages
238 .insert(hashed_address, HashedStorage::from_iter(false, changed_storage_iter));
239 }
240 }
241 }
242
243 hashed_state
244}
245
246#[derive(Debug)]
248enum PendingMultiproofTask<Factory> {
249 Storage(StorageMultiproofInput<Factory>),
251 Regular(MultiproofInput<Factory>),
253}
254
255impl<Factory> PendingMultiproofTask<Factory> {
256 const fn proof_sequence_number(&self) -> u64 {
258 match self {
259 Self::Storage(input) => input.proof_sequence_number,
260 Self::Regular(input) => input.proof_sequence_number,
261 }
262 }
263
264 fn proof_targets_is_empty(&self) -> bool {
266 match self {
267 Self::Storage(input) => input.proof_targets.is_empty(),
268 Self::Regular(input) => input.proof_targets.is_empty(),
269 }
270 }
271
272 fn send_empty_proof(self) {
274 match self {
275 Self::Storage(input) => input.send_empty_proof(),
276 Self::Regular(input) => input.send_empty_proof(),
277 }
278 }
279}
280
281impl<Factory> From<StorageMultiproofInput<Factory>> for PendingMultiproofTask<Factory> {
282 fn from(input: StorageMultiproofInput<Factory>) -> Self {
283 Self::Storage(input)
284 }
285}
286
287impl<Factory> From<MultiproofInput<Factory>> for PendingMultiproofTask<Factory> {
288 fn from(input: MultiproofInput<Factory>) -> Self {
289 Self::Regular(input)
290 }
291}
292
293#[derive(Debug)]
295struct StorageMultiproofInput<Factory> {
296 config: MultiProofConfig<Factory>,
297 source: Option<StateChangeSource>,
298 hashed_state_update: HashedPostState,
299 hashed_address: B256,
300 proof_targets: B256Set,
301 proof_sequence_number: u64,
302 state_root_message_sender: Sender<MultiProofMessage>,
303}
304
305impl<Factory> StorageMultiproofInput<Factory> {
306 fn send_empty_proof(self) {
308 let _ = self.state_root_message_sender.send(MultiProofMessage::EmptyProof {
309 sequence_number: self.proof_sequence_number,
310 state: self.hashed_state_update,
311 });
312 }
313}
314
315#[derive(Debug)]
317struct MultiproofInput<Factory> {
318 config: MultiProofConfig<Factory>,
319 source: Option<StateChangeSource>,
320 hashed_state_update: HashedPostState,
321 proof_targets: MultiProofTargets,
322 proof_sequence_number: u64,
323 state_root_message_sender: Sender<MultiProofMessage>,
324}
325
326impl<Factory> MultiproofInput<Factory> {
327 fn send_empty_proof(self) {
329 let _ = self.state_root_message_sender.send(MultiProofMessage::EmptyProof {
330 sequence_number: self.proof_sequence_number,
331 state: self.hashed_state_update,
332 });
333 }
334}
335
336#[derive(Debug)]
341pub struct MultiproofManager<Factory: DatabaseProviderFactory> {
342 max_concurrent: usize,
344 inflight: usize,
346 pending: VecDeque<PendingMultiproofTask<Factory>>,
348 executor: WorkloadExecutor,
350 storage_proof_task_handle: ProofTaskManagerHandle<FactoryTx<Factory>>,
352 metrics: MultiProofTaskMetrics,
354}
355
356impl<Factory> MultiproofManager<Factory>
357where
358 Factory:
359 DatabaseProviderFactory<Provider: BlockReader> + StateCommitmentProvider + Clone + 'static,
360{
361 fn new(
363 executor: WorkloadExecutor,
364 metrics: MultiProofTaskMetrics,
365 storage_proof_task_handle: ProofTaskManagerHandle<FactoryTx<Factory>>,
366 max_concurrent: usize,
367 ) -> Self {
368 Self {
369 pending: VecDeque::with_capacity(max_concurrent),
370 max_concurrent,
371 executor,
372 inflight: 0,
373 metrics,
374 storage_proof_task_handle,
375 }
376 }
377
378 fn spawn_or_queue(&mut self, input: PendingMultiproofTask<Factory>) {
381 if input.proof_targets_is_empty() {
383 debug!(
384 sequence_number = input.proof_sequence_number(),
385 "No proof targets, sending empty multiproof back immediately"
386 );
387 input.send_empty_proof();
388 return
389 }
390
391 if self.inflight >= self.max_concurrent {
392 self.pending.push_back(input);
393 self.metrics.pending_multiproofs_histogram.record(self.pending.len() as f64);
394 return;
395 }
396
397 self.spawn_multiproof_task(input);
398 }
399
400 fn on_calculation_complete(&mut self) {
403 self.inflight = self.inflight.saturating_sub(1);
404 self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);
405
406 if let Some(input) = self.pending.pop_front() {
407 self.metrics.pending_multiproofs_histogram.record(self.pending.len() as f64);
408 self.spawn_multiproof_task(input);
409 }
410 }
411
412 fn spawn_multiproof_task(&mut self, input: PendingMultiproofTask<Factory>) {
415 match input {
416 PendingMultiproofTask::Storage(storage_input) => {
417 self.spawn_storage_proof(storage_input);
418 }
419 PendingMultiproofTask::Regular(multiproof_input) => {
420 self.spawn_multiproof(multiproof_input);
421 }
422 }
423 }
424
425 fn spawn_storage_proof(&mut self, storage_multiproof_input: StorageMultiproofInput<Factory>) {
427 let StorageMultiproofInput {
428 config,
429 source,
430 hashed_state_update,
431 hashed_address,
432 proof_targets,
433 proof_sequence_number,
434 state_root_message_sender,
435 } = storage_multiproof_input;
436
437 let storage_proof_task_handle = self.storage_proof_task_handle.clone();
438
439 self.executor.spawn_blocking(move || {
440 let storage_targets = proof_targets.len();
441
442 trace!(
443 target: "engine::root",
444 proof_sequence_number,
445 ?proof_targets,
446 storage_targets,
447 "Starting dedicated storage proof calculation",
448 );
449 let start = Instant::now();
450 let result = ParallelProof::new(
451 config.consistent_view,
452 config.nodes_sorted,
453 config.state_sorted,
454 config.prefix_sets,
455 storage_proof_task_handle.clone(),
456 )
457 .with_branch_node_masks(true)
458 .decoded_storage_proof(hashed_address, proof_targets);
459 let elapsed = start.elapsed();
460 trace!(
461 target: "engine::root",
462 proof_sequence_number,
463 ?elapsed,
464 ?source,
465 storage_targets,
466 "Storage multiproofs calculated",
467 );
468
469 match result {
470 Ok(proof) => {
471 let _ = state_root_message_sender.send(MultiProofMessage::ProofCalculated(
472 Box::new(ProofCalculated {
473 sequence_number: proof_sequence_number,
474 update: SparseTrieUpdate {
475 state: hashed_state_update,
476 multiproof: DecodedMultiProof::from_storage_proof(
477 hashed_address,
478 proof,
479 ),
480 },
481 elapsed,
482 }),
483 ));
484 }
485 Err(error) => {
486 let _ = state_root_message_sender
487 .send(MultiProofMessage::ProofCalculationError(error.into()));
488 }
489 }
490 });
491
492 self.inflight += 1;
493 self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);
494 }
495
496 fn spawn_multiproof(&mut self, multiproof_input: MultiproofInput<Factory>) {
498 let MultiproofInput {
499 config,
500 source,
501 hashed_state_update,
502 proof_targets,
503 proof_sequence_number,
504 state_root_message_sender,
505 } = multiproof_input;
506 let storage_proof_task_handle = self.storage_proof_task_handle.clone();
507
508 self.executor.spawn_blocking(move || {
509 let account_targets = proof_targets.len();
510 let storage_targets = proof_targets.values().map(|slots| slots.len()).sum::<usize>();
511
512 trace!(
513 target: "engine::root",
514 proof_sequence_number,
515 ?proof_targets,
516 account_targets,
517 storage_targets,
518 "Starting multiproof calculation",
519 );
520 let start = Instant::now();
521 let result = ParallelProof::new(
522 config.consistent_view,
523 config.nodes_sorted,
524 config.state_sorted,
525 config.prefix_sets,
526 storage_proof_task_handle.clone(),
527 )
528 .with_branch_node_masks(true)
529 .decoded_multiproof(proof_targets);
530 let elapsed = start.elapsed();
531 trace!(
532 target: "engine::root",
533 proof_sequence_number,
534 ?elapsed,
535 ?source,
536 account_targets,
537 storage_targets,
538 "Multiproof calculated",
539 );
540
541 match result {
542 Ok(proof) => {
543 let _ = state_root_message_sender.send(MultiProofMessage::ProofCalculated(
544 Box::new(ProofCalculated {
545 sequence_number: proof_sequence_number,
546 update: SparseTrieUpdate {
547 state: hashed_state_update,
548 multiproof: proof,
549 },
550 elapsed,
551 }),
552 ));
553 }
554 Err(error) => {
555 let _ = state_root_message_sender
556 .send(MultiProofMessage::ProofCalculationError(error.into()));
557 }
558 }
559 });
560
561 self.inflight += 1;
562 self.metrics.inflight_multiproofs_histogram.record(self.inflight as f64);
563 }
564}
565
566#[derive(Metrics, Clone)]
567#[metrics(scope = "tree.root")]
568pub(crate) struct MultiProofTaskMetrics {
569 pub inflight_multiproofs_histogram: Histogram,
571 pub pending_multiproofs_histogram: Histogram,
573
574 pub prefetch_proof_targets_accounts_histogram: Histogram,
576 pub prefetch_proof_targets_storages_histogram: Histogram,
578 pub prefetch_proof_chunks_histogram: Histogram,
580
581 pub state_update_proof_targets_accounts_histogram: Histogram,
583 pub state_update_proof_targets_storages_histogram: Histogram,
585 pub state_update_proof_chunks_histogram: Histogram,
587
588 pub proof_calculation_duration_histogram: Histogram,
590
591 pub sparse_trie_update_duration_histogram: Histogram,
593 pub sparse_trie_final_update_duration_histogram: Histogram,
595 pub sparse_trie_total_duration_histogram: Histogram,
597
598 pub state_updates_received_histogram: Histogram,
600 pub proofs_processed_histogram: Histogram,
602 pub multiproof_task_total_duration_histogram: Histogram,
604 pub first_update_wait_time_histogram: Histogram,
606 pub last_proof_wait_time_histogram: Histogram,
608}
609
610#[derive(Debug)]
620pub(super) struct MultiProofTask<Factory: DatabaseProviderFactory> {
621 config: MultiProofConfig<Factory>,
623 rx: Receiver<MultiProofMessage>,
625 tx: Sender<MultiProofMessage>,
627 to_sparse_trie: Sender<SparseTrieUpdate>,
629 fetched_proof_targets: MultiProofTargets,
631 proof_sequencer: ProofSequencer,
633 multiproof_manager: MultiproofManager<Factory>,
635 metrics: MultiProofTaskMetrics,
637}
638
639impl<Factory> MultiProofTask<Factory>
640where
641 Factory:
642 DatabaseProviderFactory<Provider: BlockReader> + StateCommitmentProvider + Clone + 'static,
643{
644 pub(super) fn new(
646 config: MultiProofConfig<Factory>,
647 executor: WorkloadExecutor,
648 proof_task_handle: ProofTaskManagerHandle<FactoryTx<Factory>>,
649 to_sparse_trie: Sender<SparseTrieUpdate>,
650 max_concurrency: usize,
651 ) -> Self {
652 let (tx, rx) = channel();
653 let metrics = MultiProofTaskMetrics::default();
654
655 Self {
656 config,
657 rx,
658 tx,
659 to_sparse_trie,
660 fetched_proof_targets: Default::default(),
661 proof_sequencer: ProofSequencer::default(),
662 multiproof_manager: MultiproofManager::new(
663 executor,
664 metrics.clone(),
665 proof_task_handle,
666 max_concurrency,
667 ),
668 metrics,
669 }
670 }
671
672 pub(super) fn state_root_message_sender(&self) -> Sender<MultiProofMessage> {
674 self.tx.clone()
675 }
676
677 fn on_prefetch_proof(&mut self, targets: MultiProofTargets) -> u64 {
681 let proof_targets = self.get_prefetch_proof_targets(targets);
682 self.fetched_proof_targets.extend_ref(&proof_targets);
683
684 self.metrics.prefetch_proof_targets_accounts_histogram.record(proof_targets.len() as f64);
685 self.metrics
686 .prefetch_proof_targets_storages_histogram
687 .record(proof_targets.values().map(|slots| slots.len()).sum::<usize>() as f64);
688
689 let mut chunks = 0;
691 for proof_targets_chunk in proof_targets.chunks(MULTIPROOF_TARGETS_CHUNK_SIZE) {
692 self.multiproof_manager.spawn_or_queue(
693 MultiproofInput {
694 config: self.config.clone(),
695 source: None,
696 hashed_state_update: Default::default(),
697 proof_targets: proof_targets_chunk,
698 proof_sequence_number: self.proof_sequencer.next_sequence(),
699 state_root_message_sender: self.tx.clone(),
700 }
701 .into(),
702 );
703 chunks += 1;
704 }
705 self.metrics.prefetch_proof_chunks_histogram.record(chunks as f64);
706
707 chunks
708 }
709
710 fn is_done(
712 &self,
713 proofs_processed: u64,
714 state_update_proofs_requested: u64,
715 prefetch_proofs_requested: u64,
716 updates_finished: bool,
717 ) -> bool {
718 let all_proofs_processed =
719 proofs_processed >= state_update_proofs_requested + prefetch_proofs_requested;
720 let no_pending = !self.proof_sequencer.has_pending();
721 debug!(
722 target: "engine::root",
723 proofs_processed,
724 state_update_proofs_requested,
725 prefetch_proofs_requested,
726 no_pending,
727 updates_finished,
728 "Checking end condition"
729 );
730 all_proofs_processed && no_pending && updates_finished
731 }
732
733 fn get_prefetch_proof_targets(&self, mut targets: MultiProofTargets) -> MultiProofTargets {
735 let mut duplicates = 0;
739
740 targets.retain(|hashed_address, target_storage| {
742 let keep = self
743 .fetched_proof_targets
744 .get(hashed_address)
745 .is_none_or(|fetched_storage| {
747 !target_storage.is_subset(fetched_storage)
749 });
750
751 if !keep {
752 duplicates += target_storage.len();
753 }
754
755 keep
756 });
757
758 for (hashed_address, target_storage) in targets.deref_mut() {
760 let Some(fetched_storage) = self.fetched_proof_targets.get(hashed_address) else {
761 continue
764 };
765
766 let prev_target_storage_len = target_storage.len();
767
768 target_storage.retain(|slot| !fetched_storage.contains(slot));
772
773 duplicates += prev_target_storage_len - target_storage.len();
774 }
775
776 if duplicates > 0 {
777 trace!(target: "engine::root", duplicates, "Removed duplicate prefetch proof targets");
778 }
779
780 targets
781 }
782
783 fn on_state_update(&mut self, source: StateChangeSource, update: EvmState) -> u64 {
787 let hashed_state_update = evm_state_to_hashed_post_state(update);
788 let (fetched_state_update, not_fetched_state_update) =
791 hashed_state_update.partition_by_targets(&self.fetched_proof_targets);
792
793 let mut state_updates = 0;
794 if !fetched_state_update.is_empty() {
797 let _ = self.tx.send(MultiProofMessage::EmptyProof {
798 sequence_number: self.proof_sequencer.next_sequence(),
799 state: fetched_state_update,
800 });
801 state_updates += 1;
802 }
803
804 let mut chunks = 0;
806 let mut spawned_proof_targets = MultiProofTargets::default();
807 for chunk in not_fetched_state_update.chunks(MULTIPROOF_TARGETS_CHUNK_SIZE) {
808 let proof_targets = get_proof_targets(&chunk, &self.fetched_proof_targets);
809 spawned_proof_targets.extend_ref(&proof_targets);
810
811 self.multiproof_manager.spawn_or_queue(
812 MultiproofInput {
813 config: self.config.clone(),
814 source: Some(source),
815 hashed_state_update: chunk,
816 proof_targets,
817 proof_sequence_number: self.proof_sequencer.next_sequence(),
818 state_root_message_sender: self.tx.clone(),
819 }
820 .into(),
821 );
822 chunks += 1;
823 }
824
825 self.metrics
826 .state_update_proof_targets_accounts_histogram
827 .record(spawned_proof_targets.len() as f64);
828 self.metrics
829 .state_update_proof_targets_storages_histogram
830 .record(spawned_proof_targets.values().map(|slots| slots.len()).sum::<usize>() as f64);
831 self.metrics.state_update_proof_chunks_histogram.record(chunks as f64);
832
833 self.fetched_proof_targets.extend(spawned_proof_targets);
834
835 state_updates + chunks
836 }
837
838 fn on_proof(
840 &mut self,
841 sequence_number: u64,
842 update: SparseTrieUpdate,
843 ) -> Option<SparseTrieUpdate> {
844 let ready_proofs = self.proof_sequencer.add_proof(sequence_number, update);
845
846 ready_proofs
847 .into_iter()
848 .reduce(|mut acc_update, update| {
850 acc_update.extend(update);
851 acc_update
852 })
853 .filter(|proof| !proof.is_empty())
855 }
856
857 pub(crate) fn run(mut self) {
892 let mut prefetch_proofs_requested = 0;
894 let mut state_update_proofs_requested = 0;
895 let mut proofs_processed = 0;
896
897 let mut updates_finished = false;
898
899 let start = Instant::now();
901
902 let mut first_update_time = None;
904 let mut updates_finished_time = None;
906
907 loop {
908 trace!(target: "engine::root", "entering main channel receiving loop");
909 match self.rx.recv() {
910 Ok(message) => match message {
911 MultiProofMessage::PrefetchProofs(targets) => {
912 trace!(target: "engine::root", "processing MultiProofMessage::PrefetchProofs");
913 if first_update_time.is_none() {
914 self.metrics
916 .first_update_wait_time_histogram
917 .record(start.elapsed().as_secs_f64());
918 first_update_time = Some(Instant::now());
919 debug!(target: "engine::root", "Started state root calculation");
920 }
921
922 let account_targets = targets.len();
923 let storage_targets =
924 targets.values().map(|slots| slots.len()).sum::<usize>();
925 prefetch_proofs_requested += self.on_prefetch_proof(targets);
926 debug!(
927 target: "engine::root",
928 account_targets,
929 storage_targets,
930 prefetch_proofs_requested,
931 "Prefetching proofs"
932 );
933 }
934 MultiProofMessage::StateUpdate(source, update) => {
935 trace!(target: "engine::root", "processing MultiProofMessage::StateUpdate");
936 if first_update_time.is_none() {
937 self.metrics
939 .first_update_wait_time_histogram
940 .record(start.elapsed().as_secs_f64());
941 first_update_time = Some(Instant::now());
942 debug!(target: "engine::root", "Started state root calculation");
943 }
944
945 let len = update.len();
946 state_update_proofs_requested += self.on_state_update(source, update);
947 debug!(
948 target: "engine::root",
949 ?source,
950 len,
951 ?state_update_proofs_requested,
952 "Received new state update"
953 );
954 }
955 MultiProofMessage::FinishedStateUpdates => {
956 trace!(target: "engine::root", "processing MultiProofMessage::FinishedStateUpdates");
957 updates_finished = true;
958 updates_finished_time = Some(Instant::now());
959 if self.is_done(
960 proofs_processed,
961 state_update_proofs_requested,
962 prefetch_proofs_requested,
963 updates_finished,
964 ) {
965 debug!(
966 target: "engine::root",
967 "State updates finished and all proofs processed, ending calculation"
968 );
969 break
970 }
971 }
972 MultiProofMessage::EmptyProof { sequence_number, state } => {
973 trace!(target: "engine::root", "processing MultiProofMessage::EmptyProof");
974
975 proofs_processed += 1;
976
977 if let Some(combined_update) = self.on_proof(
978 sequence_number,
979 SparseTrieUpdate { state, multiproof: Default::default() },
980 ) {
981 let _ = self.to_sparse_trie.send(combined_update);
982 }
983
984 if self.is_done(
985 proofs_processed,
986 state_update_proofs_requested,
987 prefetch_proofs_requested,
988 updates_finished,
989 ) {
990 debug!(
991 target: "engine::root",
992 "State updates finished and all proofs processed, ending calculation"
993 );
994 break
995 }
996 }
997 MultiProofMessage::ProofCalculated(proof_calculated) => {
998 trace!(target: "engine::root", "processing
999 MultiProofMessage::ProofCalculated");
1000
1001 proofs_processed += 1;
1004
1005 self.metrics
1006 .proof_calculation_duration_histogram
1007 .record(proof_calculated.elapsed);
1008
1009 debug!(
1010 target: "engine::root",
1011 sequence = proof_calculated.sequence_number,
1012 total_proofs = proofs_processed,
1013 "Processing calculated proof"
1014 );
1015
1016 self.multiproof_manager.on_calculation_complete();
1017
1018 if let Some(combined_update) =
1019 self.on_proof(proof_calculated.sequence_number, proof_calculated.update)
1020 {
1021 let _ = self.to_sparse_trie.send(combined_update);
1022 }
1023
1024 if self.is_done(
1025 proofs_processed,
1026 state_update_proofs_requested,
1027 prefetch_proofs_requested,
1028 updates_finished,
1029 ) {
1030 debug!(
1031 target: "engine::root",
1032 "State updates finished and all proofs processed, ending calculation");
1033 break
1034 }
1035 }
1036 MultiProofMessage::ProofCalculationError(err) => {
1037 error!(
1038 target: "engine::root",
1039 ?err,
1040 "proof calculation error"
1041 );
1042 return
1043 }
1044 },
1045 Err(_) => {
1046 error!(
1049 target: "engine::root",
1050 "Internal message channel closed unexpectedly"
1051 );
1052 }
1053 }
1054 }
1055
1056 debug!(
1057 target: "engine::root",
1058 total_updates = state_update_proofs_requested,
1059 total_proofs = proofs_processed,
1060 total_time = ?first_update_time.map(|t|t.elapsed()),
1061 time_since_updates_finished = ?updates_finished_time.map(|t|t.elapsed()),
1062 "All proofs processed, ending calculation"
1063 );
1064
1065 self.metrics.state_updates_received_histogram.record(state_update_proofs_requested as f64);
1067 self.metrics.proofs_processed_histogram.record(proofs_processed as f64);
1068 if let Some(total_time) = first_update_time.map(|t| t.elapsed()) {
1069 self.metrics.multiproof_task_total_duration_histogram.record(total_time);
1070 }
1071
1072 if let Some(updates_finished_time) = updates_finished_time {
1073 self.metrics
1074 .last_proof_wait_time_histogram
1075 .record(updates_finished_time.elapsed().as_secs_f64());
1076 }
1077 }
1078}
1079
1080fn get_proof_targets(
1084 state_update: &HashedPostState,
1085 fetched_proof_targets: &MultiProofTargets,
1086) -> MultiProofTargets {
1087 let mut targets = MultiProofTargets::default();
1088
1089 for &hashed_address in state_update.accounts.keys() {
1091 if !fetched_proof_targets.contains_key(&hashed_address) {
1092 targets.insert(hashed_address, HashSet::default());
1093 }
1094 }
1095
1096 for (hashed_address, storage) in &state_update.storages {
1098 let fetched = fetched_proof_targets.get(hashed_address);
1099 let mut changed_slots = storage
1100 .storage
1101 .keys()
1102 .filter(|slot| !fetched.is_some_and(|f| f.contains(*slot)))
1103 .peekable();
1104
1105 if storage.wiped && fetched.is_none() {
1107 targets.entry(*hashed_address).or_default();
1108 }
1109
1110 if changed_slots.peek().is_some() {
1111 targets.entry(*hashed_address).or_default().extend(changed_slots);
1112 }
1113 }
1114
1115 targets
1116}
1117
1118#[cfg(test)]
1119mod tests {
1120 use super::*;
1121 use alloy_primitives::map::B256Set;
1122 use reth_provider::{providers::ConsistentDbView, test_utils::create_test_provider_factory};
1123 use reth_trie::{MultiProof, TrieInput};
1124 use reth_trie_parallel::proof_task::{ProofTaskCtx, ProofTaskManager};
1125 use revm_primitives::{B256, U256};
1126 use std::sync::Arc;
1127
1128 fn create_state_root_config<F>(factory: F, input: TrieInput) -> MultiProofConfig<F>
1129 where
1130 F: DatabaseProviderFactory<Provider: BlockReader>
1131 + StateCommitmentProvider
1132 + Clone
1133 + 'static,
1134 {
1135 let consistent_view = ConsistentDbView::new(factory, None);
1136 let nodes_sorted = Arc::new(input.nodes.clone().into_sorted());
1137 let state_sorted = Arc::new(input.state.clone().into_sorted());
1138 let prefix_sets = Arc::new(input.prefix_sets);
1139
1140 MultiProofConfig { consistent_view, nodes_sorted, state_sorted, prefix_sets }
1141 }
1142
1143 fn create_test_state_root_task<F>(factory: F) -> MultiProofTask<F>
1144 where
1145 F: DatabaseProviderFactory<Provider: BlockReader>
1146 + StateCommitmentProvider
1147 + Clone
1148 + 'static,
1149 {
1150 let executor = WorkloadExecutor::default();
1151 let config = create_state_root_config(factory, TrieInput::default());
1152 let task_ctx = ProofTaskCtx::new(
1153 config.nodes_sorted.clone(),
1154 config.state_sorted.clone(),
1155 config.prefix_sets.clone(),
1156 );
1157 let proof_task = ProofTaskManager::new(
1158 executor.handle().clone(),
1159 config.consistent_view.clone(),
1160 task_ctx,
1161 1,
1162 );
1163 let channel = channel();
1164
1165 MultiProofTask::new(config, executor, proof_task.handle(), channel.0, 1)
1166 }
1167
1168 #[test]
1169 fn test_add_proof_in_sequence() {
1170 let mut sequencer = ProofSequencer::default();
1171 let proof1 = MultiProof::default();
1172 let proof2 = MultiProof::default();
1173 sequencer.next_sequence = 2;
1174
1175 let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1176 assert_eq!(ready.len(), 1);
1177 assert!(!sequencer.has_pending());
1178
1179 let ready = sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proof2).unwrap());
1180 assert_eq!(ready.len(), 1);
1181 assert!(!sequencer.has_pending());
1182 }
1183
1184 #[test]
1185 fn test_add_proof_out_of_order() {
1186 let mut sequencer = ProofSequencer::default();
1187 let proof1 = MultiProof::default();
1188 let proof2 = MultiProof::default();
1189 let proof3 = MultiProof::default();
1190 sequencer.next_sequence = 3;
1191
1192 let ready = sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proof3).unwrap());
1193 assert_eq!(ready.len(), 0);
1194 assert!(sequencer.has_pending());
1195
1196 let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1197 assert_eq!(ready.len(), 1);
1198 assert!(sequencer.has_pending());
1199
1200 let ready = sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proof2).unwrap());
1201 assert_eq!(ready.len(), 2);
1202 assert!(!sequencer.has_pending());
1203 }
1204
1205 #[test]
1206 fn test_add_proof_with_gaps() {
1207 let mut sequencer = ProofSequencer::default();
1208 let proof1 = MultiProof::default();
1209 let proof3 = MultiProof::default();
1210 sequencer.next_sequence = 3;
1211
1212 let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1213 assert_eq!(ready.len(), 1);
1214
1215 let ready = sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proof3).unwrap());
1216 assert_eq!(ready.len(), 0);
1217 assert!(sequencer.has_pending());
1218 }
1219
1220 #[test]
1221 fn test_add_proof_duplicate_sequence() {
1222 let mut sequencer = ProofSequencer::default();
1223 let proof1 = MultiProof::default();
1224 let proof2 = MultiProof::default();
1225
1226 let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof1).unwrap());
1227 assert_eq!(ready.len(), 1);
1228
1229 let ready = sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proof2).unwrap());
1230 assert_eq!(ready.len(), 0);
1231 assert!(!sequencer.has_pending());
1232 }
1233
1234 #[test]
1235 fn test_add_proof_batch_processing() {
1236 let mut sequencer = ProofSequencer::default();
1237 let proofs: Vec<_> = (0..5).map(|_| MultiProof::default()).collect();
1238 sequencer.next_sequence = 5;
1239
1240 sequencer.add_proof(4, SparseTrieUpdate::from_multiproof(proofs[4].clone()).unwrap());
1241 sequencer.add_proof(2, SparseTrieUpdate::from_multiproof(proofs[2].clone()).unwrap());
1242 sequencer.add_proof(1, SparseTrieUpdate::from_multiproof(proofs[1].clone()).unwrap());
1243 sequencer.add_proof(3, SparseTrieUpdate::from_multiproof(proofs[3].clone()).unwrap());
1244
1245 let ready =
1246 sequencer.add_proof(0, SparseTrieUpdate::from_multiproof(proofs[0].clone()).unwrap());
1247 assert_eq!(ready.len(), 5);
1248 assert!(!sequencer.has_pending());
1249 }
1250
1251 fn create_get_proof_targets_state() -> HashedPostState {
1252 let mut state = HashedPostState::default();
1253
1254 let addr1 = B256::random();
1255 let addr2 = B256::random();
1256 state.accounts.insert(addr1, Some(Default::default()));
1257 state.accounts.insert(addr2, Some(Default::default()));
1258
1259 let mut storage = HashedStorage::default();
1260 let slot1 = B256::random();
1261 let slot2 = B256::random();
1262 storage.storage.insert(slot1, U256::ZERO.into());
1263 storage.storage.insert(slot2, U256::from(1).into());
1264 state.storages.insert(addr1, storage);
1265
1266 state
1267 }
1268
1269 #[test]
1270 fn test_get_proof_targets_new_account_targets() {
1271 let state = create_get_proof_targets_state();
1272 let fetched = MultiProofTargets::default();
1273
1274 let targets = get_proof_targets(&state, &fetched);
1275
1276 assert_eq!(targets.len(), state.accounts.len());
1278 for addr in state.accounts.keys() {
1279 assert!(targets.contains_key(addr));
1280 }
1281 }
1282
1283 #[test]
1284 fn test_get_proof_targets_new_storage_targets() {
1285 let state = create_get_proof_targets_state();
1286 let fetched = MultiProofTargets::default();
1287
1288 let targets = get_proof_targets(&state, &fetched);
1289
1290 for (addr, storage) in &state.storages {
1292 assert!(targets.contains_key(addr));
1293 let target_slots = &targets[addr];
1294 assert_eq!(target_slots.len(), storage.storage.len());
1295 for slot in storage.storage.keys() {
1296 assert!(target_slots.contains(slot));
1297 }
1298 }
1299 }
1300
1301 #[test]
1302 fn test_get_proof_targets_filter_already_fetched_accounts() {
1303 let state = create_get_proof_targets_state();
1304 let mut fetched = MultiProofTargets::default();
1305
1306 let fetched_addr = state
1308 .accounts
1309 .keys()
1310 .find(|&&addr| !state.storages.contains_key(&addr))
1311 .expect("Should have an account without storage");
1312
1313 fetched.insert(*fetched_addr, HashSet::default());
1315
1316 let targets = get_proof_targets(&state, &fetched);
1317
1318 assert!(!targets.contains_key(fetched_addr));
1320 assert_eq!(targets.len(), state.accounts.len() - 1);
1322 }
1323
1324 #[test]
1325 fn test_get_proof_targets_filter_already_fetched_storage() {
1326 let state = create_get_proof_targets_state();
1327 let mut fetched = MultiProofTargets::default();
1328
1329 let (addr, storage) = state.storages.iter().next().unwrap();
1331 let mut fetched_slots = HashSet::default();
1332 let fetched_slot = *storage.storage.keys().next().unwrap();
1333 fetched_slots.insert(fetched_slot);
1334 fetched.insert(*addr, fetched_slots);
1335
1336 let targets = get_proof_targets(&state, &fetched);
1337
1338 let target_slots = &targets[addr];
1340 assert!(!target_slots.contains(&fetched_slot));
1341 assert_eq!(target_slots.len(), storage.storage.len() - 1);
1342 }
1343
1344 #[test]
1345 fn test_get_proof_targets_empty_state() {
1346 let state = HashedPostState::default();
1347 let fetched = MultiProofTargets::default();
1348
1349 let targets = get_proof_targets(&state, &fetched);
1350
1351 assert!(targets.is_empty());
1352 }
1353
1354 #[test]
1355 fn test_get_proof_targets_mixed_fetched_state() {
1356 let mut state = HashedPostState::default();
1357 let mut fetched = MultiProofTargets::default();
1358
1359 let addr1 = B256::random();
1360 let addr2 = B256::random();
1361 let slot1 = B256::random();
1362 let slot2 = B256::random();
1363
1364 state.accounts.insert(addr1, Some(Default::default()));
1365 state.accounts.insert(addr2, Some(Default::default()));
1366
1367 let mut storage = HashedStorage::default();
1368 storage.storage.insert(slot1, U256::ZERO.into());
1369 storage.storage.insert(slot2, U256::from(1).into());
1370 state.storages.insert(addr1, storage);
1371
1372 let mut fetched_slots = HashSet::default();
1373 fetched_slots.insert(slot1);
1374 fetched.insert(addr1, fetched_slots);
1375
1376 let targets = get_proof_targets(&state, &fetched);
1377
1378 assert!(targets.contains_key(&addr2));
1379 assert!(!targets[&addr1].contains(&slot1));
1380 assert!(targets[&addr1].contains(&slot2));
1381 }
1382
1383 #[test]
1384 fn test_get_proof_targets_unmodified_account_with_storage() {
1385 let mut state = HashedPostState::default();
1386 let fetched = MultiProofTargets::default();
1387
1388 let addr = B256::random();
1389 let slot1 = B256::random();
1390 let slot2 = B256::random();
1391
1392 let mut storage = HashedStorage::default();
1395 storage.storage.insert(slot1, U256::from(1).into());
1396 storage.storage.insert(slot2, U256::from(2).into());
1397 state.storages.insert(addr, storage);
1398
1399 assert!(!state.accounts.contains_key(&addr));
1400 assert!(!fetched.contains_key(&addr));
1401
1402 let targets = get_proof_targets(&state, &fetched);
1403
1404 assert!(targets.contains_key(&addr));
1406
1407 let target_slots = &targets[&addr];
1408 assert_eq!(target_slots.len(), 2);
1409 assert!(target_slots.contains(&slot1));
1410 assert!(target_slots.contains(&slot2));
1411 }
1412
1413 #[test]
1414 fn test_get_prefetch_proof_targets_no_duplicates() {
1415 let test_provider_factory = create_test_provider_factory();
1416 let mut test_state_root_task = create_test_state_root_task(test_provider_factory);
1417
1418 let mut targets = MultiProofTargets::default();
1420 let addr1 = B256::random();
1421 let addr2 = B256::random();
1422 let slot1 = B256::random();
1423 let slot2 = B256::random();
1424 targets.insert(addr1, vec![slot1].into_iter().collect());
1425 targets.insert(addr2, vec![slot2].into_iter().collect());
1426
1427 let prefetch_proof_targets =
1428 test_state_root_task.get_prefetch_proof_targets(targets.clone());
1429
1430 assert_eq!(prefetch_proof_targets, targets);
1433
1434 let addr3 = B256::random();
1436 let slot3 = B256::random();
1437 test_state_root_task.fetched_proof_targets.insert(addr3, vec![slot3].into_iter().collect());
1438
1439 let prefetch_proof_targets =
1440 test_state_root_task.get_prefetch_proof_targets(targets.clone());
1441
1442 assert_eq!(prefetch_proof_targets, targets);
1445 }
1446
1447 #[test]
1448 fn test_get_prefetch_proof_targets_remove_subset() {
1449 let test_provider_factory = create_test_provider_factory();
1450 let mut test_state_root_task = create_test_state_root_task(test_provider_factory);
1451
1452 let mut targets = MultiProofTargets::default();
1454 let addr1 = B256::random();
1455 let addr2 = B256::random();
1456 let slot1 = B256::random();
1457 let slot2 = B256::random();
1458 targets.insert(addr1, vec![slot1].into_iter().collect());
1459 targets.insert(addr2, vec![slot2].into_iter().collect());
1460
1461 test_state_root_task.fetched_proof_targets.insert(addr1, vec![slot1].into_iter().collect());
1463
1464 let prefetch_proof_targets =
1465 test_state_root_task.get_prefetch_proof_targets(targets.clone());
1466
1467 assert_eq!(prefetch_proof_targets.len(), 1);
1469 assert!(!prefetch_proof_targets.contains_key(&addr1));
1470 assert!(prefetch_proof_targets.contains_key(&addr2));
1471
1472 let slot3 = B256::random();
1474 targets.get_mut(&addr1).unwrap().insert(slot3);
1475
1476 let prefetch_proof_targets =
1477 test_state_root_task.get_prefetch_proof_targets(targets.clone());
1478
1479 assert_eq!(prefetch_proof_targets.len(), 2);
1482 assert!(prefetch_proof_targets.contains_key(&addr1));
1483 assert_eq!(
1484 *prefetch_proof_targets.get(&addr1).unwrap(),
1485 vec![slot3].into_iter().collect::<B256Set>()
1486 );
1487 assert!(prefetch_proof_targets.contains_key(&addr2));
1488 assert_eq!(
1489 *prefetch_proof_targets.get(&addr2).unwrap(),
1490 vec![slot2].into_iter().collect::<B256Set>()
1491 );
1492 }
1493}