1use crate::{Database, OnStateHook};
6use alloy_consensus::BlockHeader;
7use alloy_evm::{
8 block::{BlockExecutor, StateChangeSource},
9 Evm,
10};
11use core::borrow::BorrowMut;
12use metrics::{Counter, Gauge, Histogram};
13use reth_execution_errors::BlockExecutionError;
14use reth_execution_types::BlockExecutionOutput;
15use reth_metrics::Metrics;
16use reth_primitives_traits::{Block, BlockBody, RecoveredBlock};
17use revm::{
18 database::{states::bundle_state::BundleRetention, State},
19 state::EvmState,
20};
21use std::time::Instant;
22
23struct MeteredStateHook {
25 metrics: ExecutorMetrics,
26 inner_hook: Box<dyn OnStateHook>,
27}
28
29impl OnStateHook for MeteredStateHook {
30 fn on_state(&mut self, source: StateChangeSource, state: &EvmState) {
31 let accounts = state.keys().len();
33 let storage_slots = state.values().map(|account| account.storage.len()).sum::<usize>();
34 let bytecodes = state.values().filter(|account| !account.info.is_empty_code_hash()).count();
35
36 self.metrics.accounts_loaded_histogram.record(accounts as f64);
37 self.metrics.storage_slots_loaded_histogram.record(storage_slots as f64);
38 self.metrics.bytecodes_loaded_histogram.record(bytecodes as f64);
39
40 self.inner_hook.on_state(source, state);
42 }
43}
44
45#[derive(Metrics, Clone)]
48#[metrics(scope = "sync.execution")]
49pub struct ExecutorMetrics {
50 pub gas_processed_total: Counter,
52 pub gas_per_second: Gauge,
54 pub gas_used_histogram: Histogram,
56
57 pub execution_histogram: Histogram,
59 pub execution_duration: Gauge,
61
62 pub accounts_loaded_histogram: Histogram,
64 pub storage_slots_loaded_histogram: Histogram,
66 pub bytecodes_loaded_histogram: Histogram,
68
69 pub accounts_updated_histogram: Histogram,
71 pub storage_slots_updated_histogram: Histogram,
73 pub bytecodes_updated_histogram: Histogram,
75}
76
77impl ExecutorMetrics {
78 fn metered<F, R, B>(&self, block: &RecoveredBlock<B>, f: F) -> R
79 where
80 F: FnOnce() -> R,
81 B: reth_primitives_traits::Block,
82 {
83 let execute_start = Instant::now();
85 let output = f();
86 let execution_duration = execute_start.elapsed().as_secs_f64();
87
88 self.gas_processed_total.increment(block.header().gas_used());
90 self.gas_per_second.set(block.header().gas_used() as f64 / execution_duration);
91 self.gas_used_histogram.record(block.header().gas_used() as f64);
92 self.execution_histogram.record(execution_duration);
93 self.execution_duration.set(execution_duration);
94
95 output
96 }
97
98 pub fn execute_metered<E, DB>(
106 &self,
107 executor: E,
108 input: &RecoveredBlock<impl Block<Body: BlockBody<Transaction = E::Transaction>>>,
109 state_hook: Box<dyn OnStateHook>,
110 ) -> Result<BlockExecutionOutput<E::Receipt>, BlockExecutionError>
111 where
112 DB: Database,
113 E: BlockExecutor<Evm: Evm<DB: BorrowMut<State<DB>>>>,
114 {
115 let wrapper = MeteredStateHook { metrics: self.clone(), inner_hook: state_hook };
119
120 let mut executor = executor.with_state_hook(Some(Box::new(wrapper)));
121
122 let (mut db, result) = self.metered(input, || {
124 executor.apply_pre_execution_changes()?;
125 for tx in input.transactions_recovered() {
126 executor.execute_transaction(tx)?;
127 }
128 executor.finish().map(|(evm, result)| (evm.into_db(), result))
129 })?;
130
131 db.borrow_mut().merge_transitions(BundleRetention::Reverts);
133 let output = BlockExecutionOutput { result, state: db.borrow_mut().take_bundle() };
134
135 let accounts = output.state.state.len();
137 let storage_slots =
138 output.state.state.values().map(|account| account.storage.len()).sum::<usize>();
139 let bytecodes = output.state.contracts.len();
140
141 self.accounts_updated_histogram.record(accounts as f64);
142 self.storage_slots_updated_histogram.record(storage_slots as f64);
143 self.bytecodes_updated_histogram.record(bytecodes as f64);
144
145 Ok(output)
146 }
147
148 pub fn metered_one<F, R, B>(&self, input: &RecoveredBlock<B>, f: F) -> R
150 where
151 F: FnOnce(&RecoveredBlock<B>) -> R,
152 B: reth_primitives_traits::Block,
153 {
154 self.metered(input, || f(input))
155 }
156}
157
158#[cfg(test)]
159mod tests {
160 use super::*;
161 use alloy_eips::eip7685::Requests;
162 use alloy_evm::{block::CommitChanges, EthEvm};
163 use alloy_primitives::{B256, U256};
164 use metrics_util::debugging::{DebugValue, DebuggingRecorder, Snapshotter};
165 use reth_ethereum_primitives::{Receipt, TransactionSigned};
166 use reth_execution_types::BlockExecutionResult;
167 use revm::{
168 context::result::ExecutionResult,
169 database::State,
170 database_interface::EmptyDB,
171 inspector::NoOpInspector,
172 state::{Account, AccountInfo, AccountStatus, EvmStorage, EvmStorageSlot},
173 Context, MainBuilder, MainContext,
174 };
175 use std::sync::mpsc;
176
177 use revm::state::FlaggedStorage;
179
180 struct MockExecutor {
182 state: EvmState,
183 hook: Option<Box<dyn OnStateHook>>,
184 evm: EthEvm<State<EmptyDB>, NoOpInspector>,
185 }
186
187 impl MockExecutor {
188 fn new(state: EvmState) -> Self {
189 let db = State::builder()
190 .with_database(EmptyDB::default())
191 .with_bundle_update()
192 .without_state_clear()
193 .build();
194 let evm = EthEvm::new(
195 Context::mainnet().with_db(db).build_mainnet_with_inspector(NoOpInspector {}),
196 false,
197 );
198 Self { state, hook: None, evm }
199 }
200 }
201
202 impl BlockExecutor for MockExecutor {
203 type Transaction = TransactionSigned;
204 type Receipt = Receipt;
205 type Evm = EthEvm<State<EmptyDB>, NoOpInspector>;
206
207 fn apply_pre_execution_changes(&mut self) -> Result<(), BlockExecutionError> {
208 Ok(())
209 }
210
211 fn execute_transaction_with_commit_condition(
212 &mut self,
213 _tx: impl alloy_evm::block::ExecutableTx<Self>,
214 _f: impl FnOnce(&ExecutionResult<<Self::Evm as Evm>::HaltReason>) -> CommitChanges,
215 ) -> Result<Option<u64>, BlockExecutionError> {
216 Ok(Some(0))
217 }
218
219 fn finish(
220 self,
221 ) -> Result<(Self::Evm, BlockExecutionResult<Self::Receipt>), BlockExecutionError> {
222 let Self { evm, hook, .. } = self;
223
224 if let Some(mut hook) = hook {
226 hook.on_state(StateChangeSource::Transaction(0), &self.state);
227 }
228
229 Ok((
230 evm,
231 BlockExecutionResult {
232 receipts: vec![],
233 requests: Requests::default(),
234 gas_used: 0,
235 },
236 ))
237 }
238
239 fn set_state_hook(&mut self, hook: Option<Box<dyn OnStateHook>>) {
240 self.hook = hook;
241 }
242
243 fn evm(&self) -> &Self::Evm {
244 &self.evm
245 }
246
247 fn evm_mut(&mut self) -> &mut Self::Evm {
248 &mut self.evm
249 }
250 }
251
252 struct ChannelStateHook {
253 output: i32,
254 sender: mpsc::Sender<i32>,
255 }
256
257 impl OnStateHook for ChannelStateHook {
258 fn on_state(&mut self, _source: StateChangeSource, _state: &EvmState) {
259 let _ = self.sender.send(self.output);
260 }
261 }
262
263 fn setup_test_recorder() -> Snapshotter {
264 let recorder = DebuggingRecorder::new();
265 let snapshotter = recorder.snapshotter();
266 recorder.install().unwrap();
267 snapshotter
268 }
269
270 #[test]
271 fn test_executor_metrics_hook_metrics_recorded() {
272 let snapshotter = setup_test_recorder();
273 let metrics = ExecutorMetrics::default();
274 let input = RecoveredBlock::<reth_ethereum_primitives::Block>::default();
275
276 let (tx, _rx) = mpsc::channel();
277 let expected_output = 42;
278 let state_hook = Box::new(ChannelStateHook { sender: tx, output: expected_output });
279
280 let state = {
281 let mut state = EvmState::default();
282 let storage = EvmStorage::from_iter([(
283 U256::from(1),
284 EvmStorageSlot::new(FlaggedStorage::new_from_value(2)),
285 )]);
286 state.insert(
287 Default::default(),
288 Account {
289 info: AccountInfo {
290 balance: U256::from(100),
291 nonce: 10,
292 code_hash: B256::random(),
293 code: Default::default(),
294 },
295 storage,
296 status: AccountStatus::Loaded,
297 },
298 );
299 state
300 };
301 let executor = MockExecutor::new(state);
302 let _result = metrics.execute_metered::<_, EmptyDB>(executor, &input, state_hook).unwrap();
303
304 let snapshot = snapshotter.snapshot().into_vec();
305
306 for metric in snapshot {
307 let metric_name = metric.0.key().name();
308 if metric_name == "sync.execution.accounts_loaded_histogram" ||
309 metric_name == "sync.execution.storage_slots_loaded_histogram" ||
310 metric_name == "sync.execution.bytecodes_loaded_histogram"
311 {
312 if let DebugValue::Histogram(vs) = metric.3 {
313 assert!(
314 vs.iter().any(|v| v.into_inner() > 0.0),
315 "metric {metric_name} not recorded"
316 );
317 }
318 }
319 }
320 }
321
322 #[test]
323 fn test_executor_metrics_hook_called() {
324 let metrics = ExecutorMetrics::default();
325 let input = RecoveredBlock::<reth_ethereum_primitives::Block>::default();
326
327 let (tx, rx) = mpsc::channel();
328 let expected_output = 42;
329 let state_hook = Box::new(ChannelStateHook { sender: tx, output: expected_output });
330
331 let state = EvmState::default();
332
333 let executor = MockExecutor::new(state);
334 let _result = metrics.execute_metered::<_, EmptyDB>(executor, &input, state_hook).unwrap();
335
336 let actual_output = rx.try_recv().unwrap();
337 assert_eq!(actual_output, expected_output);
338 }
339}