reth_engine_tree/tree/payload_processor/
sparse_trie.rs1use crate::tree::payload_processor::{
4 executor::WorkloadExecutor,
5 multiproof::{MultiProofTaskMetrics, SparseTrieUpdate},
6};
7use alloy_primitives::B256;
8use rayon::iter::{ParallelBridge, ParallelIterator};
9use reth_trie::{updates::TrieUpdates, Nibbles};
10use reth_trie_parallel::root::ParallelStateRootError;
11use reth_trie_sparse::{
12 blinded::{BlindedProvider, BlindedProviderFactory},
13 errors::{SparseStateTrieResult, SparseTrieErrorKind},
14 SparseStateTrie,
15};
16use std::{
17 sync::mpsc,
18 time::{Duration, Instant},
19};
20use tracing::{debug, trace, trace_span};
21
22const SPARSE_TRIE_INCREMENTAL_LEVEL: usize = 2;
25
26pub(super) struct SparseTrieTask<BPF>
28where
29 BPF: BlindedProviderFactory + Send + Sync,
30 BPF::AccountNodeProvider: BlindedProvider + Send + Sync,
31 BPF::StorageNodeProvider: BlindedProvider + Send + Sync,
32{
33 #[expect(unused)] pub(super) executor: WorkloadExecutor,
36 pub(super) updates: mpsc::Receiver<SparseTrieUpdate>,
38 pub(super) trie: SparseStateTrie<BPF>,
42 pub(super) metrics: MultiProofTaskMetrics,
43}
44
45impl<BPF> SparseTrieTask<BPF>
46where
47 BPF: BlindedProviderFactory + Send + Sync,
48 BPF::AccountNodeProvider: BlindedProvider + Send + Sync,
49 BPF::StorageNodeProvider: BlindedProvider + Send + Sync,
50{
51 pub(super) fn new(
53 executor: WorkloadExecutor,
54 updates: mpsc::Receiver<SparseTrieUpdate>,
55 blinded_provider_factory: BPF,
56 metrics: MultiProofTaskMetrics,
57 ) -> Self {
58 Self {
59 executor,
60 updates,
61 metrics,
62 trie: SparseStateTrie::new(blinded_provider_factory).with_updates(true),
63 }
64 }
65
66 pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
75 let now = Instant::now();
76
77 let mut num_iterations = 0;
78
79 while let Ok(mut update) = self.updates.recv() {
80 num_iterations += 1;
81 let mut num_updates = 1;
82 while let Ok(next) = self.updates.try_recv() {
83 update.extend(next);
84 num_updates += 1;
85 }
86
87 debug!(
88 target: "engine::root",
89 num_updates,
90 account_proofs = update.multiproof.account_subtree.len(),
91 storage_proofs = update.multiproof.storages.len(),
92 "Updating sparse trie"
93 );
94
95 let elapsed = update_sparse_trie(&mut self.trie, update).map_err(|e| {
96 ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
97 })?;
98 self.metrics.sparse_trie_update_duration_histogram.record(elapsed);
99 trace!(target: "engine::root", ?elapsed, num_iterations, "Root calculation completed");
100 }
101
102 debug!(target: "engine::root", num_iterations, "All proofs processed, ending calculation");
103
104 let start = Instant::now();
105 let (state_root, trie_updates) = self.trie.root_with_updates().map_err(|e| {
106 ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
107 })?;
108
109 self.metrics.sparse_trie_final_update_duration_histogram.record(start.elapsed());
110 self.metrics.sparse_trie_total_duration_histogram.record(now.elapsed());
111
112 Ok(StateRootComputeOutcome { state_root, trie_updates })
113 }
114}
115
116#[derive(Debug)]
119pub struct StateRootComputeOutcome {
120 pub state_root: B256,
122 pub trie_updates: TrieUpdates,
124}
125
126pub(crate) fn update_sparse_trie<BPF>(
128 trie: &mut SparseStateTrie<BPF>,
129 SparseTrieUpdate { mut state, multiproof }: SparseTrieUpdate,
130) -> SparseStateTrieResult<Duration>
131where
132 BPF: BlindedProviderFactory + Send + Sync,
133 BPF::AccountNodeProvider: BlindedProvider + Send + Sync,
134 BPF::StorageNodeProvider: BlindedProvider + Send + Sync,
135{
136 trace!(target: "engine::root::sparse", "Updating sparse trie");
137 let started_at = Instant::now();
138
139 trie.reveal_decoded_multiproof(multiproof)?;
141 let reveal_multiproof_elapsed = started_at.elapsed();
142 trace!(
143 target: "engine::root::sparse",
144 ?reveal_multiproof_elapsed,
145 "Done revealing multiproof"
146 );
147
148 let (tx, rx) = mpsc::channel();
150 state
151 .storages
152 .into_iter()
153 .map(|(address, storage)| (address, storage, trie.take_storage_trie(&address)))
154 .par_bridge()
155 .map(|(address, storage, storage_trie)| {
156 let span = trace_span!(target: "engine::root::sparse", "Storage trie", ?address);
157 let _enter = span.enter();
158 trace!(target: "engine::root::sparse", "Updating storage");
159 let mut storage_trie = storage_trie.ok_or(SparseTrieErrorKind::Blind)?;
160
161 if storage.wiped {
162 trace!(target: "engine::root::sparse", "Wiping storage");
163 storage_trie.wipe()?;
164 }
165 for (slot, value) in storage.storage {
166 let slot_nibbles = Nibbles::unpack(slot);
167 if value.is_zero() {
168 trace!(target: "engine::root::sparse", ?slot, "Removing storage slot");
169 storage_trie.remove_leaf(&slot_nibbles)?;
170 } else {
171 trace!(target: "engine::root::sparse", ?slot, "Updating storage slot");
172 storage_trie.update_leaf(
173 slot_nibbles,
174 alloy_rlp::encode_fixed_size(&value.value).to_vec(),
175 value.is_private,
176 )?;
177 }
178 }
179
180 storage_trie.root();
181
182 SparseStateTrieResult::Ok((address, storage_trie))
183 })
184 .for_each_init(|| tx.clone(), |tx, result| tx.send(result).unwrap());
185 drop(tx);
186
187 for result in rx {
189 let (address, storage_trie) = result?;
190 trie.insert_storage_trie(address, storage_trie);
191
192 if let Some(account) = state.accounts.remove(&address) {
193 trace!(target: "engine::root::sparse", ?address, "Updating account and its storage root");
196 trie.update_account(address, account.unwrap_or_default())?;
197 } else if trie.is_account_revealed(address) {
198 trace!(target: "engine::root::sparse", ?address, "Updating account storage root");
200 trie.update_account_storage_root(address)?;
201 }
202 }
203
204 for (address, account) in state.accounts {
206 trace!(target: "engine::root::sparse", ?address, "Updating account");
207 trie.update_account(address, account.unwrap_or_default())?;
208 }
209
210 let elapsed_before = started_at.elapsed();
211 trace!(
212 target: "engine::root::sparse",
213 level=SPARSE_TRIE_INCREMENTAL_LEVEL,
214 "Calculating intermediate nodes below trie level"
215 );
216 trie.calculate_below_level(SPARSE_TRIE_INCREMENTAL_LEVEL);
217
218 let elapsed = started_at.elapsed();
219 let below_level_elapsed = elapsed - elapsed_before;
220 trace!(
221 target: "engine::root::sparse",
222 level=SPARSE_TRIE_INCREMENTAL_LEVEL,
223 ?below_level_elapsed,
224 "Intermediate nodes calculated"
225 );
226
227 Ok(elapsed)
228}