reth_db/
metrics.rs

1use crate::Tables;
2use metrics::{Gauge, Histogram};
3use reth_metrics::{metrics::Counter, Metrics};
4use rustc_hash::FxHashMap;
5use std::time::{Duration, Instant};
6use strum::{EnumCount, EnumIter, IntoEnumIterator};
7
8const LARGE_VALUE_THRESHOLD_BYTES: usize = 4096;
9
10/// Caches metric handles for database environment to make sure handles are not re-created
11/// on every operation.
12///
13/// Requires a metric recorder to be registered before creating an instance of this struct.
14/// Otherwise, metric recording will no-op.
15#[derive(Debug)]
16pub(crate) struct DatabaseEnvMetrics {
17    /// Caches `OperationMetrics` handles for each table and operation tuple.
18    operations: FxHashMap<(&'static str, Operation), OperationMetrics>,
19    /// Caches `TransactionMetrics` handles for counters grouped by only transaction mode.
20    /// Updated both at tx open and close.
21    transactions: FxHashMap<TransactionMode, TransactionMetrics>,
22    /// Caches `TransactionOutcomeMetrics` handles for counters grouped by transaction mode and
23    /// outcome. Can only be updated at tx close, as outcome is only known at that point.
24    transaction_outcomes:
25        FxHashMap<(TransactionMode, TransactionOutcome), TransactionOutcomeMetrics>,
26}
27
28impl DatabaseEnvMetrics {
29    pub(crate) fn new() -> Self {
30        // Pre-populate metric handle maps with all possible combinations of labels
31        // to avoid runtime locks on the map when recording metrics.
32        Self {
33            operations: Self::generate_operation_handles(),
34            transactions: Self::generate_transaction_handles(),
35            transaction_outcomes: Self::generate_transaction_outcome_handles(),
36        }
37    }
38
39    /// Generate a map of all possible operation handles for each table and operation tuple.
40    /// Used for tracking all operation metrics.
41    fn generate_operation_handles() -> FxHashMap<(&'static str, Operation), OperationMetrics> {
42        let mut operations = FxHashMap::with_capacity_and_hasher(
43            Tables::COUNT * Operation::COUNT,
44            Default::default(),
45        );
46        for table in Tables::ALL {
47            for operation in Operation::iter() {
48                operations.insert(
49                    (table.name(), operation),
50                    OperationMetrics::new_with_labels(&[
51                        (Labels::Table.as_str(), table.name()),
52                        (Labels::Operation.as_str(), operation.as_str()),
53                    ]),
54                );
55            }
56        }
57        operations
58    }
59
60    /// Generate a map of all possible transaction modes to metric handles.
61    /// Used for tracking a counter of open transactions.
62    fn generate_transaction_handles() -> FxHashMap<TransactionMode, TransactionMetrics> {
63        TransactionMode::iter()
64            .map(|mode| {
65                (
66                    mode,
67                    TransactionMetrics::new_with_labels(&[(
68                        Labels::TransactionMode.as_str(),
69                        mode.as_str(),
70                    )]),
71                )
72            })
73            .collect()
74    }
75
76    /// Generate a map of all possible transaction mode and outcome handles.
77    /// Used for tracking various stats for finished transactions (e.g. commit duration).
78    fn generate_transaction_outcome_handles(
79    ) -> FxHashMap<(TransactionMode, TransactionOutcome), TransactionOutcomeMetrics> {
80        let mut transaction_outcomes = FxHashMap::with_capacity_and_hasher(
81            TransactionMode::COUNT * TransactionOutcome::COUNT,
82            Default::default(),
83        );
84        for mode in TransactionMode::iter() {
85            for outcome in TransactionOutcome::iter() {
86                transaction_outcomes.insert(
87                    (mode, outcome),
88                    TransactionOutcomeMetrics::new_with_labels(&[
89                        (Labels::TransactionMode.as_str(), mode.as_str()),
90                        (Labels::TransactionOutcome.as_str(), outcome.as_str()),
91                    ]),
92                );
93            }
94        }
95        transaction_outcomes
96    }
97
98    /// Record a metric for database operation executed in `f`.
99    /// Panics if a metric recorder is not found for the given table and operation.
100    pub(crate) fn record_operation<R>(
101        &self,
102        table: &'static str,
103        operation: Operation,
104        value_size: Option<usize>,
105        f: impl FnOnce() -> R,
106    ) -> R {
107        if let Some(metrics) = self.operations.get(&(table, operation)) {
108            metrics.record(value_size, f)
109        } else {
110            f()
111        }
112    }
113
114    /// Record metrics for opening a database transaction.
115    pub(crate) fn record_opened_transaction(&self, mode: TransactionMode) {
116        self.transactions
117            .get(&mode)
118            .expect("transaction mode metric handle not found")
119            .record_open();
120    }
121
122    /// Record metrics for closing a database transactions.
123    #[cfg(feature = "mdbx")]
124    pub(crate) fn record_closed_transaction(
125        &self,
126        mode: TransactionMode,
127        outcome: TransactionOutcome,
128        open_duration: Duration,
129        close_duration: Option<Duration>,
130        commit_latency: Option<reth_libmdbx::CommitLatency>,
131    ) {
132        self.transactions
133            .get(&mode)
134            .expect("transaction mode metric handle not found")
135            .record_close();
136
137        self.transaction_outcomes
138            .get(&(mode, outcome))
139            .expect("transaction outcome metric handle not found")
140            .record(open_duration, close_duration, commit_latency);
141    }
142}
143
144/// Transaction mode for the database, either read-only or read-write.
145#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, EnumCount, EnumIter)]
146pub(crate) enum TransactionMode {
147    /// Read-only transaction mode.
148    ReadOnly,
149    /// Read-write transaction mode.
150    ReadWrite,
151}
152
153impl TransactionMode {
154    /// Returns the transaction mode as a string.
155    pub(crate) const fn as_str(&self) -> &'static str {
156        match self {
157            Self::ReadOnly => "read-only",
158            Self::ReadWrite => "read-write",
159        }
160    }
161
162    /// Returns `true` if the transaction mode is read-only.
163    pub(crate) const fn is_read_only(&self) -> bool {
164        matches!(self, Self::ReadOnly)
165    }
166}
167
168/// Transaction outcome after a database operation - commit, abort, or drop.
169#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, EnumCount, EnumIter)]
170pub(crate) enum TransactionOutcome {
171    /// Successful commit of the transaction.
172    Commit,
173    /// Aborted transaction.
174    Abort,
175    /// Dropped transaction.
176    Drop,
177}
178
179impl TransactionOutcome {
180    /// Returns the transaction outcome as a string.
181    pub(crate) const fn as_str(&self) -> &'static str {
182        match self {
183            Self::Commit => "commit",
184            Self::Abort => "abort",
185            Self::Drop => "drop",
186        }
187    }
188
189    /// Returns `true` if the transaction outcome is a commit.
190    pub(crate) const fn is_commit(&self) -> bool {
191        matches!(self, Self::Commit)
192    }
193}
194
195/// Types of operations conducted on the database: get, put, delete, and various cursor operations.
196#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, EnumCount, EnumIter)]
197pub(crate) enum Operation {
198    /// Database get operation.
199    Get,
200    /// Database put operation.
201    Put,
202    /// Database delete operation.
203    Delete,
204    /// Database cursor upsert operation.
205    CursorUpsert,
206    /// Database cursor insert operation.
207    CursorInsert,
208    /// Database cursor append operation.
209    CursorAppend,
210    /// Database cursor append duplicates operation.
211    CursorAppendDup,
212    /// Database cursor delete current operation.
213    CursorDeleteCurrent,
214    /// Database cursor delete current duplicates operation.
215    CursorDeleteCurrentDuplicates,
216}
217
218impl Operation {
219    /// Returns the operation as a string.
220    pub(crate) const fn as_str(&self) -> &'static str {
221        match self {
222            Self::Get => "get",
223            Self::Put => "put",
224            Self::Delete => "delete",
225            Self::CursorUpsert => "cursor-upsert",
226            Self::CursorInsert => "cursor-insert",
227            Self::CursorAppend => "cursor-append",
228            Self::CursorAppendDup => "cursor-append-dup",
229            Self::CursorDeleteCurrent => "cursor-delete-current",
230            Self::CursorDeleteCurrentDuplicates => "cursor-delete-current-duplicates",
231        }
232    }
233}
234
235/// Enum defining labels for various aspects used in metrics.
236enum Labels {
237    /// Label representing a table.
238    Table,
239    /// Label representing a transaction mode.
240    TransactionMode,
241    /// Label representing a transaction outcome.
242    TransactionOutcome,
243    /// Label representing a database operation.
244    Operation,
245}
246
247impl Labels {
248    /// Converts each label variant into its corresponding string representation.
249    pub(crate) const fn as_str(&self) -> &'static str {
250        match self {
251            Self::Table => "table",
252            Self::TransactionMode => "mode",
253            Self::TransactionOutcome => "outcome",
254            Self::Operation => "operation",
255        }
256    }
257}
258
259#[derive(Metrics, Clone)]
260#[metrics(scope = "database.transaction")]
261pub(crate) struct TransactionMetrics {
262    /// Total number of currently open database transactions
263    open_total: Gauge,
264}
265
266impl TransactionMetrics {
267    pub(crate) fn record_open(&self) {
268        self.open_total.increment(1.0);
269    }
270
271    pub(crate) fn record_close(&self) {
272        self.open_total.decrement(1.0);
273    }
274}
275
276#[derive(Metrics, Clone)]
277#[metrics(scope = "database.transaction")]
278pub(crate) struct TransactionOutcomeMetrics {
279    /// The time a database transaction has been open
280    open_duration_seconds: Histogram,
281    /// The time it took to close a database transaction
282    close_duration_seconds: Histogram,
283    /// The time it took to prepare a transaction commit
284    commit_preparation_duration_seconds: Histogram,
285    /// Duration of GC update during transaction commit by wall clock
286    commit_gc_wallclock_duration_seconds: Histogram,
287    /// The time it took to conduct audit of a transaction commit
288    commit_audit_duration_seconds: Histogram,
289    /// The time it took to write dirty/modified data pages to a filesystem during transaction
290    /// commit
291    commit_write_duration_seconds: Histogram,
292    /// The time it took to sync written data to the disk/storage during transaction commit
293    commit_sync_duration_seconds: Histogram,
294    /// The time it took to release resources during transaction commit
295    commit_ending_duration_seconds: Histogram,
296    /// The total duration of a transaction commit
297    commit_whole_duration_seconds: Histogram,
298    /// User-mode CPU time spent on GC update during transaction commit
299    commit_gc_cputime_duration_seconds: Histogram,
300}
301
302impl TransactionOutcomeMetrics {
303    /// Record transaction closing with the duration it was open and the duration it took to close
304    /// it.
305    #[cfg(feature = "mdbx")]
306    pub(crate) fn record(
307        &self,
308        open_duration: Duration,
309        close_duration: Option<Duration>,
310        commit_latency: Option<reth_libmdbx::CommitLatency>,
311    ) {
312        self.open_duration_seconds.record(open_duration);
313
314        if let Some(close_duration) = close_duration {
315            self.close_duration_seconds.record(close_duration)
316        }
317
318        if let Some(commit_latency) = commit_latency {
319            self.commit_preparation_duration_seconds.record(commit_latency.preparation());
320            self.commit_gc_wallclock_duration_seconds.record(commit_latency.gc_wallclock());
321            self.commit_audit_duration_seconds.record(commit_latency.audit());
322            self.commit_write_duration_seconds.record(commit_latency.write());
323            self.commit_sync_duration_seconds.record(commit_latency.sync());
324            self.commit_ending_duration_seconds.record(commit_latency.ending());
325            self.commit_whole_duration_seconds.record(commit_latency.whole());
326            self.commit_gc_cputime_duration_seconds.record(commit_latency.gc_cputime());
327        }
328    }
329}
330
331#[derive(Metrics, Clone)]
332#[metrics(scope = "database.operation")]
333pub(crate) struct OperationMetrics {
334    /// Total number of database operations made
335    calls_total: Counter,
336    /// The time it took to execute a database operation (`put/upsert/insert/append/append_dup`)
337    /// with value larger than [`LARGE_VALUE_THRESHOLD_BYTES`] bytes.
338    large_value_duration_seconds: Histogram,
339}
340
341impl OperationMetrics {
342    /// Record operation metric.
343    ///
344    /// The duration it took to execute the closure is recorded only if the provided `value_size` is
345    /// larger than [`LARGE_VALUE_THRESHOLD_BYTES`].
346    pub(crate) fn record<R>(&self, value_size: Option<usize>, f: impl FnOnce() -> R) -> R {
347        self.calls_total.increment(1);
348
349        // Record duration only for large values to prevent the performance hit of clock syscall
350        // on small operations
351        if value_size.is_some_and(|size| size > LARGE_VALUE_THRESHOLD_BYTES) {
352            let start = Instant::now();
353            let result = f();
354            self.large_value_duration_seconds.record(start.elapsed());
355            result
356        } else {
357            f()
358        }
359    }
360}