1use crate::{execute::Executor, system_calls::OnStateHook};
6use alloy_consensus::BlockHeader;
7use metrics::{Counter, Gauge, Histogram};
8use reth_execution_types::{BlockExecutionInput, BlockExecutionOutput};
9use reth_metrics::Metrics;
10use reth_primitives::BlockWithSenders;
11use revm_primitives::EvmState;
12use std::time::Instant;
13
14struct MeteredStateHook {
16 metrics: ExecutorMetrics,
17 inner_hook: Box<dyn OnStateHook>,
18}
19
20impl OnStateHook for MeteredStateHook {
21 fn on_state(&mut self, state: &EvmState) {
22 let accounts = state.keys().len();
24 let storage_slots = state.values().map(|account| account.storage.len()).sum::<usize>();
25 let bytecodes = state
26 .values()
27 .filter(|account| !account.info.is_empty_code_hash())
28 .collect::<Vec<_>>()
29 .len();
30
31 self.metrics.accounts_loaded_histogram.record(accounts as f64);
32 self.metrics.storage_slots_loaded_histogram.record(storage_slots as f64);
33 self.metrics.bytecodes_loaded_histogram.record(bytecodes as f64);
34
35 self.inner_hook.on_state(state);
37 }
38}
39
40#[derive(Metrics, Clone)]
43#[metrics(scope = "sync.execution")]
44pub struct ExecutorMetrics {
45 pub gas_processed_total: Counter,
47 pub gas_per_second: Gauge,
49
50 pub execution_histogram: Histogram,
52 pub execution_duration: Gauge,
54
55 pub accounts_loaded_histogram: Histogram,
57 pub storage_slots_loaded_histogram: Histogram,
59 pub bytecodes_loaded_histogram: Histogram,
61
62 pub accounts_updated_histogram: Histogram,
64 pub storage_slots_updated_histogram: Histogram,
66 pub bytecodes_updated_histogram: Histogram,
68}
69
70impl ExecutorMetrics {
71 fn metered<F, R, B>(&self, block: &BlockWithSenders<B>, f: F) -> R
72 where
73 F: FnOnce() -> R,
74 B: reth_primitives_traits::Block,
75 {
76 let execute_start = Instant::now();
78 let output = f();
79 let execution_duration = execute_start.elapsed().as_secs_f64();
80
81 self.gas_processed_total.increment(block.header().gas_used());
83 self.gas_per_second.set(block.header().gas_used() as f64 / execution_duration);
84 self.execution_histogram.record(execution_duration);
85 self.execution_duration.set(execution_duration);
86
87 output
88 }
89
90 pub fn execute_metered<'a, E, DB, O, Error, B>(
98 &self,
99 executor: E,
100 input: BlockExecutionInput<'a, BlockWithSenders<B>>,
101 state_hook: Box<dyn OnStateHook>,
102 ) -> Result<BlockExecutionOutput<O>, Error>
103 where
104 E: Executor<
105 DB,
106 Input<'a> = BlockExecutionInput<'a, BlockWithSenders<B>>,
107 Output = BlockExecutionOutput<O>,
108 Error = Error,
109 >,
110 B: reth_primitives_traits::Block,
111 {
112 let wrapper = MeteredStateHook { metrics: self.clone(), inner_hook: state_hook };
116
117 let block = input.block;
119
120 let output = self.metered(block, || executor.execute_with_state_hook(input, wrapper))?;
122
123 let accounts = output.state.state.len();
125 let storage_slots =
126 output.state.state.values().map(|account| account.storage.len()).sum::<usize>();
127 let bytecodes = output.state.contracts.len();
128
129 self.accounts_updated_histogram.record(accounts as f64);
130 self.storage_slots_updated_histogram.record(storage_slots as f64);
131 self.bytecodes_updated_histogram.record(bytecodes as f64);
132
133 Ok(output)
134 }
135
136 pub fn metered_one<F, R, B>(
138 &self,
139 input: BlockExecutionInput<'_, BlockWithSenders<B>>,
140 f: F,
141 ) -> R
142 where
143 F: FnOnce(BlockExecutionInput<'_, BlockWithSenders<B>>) -> R,
144 B: reth_primitives_traits::Block,
145 {
146 self.metered(input.block, || f(input))
147 }
148}
149
150#[cfg(test)]
151mod tests {
152 use super::*;
153 use alloy_eips::eip7685::Requests;
154 use metrics_util::debugging::{DebugValue, DebuggingRecorder, Snapshotter};
155 use revm::db::BundleState;
156 use revm_primitives::{
157 Account, AccountInfo, AccountStatus, EvmState, EvmStorage, EvmStorageSlot, FlaggedStorage,
158 B256, U256,
159 };
160 use std::sync::mpsc;
161
162 struct MockExecutor {
164 state: EvmState,
165 }
166
167 impl Executor<()> for MockExecutor {
168 type Input<'a>
169 = BlockExecutionInput<'a, BlockWithSenders>
170 where
171 Self: 'a;
172 type Output = BlockExecutionOutput<()>;
173 type Error = std::convert::Infallible;
174
175 fn execute(self, _input: Self::Input<'_>) -> Result<Self::Output, Self::Error> {
176 Ok(BlockExecutionOutput {
177 state: BundleState::default(),
178 receipts: vec![],
179 requests: Requests::default(),
180 gas_used: 0,
181 })
182 }
183 fn execute_with_state_closure<F>(
184 self,
185 _input: Self::Input<'_>,
186 _state: F,
187 ) -> Result<Self::Output, Self::Error>
188 where
189 F: FnMut(&revm::State<()>),
190 {
191 Ok(BlockExecutionOutput {
192 state: BundleState::default(),
193 receipts: vec![],
194 requests: Requests::default(),
195 gas_used: 0,
196 })
197 }
198 fn execute_with_state_hook<F>(
199 self,
200 _input: Self::Input<'_>,
201 mut hook: F,
202 ) -> Result<Self::Output, Self::Error>
203 where
204 F: OnStateHook + 'static,
205 {
206 hook.on_state(&self.state);
208
209 Ok(BlockExecutionOutput {
210 state: BundleState::default(),
211 receipts: vec![],
212 requests: Requests::default(),
213 gas_used: 0,
214 })
215 }
216 }
217
218 struct ChannelStateHook {
219 output: i32,
220 sender: mpsc::Sender<i32>,
221 }
222
223 impl OnStateHook for ChannelStateHook {
224 fn on_state(&mut self, _state: &EvmState) {
225 let _ = self.sender.send(self.output);
226 }
227 }
228
229 fn setup_test_recorder() -> Snapshotter {
230 let recorder = DebuggingRecorder::new();
231 let snapshotter = recorder.snapshotter();
232 recorder.install().unwrap();
233 snapshotter
234 }
235
236 #[test]
237 fn test_executor_metrics_hook_metrics_recorded() {
238 let snapshotter = setup_test_recorder();
239 let metrics = ExecutorMetrics::default();
240
241 let input = BlockExecutionInput {
242 block: &BlockWithSenders::default(),
243 total_difficulty: Default::default(),
244 };
245
246 let (tx, _rx) = mpsc::channel();
247 let expected_output = 42;
248 let state_hook = Box::new(ChannelStateHook { sender: tx, output: expected_output });
249
250 let state = {
251 let mut state = EvmState::default();
252 let storage = EvmStorage::from_iter([(
253 U256::from(1),
254 EvmStorageSlot::new(FlaggedStorage::new_from_value(2)),
255 )]);
256 state.insert(
257 Default::default(),
258 Account {
259 info: AccountInfo {
260 balance: U256::from(100),
261 nonce: 10,
262 code_hash: B256::random(),
263 code: Default::default(),
264 },
265 storage,
266 status: AccountStatus::Loaded,
267 },
268 );
269 state
270 };
271 let executor = MockExecutor { state };
272 let _result = metrics.execute_metered(executor, input, state_hook).unwrap();
273
274 let snapshot = snapshotter.snapshot().into_vec();
275
276 for metric in snapshot {
277 let metric_name = metric.0.key().name();
278 if metric_name == "sync.execution.accounts_loaded_histogram" ||
279 metric_name == "sync.execution.storage_slots_loaded_histogram" ||
280 metric_name == "sync.execution.bytecodes_loaded_histogram"
281 {
282 if let DebugValue::Histogram(vs) = metric.3 {
283 assert!(
284 vs.iter().any(|v| v.into_inner() > 0.0),
285 "metric {metric_name} not recorded"
286 );
287 }
288 }
289 }
290 }
291
292 #[test]
293 fn test_executor_metrics_hook_called() {
294 let metrics = ExecutorMetrics::default();
295
296 let input = BlockExecutionInput {
297 block: &BlockWithSenders::default(),
298 total_difficulty: Default::default(),
299 };
300
301 let (tx, rx) = mpsc::channel();
302 let expected_output = 42;
303 let state_hook = Box::new(ChannelStateHook { sender: tx, output: expected_output });
304
305 let state = EvmState::default();
306
307 let executor = MockExecutor { state };
308 let _result = metrics.execute_metered(executor, input, state_hook).unwrap();
309
310 let actual_output = rx.try_recv().unwrap();
311 assert_eq!(actual_output, expected_output);
312 }
313}