reth_engine_tree/tree/payload_processor/
mod.rs

1//! Entrypoint for payload processing.
2
3use crate::tree::{
4    cached_state::{CachedStateMetrics, ProviderCacheBuilder, ProviderCaches, SavedCache},
5    payload_processor::{
6        prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmTaskEvent},
7        sparse_trie::StateRootComputeOutcome,
8    },
9    sparse_trie::SparseTrieTask,
10    StateProviderBuilder, TreeConfig,
11};
12use alloy_consensus::{transaction::Recovered, BlockHeader};
13use alloy_evm::block::StateChangeSource;
14use alloy_primitives::B256;
15use executor::WorkloadExecutor;
16use multiproof::*;
17use parking_lot::RwLock;
18use prewarm::PrewarmMetrics;
19use reth_evm::{ConfigureEvm, OnStateHook, SpecFor};
20use reth_primitives_traits::{NodePrimitives, SealedHeaderFor};
21use reth_provider::{
22    providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, StateCommitmentProvider,
23    StateProviderFactory, StateReader,
24};
25use reth_revm::{db::BundleState, state::EvmState};
26use reth_trie::TrieInput;
27use reth_trie_parallel::{
28    proof_task::{ProofTaskCtx, ProofTaskManager},
29    root::ParallelStateRootError,
30};
31use std::{
32    collections::VecDeque,
33    sync::{
34        atomic::AtomicBool,
35        mpsc::{self, channel, Sender},
36        Arc,
37    },
38};
39
40use super::precompile_cache::PrecompileCacheMap;
41
42pub mod executor;
43pub mod multiproof;
44pub mod prewarm;
45pub mod sparse_trie;
46
47/// Entrypoint for executing the payload.
48#[derive(Debug, Clone)]
49pub struct PayloadProcessor<N, Evm>
50where
51    N: NodePrimitives,
52    Evm: ConfigureEvm<Primitives = N>,
53{
54    /// The executor used by to spawn tasks.
55    executor: WorkloadExecutor,
56    /// The most recent cache used for execution.
57    execution_cache: ExecutionCache,
58    /// Metrics for trie operations
59    trie_metrics: MultiProofTaskMetrics,
60    /// Cross-block cache size in bytes.
61    cross_block_cache_size: u64,
62    /// Whether transactions should not be executed on prewarming task.
63    disable_transaction_prewarming: bool,
64    /// Determines how to configure the evm for execution.
65    evm_config: Evm,
66    /// whether precompile cache should be enabled.
67    precompile_cache_enabled: bool,
68    /// Precompile cache map.
69    precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
70    _marker: std::marker::PhantomData<N>,
71}
72
73impl<N, Evm> PayloadProcessor<N, Evm>
74where
75    N: NodePrimitives,
76    Evm: ConfigureEvm<Primitives = N>,
77{
78    /// Creates a new payload processor.
79    pub fn new(
80        executor: WorkloadExecutor,
81        evm_config: Evm,
82        config: &TreeConfig,
83        precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
84    ) -> Self {
85        Self {
86            executor,
87            execution_cache: Default::default(),
88            trie_metrics: Default::default(),
89            cross_block_cache_size: config.cross_block_cache_size(),
90            disable_transaction_prewarming: config.disable_caching_and_prewarming(),
91            evm_config,
92            precompile_cache_enabled: config.precompile_cache_enabled(),
93            precompile_cache_map,
94            _marker: Default::default(),
95        }
96    }
97}
98
99impl<N, Evm> PayloadProcessor<N, Evm>
100where
101    N: NodePrimitives,
102    Evm: ConfigureEvm<Primitives = N> + 'static,
103{
104    /// Spawns all background tasks and returns a handle connected to the tasks.
105    ///
106    /// - Transaction prewarming task
107    /// - State root task
108    /// - Sparse trie task
109    ///
110    /// # Transaction prewarming task
111    ///
112    /// Responsible for feeding state updates to the multi proof task.
113    ///
114    /// This task runs until:
115    ///  - externally cancelled (e.g. sequential block execution is complete)
116    ///
117    /// ## Multi proof task
118    ///
119    /// Responsible for preparing sparse trie messages for the sparse trie task.
120    /// A state update (e.g. tx output) is converted into a multiproof calculation that returns an
121    /// output back to this task.
122    ///
123    /// Receives updates from sequential execution.
124    /// This task runs until it receives a shutdown signal, which should be after the block
125    /// was fully executed.
126    ///
127    /// ## Sparse trie task
128    ///
129    /// Responsible for calculating the state root based on the received [`SparseTrieUpdate`].
130    ///
131    /// This task runs until there are no further updates to process.
132    ///
133    ///
134    /// This returns a handle to await the final state root and to interact with the tasks (e.g.
135    /// canceling)
136    pub fn spawn<P>(
137        &self,
138        header: SealedHeaderFor<N>,
139        transactions: VecDeque<Recovered<N::SignedTx>>,
140        provider_builder: StateProviderBuilder<N, P>,
141        consistent_view: ConsistentDbView<P>,
142        trie_input: TrieInput,
143        config: &TreeConfig,
144    ) -> PayloadHandle
145    where
146        P: DatabaseProviderFactory<Provider: BlockReader>
147            + BlockReader
148            + StateProviderFactory
149            + StateReader
150            + StateCommitmentProvider
151            + Clone
152            + 'static,
153    {
154        let (to_sparse_trie, sparse_trie_rx) = channel();
155        // spawn multiproof task
156        let state_root_config = MultiProofConfig::new_from_input(consistent_view, trie_input);
157
158        // Create and spawn the storage proof task
159        let task_ctx = ProofTaskCtx::new(
160            state_root_config.nodes_sorted.clone(),
161            state_root_config.state_sorted.clone(),
162            state_root_config.prefix_sets.clone(),
163        );
164        let max_proof_task_concurrency = config.max_proof_task_concurrency() as usize;
165        let proof_task = ProofTaskManager::new(
166            self.executor.handle().clone(),
167            state_root_config.consistent_view.clone(),
168            task_ctx,
169            max_proof_task_concurrency,
170        );
171
172        // We set it to half of the proof task concurrency, because often for each multiproof we
173        // spawn one Tokio task for the account proof, and one Tokio task for the storage proof.
174        let max_multi_proof_task_concurrency = max_proof_task_concurrency / 2;
175        let multi_proof_task = MultiProofTask::new(
176            state_root_config,
177            self.executor.clone(),
178            proof_task.handle(),
179            to_sparse_trie,
180            max_multi_proof_task_concurrency,
181        );
182
183        // wire the multiproof task to the prewarm task
184        let to_multi_proof = Some(multi_proof_task.state_root_message_sender());
185
186        let prewarm_handle =
187            self.spawn_caching_with(header, transactions, provider_builder, to_multi_proof.clone());
188
189        // spawn multi-proof task
190        self.executor.spawn_blocking(move || {
191            multi_proof_task.run();
192        });
193
194        let mut sparse_trie_task = SparseTrieTask::new(
195            self.executor.clone(),
196            sparse_trie_rx,
197            proof_task.handle(),
198            self.trie_metrics.clone(),
199        );
200
201        // wire the sparse trie to the state root response receiver
202        let (state_root_tx, state_root_rx) = channel();
203        self.executor.spawn_blocking(move || {
204            let res = sparse_trie_task.run();
205            let _ = state_root_tx.send(res);
206        });
207
208        // spawn the proof task
209        self.executor.spawn_blocking(move || {
210            if let Err(err) = proof_task.run() {
211                // At least log if there is an error at any point
212                tracing::error!(
213                    target: "engine::root",
214                    ?err,
215                    "Storage proof task returned an error"
216                );
217            }
218        });
219
220        PayloadHandle { to_multi_proof, prewarm_handle, state_root: Some(state_root_rx) }
221    }
222
223    /// Spawn cache prewarming exclusively.
224    ///
225    /// Returns a [`PayloadHandle`] to communicate with the task.
226    pub(super) fn spawn_cache_exclusive<P>(
227        &self,
228        header: SealedHeaderFor<N>,
229        transactions: VecDeque<Recovered<N::SignedTx>>,
230        provider_builder: StateProviderBuilder<N, P>,
231    ) -> PayloadHandle
232    where
233        P: BlockReader
234            + StateProviderFactory
235            + StateReader
236            + StateCommitmentProvider
237            + Clone
238            + 'static,
239    {
240        let prewarm_handle = self.spawn_caching_with(header, transactions, provider_builder, None);
241        PayloadHandle { to_multi_proof: None, prewarm_handle, state_root: None }
242    }
243
244    /// Spawn prewarming optionally wired to the multiproof task for target updates.
245    fn spawn_caching_with<P>(
246        &self,
247        header: SealedHeaderFor<N>,
248        mut transactions: VecDeque<Recovered<N::SignedTx>>,
249        provider_builder: StateProviderBuilder<N, P>,
250        to_multi_proof: Option<Sender<MultiProofMessage>>,
251    ) -> CacheTaskHandle
252    where
253        P: BlockReader
254            + StateProviderFactory
255            + StateReader
256            + StateCommitmentProvider
257            + Clone
258            + 'static,
259    {
260        if self.disable_transaction_prewarming {
261            // if no transactions should be executed we clear them but still spawn the task for
262            // caching updates
263            transactions.clear();
264        }
265
266        let (cache, cache_metrics) = self.cache_for(header.parent_hash()).split();
267        // configure prewarming
268        let prewarm_ctx = PrewarmContext {
269            header,
270            evm_config: self.evm_config.clone(),
271            cache: cache.clone(),
272            cache_metrics: cache_metrics.clone(),
273            provider: provider_builder,
274            metrics: PrewarmMetrics::default(),
275            terminate_execution: Arc::new(AtomicBool::new(false)),
276            precompile_cache_enabled: self.precompile_cache_enabled,
277            precompile_cache_map: self.precompile_cache_map.clone(),
278        };
279
280        let prewarm_task = PrewarmCacheTask::new(
281            self.executor.clone(),
282            self.execution_cache.clone(),
283            prewarm_ctx,
284            to_multi_proof,
285            transactions,
286        );
287        let to_prewarm_task = prewarm_task.actions_tx();
288
289        // spawn pre-warm task
290        self.executor.spawn_blocking(move || {
291            prewarm_task.run();
292        });
293        CacheTaskHandle { cache, to_prewarm_task: Some(to_prewarm_task), cache_metrics }
294    }
295
296    /// Returns the cache for the given parent hash.
297    ///
298    /// If the given hash is different then what is recently cached, then this will create a new
299    /// instance.
300    fn cache_for(&self, parent_hash: B256) -> SavedCache {
301        self.execution_cache.get_cache_for(parent_hash).unwrap_or_else(|| {
302            let cache = ProviderCacheBuilder::default().build_caches(self.cross_block_cache_size);
303            SavedCache::new(parent_hash, cache, CachedStateMetrics::zeroed())
304        })
305    }
306}
307
308/// Handle to all the spawned tasks.
309#[derive(Debug)]
310pub struct PayloadHandle {
311    /// Channel for evm state updates
312    to_multi_proof: Option<Sender<MultiProofMessage>>,
313    // must include the receiver of the state root wired to the sparse trie
314    prewarm_handle: CacheTaskHandle,
315    /// Receiver for the state root
316    state_root: Option<mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>>,
317}
318
319impl PayloadHandle {
320    /// Awaits the state root
321    ///
322    /// # Panics
323    ///
324    /// If payload processing was started without background tasks.
325    pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
326        self.state_root
327            .take()
328            .expect("state_root is None")
329            .recv()
330            .map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
331    }
332
333    /// Returns a state hook to be used to send state updates to this task.
334    ///
335    /// If a multiproof task is spawned the hook will notify it about new states.
336    pub fn state_hook(&self) -> impl OnStateHook {
337        // convert the channel into a `StateHookSender` that emits an event on drop
338        let to_multi_proof = self.to_multi_proof.clone().map(StateHookSender::new);
339
340        move |source: StateChangeSource, state: &EvmState| {
341            if let Some(sender) = &to_multi_proof {
342                let _ = sender.send(MultiProofMessage::StateUpdate(source, state.clone()));
343            }
344        }
345    }
346
347    /// Returns a clone of the caches used by prewarming
348    pub(super) fn caches(&self) -> ProviderCaches {
349        self.prewarm_handle.cache.clone()
350    }
351
352    pub(super) fn cache_metrics(&self) -> CachedStateMetrics {
353        self.prewarm_handle.cache_metrics.clone()
354    }
355
356    /// Terminates the pre-warming transaction processing.
357    ///
358    /// Note: This does not terminate the task yet.
359    pub(super) fn stop_prewarming_execution(&self) {
360        self.prewarm_handle.stop_prewarming_execution()
361    }
362
363    /// Terminates the entire caching task.
364    ///
365    /// If the [`BundleState`] is provided it will update the shared cache.
366    pub(super) fn terminate_caching(&mut self, block_output: Option<BundleState>) {
367        self.prewarm_handle.terminate_caching(block_output)
368    }
369}
370
371/// Access to the spawned [`PrewarmCacheTask`].
372#[derive(Debug)]
373pub(crate) struct CacheTaskHandle {
374    /// The shared cache the task operates with.
375    cache: ProviderCaches,
376    /// Metrics for the caches
377    cache_metrics: CachedStateMetrics,
378    /// Channel to the spawned prewarm task if any
379    to_prewarm_task: Option<Sender<PrewarmTaskEvent>>,
380}
381
382impl CacheTaskHandle {
383    /// Terminates the pre-warming transaction processing.
384    ///
385    /// Note: This does not terminate the task yet.
386    pub(super) fn stop_prewarming_execution(&self) {
387        self.to_prewarm_task
388            .as_ref()
389            .map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok());
390    }
391
392    /// Terminates the entire pre-warming task.
393    ///
394    /// If the [`BundleState`] is provided it will update the shared cache.
395    pub(super) fn terminate_caching(&mut self, block_output: Option<BundleState>) {
396        self.to_prewarm_task
397            .take()
398            .map(|tx| tx.send(PrewarmTaskEvent::Terminate { block_output }).ok());
399    }
400}
401
402impl Drop for CacheTaskHandle {
403    fn drop(&mut self) {
404        // Ensure we always terminate on drop
405        self.terminate_caching(None);
406    }
407}
408
409/// Shared access to most recently used cache.
410///
411/// This cache is intended to used for processing the payload in the following manner:
412///  - Get Cache if the payload's parent block matches the parent block
413///  - Update cache upon successful payload execution
414///
415/// This process assumes that payloads are received sequentially.
416#[derive(Clone, Debug, Default)]
417struct ExecutionCache {
418    /// Guarded cloneable cache identified by a block hash.
419    inner: Arc<RwLock<Option<SavedCache>>>,
420}
421
422impl ExecutionCache {
423    /// Returns the cache if the currently store cache is for the given `parent_hash`
424    pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
425        let cache = self.inner.read();
426        cache
427            .as_ref()
428            .and_then(|cache| (cache.executed_block_hash() == parent_hash).then(|| cache.clone()))
429    }
430
431    /// Clears the tracked cache
432    #[expect(unused)]
433    pub(crate) fn clear(&self) {
434        self.inner.write().take();
435    }
436
437    /// Stores the provider cache
438    pub(crate) fn save_cache(&self, cache: SavedCache) {
439        self.inner.write().replace(cache);
440    }
441}
442
443#[cfg(test)]
444mod tests {
445    use crate::tree::{
446        payload_processor::{
447            evm_state_to_hashed_post_state, executor::WorkloadExecutor, PayloadProcessor,
448        },
449        precompile_cache::PrecompileCacheMap,
450        StateProviderBuilder, TreeConfig,
451    };
452    use alloy_evm::block::StateChangeSource;
453    use rand::Rng;
454    use reth_chainspec::ChainSpec;
455    use reth_db_common::init::init_genesis;
456    use reth_ethereum_primitives::EthPrimitives;
457    use reth_evm::OnStateHook;
458    use reth_evm_ethereum::EthEvmConfig;
459    use reth_primitives_traits::{Account, StorageEntry};
460    use reth_provider::{
461        providers::{BlockchainProvider, ConsistentDbView},
462        test_utils::create_test_provider_factory_with_chain_spec,
463        ChainSpecProvider, HashingWriter,
464    };
465    use reth_testing_utils::generators;
466    use reth_trie::{test_utils::state_root, HashedPostState, TrieInput};
467    use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256};
468    use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot};
469    use std::sync::Arc;
470
471    fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec<EvmState> {
472        let mut rng = generators::rng();
473        let all_addresses: Vec<Address> = (0..num_accounts).map(|_| rng.random()).collect();
474        let mut updates = Vec::new();
475
476        for _ in 0..updates_per_account {
477            let num_accounts_in_update = rng.random_range(1..=num_accounts);
478            let mut state_update = EvmState::default();
479
480            let selected_addresses = &all_addresses[0..num_accounts_in_update];
481
482            for &address in selected_addresses {
483                let mut storage = HashMap::default();
484                if rng.random_bool(0.7) {
485                    for _ in 0..rng.random_range(1..10) {
486                        let slot = U256::from(rng.random::<u64>());
487                        storage.insert(
488                            slot,
489                            EvmStorageSlot::new_changed(
490                                U256::ZERO.into(),
491                                U256::from(rng.random::<u64>()).into(),
492                            ),
493                        );
494                    }
495                }
496
497                let account = revm_state::Account {
498                    info: AccountInfo {
499                        balance: U256::from(rng.random::<u64>()),
500                        nonce: rng.random::<u64>(),
501                        code_hash: KECCAK_EMPTY,
502                        code: Some(Default::default()),
503                    },
504                    storage,
505                    status: AccountStatus::Touched,
506                };
507
508                state_update.insert(address, account);
509            }
510
511            updates.push(state_update);
512        }
513
514        updates
515    }
516
517    #[test]
518    fn test_state_root() {
519        reth_tracing::init_test_tracing();
520
521        let factory = create_test_provider_factory_with_chain_spec(Arc::new(ChainSpec::default()));
522        let genesis_hash = init_genesis(&factory).unwrap();
523
524        let state_updates = create_mock_state_updates(10, 10);
525        let mut hashed_state = HashedPostState::default();
526        let mut accumulated_state: HashMap<Address, (Account, HashMap<B256, U256>)> =
527            HashMap::default();
528
529        {
530            let provider_rw = factory.provider_rw().expect("failed to get provider");
531
532            for update in &state_updates {
533                let account_updates = update.iter().map(|(address, account)| {
534                    (*address, Some(Account::from_revm_account(account)))
535                });
536                provider_rw
537                    .insert_account_for_hashing(account_updates)
538                    .expect("failed to insert accounts");
539
540                let storage_updates = update.iter().map(|(address, account)| {
541                    let storage_entries = account.storage.iter().map(|(slot, value)| {
542                        StorageEntry { key: B256::from(*slot), value: value.present_value }
543                    });
544                    (*address, storage_entries)
545                });
546                provider_rw
547                    .insert_storage_for_hashing(storage_updates)
548                    .expect("failed to insert storage");
549            }
550            provider_rw.commit().expect("failed to commit changes");
551        }
552
553        for update in &state_updates {
554            hashed_state.extend(evm_state_to_hashed_post_state(update.clone()));
555            for (address, account) in update {
556                let storage: HashMap<B256, U256> = account
557                    .storage
558                    .iter()
559                    .map(|(k, v)| (B256::from(*k), v.present_value.value))
560                    .collect();
561
562                let entry = accumulated_state.entry(*address).or_default();
563                entry.0 = Account::from_revm_account(account);
564                entry.1.extend(storage);
565            }
566        }
567
568        let payload_processor = PayloadProcessor::<EthPrimitives, _>::new(
569            WorkloadExecutor::default(),
570            EthEvmConfig::new(factory.chain_spec()),
571            &TreeConfig::default(),
572            PrecompileCacheMap::default(),
573        );
574        let provider = BlockchainProvider::new(factory).unwrap();
575        let mut handle = payload_processor.spawn(
576            Default::default(),
577            Default::default(),
578            StateProviderBuilder::new(provider.clone(), genesis_hash, None),
579            ConsistentDbView::new_with_latest_tip(provider).unwrap(),
580            TrieInput::from_state(hashed_state),
581            &TreeConfig::default(),
582        );
583
584        let mut state_hook = handle.state_hook();
585
586        for (i, update) in state_updates.into_iter().enumerate() {
587            state_hook.on_state(StateChangeSource::Transaction(i), &update);
588        }
589        drop(state_hook);
590
591        let root_from_task = handle.state_root().expect("task failed").state_root;
592        let root_from_regular = state_root(accumulated_state);
593
594        assert_eq!(
595            root_from_task, root_from_regular,
596            "State root mismatch: task={root_from_task}, base={root_from_regular}"
597        );
598    }
599}