reth_engine_tree/tree/
instrumented_state.rs

1//! Implements a state provider that tracks latency metrics.
2use alloy_primitives::{Address, StorageKey, B256};
3use metrics::{Gauge, Histogram};
4use reth_errors::ProviderResult;
5use reth_metrics::Metrics;
6use reth_primitives_traits::{Account, Bytecode};
7use reth_provider::{
8    AccountReader, BlockHashReader, HashedPostStateProvider, StateProofProvider, StateProvider,
9    StateRootProvider, StorageRootProvider,
10};
11use reth_trie::{
12    updates::TrieUpdates, AccountProof, HashedPostState, HashedStorage, MultiProof,
13    MultiProofTargets, StorageMultiProof, StorageProof, TrieInput,
14};
15use std::{
16    sync::atomic::{AtomicU64, Ordering},
17    time::{Duration, Instant},
18};
19
20// Seismic imports not used upstream
21use revm_state::FlaggedStorage as StorageValue;
22
23/// Nanoseconds per second
24const NANOS_PER_SEC: u32 = 1_000_000_000;
25
26/// An atomic version of [`Duration`], using an [`AtomicU64`] to store the total nanoseconds in the
27/// duration.
28#[derive(Default)]
29pub(crate) struct AtomicDuration {
30    /// The nanoseconds part of the duration
31    ///
32    /// We would have to accumulate 584 years of nanoseconds to overflow a u64, so this is
33    /// sufficiently large for our use case. We don't expect to be adding arbitrary durations to
34    /// this value.
35    nanos: AtomicU64,
36}
37
38impl AtomicDuration {
39    /// Returns a zero duration.
40    pub(crate) const fn zero() -> Self {
41        Self { nanos: AtomicU64::new(0) }
42    }
43
44    /// Returns the duration as a [`Duration`]
45    pub(crate) fn duration(&self) -> Duration {
46        let nanos = self.nanos.load(Ordering::Relaxed);
47        let seconds = nanos / NANOS_PER_SEC as u64;
48        let nanos = nanos % NANOS_PER_SEC as u64;
49        // `as u32` is ok because we did a mod by u32 const
50        Duration::new(seconds, nanos as u32)
51    }
52
53    /// Adds a [`Duration`] to the atomic duration.
54    pub(crate) fn add_duration(&self, duration: Duration) {
55        // this is `as_nanos` but without the `as u128` - we do not expect durations over 584 years
56        // as input here
57        let total_nanos =
58            duration.as_secs() * NANOS_PER_SEC as u64 + duration.subsec_nanos() as u64;
59        // add the nanoseconds part of the duration
60        self.nanos.fetch_add(total_nanos, Ordering::Relaxed);
61    }
62}
63
64/// A wrapper of a state provider and latency metrics.
65pub(crate) struct InstrumentedStateProvider<S> {
66    /// The state provider
67    state_provider: S,
68
69    /// Metrics for the instrumented state provider
70    metrics: StateProviderMetrics,
71
72    /// The total time we spend fetching storage over the lifetime of this state provider
73    total_storage_fetch_latency: AtomicDuration,
74
75    /// The total time we spend fetching code over the lifetime of this state provider
76    total_code_fetch_latency: AtomicDuration,
77
78    /// The total time we spend fetching accounts over the lifetime of this state provider
79    total_account_fetch_latency: AtomicDuration,
80}
81
82impl<S> InstrumentedStateProvider<S>
83where
84    S: StateProvider,
85{
86    /// Creates a new [`InstrumentedStateProvider`] from a state provider
87    pub(crate) fn from_state_provider(state_provider: S) -> Self {
88        Self {
89            state_provider,
90            metrics: StateProviderMetrics::default(),
91            total_storage_fetch_latency: AtomicDuration::zero(),
92            total_code_fetch_latency: AtomicDuration::zero(),
93            total_account_fetch_latency: AtomicDuration::zero(),
94        }
95    }
96}
97
98impl<S> InstrumentedStateProvider<S> {
99    /// Records the latency for a storage fetch, and increments the duration counter for the storage
100    /// fetch.
101    fn record_storage_fetch(&self, latency: Duration) {
102        self.metrics.storage_fetch_latency.record(latency);
103        self.total_storage_fetch_latency.add_duration(latency);
104    }
105
106    /// Records the latency for a code fetch, and increments the duration counter for the code
107    /// fetch.
108    fn record_code_fetch(&self, latency: Duration) {
109        self.metrics.code_fetch_latency.record(latency);
110        self.total_code_fetch_latency.add_duration(latency);
111    }
112
113    /// Records the latency for an account fetch, and increments the duration counter for the
114    /// account fetch.
115    fn record_account_fetch(&self, latency: Duration) {
116        self.metrics.account_fetch_latency.record(latency);
117        self.total_account_fetch_latency.add_duration(latency);
118    }
119
120    /// Records the total latencies into their respective gauges and histograms.
121    pub(crate) fn record_total_latency(&self) {
122        let total_storage_fetch_latency = self.total_storage_fetch_latency.duration();
123        self.metrics.total_storage_fetch_latency.record(total_storage_fetch_latency);
124        self.metrics
125            .total_storage_fetch_latency_gauge
126            .set(total_storage_fetch_latency.as_secs_f64());
127
128        let total_code_fetch_latency = self.total_code_fetch_latency.duration();
129        self.metrics.total_code_fetch_latency.record(total_code_fetch_latency);
130        self.metrics.total_code_fetch_latency_gauge.set(total_code_fetch_latency.as_secs_f64());
131
132        let total_account_fetch_latency = self.total_account_fetch_latency.duration();
133        self.metrics.total_account_fetch_latency.record(total_account_fetch_latency);
134        self.metrics
135            .total_account_fetch_latency_gauge
136            .set(total_account_fetch_latency.as_secs_f64());
137    }
138}
139
140/// Metrics for the instrumented state provider
141#[derive(Metrics, Clone)]
142#[metrics(scope = "sync.state_provider")]
143pub(crate) struct StateProviderMetrics {
144    /// A histogram of the time it takes to get a storage value
145    storage_fetch_latency: Histogram,
146
147    /// A histogram of the time it takes to get a code value
148    code_fetch_latency: Histogram,
149
150    /// A histogram of the time it takes to get an account value
151    account_fetch_latency: Histogram,
152
153    /// A histogram of the total time we spend fetching storage over the lifetime of this state
154    /// provider
155    total_storage_fetch_latency: Histogram,
156
157    /// A gauge of the total time we spend fetching storage over the lifetime of this state
158    /// provider
159    total_storage_fetch_latency_gauge: Gauge,
160
161    /// A histogram of the total time we spend fetching code over the lifetime of this state
162    /// provider
163    total_code_fetch_latency: Histogram,
164
165    /// A gauge of the total time we spend fetching code over the lifetime of this state provider
166    total_code_fetch_latency_gauge: Gauge,
167
168    /// A histogram of the total time we spend fetching accounts over the lifetime of this state
169    /// provider
170    total_account_fetch_latency: Histogram,
171
172    /// A gauge of the total time we spend fetching accounts over the lifetime of this state
173    /// provider
174    total_account_fetch_latency_gauge: Gauge,
175}
176
177impl<S: AccountReader> AccountReader for InstrumentedStateProvider<S> {
178    fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
179        let start = Instant::now();
180        let res = self.state_provider.basic_account(address);
181        self.record_account_fetch(start.elapsed());
182        res
183    }
184}
185
186impl<S: StateProvider> StateProvider for InstrumentedStateProvider<S> {
187    fn storage(
188        &self,
189        account: Address,
190        storage_key: StorageKey,
191    ) -> ProviderResult<Option<StorageValue>> {
192        let start = Instant::now();
193        let res = self.state_provider.storage(account, storage_key);
194        self.record_storage_fetch(start.elapsed());
195        res
196    }
197
198    fn bytecode_by_hash(&self, code_hash: &B256) -> ProviderResult<Option<Bytecode>> {
199        let start = Instant::now();
200        let res = self.state_provider.bytecode_by_hash(code_hash);
201        self.record_code_fetch(start.elapsed());
202        res
203    }
204}
205
206impl<S: StateRootProvider> StateRootProvider for InstrumentedStateProvider<S> {
207    fn state_root(&self, hashed_state: HashedPostState) -> ProviderResult<B256> {
208        self.state_provider.state_root(hashed_state)
209    }
210
211    fn state_root_from_nodes(&self, input: TrieInput) -> ProviderResult<B256> {
212        self.state_provider.state_root_from_nodes(input)
213    }
214
215    fn state_root_with_updates(
216        &self,
217        hashed_state: HashedPostState,
218    ) -> ProviderResult<(B256, TrieUpdates)> {
219        self.state_provider.state_root_with_updates(hashed_state)
220    }
221
222    fn state_root_from_nodes_with_updates(
223        &self,
224        input: TrieInput,
225    ) -> ProviderResult<(B256, TrieUpdates)> {
226        self.state_provider.state_root_from_nodes_with_updates(input)
227    }
228}
229
230impl<S: StateProofProvider> StateProofProvider for InstrumentedStateProvider<S> {
231    fn proof(
232        &self,
233        input: TrieInput,
234        address: Address,
235        slots: &[B256],
236    ) -> ProviderResult<AccountProof> {
237        self.state_provider.proof(input, address, slots)
238    }
239
240    fn multiproof(
241        &self,
242        input: TrieInput,
243        targets: MultiProofTargets,
244    ) -> ProviderResult<MultiProof> {
245        self.state_provider.multiproof(input, targets)
246    }
247
248    fn witness(
249        &self,
250        input: TrieInput,
251        target: HashedPostState,
252    ) -> ProviderResult<Vec<alloy_primitives::Bytes>> {
253        self.state_provider.witness(input, target)
254    }
255}
256
257impl<S: StorageRootProvider> StorageRootProvider for InstrumentedStateProvider<S> {
258    fn storage_root(
259        &self,
260        address: Address,
261        hashed_storage: HashedStorage,
262    ) -> ProviderResult<B256> {
263        self.state_provider.storage_root(address, hashed_storage)
264    }
265
266    fn storage_proof(
267        &self,
268        address: Address,
269        slot: B256,
270        hashed_storage: HashedStorage,
271    ) -> ProviderResult<StorageProof> {
272        self.state_provider.storage_proof(address, slot, hashed_storage)
273    }
274
275    fn storage_multiproof(
276        &self,
277        address: Address,
278        slots: &[B256],
279        hashed_storage: HashedStorage,
280    ) -> ProviderResult<StorageMultiProof> {
281        self.state_provider.storage_multiproof(address, slots, hashed_storage)
282    }
283}
284
285impl<S: BlockHashReader> BlockHashReader for InstrumentedStateProvider<S> {
286    fn block_hash(&self, number: alloy_primitives::BlockNumber) -> ProviderResult<Option<B256>> {
287        self.state_provider.block_hash(number)
288    }
289
290    fn canonical_hashes_range(
291        &self,
292        start: alloy_primitives::BlockNumber,
293        end: alloy_primitives::BlockNumber,
294    ) -> ProviderResult<Vec<B256>> {
295        self.state_provider.canonical_hashes_range(start, end)
296    }
297}
298
299impl<S: HashedPostStateProvider> HashedPostStateProvider for InstrumentedStateProvider<S> {
300    fn hashed_post_state(&self, bundle_state: &reth_revm::db::BundleState) -> HashedPostState {
301        self.state_provider.hashed_post_state(bundle_state)
302    }
303}