reth_engine_tree/tree/payload_processor/
sparse_trie.rs

1//! Sparse Trie task related functionality.
2
3use crate::tree::payload_processor::{
4    executor::WorkloadExecutor,
5    multiproof::{MultiProofTaskMetrics, SparseTrieUpdate},
6};
7use alloy_primitives::B256;
8use rayon::iter::{ParallelBridge, ParallelIterator};
9use reth_trie::{updates::TrieUpdates, Nibbles};
10use reth_trie_parallel::root::ParallelStateRootError;
11use reth_trie_sparse::{
12    blinded::{BlindedProvider, BlindedProviderFactory},
13    errors::{SparseStateTrieResult, SparseTrieErrorKind},
14    SparseStateTrie,
15};
16use std::{
17    sync::mpsc,
18    time::{Duration, Instant},
19};
20use tracing::{debug, trace, trace_span};
21
22/// The level below which the sparse trie hashes are calculated in
23/// [`update_sparse_trie`].
24const SPARSE_TRIE_INCREMENTAL_LEVEL: usize = 2;
25
26/// A task responsible for populating the sparse trie.
27pub(super) struct SparseTrieTask<BPF>
28where
29    BPF: BlindedProviderFactory + Send + Sync,
30    BPF::AccountNodeProvider: BlindedProvider + Send + Sync,
31    BPF::StorageNodeProvider: BlindedProvider + Send + Sync,
32{
33    /// Executor used to spawn subtasks.
34    #[expect(unused)] // TODO use this for spawning trie tasks
35    pub(super) executor: WorkloadExecutor,
36    /// Receives updates from the state root task.
37    pub(super) updates: mpsc::Receiver<SparseTrieUpdate>,
38    /// Sparse Trie initialized with the blinded provider factory.
39    ///
40    /// It's kept as a field on the struct to prevent blocking on de-allocation in [`Self::run`].
41    pub(super) trie: SparseStateTrie<BPF>,
42    pub(super) metrics: MultiProofTaskMetrics,
43}
44
45impl<BPF> SparseTrieTask<BPF>
46where
47    BPF: BlindedProviderFactory + Send + Sync,
48    BPF::AccountNodeProvider: BlindedProvider + Send + Sync,
49    BPF::StorageNodeProvider: BlindedProvider + Send + Sync,
50{
51    /// Creates a new sparse trie task.
52    pub(super) fn new(
53        executor: WorkloadExecutor,
54        updates: mpsc::Receiver<SparseTrieUpdate>,
55        blinded_provider_factory: BPF,
56        metrics: MultiProofTaskMetrics,
57    ) -> Self {
58        Self {
59            executor,
60            updates,
61            metrics,
62            trie: SparseStateTrie::new(blinded_provider_factory).with_updates(true),
63        }
64    }
65
66    /// Runs the sparse trie task to completion.
67    ///
68    /// This waits for new incoming [`SparseTrieUpdate`].
69    ///
70    /// This concludes once the last trie update has been received.
71    ///
72    /// NOTE: This function does not take `self` by value to prevent blocking on [`SparseStateTrie`]
73    /// drop.
74    pub(super) fn run(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
75        let now = Instant::now();
76
77        let mut num_iterations = 0;
78
79        while let Ok(mut update) = self.updates.recv() {
80            num_iterations += 1;
81            let mut num_updates = 1;
82            while let Ok(next) = self.updates.try_recv() {
83                update.extend(next);
84                num_updates += 1;
85            }
86
87            debug!(
88                target: "engine::root",
89                num_updates,
90                account_proofs = update.multiproof.account_subtree.len(),
91                storage_proofs = update.multiproof.storages.len(),
92                "Updating sparse trie"
93            );
94
95            let elapsed = update_sparse_trie(&mut self.trie, update).map_err(|e| {
96                ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
97            })?;
98            self.metrics.sparse_trie_update_duration_histogram.record(elapsed);
99            trace!(target: "engine::root", ?elapsed, num_iterations, "Root calculation completed");
100        }
101
102        debug!(target: "engine::root", num_iterations, "All proofs processed, ending calculation");
103
104        let start = Instant::now();
105        let (state_root, trie_updates) = self.trie.root_with_updates().map_err(|e| {
106            ParallelStateRootError::Other(format!("could not calculate state root: {e:?}"))
107        })?;
108
109        self.metrics.sparse_trie_final_update_duration_histogram.record(start.elapsed());
110        self.metrics.sparse_trie_total_duration_histogram.record(now.elapsed());
111
112        Ok(StateRootComputeOutcome { state_root, trie_updates })
113    }
114}
115
116/// Outcome of the state root computation, including the state root itself with
117/// the trie updates.
118#[derive(Debug)]
119pub struct StateRootComputeOutcome {
120    /// The state root.
121    pub state_root: B256,
122    /// The trie updates.
123    pub trie_updates: TrieUpdates,
124}
125
126/// Updates the sparse trie with the given proofs and state, and returns the elapsed time.
127pub(crate) fn update_sparse_trie<BPF>(
128    trie: &mut SparseStateTrie<BPF>,
129    SparseTrieUpdate { mut state, multiproof }: SparseTrieUpdate,
130) -> SparseStateTrieResult<Duration>
131where
132    BPF: BlindedProviderFactory + Send + Sync,
133    BPF::AccountNodeProvider: BlindedProvider + Send + Sync,
134    BPF::StorageNodeProvider: BlindedProvider + Send + Sync,
135{
136    trace!(target: "engine::root::sparse", "Updating sparse trie");
137    let started_at = Instant::now();
138
139    // Reveal new accounts and storage slots.
140    trie.reveal_decoded_multiproof(multiproof)?;
141    let reveal_multiproof_elapsed = started_at.elapsed();
142    trace!(
143        target: "engine::root::sparse",
144        ?reveal_multiproof_elapsed,
145        "Done revealing multiproof"
146    );
147
148    // Update storage slots with new values and calculate storage roots.
149    let (tx, rx) = mpsc::channel();
150    state
151        .storages
152        .into_iter()
153        .map(|(address, storage)| (address, storage, trie.take_storage_trie(&address)))
154        .par_bridge()
155        .map(|(address, storage, storage_trie)| {
156            let span = trace_span!(target: "engine::root::sparse", "Storage trie", ?address);
157            let _enter = span.enter();
158            trace!(target: "engine::root::sparse", "Updating storage");
159            let mut storage_trie = storage_trie.ok_or(SparseTrieErrorKind::Blind)?;
160
161            if storage.wiped {
162                trace!(target: "engine::root::sparse", "Wiping storage");
163                storage_trie.wipe()?;
164            }
165            for (slot, value) in storage.storage {
166                let slot_nibbles = Nibbles::unpack(slot);
167                if value.is_zero() {
168                    trace!(target: "engine::root::sparse", ?slot, "Removing storage slot");
169                    storage_trie.remove_leaf(&slot_nibbles)?;
170                } else {
171                    trace!(target: "engine::root::sparse", ?slot, "Updating storage slot");
172                    storage_trie.update_leaf(
173                        slot_nibbles,
174                        alloy_rlp::encode_fixed_size(&value.value).to_vec(),
175                        value.is_private,
176                    )?;
177                }
178            }
179
180            storage_trie.root();
181
182            SparseStateTrieResult::Ok((address, storage_trie))
183        })
184        .for_each_init(|| tx.clone(), |tx, result| tx.send(result).unwrap());
185    drop(tx);
186
187    // Update account storage roots
188    for result in rx {
189        let (address, storage_trie) = result?;
190        trie.insert_storage_trie(address, storage_trie);
191
192        if let Some(account) = state.accounts.remove(&address) {
193            // If the account itself has an update, remove it from the state update and update in
194            // one go instead of doing it down below.
195            trace!(target: "engine::root::sparse", ?address, "Updating account and its storage root");
196            trie.update_account(address, account.unwrap_or_default())?;
197        } else if trie.is_account_revealed(address) {
198            // Otherwise, if the account is revealed, only update its storage root.
199            trace!(target: "engine::root::sparse", ?address, "Updating account storage root");
200            trie.update_account_storage_root(address)?;
201        }
202    }
203
204    // Update accounts
205    for (address, account) in state.accounts {
206        trace!(target: "engine::root::sparse", ?address, "Updating account");
207        trie.update_account(address, account.unwrap_or_default())?;
208    }
209
210    let elapsed_before = started_at.elapsed();
211    trace!(
212        target: "engine::root::sparse",
213        level=SPARSE_TRIE_INCREMENTAL_LEVEL,
214        "Calculating intermediate nodes below trie level"
215    );
216    trie.calculate_below_level(SPARSE_TRIE_INCREMENTAL_LEVEL);
217
218    let elapsed = started_at.elapsed();
219    let below_level_elapsed = elapsed - elapsed_before;
220    trace!(
221        target: "engine::root::sparse",
222        level=SPARSE_TRIE_INCREMENTAL_LEVEL,
223        ?below_level_elapsed,
224        "Intermediate nodes calculated"
225    );
226
227    Ok(elapsed)
228}