reth_db/implementation/mdbx/
tx.rs

1//! Transaction wrapper for libmdbx-sys.
2
3use super::cursor::Cursor;
4use crate::{
5    metrics::{DatabaseEnvMetrics, Operation, TransactionMode, TransactionOutcome},
6    tables::utils::decode_one,
7    DatabaseError,
8};
9use reth_db_api::{
10    table::{Compress, DupSort, Encode, Table, TableImporter},
11    transaction::{DbTx, DbTxMut},
12};
13use reth_libmdbx::{ffi::MDBX_dbi, CommitLatency, Transaction, TransactionKind, WriteFlags, RW};
14use reth_storage_errors::db::{DatabaseWriteError, DatabaseWriteOperation};
15use reth_tracing::tracing::{debug, trace, warn};
16use std::{
17    backtrace::Backtrace,
18    marker::PhantomData,
19    sync::{
20        atomic::{AtomicBool, Ordering},
21        Arc,
22    },
23    time::{Duration, Instant},
24};
25
26/// Duration after which we emit the log about long-lived database transactions.
27const LONG_TRANSACTION_DURATION: Duration = Duration::from_secs(60);
28
29/// Wrapper for the libmdbx transaction.
30#[derive(Debug)]
31pub struct Tx<K: TransactionKind> {
32    /// Libmdbx-sys transaction.
33    pub inner: Transaction<K>,
34
35    /// Handler for metrics with its own [Drop] implementation for cases when the transaction isn't
36    /// closed by [`Tx::commit`] or [`Tx::abort`], but we still need to report it in the metrics.
37    ///
38    /// If [Some], then metrics are reported.
39    metrics_handler: Option<MetricsHandler<K>>,
40}
41
42impl<K: TransactionKind> Tx<K> {
43    /// Creates new `Tx` object with a `RO` or `RW` transaction.
44    #[inline]
45    pub const fn new(inner: Transaction<K>) -> Self {
46        Self::new_inner(inner, None)
47    }
48
49    /// Creates new `Tx` object with a `RO` or `RW` transaction and optionally enables metrics.
50    #[inline]
51    #[track_caller]
52    pub(crate) fn new_with_metrics(
53        inner: Transaction<K>,
54        env_metrics: Option<Arc<DatabaseEnvMetrics>>,
55    ) -> reth_libmdbx::Result<Self> {
56        let metrics_handler = env_metrics
57            .map(|env_metrics| {
58                let handler = MetricsHandler::<K>::new(inner.id()?, env_metrics);
59                handler.env_metrics.record_opened_transaction(handler.transaction_mode());
60                handler.log_transaction_opened();
61                Ok(handler)
62            })
63            .transpose()?;
64        Ok(Self::new_inner(inner, metrics_handler))
65    }
66
67    #[inline]
68    const fn new_inner(inner: Transaction<K>, metrics_handler: Option<MetricsHandler<K>>) -> Self {
69        Self { inner, metrics_handler }
70    }
71
72    /// Gets this transaction ID.
73    pub fn id(&self) -> reth_libmdbx::Result<u64> {
74        self.metrics_handler.as_ref().map_or_else(|| self.inner.id(), |handler| Ok(handler.txn_id))
75    }
76
77    /// Gets a table database handle if it exists, otherwise creates it.
78    pub fn get_dbi<T: Table>(&self) -> Result<MDBX_dbi, DatabaseError> {
79        self.inner
80            .open_db(Some(T::NAME))
81            .map(|db| db.dbi())
82            .map_err(|e| DatabaseError::Open(e.into()))
83    }
84
85    /// Create db Cursor
86    pub fn new_cursor<T: Table>(&self) -> Result<Cursor<K, T>, DatabaseError> {
87        let inner = self
88            .inner
89            .cursor_with_dbi(self.get_dbi::<T>()?)
90            .map_err(|e| DatabaseError::InitCursor(e.into()))?;
91
92        Ok(Cursor::new_with_metrics(
93            inner,
94            self.metrics_handler.as_ref().map(|h| h.env_metrics.clone()),
95        ))
96    }
97
98    /// If `self.metrics_handler == Some(_)`, measure the time it takes to execute the closure and
99    /// record a metric with the provided transaction outcome.
100    ///
101    /// Otherwise, just execute the closure.
102    fn execute_with_close_transaction_metric<R>(
103        mut self,
104        outcome: TransactionOutcome,
105        f: impl FnOnce(Self) -> (R, Option<CommitLatency>),
106    ) -> R {
107        let run = |tx| {
108            let start = Instant::now();
109            let (result, commit_latency) = f(tx);
110            let total_duration = start.elapsed();
111
112            if outcome.is_commit() {
113                debug!(
114                    target: "storage::db::mdbx",
115                    ?total_duration,
116                    ?commit_latency,
117                    is_read_only = K::IS_READ_ONLY,
118                    "Commit"
119                );
120            }
121
122            (result, commit_latency, total_duration)
123        };
124
125        if let Some(mut metrics_handler) = self.metrics_handler.take() {
126            metrics_handler.close_recorded = true;
127            metrics_handler.log_backtrace_on_long_read_transaction();
128
129            let (result, commit_latency, close_duration) = run(self);
130            let open_duration = metrics_handler.start.elapsed();
131            metrics_handler.env_metrics.record_closed_transaction(
132                metrics_handler.transaction_mode(),
133                outcome,
134                open_duration,
135                Some(close_duration),
136                commit_latency,
137            );
138
139            result
140        } else {
141            run(self).0
142        }
143    }
144
145    /// If `self.metrics_handler == Some(_)`, measure the time it takes to execute the closure and
146    /// record a metric with the provided operation.
147    ///
148    /// Otherwise, just execute the closure.
149    fn execute_with_operation_metric<T: Table, R>(
150        &self,
151        operation: Operation,
152        value_size: Option<usize>,
153        f: impl FnOnce(&Transaction<K>) -> R,
154    ) -> R {
155        if let Some(metrics_handler) = &self.metrics_handler {
156            metrics_handler.log_backtrace_on_long_read_transaction();
157            metrics_handler
158                .env_metrics
159                .record_operation(T::NAME, operation, value_size, || f(&self.inner))
160        } else {
161            f(&self.inner)
162        }
163    }
164}
165
166#[derive(Debug)]
167struct MetricsHandler<K: TransactionKind> {
168    /// Cached internal transaction ID provided by libmdbx.
169    txn_id: u64,
170    /// The time when transaction has started.
171    start: Instant,
172    /// Duration after which we emit the log about long-lived database transactions.
173    long_transaction_duration: Duration,
174    /// If `true`, the metric about transaction closing has already been recorded and we don't need
175    /// to do anything on [`Drop::drop`].
176    close_recorded: bool,
177    /// If `true`, the backtrace of transaction will be recorded and logged.
178    /// See [`MetricsHandler::log_backtrace_on_long_read_transaction`].
179    record_backtrace: bool,
180    /// If `true`, the backtrace of transaction has already been recorded and logged.
181    /// See [`MetricsHandler::log_backtrace_on_long_read_transaction`].
182    backtrace_recorded: AtomicBool,
183    /// Shared database environment metrics.
184    env_metrics: Arc<DatabaseEnvMetrics>,
185    /// Backtrace of the location where the transaction has been opened. Reported only with debug
186    /// assertions, because capturing the backtrace on every transaction opening is expensive.
187    #[cfg(debug_assertions)]
188    open_backtrace: Backtrace,
189    _marker: PhantomData<K>,
190}
191
192impl<K: TransactionKind> MetricsHandler<K> {
193    fn new(txn_id: u64, env_metrics: Arc<DatabaseEnvMetrics>) -> Self {
194        Self {
195            txn_id,
196            start: Instant::now(),
197            long_transaction_duration: LONG_TRANSACTION_DURATION,
198            close_recorded: false,
199            record_backtrace: true,
200            backtrace_recorded: AtomicBool::new(false),
201            #[cfg(debug_assertions)]
202            open_backtrace: Backtrace::force_capture(),
203            env_metrics,
204            _marker: PhantomData,
205        }
206    }
207
208    const fn transaction_mode(&self) -> TransactionMode {
209        if K::IS_READ_ONLY {
210            TransactionMode::ReadOnly
211        } else {
212            TransactionMode::ReadWrite
213        }
214    }
215
216    /// Logs the caller location and ID of the transaction that was opened.
217    #[track_caller]
218    fn log_transaction_opened(&self) {
219        trace!(
220            target: "storage::db::mdbx",
221            caller = %core::panic::Location::caller(),
222            id = %self.txn_id,
223            mode = %self.transaction_mode().as_str(),
224            "Transaction opened",
225        );
226    }
227
228    /// Logs the backtrace of current call if the duration that the read transaction has been open
229    /// is more than [`LONG_TRANSACTION_DURATION`] and `record_backtrace == true`.
230    /// The backtrace is recorded and logged just once, guaranteed by `backtrace_recorded` atomic.
231    ///
232    /// NOTE: Backtrace is recorded using [`Backtrace::force_capture`], so `RUST_BACKTRACE` env var
233    /// is not needed.
234    fn log_backtrace_on_long_read_transaction(&self) {
235        if self.record_backtrace &&
236            !self.backtrace_recorded.load(Ordering::Relaxed) &&
237            self.transaction_mode().is_read_only()
238        {
239            let open_duration = self.start.elapsed();
240            if open_duration >= self.long_transaction_duration {
241                self.backtrace_recorded.store(true, Ordering::Relaxed);
242                #[cfg(debug_assertions)]
243                let message = format!(
244                   "The database read transaction has been open for too long. Open backtrace:\n{}\n\nCurrent backtrace:\n{}",
245                   self.open_backtrace,
246                   Backtrace::force_capture()
247                );
248                #[cfg(not(debug_assertions))]
249                let message = format!(
250                    "The database read transaction has been open for too long. Backtrace:\n{}",
251                    Backtrace::force_capture()
252                );
253                warn!(
254                    target: "storage::db::mdbx",
255                    ?open_duration,
256                    %self.txn_id,
257                    "{message}"
258                );
259            }
260        }
261    }
262}
263
264impl<K: TransactionKind> Drop for MetricsHandler<K> {
265    fn drop(&mut self) {
266        if !self.close_recorded {
267            self.log_backtrace_on_long_read_transaction();
268            self.env_metrics.record_closed_transaction(
269                self.transaction_mode(),
270                TransactionOutcome::Drop,
271                self.start.elapsed(),
272                None,
273                None,
274            );
275        }
276    }
277}
278
279impl TableImporter for Tx<RW> {}
280
281impl<K: TransactionKind> DbTx for Tx<K> {
282    type Cursor<T: Table> = Cursor<K, T>;
283    type DupCursor<T: DupSort> = Cursor<K, T>;
284
285    fn get<T: Table>(&self, key: T::Key) -> Result<Option<<T as Table>::Value>, DatabaseError> {
286        self.get_by_encoded_key::<T>(&key.encode())
287    }
288
289    fn get_by_encoded_key<T: Table>(
290        &self,
291        key: &<T::Key as Encode>::Encoded,
292    ) -> Result<Option<T::Value>, DatabaseError> {
293        self.execute_with_operation_metric::<T, _>(Operation::Get, None, |tx| {
294            tx.get(self.get_dbi::<T>()?, key.as_ref())
295                .map_err(|e| DatabaseError::Read(e.into()))?
296                .map(decode_one::<T>)
297                .transpose()
298        })
299    }
300
301    fn commit(self) -> Result<bool, DatabaseError> {
302        self.execute_with_close_transaction_metric(TransactionOutcome::Commit, |this| {
303            match this.inner.commit().map_err(|e| DatabaseError::Commit(e.into())) {
304                Ok((v, latency)) => (Ok(v), Some(latency)),
305                Err(e) => (Err(e), None),
306            }
307        })
308    }
309
310    fn abort(self) {
311        self.execute_with_close_transaction_metric(TransactionOutcome::Abort, |this| {
312            (drop(this.inner), None)
313        })
314    }
315
316    // Iterate over read only values in database.
317    fn cursor_read<T: Table>(&self) -> Result<Self::Cursor<T>, DatabaseError> {
318        self.new_cursor()
319    }
320
321    /// Iterate over read only values in database.
322    fn cursor_dup_read<T: DupSort>(&self) -> Result<Self::DupCursor<T>, DatabaseError> {
323        self.new_cursor()
324    }
325
326    /// Returns number of entries in the table using cheap DB stats invocation.
327    fn entries<T: Table>(&self) -> Result<usize, DatabaseError> {
328        Ok(self
329            .inner
330            .db_stat_with_dbi(self.get_dbi::<T>()?)
331            .map_err(|e| DatabaseError::Stats(e.into()))?
332            .entries())
333    }
334
335    /// Disables long-lived read transaction safety guarantees, such as backtrace recording and
336    /// timeout.
337    fn disable_long_read_transaction_safety(&mut self) {
338        if let Some(metrics_handler) = self.metrics_handler.as_mut() {
339            metrics_handler.record_backtrace = false;
340        }
341
342        self.inner.disable_timeout();
343    }
344}
345
346impl DbTxMut for Tx<RW> {
347    type CursorMut<T: Table> = Cursor<RW, T>;
348    type DupCursorMut<T: DupSort> = Cursor<RW, T>;
349
350    fn put<T: Table>(&self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> {
351        let key = key.encode();
352        let value = value.compress();
353        self.execute_with_operation_metric::<T, _>(
354            Operation::Put,
355            Some(value.as_ref().len()),
356            |tx| {
357                tx.put(self.get_dbi::<T>()?, key.as_ref(), value, WriteFlags::UPSERT).map_err(|e| {
358                    DatabaseWriteError {
359                        info: e.into(),
360                        operation: DatabaseWriteOperation::Put,
361                        table_name: T::NAME,
362                        key: key.into(),
363                    }
364                    .into()
365                })
366            },
367        )
368    }
369
370    fn delete<T: Table>(
371        &self,
372        key: T::Key,
373        value: Option<T::Value>,
374    ) -> Result<bool, DatabaseError> {
375        let mut data = None;
376
377        let value = value.map(Compress::compress);
378        if let Some(value) = &value {
379            data = Some(value.as_ref());
380        };
381
382        self.execute_with_operation_metric::<T, _>(Operation::Delete, None, |tx| {
383            tx.del(self.get_dbi::<T>()?, key.encode(), data)
384                .map_err(|e| DatabaseError::Delete(e.into()))
385        })
386    }
387
388    fn clear<T: Table>(&self) -> Result<(), DatabaseError> {
389        self.inner.clear_db(self.get_dbi::<T>()?).map_err(|e| DatabaseError::Delete(e.into()))?;
390
391        Ok(())
392    }
393
394    fn cursor_write<T: Table>(&self) -> Result<Self::CursorMut<T>, DatabaseError> {
395        self.new_cursor()
396    }
397
398    fn cursor_dup_write<T: DupSort>(&self) -> Result<Self::DupCursorMut<T>, DatabaseError> {
399        self.new_cursor()
400    }
401}
402
403#[cfg(test)]
404mod tests {
405    use crate::{mdbx::DatabaseArguments, tables, DatabaseEnv, DatabaseEnvKind};
406    use reth_db_api::{database::Database, models::ClientVersion, transaction::DbTx};
407    use reth_libmdbx::MaxReadTransactionDuration;
408    use reth_storage_errors::db::DatabaseError;
409    use std::{sync::atomic::Ordering, thread::sleep, time::Duration};
410    use tempfile::tempdir;
411
412    #[test]
413    fn long_read_transaction_safety_disabled() {
414        const MAX_DURATION: Duration = Duration::from_secs(1);
415
416        let dir = tempdir().unwrap();
417        let args = DatabaseArguments::new(ClientVersion::default())
418            .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Set(
419                MAX_DURATION,
420            )));
421        let db = DatabaseEnv::open(dir.path(), DatabaseEnvKind::RW, args).unwrap().with_metrics();
422
423        let mut tx = db.tx().unwrap();
424        tx.metrics_handler.as_mut().unwrap().long_transaction_duration = MAX_DURATION;
425        tx.disable_long_read_transaction_safety();
426        // Give the `TxnManager` some time to time out the transaction.
427        sleep(MAX_DURATION + Duration::from_millis(100));
428
429        // Transaction has not timed out.
430        assert_eq!(
431            tx.get::<tables::Transactions>(0),
432            Err(DatabaseError::Open(reth_libmdbx::Error::NotFound.into()))
433        );
434        // Backtrace is not recorded.
435        assert!(!tx.metrics_handler.unwrap().backtrace_recorded.load(Ordering::Relaxed));
436    }
437
438    #[test]
439    fn long_read_transaction_safety_enabled() {
440        const MAX_DURATION: Duration = Duration::from_secs(1);
441
442        let dir = tempdir().unwrap();
443        let args = DatabaseArguments::new(ClientVersion::default())
444            .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Set(
445                MAX_DURATION,
446            )));
447        let db = DatabaseEnv::open(dir.path(), DatabaseEnvKind::RW, args).unwrap().with_metrics();
448
449        let mut tx = db.tx().unwrap();
450        tx.metrics_handler.as_mut().unwrap().long_transaction_duration = MAX_DURATION;
451        // Give the `TxnManager` some time to time out the transaction.
452        sleep(MAX_DURATION + Duration::from_millis(100));
453
454        // Transaction has timed out.
455        assert_eq!(
456            tx.get::<tables::Transactions>(0),
457            Err(DatabaseError::Open(reth_libmdbx::Error::ReadTransactionTimeout.into()))
458        );
459        // Backtrace is recorded.
460        assert!(tx.metrics_handler.unwrap().backtrace_recorded.load(Ordering::Relaxed));
461    }
462}