reth_db/implementation/mdbx/
tx.rs1use super::cursor::Cursor;
4use crate::{
5 metrics::{DatabaseEnvMetrics, Operation, TransactionMode, TransactionOutcome},
6 tables::utils::decode_one,
7 DatabaseError,
8};
9use reth_db_api::{
10 table::{Compress, DupSort, Encode, Table, TableImporter},
11 transaction::{DbTx, DbTxMut},
12};
13use reth_libmdbx::{ffi::MDBX_dbi, CommitLatency, Transaction, TransactionKind, WriteFlags, RW};
14use reth_storage_errors::db::{DatabaseWriteError, DatabaseWriteOperation};
15use reth_tracing::tracing::{debug, trace, warn};
16use std::{
17 backtrace::Backtrace,
18 marker::PhantomData,
19 sync::{
20 atomic::{AtomicBool, Ordering},
21 Arc,
22 },
23 time::{Duration, Instant},
24};
25
26const LONG_TRANSACTION_DURATION: Duration = Duration::from_secs(60);
28
29#[derive(Debug)]
31pub struct Tx<K: TransactionKind> {
32 pub inner: Transaction<K>,
34
35 metrics_handler: Option<MetricsHandler<K>>,
40}
41
42impl<K: TransactionKind> Tx<K> {
43 #[inline]
45 pub const fn new(inner: Transaction<K>) -> Self {
46 Self::new_inner(inner, None)
47 }
48
49 #[inline]
51 #[track_caller]
52 pub(crate) fn new_with_metrics(
53 inner: Transaction<K>,
54 env_metrics: Option<Arc<DatabaseEnvMetrics>>,
55 ) -> reth_libmdbx::Result<Self> {
56 let metrics_handler = env_metrics
57 .map(|env_metrics| {
58 let handler = MetricsHandler::<K>::new(inner.id()?, env_metrics);
59 handler.env_metrics.record_opened_transaction(handler.transaction_mode());
60 handler.log_transaction_opened();
61 Ok(handler)
62 })
63 .transpose()?;
64 Ok(Self::new_inner(inner, metrics_handler))
65 }
66
67 #[inline]
68 const fn new_inner(inner: Transaction<K>, metrics_handler: Option<MetricsHandler<K>>) -> Self {
69 Self { inner, metrics_handler }
70 }
71
72 pub fn id(&self) -> reth_libmdbx::Result<u64> {
74 self.metrics_handler.as_ref().map_or_else(|| self.inner.id(), |handler| Ok(handler.txn_id))
75 }
76
77 pub fn get_dbi<T: Table>(&self) -> Result<MDBX_dbi, DatabaseError> {
79 self.inner
80 .open_db(Some(T::NAME))
81 .map(|db| db.dbi())
82 .map_err(|e| DatabaseError::Open(e.into()))
83 }
84
85 pub fn new_cursor<T: Table>(&self) -> Result<Cursor<K, T>, DatabaseError> {
87 let inner = self
88 .inner
89 .cursor_with_dbi(self.get_dbi::<T>()?)
90 .map_err(|e| DatabaseError::InitCursor(e.into()))?;
91
92 Ok(Cursor::new_with_metrics(
93 inner,
94 self.metrics_handler.as_ref().map(|h| h.env_metrics.clone()),
95 ))
96 }
97
98 fn execute_with_close_transaction_metric<R>(
103 mut self,
104 outcome: TransactionOutcome,
105 f: impl FnOnce(Self) -> (R, Option<CommitLatency>),
106 ) -> R {
107 let run = |tx| {
108 let start = Instant::now();
109 let (result, commit_latency) = f(tx);
110 let total_duration = start.elapsed();
111
112 if outcome.is_commit() {
113 debug!(
114 target: "storage::db::mdbx",
115 ?total_duration,
116 ?commit_latency,
117 is_read_only = K::IS_READ_ONLY,
118 "Commit"
119 );
120 }
121
122 (result, commit_latency, total_duration)
123 };
124
125 if let Some(mut metrics_handler) = self.metrics_handler.take() {
126 metrics_handler.close_recorded = true;
127 metrics_handler.log_backtrace_on_long_read_transaction();
128
129 let (result, commit_latency, close_duration) = run(self);
130 let open_duration = metrics_handler.start.elapsed();
131 metrics_handler.env_metrics.record_closed_transaction(
132 metrics_handler.transaction_mode(),
133 outcome,
134 open_duration,
135 Some(close_duration),
136 commit_latency,
137 );
138
139 result
140 } else {
141 run(self).0
142 }
143 }
144
145 fn execute_with_operation_metric<T: Table, R>(
150 &self,
151 operation: Operation,
152 value_size: Option<usize>,
153 f: impl FnOnce(&Transaction<K>) -> R,
154 ) -> R {
155 if let Some(metrics_handler) = &self.metrics_handler {
156 metrics_handler.log_backtrace_on_long_read_transaction();
157 metrics_handler
158 .env_metrics
159 .record_operation(T::NAME, operation, value_size, || f(&self.inner))
160 } else {
161 f(&self.inner)
162 }
163 }
164}
165
166#[derive(Debug)]
167struct MetricsHandler<K: TransactionKind> {
168 txn_id: u64,
170 start: Instant,
172 long_transaction_duration: Duration,
174 close_recorded: bool,
177 record_backtrace: bool,
180 backtrace_recorded: AtomicBool,
183 env_metrics: Arc<DatabaseEnvMetrics>,
185 #[cfg(debug_assertions)]
188 open_backtrace: Backtrace,
189 _marker: PhantomData<K>,
190}
191
192impl<K: TransactionKind> MetricsHandler<K> {
193 fn new(txn_id: u64, env_metrics: Arc<DatabaseEnvMetrics>) -> Self {
194 Self {
195 txn_id,
196 start: Instant::now(),
197 long_transaction_duration: LONG_TRANSACTION_DURATION,
198 close_recorded: false,
199 record_backtrace: true,
200 backtrace_recorded: AtomicBool::new(false),
201 #[cfg(debug_assertions)]
202 open_backtrace: Backtrace::force_capture(),
203 env_metrics,
204 _marker: PhantomData,
205 }
206 }
207
208 const fn transaction_mode(&self) -> TransactionMode {
209 if K::IS_READ_ONLY {
210 TransactionMode::ReadOnly
211 } else {
212 TransactionMode::ReadWrite
213 }
214 }
215
216 #[track_caller]
218 fn log_transaction_opened(&self) {
219 trace!(
220 target: "storage::db::mdbx",
221 caller = %core::panic::Location::caller(),
222 id = %self.txn_id,
223 mode = %self.transaction_mode().as_str(),
224 "Transaction opened",
225 );
226 }
227
228 fn log_backtrace_on_long_read_transaction(&self) {
235 if self.record_backtrace &&
236 !self.backtrace_recorded.load(Ordering::Relaxed) &&
237 self.transaction_mode().is_read_only()
238 {
239 let open_duration = self.start.elapsed();
240 if open_duration >= self.long_transaction_duration {
241 self.backtrace_recorded.store(true, Ordering::Relaxed);
242 #[cfg(debug_assertions)]
243 let message = format!(
244 "The database read transaction has been open for too long. Open backtrace:\n{}\n\nCurrent backtrace:\n{}",
245 self.open_backtrace,
246 Backtrace::force_capture()
247 );
248 #[cfg(not(debug_assertions))]
249 let message = format!(
250 "The database read transaction has been open for too long. Backtrace:\n{}",
251 Backtrace::force_capture()
252 );
253 warn!(
254 target: "storage::db::mdbx",
255 ?open_duration,
256 %self.txn_id,
257 "{message}"
258 );
259 }
260 }
261 }
262}
263
264impl<K: TransactionKind> Drop for MetricsHandler<K> {
265 fn drop(&mut self) {
266 if !self.close_recorded {
267 self.log_backtrace_on_long_read_transaction();
268 self.env_metrics.record_closed_transaction(
269 self.transaction_mode(),
270 TransactionOutcome::Drop,
271 self.start.elapsed(),
272 None,
273 None,
274 );
275 }
276 }
277}
278
279impl TableImporter for Tx<RW> {}
280
281impl<K: TransactionKind> DbTx for Tx<K> {
282 type Cursor<T: Table> = Cursor<K, T>;
283 type DupCursor<T: DupSort> = Cursor<K, T>;
284
285 fn get<T: Table>(&self, key: T::Key) -> Result<Option<<T as Table>::Value>, DatabaseError> {
286 self.get_by_encoded_key::<T>(&key.encode())
287 }
288
289 fn get_by_encoded_key<T: Table>(
290 &self,
291 key: &<T::Key as Encode>::Encoded,
292 ) -> Result<Option<T::Value>, DatabaseError> {
293 self.execute_with_operation_metric::<T, _>(Operation::Get, None, |tx| {
294 tx.get(self.get_dbi::<T>()?, key.as_ref())
295 .map_err(|e| DatabaseError::Read(e.into()))?
296 .map(decode_one::<T>)
297 .transpose()
298 })
299 }
300
301 fn commit(self) -> Result<bool, DatabaseError> {
302 self.execute_with_close_transaction_metric(TransactionOutcome::Commit, |this| {
303 match this.inner.commit().map_err(|e| DatabaseError::Commit(e.into())) {
304 Ok((v, latency)) => (Ok(v), Some(latency)),
305 Err(e) => (Err(e), None),
306 }
307 })
308 }
309
310 fn abort(self) {
311 self.execute_with_close_transaction_metric(TransactionOutcome::Abort, |this| {
312 (drop(this.inner), None)
313 })
314 }
315
316 fn cursor_read<T: Table>(&self) -> Result<Self::Cursor<T>, DatabaseError> {
318 self.new_cursor()
319 }
320
321 fn cursor_dup_read<T: DupSort>(&self) -> Result<Self::DupCursor<T>, DatabaseError> {
323 self.new_cursor()
324 }
325
326 fn entries<T: Table>(&self) -> Result<usize, DatabaseError> {
328 Ok(self
329 .inner
330 .db_stat_with_dbi(self.get_dbi::<T>()?)
331 .map_err(|e| DatabaseError::Stats(e.into()))?
332 .entries())
333 }
334
335 fn disable_long_read_transaction_safety(&mut self) {
338 if let Some(metrics_handler) = self.metrics_handler.as_mut() {
339 metrics_handler.record_backtrace = false;
340 }
341
342 self.inner.disable_timeout();
343 }
344}
345
346impl DbTxMut for Tx<RW> {
347 type CursorMut<T: Table> = Cursor<RW, T>;
348 type DupCursorMut<T: DupSort> = Cursor<RW, T>;
349
350 fn put<T: Table>(&self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> {
351 let key = key.encode();
352 let value = value.compress();
353 self.execute_with_operation_metric::<T, _>(
354 Operation::Put,
355 Some(value.as_ref().len()),
356 |tx| {
357 tx.put(self.get_dbi::<T>()?, key.as_ref(), value, WriteFlags::UPSERT).map_err(|e| {
358 DatabaseWriteError {
359 info: e.into(),
360 operation: DatabaseWriteOperation::Put,
361 table_name: T::NAME,
362 key: key.into(),
363 }
364 .into()
365 })
366 },
367 )
368 }
369
370 fn delete<T: Table>(
371 &self,
372 key: T::Key,
373 value: Option<T::Value>,
374 ) -> Result<bool, DatabaseError> {
375 let mut data = None;
376
377 let value = value.map(Compress::compress);
378 if let Some(value) = &value {
379 data = Some(value.as_ref());
380 };
381
382 self.execute_with_operation_metric::<T, _>(Operation::Delete, None, |tx| {
383 tx.del(self.get_dbi::<T>()?, key.encode(), data)
384 .map_err(|e| DatabaseError::Delete(e.into()))
385 })
386 }
387
388 fn clear<T: Table>(&self) -> Result<(), DatabaseError> {
389 self.inner.clear_db(self.get_dbi::<T>()?).map_err(|e| DatabaseError::Delete(e.into()))?;
390
391 Ok(())
392 }
393
394 fn cursor_write<T: Table>(&self) -> Result<Self::CursorMut<T>, DatabaseError> {
395 self.new_cursor()
396 }
397
398 fn cursor_dup_write<T: DupSort>(&self) -> Result<Self::DupCursorMut<T>, DatabaseError> {
399 self.new_cursor()
400 }
401}
402
403#[cfg(test)]
404mod tests {
405 use crate::{mdbx::DatabaseArguments, tables, DatabaseEnv, DatabaseEnvKind};
406 use reth_db_api::{database::Database, models::ClientVersion, transaction::DbTx};
407 use reth_libmdbx::MaxReadTransactionDuration;
408 use reth_storage_errors::db::DatabaseError;
409 use std::{sync::atomic::Ordering, thread::sleep, time::Duration};
410 use tempfile::tempdir;
411
412 #[test]
413 fn long_read_transaction_safety_disabled() {
414 const MAX_DURATION: Duration = Duration::from_secs(1);
415
416 let dir = tempdir().unwrap();
417 let args = DatabaseArguments::new(ClientVersion::default())
418 .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Set(
419 MAX_DURATION,
420 )));
421 let db = DatabaseEnv::open(dir.path(), DatabaseEnvKind::RW, args).unwrap().with_metrics();
422
423 let mut tx = db.tx().unwrap();
424 tx.metrics_handler.as_mut().unwrap().long_transaction_duration = MAX_DURATION;
425 tx.disable_long_read_transaction_safety();
426 sleep(MAX_DURATION + Duration::from_millis(100));
428
429 assert_eq!(
431 tx.get::<tables::Transactions>(0),
432 Err(DatabaseError::Open(reth_libmdbx::Error::NotFound.into()))
433 );
434 assert!(!tx.metrics_handler.unwrap().backtrace_recorded.load(Ordering::Relaxed));
436 }
437
438 #[test]
439 fn long_read_transaction_safety_enabled() {
440 const MAX_DURATION: Duration = Duration::from_secs(1);
441
442 let dir = tempdir().unwrap();
443 let args = DatabaseArguments::new(ClientVersion::default())
444 .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Set(
445 MAX_DURATION,
446 )));
447 let db = DatabaseEnv::open(dir.path(), DatabaseEnvKind::RW, args).unwrap().with_metrics();
448
449 let mut tx = db.tx().unwrap();
450 tx.metrics_handler.as_mut().unwrap().long_transaction_duration = MAX_DURATION;
451 sleep(MAX_DURATION + Duration::from_millis(100));
453
454 assert_eq!(
456 tx.get::<tables::Transactions>(0),
457 Err(DatabaseError::Open(reth_libmdbx::Error::ReadTransactionTimeout.into()))
458 );
459 assert!(tx.metrics_handler.unwrap().backtrace_recorded.load(Ordering::Relaxed));
461 }
462}