reth_engine_tree/tree/payload_processor/
prewarm.rs

1//! Caching and prewarming related functionality.
2
3use crate::tree::{
4    cached_state::{CachedStateMetrics, CachedStateProvider, ProviderCaches, SavedCache},
5    payload_processor::{
6        executor::WorkloadExecutor, multiproof::MultiProofMessage, ExecutionCache,
7    },
8    precompile_cache::PrecompileCacheMap,
9    StateProviderBuilder,
10};
11use alloy_consensus::transaction::Recovered;
12use alloy_evm::Database;
13use alloy_primitives::{keccak256, map::B256Set, B256};
14use itertools::Itertools;
15use metrics::{Gauge, Histogram};
16use reth_evm::{ConfigureEvm, Evm, EvmFor, SpecFor};
17use reth_metrics::Metrics;
18use reth_primitives_traits::{header::SealedHeaderFor, NodePrimitives, SignedTransaction};
19use reth_provider::{BlockReader, StateCommitmentProvider, StateProviderFactory, StateReader};
20use reth_revm::{database::StateProviderDatabase, db::BundleState, state::EvmState};
21use reth_trie::MultiProofTargets;
22use std::{
23    collections::VecDeque,
24    sync::{
25        atomic::{AtomicBool, Ordering},
26        mpsc::{channel, Receiver, Sender},
27        Arc,
28    },
29    time::Instant,
30};
31use tracing::{debug, trace};
32
33/// A task that is responsible for caching and prewarming the cache by executing transactions
34/// individually in parallel.
35///
36/// Note: This task runs until cancelled externally.
37pub(super) struct PrewarmCacheTask<N, P, Evm>
38where
39    N: NodePrimitives,
40    Evm: ConfigureEvm<Primitives = N>,
41{
42    /// The executor used to spawn execution tasks.
43    executor: WorkloadExecutor,
44    /// Shared execution cache.
45    execution_cache: ExecutionCache,
46    /// Transactions pending execution.
47    pending: VecDeque<Recovered<N::SignedTx>>,
48    /// Context provided to execution tasks
49    ctx: PrewarmContext<N, P, Evm>,
50    /// How many transactions should be executed in parallel
51    max_concurrency: usize,
52    /// Sender to emit evm state outcome messages, if any.
53    to_multi_proof: Option<Sender<MultiProofMessage>>,
54    /// Receiver for events produced by tx execution
55    actions_rx: Receiver<PrewarmTaskEvent>,
56    /// Sender the transactions use to send their result back
57    actions_tx: Sender<PrewarmTaskEvent>,
58    /// Total prewarming tasks spawned
59    prewarm_outcomes_left: usize,
60}
61
62impl<N, P, Evm> PrewarmCacheTask<N, P, Evm>
63where
64    N: NodePrimitives,
65    P: BlockReader + StateProviderFactory + StateReader + StateCommitmentProvider + Clone + 'static,
66    Evm: ConfigureEvm<Primitives = N> + 'static,
67{
68    /// Initializes the task with the given transactions pending execution
69    pub(super) fn new(
70        executor: WorkloadExecutor,
71        execution_cache: ExecutionCache,
72        ctx: PrewarmContext<N, P, Evm>,
73        to_multi_proof: Option<Sender<MultiProofMessage>>,
74        pending: VecDeque<Recovered<N::SignedTx>>,
75    ) -> Self {
76        let (actions_tx, actions_rx) = channel();
77        Self {
78            executor,
79            execution_cache,
80            pending,
81            ctx,
82            max_concurrency: 64,
83            to_multi_proof,
84            actions_rx,
85            actions_tx,
86            prewarm_outcomes_left: 0,
87        }
88    }
89
90    /// Returns the sender that can communicate with this task.
91    pub(super) fn actions_tx(&self) -> Sender<PrewarmTaskEvent> {
92        self.actions_tx.clone()
93    }
94
95    /// Spawns all pending transactions as blocking tasks by first chunking them.
96    fn spawn_all(&mut self) {
97        let chunk_size = (self.pending.len() / self.max_concurrency).max(1);
98
99        for chunk in &self.pending.drain(..).chunks(chunk_size) {
100            let sender = self.actions_tx.clone();
101            let ctx = self.ctx.clone();
102            let pending_chunk = chunk.collect::<Vec<_>>();
103
104            self.prewarm_outcomes_left += pending_chunk.len();
105            self.executor.spawn_blocking(move || {
106                ctx.transact_batch(&pending_chunk, sender);
107            });
108        }
109    }
110
111    /// If configured and the tx returned proof targets, emit the targets the transaction produced
112    fn send_multi_proof_targets(&self, targets: Option<MultiProofTargets>) {
113        if let Some((proof_targets, to_multi_proof)) = targets.zip(self.to_multi_proof.as_ref()) {
114            let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(proof_targets));
115        }
116    }
117
118    /// Save the state to the shared cache for the given block.
119    fn save_cache(self, state: BundleState) {
120        let start = Instant::now();
121        let cache = SavedCache::new(
122            self.ctx.header.hash(),
123            self.ctx.cache.clone(),
124            self.ctx.cache_metrics.clone(),
125        );
126        if cache.cache().insert_state(&state).is_err() {
127            return
128        }
129
130        cache.update_metrics();
131
132        debug!(target: "engine::caching", "Updated state caches");
133
134        // update the cache for the executed block
135        self.execution_cache.save_cache(cache);
136        self.ctx.metrics.cache_saving_duration.set(start.elapsed().as_secs_f64());
137    }
138
139    /// Removes the `actions_tx` currently stored in the struct, replacing it with a new one that
140    /// does not point to any active receiver.
141    ///
142    /// This is used to drop the `actions_tx` after all tasks have been spawned, and should not be
143    /// used in any context other than the `run` method.
144    fn drop_actions_tx(&mut self) {
145        self.actions_tx = channel().0;
146    }
147
148    /// Executes the task.
149    ///
150    /// This will execute the transactions until all transactions have been processed or the task
151    /// was cancelled.
152    pub(super) fn run(mut self) {
153        self.ctx.metrics.transactions.set(self.pending.len() as f64);
154        self.ctx.metrics.transactions_histogram.record(self.pending.len() as f64);
155
156        // spawn execution tasks.
157        self.spawn_all();
158
159        // drop the actions sender after we've spawned all execution tasks. This is so that the
160        // following loop can terminate even if one of the prewarm tasks ends in an error (i.e.,
161        // does not return an Outcome) or panics.
162        self.drop_actions_tx();
163
164        let mut final_block_output = None;
165        while let Ok(event) = self.actions_rx.recv() {
166            match event {
167                PrewarmTaskEvent::TerminateTransactionExecution => {
168                    // stop tx processing
169                    self.ctx.terminate_execution.store(true, Ordering::Relaxed);
170                }
171                PrewarmTaskEvent::Outcome { proof_targets } => {
172                    // completed executing a set of transactions
173                    self.send_multi_proof_targets(proof_targets);
174
175                    // decrement the number of tasks left
176                    self.prewarm_outcomes_left -= 1;
177
178                    if self.prewarm_outcomes_left == 0 && final_block_output.is_some() {
179                        // all tasks are done, and we have the block output, we can exit
180                        break
181                    }
182                }
183                PrewarmTaskEvent::Terminate { block_output } => {
184                    final_block_output = Some(block_output);
185
186                    if self.prewarm_outcomes_left == 0 {
187                        // all tasks are done, we can exit, which will save caches and exit
188                        break
189                    }
190                }
191            }
192        }
193
194        // save caches and finish
195        if let Some(Some(state)) = final_block_output {
196            self.save_cache(state);
197        }
198    }
199}
200
201/// Context required by tx execution tasks.
202#[derive(Debug, Clone)]
203pub(super) struct PrewarmContext<N, P, Evm>
204where
205    N: NodePrimitives,
206    Evm: ConfigureEvm<Primitives = N>,
207{
208    pub(super) header: SealedHeaderFor<N>,
209    pub(super) evm_config: Evm,
210    pub(super) cache: ProviderCaches,
211    pub(super) cache_metrics: CachedStateMetrics,
212    /// Provider to obtain the state
213    pub(super) provider: StateProviderBuilder<N, P>,
214    pub(super) metrics: PrewarmMetrics,
215    /// An atomic bool that tells prewarm tasks to not start any more execution.
216    pub(super) terminate_execution: Arc<AtomicBool>,
217    pub(super) precompile_cache_enabled: bool,
218    pub(super) precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
219}
220
221impl<N, P, Evm> PrewarmContext<N, P, Evm>
222where
223    N: NodePrimitives,
224    P: BlockReader + StateProviderFactory + StateReader + StateCommitmentProvider + Clone + 'static,
225    Evm: ConfigureEvm<Primitives = N> + 'static,
226{
227    /// Splits this context into an evm, an evm config, metrics, and the atomic bool for terminating
228    /// execution.
229    fn evm_for_ctx(
230        self,
231    ) -> Option<(EvmFor<Evm, impl Database>, Evm, PrewarmMetrics, Arc<AtomicBool>)> {
232        #[allow(unused_variables)]
233        let Self {
234            header,
235            evm_config,
236            cache: caches,
237            cache_metrics,
238            provider,
239            metrics,
240            terminate_execution,
241            precompile_cache_enabled,
242            precompile_cache_map,
243        } = self;
244
245        let state_provider = match provider.build() {
246            Ok(provider) => provider,
247            Err(err) => {
248                trace!(
249                    target: "engine::tree",
250                    %err,
251                    "Failed to build state provider in prewarm thread"
252                );
253                return None
254            }
255        };
256
257        // Use the caches to create a new provider with caching
258        let state_provider =
259            CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics);
260
261        let state_provider = StateProviderDatabase::new(state_provider);
262
263        let mut evm_env = evm_config.evm_env(&header);
264
265        // we must disable the nonce check so that we can execute the transaction even if the nonce
266        // doesn't match what's on chain.
267        evm_env.cfg_env.disable_nonce_check = true;
268
269        // seismic upstream merge: we do not enable precompile cache since it breaks our stateful
270        // precompiles create a new executor and disable nonce checks in the env
271        // let spec_id = *evm_env.spec_id();
272        let evm = evm_config.evm_with_env(state_provider, evm_env);
273        // if precompile_cache_enabled {
274        //     evm.precompiles_mut().map_precompiles(|address, precompile| {
275        //         CachedPrecompile::wrap(
276        //             precompile,
277        //             precompile_cache_map.cache_for_address(*address),
278        //             spec_id,
279        //         )
280        //     });
281        // }
282
283        Some((evm, evm_config, metrics, terminate_execution))
284    }
285
286    /// Transacts the vec of transactions and returns the state outcome.
287    ///
288    /// Returns `None` if executing the transactions failed to a non Revert error.
289    /// Returns the touched+modified state of the transaction.
290    ///
291    /// Note: Since here are no ordering guarantees this won't the state the txs produce when
292    /// executed sequentially.
293    fn transact_batch(self, txs: &[Recovered<N::SignedTx>], sender: Sender<PrewarmTaskEvent>) {
294        let Some((mut evm, evm_config, metrics, terminate_execution)) = self.evm_for_ctx() else {
295            return
296        };
297
298        for tx in txs {
299            // If the task was cancelled, stop execution, send an empty result to notify the task,
300            // and exit.
301            if terminate_execution.load(Ordering::Relaxed) {
302                let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: None });
303                return
304            }
305
306            // create the tx env
307            let tx_env = evm_config.tx_env(tx);
308            let start = Instant::now();
309            let res = match evm.transact(tx_env) {
310                Ok(res) => res,
311                Err(err) => {
312                    trace!(
313                        target: "engine::tree",
314                        %err,
315                        tx_hash=%tx.tx_hash(),
316                        sender=%tx.signer(),
317                        "Error when executing prewarm transaction",
318                    );
319                    return
320                }
321            };
322            metrics.execution_duration.record(start.elapsed());
323
324            let (targets, storage_targets) = multiproof_targets_from_state(res.state);
325            metrics.prefetch_storage_targets.record(storage_targets as f64);
326            metrics.total_runtime.record(start.elapsed());
327
328            let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: Some(targets) });
329        }
330    }
331}
332
333/// Returns a set of [`MultiProofTargets`] and the total amount of storage targets, based on the
334/// given state.
335fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargets, usize) {
336    let mut targets = MultiProofTargets::with_capacity(state.len());
337    let mut storage_targets = 0;
338    for (addr, account) in state {
339        // if the account was not touched, or if the account was selfdestructed, do not
340        // fetch proofs for it
341        //
342        // Since selfdestruct can only happen in the same transaction, we can skip
343        // prefetching proofs for selfdestructed accounts
344        //
345        // See: https://eips.ethereum.org/EIPS/eip-6780
346        if !account.is_touched() || account.is_selfdestructed() {
347            continue
348        }
349
350        let mut storage_set =
351            B256Set::with_capacity_and_hasher(account.storage.len(), Default::default());
352        for (key, slot) in account.storage {
353            // do nothing if unchanged
354            if !slot.is_changed() {
355                continue
356            }
357
358            storage_set.insert(keccak256(B256::new(key.to_be_bytes())));
359        }
360
361        storage_targets += storage_set.len();
362        targets.insert(keccak256(addr), storage_set);
363    }
364
365    (targets, storage_targets)
366}
367
368/// The events the pre-warm task can handle.
369pub(super) enum PrewarmTaskEvent {
370    /// Forcefully terminate all remaining transaction execution.
371    TerminateTransactionExecution,
372    /// Forcefully terminate the task on demand and update the shared cache with the given output
373    /// before exiting.
374    Terminate {
375        /// The final block state output.
376        block_output: Option<BundleState>,
377    },
378    /// The outcome of a pre-warm task
379    Outcome {
380        /// The prepared proof targets based on the evm state outcome
381        proof_targets: Option<MultiProofTargets>,
382    },
383}
384
385/// Metrics for transactions prewarming.
386#[derive(Metrics, Clone)]
387#[metrics(scope = "sync.prewarm")]
388pub(crate) struct PrewarmMetrics {
389    /// The number of transactions to prewarm
390    pub(crate) transactions: Gauge,
391    /// A histogram of the number of transactions to prewarm
392    pub(crate) transactions_histogram: Histogram,
393    /// A histogram of duration per transaction prewarming
394    pub(crate) total_runtime: Histogram,
395    /// A histogram of EVM execution duration per transaction prewarming
396    pub(crate) execution_duration: Histogram,
397    /// A histogram for prefetch targets per transaction prewarming
398    pub(crate) prefetch_storage_targets: Histogram,
399    /// A histogram of duration for cache saving
400    pub(crate) cache_saving_duration: Gauge,
401}