1use alloy_primitives::{Address, StorageKey, B256};
3use metrics::{Gauge, Histogram};
4use reth_errors::ProviderResult;
5use reth_metrics::Metrics;
6use reth_primitives_traits::{Account, Bytecode};
7use reth_provider::{
8 AccountReader, BlockHashReader, HashedPostStateProvider, StateProofProvider, StateProvider,
9 StateRootProvider, StorageRootProvider,
10};
11use reth_trie::{
12 updates::TrieUpdates, AccountProof, HashedPostState, HashedStorage, MultiProof,
13 MultiProofTargets, StorageMultiProof, StorageProof, TrieInput,
14};
15use std::{
16 sync::atomic::{AtomicU64, Ordering},
17 time::{Duration, Instant},
18};
19
20use revm_state::FlaggedStorage as StorageValue;
22
23const NANOS_PER_SEC: u32 = 1_000_000_000;
25
26#[derive(Default)]
29pub(crate) struct AtomicDuration {
30 nanos: AtomicU64,
36}
37
38impl AtomicDuration {
39 pub(crate) const fn zero() -> Self {
41 Self { nanos: AtomicU64::new(0) }
42 }
43
44 pub(crate) fn duration(&self) -> Duration {
46 let nanos = self.nanos.load(Ordering::Relaxed);
47 let seconds = nanos / NANOS_PER_SEC as u64;
48 let nanos = nanos % NANOS_PER_SEC as u64;
49 Duration::new(seconds, nanos as u32)
51 }
52
53 pub(crate) fn add_duration(&self, duration: Duration) {
55 let total_nanos =
58 duration.as_secs() * NANOS_PER_SEC as u64 + duration.subsec_nanos() as u64;
59 self.nanos.fetch_add(total_nanos, Ordering::Relaxed);
61 }
62}
63
64pub(crate) struct InstrumentedStateProvider<S> {
66 state_provider: S,
68
69 metrics: StateProviderMetrics,
71
72 total_storage_fetch_latency: AtomicDuration,
74
75 total_code_fetch_latency: AtomicDuration,
77
78 total_account_fetch_latency: AtomicDuration,
80}
81
82impl<S> InstrumentedStateProvider<S>
83where
84 S: StateProvider,
85{
86 pub(crate) fn from_state_provider(state_provider: S) -> Self {
88 Self {
89 state_provider,
90 metrics: StateProviderMetrics::default(),
91 total_storage_fetch_latency: AtomicDuration::zero(),
92 total_code_fetch_latency: AtomicDuration::zero(),
93 total_account_fetch_latency: AtomicDuration::zero(),
94 }
95 }
96}
97
98impl<S> InstrumentedStateProvider<S> {
99 fn record_storage_fetch(&self, latency: Duration) {
102 self.metrics.storage_fetch_latency.record(latency);
103 self.total_storage_fetch_latency.add_duration(latency);
104 }
105
106 fn record_code_fetch(&self, latency: Duration) {
109 self.metrics.code_fetch_latency.record(latency);
110 self.total_code_fetch_latency.add_duration(latency);
111 }
112
113 fn record_account_fetch(&self, latency: Duration) {
116 self.metrics.account_fetch_latency.record(latency);
117 self.total_account_fetch_latency.add_duration(latency);
118 }
119
120 pub(crate) fn record_total_latency(&self) {
122 let total_storage_fetch_latency = self.total_storage_fetch_latency.duration();
123 self.metrics.total_storage_fetch_latency.record(total_storage_fetch_latency);
124 self.metrics
125 .total_storage_fetch_latency_gauge
126 .set(total_storage_fetch_latency.as_secs_f64());
127
128 let total_code_fetch_latency = self.total_code_fetch_latency.duration();
129 self.metrics.total_code_fetch_latency.record(total_code_fetch_latency);
130 self.metrics.total_code_fetch_latency_gauge.set(total_code_fetch_latency.as_secs_f64());
131
132 let total_account_fetch_latency = self.total_account_fetch_latency.duration();
133 self.metrics.total_account_fetch_latency.record(total_account_fetch_latency);
134 self.metrics
135 .total_account_fetch_latency_gauge
136 .set(total_account_fetch_latency.as_secs_f64());
137 }
138}
139
140#[derive(Metrics, Clone)]
142#[metrics(scope = "sync.state_provider")]
143pub(crate) struct StateProviderMetrics {
144 storage_fetch_latency: Histogram,
146
147 code_fetch_latency: Histogram,
149
150 account_fetch_latency: Histogram,
152
153 total_storage_fetch_latency: Histogram,
156
157 total_storage_fetch_latency_gauge: Gauge,
160
161 total_code_fetch_latency: Histogram,
164
165 total_code_fetch_latency_gauge: Gauge,
167
168 total_account_fetch_latency: Histogram,
171
172 total_account_fetch_latency_gauge: Gauge,
175}
176
177impl<S: AccountReader> AccountReader for InstrumentedStateProvider<S> {
178 fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
179 let start = Instant::now();
180 let res = self.state_provider.basic_account(address);
181 self.record_account_fetch(start.elapsed());
182 res
183 }
184}
185
186impl<S: StateProvider> StateProvider for InstrumentedStateProvider<S> {
187 fn storage(
188 &self,
189 account: Address,
190 storage_key: StorageKey,
191 ) -> ProviderResult<Option<StorageValue>> {
192 let start = Instant::now();
193 let res = self.state_provider.storage(account, storage_key);
194 self.record_storage_fetch(start.elapsed());
195 res
196 }
197
198 fn bytecode_by_hash(&self, code_hash: &B256) -> ProviderResult<Option<Bytecode>> {
199 let start = Instant::now();
200 let res = self.state_provider.bytecode_by_hash(code_hash);
201 self.record_code_fetch(start.elapsed());
202 res
203 }
204}
205
206impl<S: StateRootProvider> StateRootProvider for InstrumentedStateProvider<S> {
207 fn state_root(&self, hashed_state: HashedPostState) -> ProviderResult<B256> {
208 self.state_provider.state_root(hashed_state)
209 }
210
211 fn state_root_from_nodes(&self, input: TrieInput) -> ProviderResult<B256> {
212 self.state_provider.state_root_from_nodes(input)
213 }
214
215 fn state_root_with_updates(
216 &self,
217 hashed_state: HashedPostState,
218 ) -> ProviderResult<(B256, TrieUpdates)> {
219 self.state_provider.state_root_with_updates(hashed_state)
220 }
221
222 fn state_root_from_nodes_with_updates(
223 &self,
224 input: TrieInput,
225 ) -> ProviderResult<(B256, TrieUpdates)> {
226 self.state_provider.state_root_from_nodes_with_updates(input)
227 }
228}
229
230impl<S: StateProofProvider> StateProofProvider for InstrumentedStateProvider<S> {
231 fn proof(
232 &self,
233 input: TrieInput,
234 address: Address,
235 slots: &[B256],
236 ) -> ProviderResult<AccountProof> {
237 self.state_provider.proof(input, address, slots)
238 }
239
240 fn multiproof(
241 &self,
242 input: TrieInput,
243 targets: MultiProofTargets,
244 ) -> ProviderResult<MultiProof> {
245 self.state_provider.multiproof(input, targets)
246 }
247
248 fn witness(
249 &self,
250 input: TrieInput,
251 target: HashedPostState,
252 ) -> ProviderResult<Vec<alloy_primitives::Bytes>> {
253 self.state_provider.witness(input, target)
254 }
255}
256
257impl<S: StorageRootProvider> StorageRootProvider for InstrumentedStateProvider<S> {
258 fn storage_root(
259 &self,
260 address: Address,
261 hashed_storage: HashedStorage,
262 ) -> ProviderResult<B256> {
263 self.state_provider.storage_root(address, hashed_storage)
264 }
265
266 fn storage_proof(
267 &self,
268 address: Address,
269 slot: B256,
270 hashed_storage: HashedStorage,
271 ) -> ProviderResult<StorageProof> {
272 self.state_provider.storage_proof(address, slot, hashed_storage)
273 }
274
275 fn storage_multiproof(
276 &self,
277 address: Address,
278 slots: &[B256],
279 hashed_storage: HashedStorage,
280 ) -> ProviderResult<StorageMultiProof> {
281 self.state_provider.storage_multiproof(address, slots, hashed_storage)
282 }
283}
284
285impl<S: BlockHashReader> BlockHashReader for InstrumentedStateProvider<S> {
286 fn block_hash(&self, number: alloy_primitives::BlockNumber) -> ProviderResult<Option<B256>> {
287 self.state_provider.block_hash(number)
288 }
289
290 fn canonical_hashes_range(
291 &self,
292 start: alloy_primitives::BlockNumber,
293 end: alloy_primitives::BlockNumber,
294 ) -> ProviderResult<Vec<B256>> {
295 self.state_provider.canonical_hashes_range(start, end)
296 }
297}
298
299impl<S: HashedPostStateProvider> HashedPostStateProvider for InstrumentedStateProvider<S> {
300 fn hashed_post_state(&self, bundle_state: &reth_revm::db::BundleState) -> HashedPostState {
301 self.state_provider.hashed_post_state(bundle_state)
302 }
303}