1use crate::tree::{
4 cached_state::{CachedStateMetrics, ProviderCacheBuilder, ProviderCaches, SavedCache},
5 payload_processor::{
6 prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmTaskEvent},
7 sparse_trie::StateRootComputeOutcome,
8 },
9 sparse_trie::SparseTrieTask,
10 StateProviderBuilder, TreeConfig,
11};
12use alloy_consensus::{transaction::Recovered, BlockHeader};
13use alloy_evm::block::StateChangeSource;
14use alloy_primitives::B256;
15use executor::WorkloadExecutor;
16use multiproof::*;
17use parking_lot::RwLock;
18use prewarm::PrewarmMetrics;
19use reth_evm::{ConfigureEvm, OnStateHook, SpecFor};
20use reth_primitives_traits::{NodePrimitives, SealedHeaderFor};
21use reth_provider::{
22 providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, StateCommitmentProvider,
23 StateProviderFactory, StateReader,
24};
25use reth_revm::{db::BundleState, state::EvmState};
26use reth_trie::TrieInput;
27use reth_trie_parallel::{
28 proof_task::{ProofTaskCtx, ProofTaskManager},
29 root::ParallelStateRootError,
30};
31use std::{
32 collections::VecDeque,
33 sync::{
34 atomic::AtomicBool,
35 mpsc::{self, channel, Sender},
36 Arc,
37 },
38};
39
40use super::precompile_cache::PrecompileCacheMap;
41
42pub mod executor;
43pub mod multiproof;
44pub mod prewarm;
45pub mod sparse_trie;
46
47#[derive(Debug, Clone)]
49pub struct PayloadProcessor<N, Evm>
50where
51 N: NodePrimitives,
52 Evm: ConfigureEvm<Primitives = N>,
53{
54 executor: WorkloadExecutor,
56 execution_cache: ExecutionCache,
58 trie_metrics: MultiProofTaskMetrics,
60 cross_block_cache_size: u64,
62 disable_transaction_prewarming: bool,
64 evm_config: Evm,
66 precompile_cache_enabled: bool,
68 precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
70 _marker: std::marker::PhantomData<N>,
71}
72
73impl<N, Evm> PayloadProcessor<N, Evm>
74where
75 N: NodePrimitives,
76 Evm: ConfigureEvm<Primitives = N>,
77{
78 pub fn new(
80 executor: WorkloadExecutor,
81 evm_config: Evm,
82 config: &TreeConfig,
83 precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
84 ) -> Self {
85 Self {
86 executor,
87 execution_cache: Default::default(),
88 trie_metrics: Default::default(),
89 cross_block_cache_size: config.cross_block_cache_size(),
90 disable_transaction_prewarming: config.disable_caching_and_prewarming(),
91 evm_config,
92 precompile_cache_enabled: config.precompile_cache_enabled(),
93 precompile_cache_map,
94 _marker: Default::default(),
95 }
96 }
97}
98
99impl<N, Evm> PayloadProcessor<N, Evm>
100where
101 N: NodePrimitives,
102 Evm: ConfigureEvm<Primitives = N> + 'static,
103{
104 pub fn spawn<P>(
137 &self,
138 header: SealedHeaderFor<N>,
139 transactions: VecDeque<Recovered<N::SignedTx>>,
140 provider_builder: StateProviderBuilder<N, P>,
141 consistent_view: ConsistentDbView<P>,
142 trie_input: TrieInput,
143 config: &TreeConfig,
144 ) -> PayloadHandle
145 where
146 P: DatabaseProviderFactory<Provider: BlockReader>
147 + BlockReader
148 + StateProviderFactory
149 + StateReader
150 + StateCommitmentProvider
151 + Clone
152 + 'static,
153 {
154 let (to_sparse_trie, sparse_trie_rx) = channel();
155 let state_root_config = MultiProofConfig::new_from_input(consistent_view, trie_input);
157
158 let task_ctx = ProofTaskCtx::new(
160 state_root_config.nodes_sorted.clone(),
161 state_root_config.state_sorted.clone(),
162 state_root_config.prefix_sets.clone(),
163 );
164 let max_proof_task_concurrency = config.max_proof_task_concurrency() as usize;
165 let proof_task = ProofTaskManager::new(
166 self.executor.handle().clone(),
167 state_root_config.consistent_view.clone(),
168 task_ctx,
169 max_proof_task_concurrency,
170 );
171
172 let max_multi_proof_task_concurrency = max_proof_task_concurrency / 2;
175 let multi_proof_task = MultiProofTask::new(
176 state_root_config,
177 self.executor.clone(),
178 proof_task.handle(),
179 to_sparse_trie,
180 max_multi_proof_task_concurrency,
181 );
182
183 let to_multi_proof = Some(multi_proof_task.state_root_message_sender());
185
186 let prewarm_handle =
187 self.spawn_caching_with(header, transactions, provider_builder, to_multi_proof.clone());
188
189 self.executor.spawn_blocking(move || {
191 multi_proof_task.run();
192 });
193
194 let mut sparse_trie_task = SparseTrieTask::new(
195 self.executor.clone(),
196 sparse_trie_rx,
197 proof_task.handle(),
198 self.trie_metrics.clone(),
199 );
200
201 let (state_root_tx, state_root_rx) = channel();
203 self.executor.spawn_blocking(move || {
204 let res = sparse_trie_task.run();
205 let _ = state_root_tx.send(res);
206 });
207
208 self.executor.spawn_blocking(move || {
210 if let Err(err) = proof_task.run() {
211 tracing::error!(
213 target: "engine::root",
214 ?err,
215 "Storage proof task returned an error"
216 );
217 }
218 });
219
220 PayloadHandle { to_multi_proof, prewarm_handle, state_root: Some(state_root_rx) }
221 }
222
223 pub(super) fn spawn_cache_exclusive<P>(
227 &self,
228 header: SealedHeaderFor<N>,
229 transactions: VecDeque<Recovered<N::SignedTx>>,
230 provider_builder: StateProviderBuilder<N, P>,
231 ) -> PayloadHandle
232 where
233 P: BlockReader
234 + StateProviderFactory
235 + StateReader
236 + StateCommitmentProvider
237 + Clone
238 + 'static,
239 {
240 let prewarm_handle = self.spawn_caching_with(header, transactions, provider_builder, None);
241 PayloadHandle { to_multi_proof: None, prewarm_handle, state_root: None }
242 }
243
244 fn spawn_caching_with<P>(
246 &self,
247 header: SealedHeaderFor<N>,
248 mut transactions: VecDeque<Recovered<N::SignedTx>>,
249 provider_builder: StateProviderBuilder<N, P>,
250 to_multi_proof: Option<Sender<MultiProofMessage>>,
251 ) -> CacheTaskHandle
252 where
253 P: BlockReader
254 + StateProviderFactory
255 + StateReader
256 + StateCommitmentProvider
257 + Clone
258 + 'static,
259 {
260 if self.disable_transaction_prewarming {
261 transactions.clear();
264 }
265
266 let (cache, cache_metrics) = self.cache_for(header.parent_hash()).split();
267 let prewarm_ctx = PrewarmContext {
269 header,
270 evm_config: self.evm_config.clone(),
271 cache: cache.clone(),
272 cache_metrics: cache_metrics.clone(),
273 provider: provider_builder,
274 metrics: PrewarmMetrics::default(),
275 terminate_execution: Arc::new(AtomicBool::new(false)),
276 precompile_cache_enabled: self.precompile_cache_enabled,
277 precompile_cache_map: self.precompile_cache_map.clone(),
278 };
279
280 let prewarm_task = PrewarmCacheTask::new(
281 self.executor.clone(),
282 self.execution_cache.clone(),
283 prewarm_ctx,
284 to_multi_proof,
285 transactions,
286 );
287 let to_prewarm_task = prewarm_task.actions_tx();
288
289 self.executor.spawn_blocking(move || {
291 prewarm_task.run();
292 });
293 CacheTaskHandle { cache, to_prewarm_task: Some(to_prewarm_task), cache_metrics }
294 }
295
296 fn cache_for(&self, parent_hash: B256) -> SavedCache {
301 self.execution_cache.get_cache_for(parent_hash).unwrap_or_else(|| {
302 let cache = ProviderCacheBuilder::default().build_caches(self.cross_block_cache_size);
303 SavedCache::new(parent_hash, cache, CachedStateMetrics::zeroed())
304 })
305 }
306}
307
308#[derive(Debug)]
310pub struct PayloadHandle {
311 to_multi_proof: Option<Sender<MultiProofMessage>>,
313 prewarm_handle: CacheTaskHandle,
315 state_root: Option<mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>>,
317}
318
319impl PayloadHandle {
320 pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
326 self.state_root
327 .take()
328 .expect("state_root is None")
329 .recv()
330 .map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
331 }
332
333 pub fn state_hook(&self) -> impl OnStateHook {
337 let to_multi_proof = self.to_multi_proof.clone().map(StateHookSender::new);
339
340 move |source: StateChangeSource, state: &EvmState| {
341 if let Some(sender) = &to_multi_proof {
342 let _ = sender.send(MultiProofMessage::StateUpdate(source, state.clone()));
343 }
344 }
345 }
346
347 pub(super) fn caches(&self) -> ProviderCaches {
349 self.prewarm_handle.cache.clone()
350 }
351
352 pub(super) fn cache_metrics(&self) -> CachedStateMetrics {
353 self.prewarm_handle.cache_metrics.clone()
354 }
355
356 pub(super) fn stop_prewarming_execution(&self) {
360 self.prewarm_handle.stop_prewarming_execution()
361 }
362
363 pub(super) fn terminate_caching(&mut self, block_output: Option<BundleState>) {
367 self.prewarm_handle.terminate_caching(block_output)
368 }
369}
370
371#[derive(Debug)]
373pub(crate) struct CacheTaskHandle {
374 cache: ProviderCaches,
376 cache_metrics: CachedStateMetrics,
378 to_prewarm_task: Option<Sender<PrewarmTaskEvent>>,
380}
381
382impl CacheTaskHandle {
383 pub(super) fn stop_prewarming_execution(&self) {
387 self.to_prewarm_task
388 .as_ref()
389 .map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok());
390 }
391
392 pub(super) fn terminate_caching(&mut self, block_output: Option<BundleState>) {
396 self.to_prewarm_task
397 .take()
398 .map(|tx| tx.send(PrewarmTaskEvent::Terminate { block_output }).ok());
399 }
400}
401
402impl Drop for CacheTaskHandle {
403 fn drop(&mut self) {
404 self.terminate_caching(None);
406 }
407}
408
409#[derive(Clone, Debug, Default)]
417struct ExecutionCache {
418 inner: Arc<RwLock<Option<SavedCache>>>,
420}
421
422impl ExecutionCache {
423 pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
425 let cache = self.inner.read();
426 cache
427 .as_ref()
428 .and_then(|cache| (cache.executed_block_hash() == parent_hash).then(|| cache.clone()))
429 }
430
431 #[expect(unused)]
433 pub(crate) fn clear(&self) {
434 self.inner.write().take();
435 }
436
437 pub(crate) fn save_cache(&self, cache: SavedCache) {
439 self.inner.write().replace(cache);
440 }
441}
442
443#[cfg(test)]
444mod tests {
445 use crate::tree::{
446 payload_processor::{
447 evm_state_to_hashed_post_state, executor::WorkloadExecutor, PayloadProcessor,
448 },
449 precompile_cache::PrecompileCacheMap,
450 StateProviderBuilder, TreeConfig,
451 };
452 use alloy_evm::block::StateChangeSource;
453 use rand::Rng;
454 use reth_chainspec::ChainSpec;
455 use reth_db_common::init::init_genesis;
456 use reth_ethereum_primitives::EthPrimitives;
457 use reth_evm::OnStateHook;
458 use reth_evm_ethereum::EthEvmConfig;
459 use reth_primitives_traits::{Account, StorageEntry};
460 use reth_provider::{
461 providers::{BlockchainProvider, ConsistentDbView},
462 test_utils::create_test_provider_factory_with_chain_spec,
463 ChainSpecProvider, HashingWriter,
464 };
465 use reth_testing_utils::generators;
466 use reth_trie::{test_utils::state_root, HashedPostState, TrieInput};
467 use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256};
468 use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot};
469 use std::sync::Arc;
470
471 fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec<EvmState> {
472 let mut rng = generators::rng();
473 let all_addresses: Vec<Address> = (0..num_accounts).map(|_| rng.random()).collect();
474 let mut updates = Vec::new();
475
476 for _ in 0..updates_per_account {
477 let num_accounts_in_update = rng.random_range(1..=num_accounts);
478 let mut state_update = EvmState::default();
479
480 let selected_addresses = &all_addresses[0..num_accounts_in_update];
481
482 for &address in selected_addresses {
483 let mut storage = HashMap::default();
484 if rng.random_bool(0.7) {
485 for _ in 0..rng.random_range(1..10) {
486 let slot = U256::from(rng.random::<u64>());
487 storage.insert(
488 slot,
489 EvmStorageSlot::new_changed(
490 U256::ZERO.into(),
491 U256::from(rng.random::<u64>()).into(),
492 ),
493 );
494 }
495 }
496
497 let account = revm_state::Account {
498 info: AccountInfo {
499 balance: U256::from(rng.random::<u64>()),
500 nonce: rng.random::<u64>(),
501 code_hash: KECCAK_EMPTY,
502 code: Some(Default::default()),
503 },
504 storage,
505 status: AccountStatus::Touched,
506 };
507
508 state_update.insert(address, account);
509 }
510
511 updates.push(state_update);
512 }
513
514 updates
515 }
516
517 #[test]
518 fn test_state_root() {
519 reth_tracing::init_test_tracing();
520
521 let factory = create_test_provider_factory_with_chain_spec(Arc::new(ChainSpec::default()));
522 let genesis_hash = init_genesis(&factory).unwrap();
523
524 let state_updates = create_mock_state_updates(10, 10);
525 let mut hashed_state = HashedPostState::default();
526 let mut accumulated_state: HashMap<Address, (Account, HashMap<B256, U256>)> =
527 HashMap::default();
528
529 {
530 let provider_rw = factory.provider_rw().expect("failed to get provider");
531
532 for update in &state_updates {
533 let account_updates = update.iter().map(|(address, account)| {
534 (*address, Some(Account::from_revm_account(account)))
535 });
536 provider_rw
537 .insert_account_for_hashing(account_updates)
538 .expect("failed to insert accounts");
539
540 let storage_updates = update.iter().map(|(address, account)| {
541 let storage_entries = account.storage.iter().map(|(slot, value)| {
542 StorageEntry { key: B256::from(*slot), value: value.present_value }
543 });
544 (*address, storage_entries)
545 });
546 provider_rw
547 .insert_storage_for_hashing(storage_updates)
548 .expect("failed to insert storage");
549 }
550 provider_rw.commit().expect("failed to commit changes");
551 }
552
553 for update in &state_updates {
554 hashed_state.extend(evm_state_to_hashed_post_state(update.clone()));
555 for (address, account) in update {
556 let storage: HashMap<B256, U256> = account
557 .storage
558 .iter()
559 .map(|(k, v)| (B256::from(*k), v.present_value.value))
560 .collect();
561
562 let entry = accumulated_state.entry(*address).or_default();
563 entry.0 = Account::from_revm_account(account);
564 entry.1.extend(storage);
565 }
566 }
567
568 let payload_processor = PayloadProcessor::<EthPrimitives, _>::new(
569 WorkloadExecutor::default(),
570 EthEvmConfig::new(factory.chain_spec()),
571 &TreeConfig::default(),
572 PrecompileCacheMap::default(),
573 );
574 let provider = BlockchainProvider::new(factory).unwrap();
575 let mut handle = payload_processor.spawn(
576 Default::default(),
577 Default::default(),
578 StateProviderBuilder::new(provider.clone(), genesis_hash, None),
579 ConsistentDbView::new_with_latest_tip(provider).unwrap(),
580 TrieInput::from_state(hashed_state),
581 &TreeConfig::default(),
582 );
583
584 let mut state_hook = handle.state_hook();
585
586 for (i, update) in state_updates.into_iter().enumerate() {
587 state_hook.on_state(StateChangeSource::Transaction(i), &update);
588 }
589 drop(state_hook);
590
591 let root_from_task = handle.state_root().expect("task failed").state_root;
592 let root_from_regular = state_root(accumulated_state);
593
594 assert_eq!(
595 root_from_task, root_from_regular,
596 "State root mismatch: task={root_from_task}, base={root_from_regular}"
597 );
598 }
599}