reth_engine_tree/tree/payload_processor/
prewarm.rs1use crate::tree::{
4 cached_state::{CachedStateMetrics, CachedStateProvider, ProviderCaches, SavedCache},
5 payload_processor::{
6 executor::WorkloadExecutor, multiproof::MultiProofMessage, ExecutionCache,
7 },
8 precompile_cache::PrecompileCacheMap,
9 StateProviderBuilder,
10};
11use alloy_consensus::transaction::Recovered;
12use alloy_evm::Database;
13use alloy_primitives::{keccak256, map::B256Set, B256};
14use itertools::Itertools;
15use metrics::{Gauge, Histogram};
16use reth_evm::{ConfigureEvm, Evm, EvmFor, SpecFor};
17use reth_metrics::Metrics;
18use reth_primitives_traits::{header::SealedHeaderFor, NodePrimitives, SignedTransaction};
19use reth_provider::{BlockReader, StateCommitmentProvider, StateProviderFactory, StateReader};
20use reth_revm::{database::StateProviderDatabase, db::BundleState, state::EvmState};
21use reth_trie::MultiProofTargets;
22use std::{
23 collections::VecDeque,
24 sync::{
25 atomic::{AtomicBool, Ordering},
26 mpsc::{channel, Receiver, Sender},
27 Arc,
28 },
29 time::Instant,
30};
31use tracing::{debug, trace};
32
33pub(super) struct PrewarmCacheTask<N, P, Evm>
38where
39 N: NodePrimitives,
40 Evm: ConfigureEvm<Primitives = N>,
41{
42 executor: WorkloadExecutor,
44 execution_cache: ExecutionCache,
46 pending: VecDeque<Recovered<N::SignedTx>>,
48 ctx: PrewarmContext<N, P, Evm>,
50 max_concurrency: usize,
52 to_multi_proof: Option<Sender<MultiProofMessage>>,
54 actions_rx: Receiver<PrewarmTaskEvent>,
56 actions_tx: Sender<PrewarmTaskEvent>,
58 prewarm_outcomes_left: usize,
60}
61
62impl<N, P, Evm> PrewarmCacheTask<N, P, Evm>
63where
64 N: NodePrimitives,
65 P: BlockReader + StateProviderFactory + StateReader + StateCommitmentProvider + Clone + 'static,
66 Evm: ConfigureEvm<Primitives = N> + 'static,
67{
68 pub(super) fn new(
70 executor: WorkloadExecutor,
71 execution_cache: ExecutionCache,
72 ctx: PrewarmContext<N, P, Evm>,
73 to_multi_proof: Option<Sender<MultiProofMessage>>,
74 pending: VecDeque<Recovered<N::SignedTx>>,
75 ) -> Self {
76 let (actions_tx, actions_rx) = channel();
77 Self {
78 executor,
79 execution_cache,
80 pending,
81 ctx,
82 max_concurrency: 64,
83 to_multi_proof,
84 actions_rx,
85 actions_tx,
86 prewarm_outcomes_left: 0,
87 }
88 }
89
90 pub(super) fn actions_tx(&self) -> Sender<PrewarmTaskEvent> {
92 self.actions_tx.clone()
93 }
94
95 fn spawn_all(&mut self) {
97 let chunk_size = (self.pending.len() / self.max_concurrency).max(1);
98
99 for chunk in &self.pending.drain(..).chunks(chunk_size) {
100 let sender = self.actions_tx.clone();
101 let ctx = self.ctx.clone();
102 let pending_chunk = chunk.collect::<Vec<_>>();
103
104 self.prewarm_outcomes_left += pending_chunk.len();
105 self.executor.spawn_blocking(move || {
106 ctx.transact_batch(&pending_chunk, sender);
107 });
108 }
109 }
110
111 fn send_multi_proof_targets(&self, targets: Option<MultiProofTargets>) {
113 if let Some((proof_targets, to_multi_proof)) = targets.zip(self.to_multi_proof.as_ref()) {
114 let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(proof_targets));
115 }
116 }
117
118 fn save_cache(self, state: BundleState) {
120 let start = Instant::now();
121 let cache = SavedCache::new(
122 self.ctx.header.hash(),
123 self.ctx.cache.clone(),
124 self.ctx.cache_metrics.clone(),
125 );
126 if cache.cache().insert_state(&state).is_err() {
127 return
128 }
129
130 cache.update_metrics();
131
132 debug!(target: "engine::caching", "Updated state caches");
133
134 self.execution_cache.save_cache(cache);
136 self.ctx.metrics.cache_saving_duration.set(start.elapsed().as_secs_f64());
137 }
138
139 fn drop_actions_tx(&mut self) {
145 self.actions_tx = channel().0;
146 }
147
148 pub(super) fn run(mut self) {
153 self.ctx.metrics.transactions.set(self.pending.len() as f64);
154 self.ctx.metrics.transactions_histogram.record(self.pending.len() as f64);
155
156 self.spawn_all();
158
159 self.drop_actions_tx();
163
164 let mut final_block_output = None;
165 while let Ok(event) = self.actions_rx.recv() {
166 match event {
167 PrewarmTaskEvent::TerminateTransactionExecution => {
168 self.ctx.terminate_execution.store(true, Ordering::Relaxed);
170 }
171 PrewarmTaskEvent::Outcome { proof_targets } => {
172 self.send_multi_proof_targets(proof_targets);
174
175 self.prewarm_outcomes_left -= 1;
177
178 if self.prewarm_outcomes_left == 0 && final_block_output.is_some() {
179 break
181 }
182 }
183 PrewarmTaskEvent::Terminate { block_output } => {
184 final_block_output = Some(block_output);
185
186 if self.prewarm_outcomes_left == 0 {
187 break
189 }
190 }
191 }
192 }
193
194 if let Some(Some(state)) = final_block_output {
196 self.save_cache(state);
197 }
198 }
199}
200
201#[derive(Debug, Clone)]
203pub(super) struct PrewarmContext<N, P, Evm>
204where
205 N: NodePrimitives,
206 Evm: ConfigureEvm<Primitives = N>,
207{
208 pub(super) header: SealedHeaderFor<N>,
209 pub(super) evm_config: Evm,
210 pub(super) cache: ProviderCaches,
211 pub(super) cache_metrics: CachedStateMetrics,
212 pub(super) provider: StateProviderBuilder<N, P>,
214 pub(super) metrics: PrewarmMetrics,
215 pub(super) terminate_execution: Arc<AtomicBool>,
217 pub(super) precompile_cache_enabled: bool,
218 pub(super) precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
219}
220
221impl<N, P, Evm> PrewarmContext<N, P, Evm>
222where
223 N: NodePrimitives,
224 P: BlockReader + StateProviderFactory + StateReader + StateCommitmentProvider + Clone + 'static,
225 Evm: ConfigureEvm<Primitives = N> + 'static,
226{
227 fn evm_for_ctx(
230 self,
231 ) -> Option<(EvmFor<Evm, impl Database>, Evm, PrewarmMetrics, Arc<AtomicBool>)> {
232 #[allow(unused_variables)]
233 let Self {
234 header,
235 evm_config,
236 cache: caches,
237 cache_metrics,
238 provider,
239 metrics,
240 terminate_execution,
241 precompile_cache_enabled,
242 precompile_cache_map,
243 } = self;
244
245 let state_provider = match provider.build() {
246 Ok(provider) => provider,
247 Err(err) => {
248 trace!(
249 target: "engine::tree",
250 %err,
251 "Failed to build state provider in prewarm thread"
252 );
253 return None
254 }
255 };
256
257 let state_provider =
259 CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics);
260
261 let state_provider = StateProviderDatabase::new(state_provider);
262
263 let mut evm_env = evm_config.evm_env(&header);
264
265 evm_env.cfg_env.disable_nonce_check = true;
268
269 let evm = evm_config.evm_with_env(state_provider, evm_env);
273 Some((evm, evm_config, metrics, terminate_execution))
284 }
285
286 fn transact_batch(self, txs: &[Recovered<N::SignedTx>], sender: Sender<PrewarmTaskEvent>) {
294 let Some((mut evm, evm_config, metrics, terminate_execution)) = self.evm_for_ctx() else {
295 return
296 };
297
298 for tx in txs {
299 if terminate_execution.load(Ordering::Relaxed) {
302 let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: None });
303 return
304 }
305
306 let tx_env = evm_config.tx_env(tx);
308 let start = Instant::now();
309 let res = match evm.transact(tx_env) {
310 Ok(res) => res,
311 Err(err) => {
312 trace!(
313 target: "engine::tree",
314 %err,
315 tx_hash=%tx.tx_hash(),
316 sender=%tx.signer(),
317 "Error when executing prewarm transaction",
318 );
319 return
320 }
321 };
322 metrics.execution_duration.record(start.elapsed());
323
324 let (targets, storage_targets) = multiproof_targets_from_state(res.state);
325 metrics.prefetch_storage_targets.record(storage_targets as f64);
326 metrics.total_runtime.record(start.elapsed());
327
328 let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: Some(targets) });
329 }
330 }
331}
332
333fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargets, usize) {
336 let mut targets = MultiProofTargets::with_capacity(state.len());
337 let mut storage_targets = 0;
338 for (addr, account) in state {
339 if !account.is_touched() || account.is_selfdestructed() {
347 continue
348 }
349
350 let mut storage_set =
351 B256Set::with_capacity_and_hasher(account.storage.len(), Default::default());
352 for (key, slot) in account.storage {
353 if !slot.is_changed() {
355 continue
356 }
357
358 storage_set.insert(keccak256(B256::new(key.to_be_bytes())));
359 }
360
361 storage_targets += storage_set.len();
362 targets.insert(keccak256(addr), storage_set);
363 }
364
365 (targets, storage_targets)
366}
367
368pub(super) enum PrewarmTaskEvent {
370 TerminateTransactionExecution,
372 Terminate {
375 block_output: Option<BundleState>,
377 },
378 Outcome {
380 proof_targets: Option<MultiProofTargets>,
382 },
383}
384
385#[derive(Metrics, Clone)]
387#[metrics(scope = "sync.prewarm")]
388pub(crate) struct PrewarmMetrics {
389 pub(crate) transactions: Gauge,
391 pub(crate) transactions_histogram: Histogram,
393 pub(crate) total_runtime: Histogram,
395 pub(crate) execution_duration: Histogram,
397 pub(crate) prefetch_storage_targets: Histogram,
399 pub(crate) cache_saving_duration: Gauge,
401}