reth_libmdbx/
txn_manager.rs

1use crate::{
2    environment::EnvPtr,
3    error::{mdbx_result, Result},
4    CommitLatency,
5};
6use std::{
7    ptr,
8    sync::{
9        mpsc::{sync_channel, Receiver, SyncSender},
10        Arc,
11    },
12};
13
14#[derive(Copy, Clone, Debug)]
15pub(crate) struct TxnPtr(pub(crate) *mut ffi::MDBX_txn);
16unsafe impl Send for TxnPtr {}
17unsafe impl Sync for TxnPtr {}
18
19pub(crate) enum TxnManagerMessage {
20    Begin { parent: TxnPtr, flags: ffi::MDBX_txn_flags_t, sender: SyncSender<Result<TxnPtr>> },
21    Abort { tx: TxnPtr, sender: SyncSender<Result<bool>> },
22    Commit { tx: TxnPtr, sender: SyncSender<Result<(bool, CommitLatency)>> },
23}
24
25/// Manages transactions by doing two things:
26/// - Opening, aborting, and committing transactions using [`TxnManager::send_message`] with the
27///   corresponding [`TxnManagerMessage`]
28/// - Aborting long-lived read transactions (if the `read-tx-timeouts` feature is enabled and
29///   `TxnManager::with_max_read_transaction_duration` is called)
30#[derive(Debug)]
31pub(crate) struct TxnManager {
32    sender: SyncSender<TxnManagerMessage>,
33    #[cfg(feature = "read-tx-timeouts")]
34    read_transactions: Option<Arc<read_transactions::ReadTransactions>>,
35}
36
37impl TxnManager {
38    pub(crate) fn new(env: EnvPtr) -> Self {
39        let (tx, rx) = sync_channel(0);
40        let txn_manager = Self {
41            sender: tx,
42            #[cfg(feature = "read-tx-timeouts")]
43            read_transactions: None,
44        };
45
46        txn_manager.start_message_listener(env, rx);
47
48        txn_manager
49    }
50
51    /// Spawns a new [`std::thread`] that listens to incoming [`TxnManagerMessage`] messages,
52    /// executes an FFI function, and returns the result on the provided channel.
53    ///
54    /// - [`TxnManagerMessage::Begin`] opens a new transaction with [`ffi::mdbx_txn_begin_ex`]
55    /// - [`TxnManagerMessage::Abort`] aborts a transaction with [`ffi::mdbx_txn_abort`]
56    /// - [`TxnManagerMessage::Commit`] commits a transaction with [`ffi::mdbx_txn_commit_ex`]
57    fn start_message_listener(&self, env: EnvPtr, rx: Receiver<TxnManagerMessage>) {
58        let task = move || {
59            #[allow(clippy::redundant_locals)]
60            let env = env;
61            loop {
62                match rx.recv() {
63                    Ok(msg) => match msg {
64                        TxnManagerMessage::Begin { parent, flags, sender } => {
65                            let mut txn: *mut ffi::MDBX_txn = ptr::null_mut();
66                            let res = mdbx_result(unsafe {
67                                ffi::mdbx_txn_begin_ex(
68                                    env.0,
69                                    parent.0,
70                                    flags,
71                                    &mut txn,
72                                    ptr::null_mut(),
73                                )
74                            })
75                            .map(|_| TxnPtr(txn));
76                            sender.send(res).unwrap();
77                        }
78                        TxnManagerMessage::Abort { tx, sender } => {
79                            sender.send(mdbx_result(unsafe { ffi::mdbx_txn_abort(tx.0) })).unwrap();
80                        }
81                        TxnManagerMessage::Commit { tx, sender } => {
82                            sender
83                                .send({
84                                    let mut latency = CommitLatency::new();
85                                    mdbx_result(unsafe {
86                                        ffi::mdbx_txn_commit_ex(tx.0, latency.mdb_commit_latency())
87                                    })
88                                    .map(|v| (v, latency))
89                                })
90                                .unwrap();
91                        }
92                    },
93                    Err(_) => return,
94                }
95            }
96        };
97        std::thread::Builder::new().name("mbdx-rs-txn-manager".to_string()).spawn(task).unwrap();
98    }
99
100    pub(crate) fn send_message(&self, message: TxnManagerMessage) {
101        self.sender.send(message).unwrap()
102    }
103}
104
105#[cfg(feature = "read-tx-timeouts")]
106mod read_transactions {
107    use crate::{
108        environment::EnvPtr, error::mdbx_result, transaction::TransactionPtr,
109        txn_manager::TxnManager,
110    };
111    use dashmap::{DashMap, DashSet};
112    use std::{
113        sync::{mpsc::sync_channel, Arc},
114        time::{Duration, Instant},
115    };
116    use tracing::{error, trace, warn};
117
118    const READ_TRANSACTIONS_CHECK_INTERVAL: Duration = Duration::from_secs(5);
119
120    impl TxnManager {
121        /// Returns a new instance for which the maximum duration that a read transaction can be
122        /// open is set.
123        pub(crate) fn new_with_max_read_transaction_duration(
124            env: EnvPtr,
125            duration: Duration,
126        ) -> Self {
127            let read_transactions = Arc::new(ReadTransactions::new(duration));
128            read_transactions.clone().start_monitor();
129
130            let (tx, rx) = sync_channel(0);
131
132            let txn_manager = Self { sender: tx, read_transactions: Some(read_transactions) };
133
134            txn_manager.start_message_listener(env, rx);
135
136            txn_manager
137        }
138
139        /// Adds a new transaction to the list of active read transactions.
140        pub(crate) fn add_active_read_transaction(
141            &self,
142            ptr: *mut ffi::MDBX_txn,
143            tx: TransactionPtr,
144        ) {
145            if let Some(read_transactions) = &self.read_transactions {
146                read_transactions.add_active(ptr, tx);
147            }
148        }
149
150        /// Removes a transaction from the list of active read transactions.
151        pub(crate) fn remove_active_read_transaction(
152            &self,
153            ptr: *mut ffi::MDBX_txn,
154        ) -> Option<(usize, (TransactionPtr, Instant))> {
155            self.read_transactions.as_ref()?.remove_active(ptr)
156        }
157
158        /// Returns the number of timed out transactions that were not aborted by the user yet.
159        pub(crate) fn timed_out_not_aborted_read_transactions(&self) -> Option<usize> {
160            self.read_transactions
161                .as_ref()
162                .map(|read_transactions| read_transactions.timed_out_not_aborted())
163        }
164    }
165
166    #[derive(Debug, Default)]
167    pub(super) struct ReadTransactions {
168        /// Maximum duration that a read transaction can be open until the
169        /// [`ReadTransactions::start_monitor`] aborts it.
170        max_duration: Duration,
171        /// List of currently active read transactions.
172        ///
173        /// We store `usize` instead of a raw pointer as a key, because pointers are not
174        /// comparable. The time of transaction opening is stored as a value.
175        active: DashMap<usize, (TransactionPtr, Instant)>,
176        /// List of timed out transactions that were not aborted by the user yet, hence have a
177        /// dangling read transaction pointer.
178        timed_out_not_aborted: DashSet<usize>,
179    }
180
181    impl ReadTransactions {
182        pub(super) fn new(max_duration: Duration) -> Self {
183            Self { max_duration, ..Default::default() }
184        }
185
186        /// Adds a new transaction to the list of active read transactions.
187        pub(super) fn add_active(&self, ptr: *mut ffi::MDBX_txn, tx: TransactionPtr) {
188            let _ = self.active.insert(ptr as usize, (tx, Instant::now()));
189        }
190
191        /// Removes a transaction from the list of active read transactions.
192        pub(super) fn remove_active(
193            &self,
194            ptr: *mut ffi::MDBX_txn,
195        ) -> Option<(usize, (TransactionPtr, Instant))> {
196            self.timed_out_not_aborted.remove(&(ptr as usize));
197            self.active.remove(&(ptr as usize))
198        }
199
200        /// Returns the number of timed out transactions that were not aborted by the user yet.
201        pub(super) fn timed_out_not_aborted(&self) -> usize {
202            self.timed_out_not_aborted.len()
203        }
204
205        /// Spawns a new [`std::thread`] that monitors the list of active read transactions and
206        /// timeouts those that are open for longer than `ReadTransactions.max_duration`.
207        pub(super) fn start_monitor(self: Arc<Self>) {
208            let task = move || {
209                let mut timed_out_active = Vec::new();
210
211                loop {
212                    let now = Instant::now();
213                    let mut max_active_transaction_duration = None;
214
215                    // Iterate through active read transactions and time out those that's open for
216                    // longer than `self.max_duration`.
217                    for entry in &self.active {
218                        let (tx, start) = entry.value();
219                        let duration = now - *start;
220
221                        if duration > self.max_duration {
222                            let result = tx.txn_execute_fail_on_timeout(|txn_ptr| {
223                                // Time out the transaction.
224                                //
225                                // We use `mdbx_txn_reset` instead of `mdbx_txn_abort` here to
226                                // prevent MDBX from reusing the pointer of the aborted
227                                // transaction for new read-only transactions. This is
228                                // important because we store the pointer in the `active` list
229                                // and assume that it is unique.
230                                //
231                                // See https://erthink.github.io/libmdbx/group__c__transactions.html#gae9f34737fe60b0ba538d5a09b6a25c8d for more info.
232                                let result = mdbx_result(unsafe { ffi::mdbx_txn_reset(txn_ptr) });
233                                if result.is_ok() {
234                                    tx.set_timed_out();
235                                }
236                                (txn_ptr, duration, result)
237                            });
238
239                            match result {
240                                Ok((txn_ptr, duration, error)) => {
241                                    // Add the transaction to `timed_out_active`. We can't remove it
242                                    // instantly from the list of active transactions, because we
243                                    // iterate through it.
244                                    timed_out_active.push((txn_ptr, duration, error));
245                                }
246                                Err(err) => {
247                                    error!(target: "libmdbx", %err, "Failed to abort the long-lived read transaction")
248                                }
249                            }
250                        } else {
251                            max_active_transaction_duration = Some(
252                                duration.max(max_active_transaction_duration.unwrap_or_default()),
253                            );
254                        }
255                    }
256
257                    // Walk through timed out transactions, and delete them from the list of active
258                    // transactions.
259                    for (ptr, open_duration, err) in timed_out_active.iter().copied() {
260                        // Try deleting the transaction from the list of active transactions.
261                        let was_in_active = self.remove_active(ptr).is_some();
262                        if let Err(err) = err {
263                            if was_in_active {
264                                // If the transaction was in the list of active transactions,
265                                // then user didn't abort it and we failed to do so.
266                                error!(target: "libmdbx", %err, ?open_duration, "Failed to time out the long-lived read transaction");
267                            }
268                        } else {
269                            // Happy path, the transaction has been timed out by us with no errors.
270                            warn!(target: "libmdbx", ?open_duration, "Long-lived read transaction has been timed out");
271                            // Add transaction to the list of timed out transactions that were not
272                            // aborted by the user yet.
273                            self.timed_out_not_aborted.insert(ptr as usize);
274                        }
275                    }
276
277                    // Clear the list of timed out transactions, but not de-allocate the reserved
278                    // capacity to save on further pushes.
279                    timed_out_active.clear();
280
281                    if !self.active.is_empty() {
282                        trace!(
283                            target: "libmdbx",
284                            elapsed = ?now.elapsed(),
285                            active = ?self.active.iter().map(|entry| {
286                                let (tx, start) = entry.value();
287                                (tx.clone(), start.elapsed())
288                            }).collect::<Vec<_>>(),
289                            "Read transactions"
290                        );
291                    }
292
293                    // Sleep not more than `READ_TRANSACTIONS_CHECK_INTERVAL`, but at least until
294                    // the closest deadline of an active read transaction
295                    let sleep_duration = READ_TRANSACTIONS_CHECK_INTERVAL.min(
296                        self.max_duration - max_active_transaction_duration.unwrap_or_default(),
297                    );
298                    trace!(target: "libmdbx", ?sleep_duration, elapsed = ?now.elapsed(), "Putting transaction monitor to sleep");
299                    std::thread::sleep(sleep_duration);
300                }
301            };
302            std::thread::Builder::new()
303                .name("mdbx-rs-read-tx-timeouts".to_string())
304                .spawn(task)
305                .unwrap();
306        }
307    }
308
309    #[cfg(test)]
310    mod tests {
311        use crate::{
312            txn_manager::read_transactions::READ_TRANSACTIONS_CHECK_INTERVAL, Environment, Error,
313            MaxReadTransactionDuration,
314        };
315        use std::{thread::sleep, time::Duration};
316        use tempfile::tempdir;
317
318        #[test]
319        fn txn_manager_read_transactions_duration_set() {
320            const MAX_DURATION: Duration = Duration::from_secs(1);
321
322            let dir = tempdir().unwrap();
323            let env = Environment::builder()
324                .set_max_read_transaction_duration(MaxReadTransactionDuration::Set(MAX_DURATION))
325                .open(dir.path())
326                .unwrap();
327
328            let read_transactions = env.txn_manager().read_transactions.as_ref().unwrap();
329
330            // Create a read-only transaction, successfully use it, close it by dropping.
331            {
332                let tx = env.begin_ro_txn().unwrap();
333                let tx_ptr = tx.txn() as usize;
334                assert!(read_transactions.active.contains_key(&tx_ptr));
335
336                tx.open_db(None).unwrap();
337                drop(tx);
338
339                assert!(!read_transactions.active.contains_key(&tx_ptr));
340            }
341
342            // Create a read-only transaction, successfully use it, close it by committing.
343            {
344                let tx = env.begin_ro_txn().unwrap();
345                let tx_ptr = tx.txn() as usize;
346                assert!(read_transactions.active.contains_key(&tx_ptr));
347
348                tx.open_db(None).unwrap();
349                tx.commit().unwrap();
350
351                assert!(!read_transactions.active.contains_key(&tx_ptr));
352            }
353
354            {
355                // Create a read-only transaction and observe it's in the list of active
356                // transactions.
357                let tx = env.begin_ro_txn().unwrap();
358                let tx_ptr = tx.txn() as usize;
359                assert!(read_transactions.active.contains_key(&tx_ptr));
360
361                // Wait until the transaction is timed out by the manager.
362                sleep(MAX_DURATION + READ_TRANSACTIONS_CHECK_INTERVAL);
363
364                // Ensure that the transaction is not in the list of active transactions anymore,
365                // and is in the list of timed out but not aborted transactions.
366                assert!(!read_transactions.active.contains_key(&tx_ptr));
367                assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));
368
369                // Use the timed out transaction and observe the `Error::ReadTransactionTimeout`
370                assert_eq!(tx.open_db(None).err(), Some(Error::ReadTransactionTimeout));
371                assert!(!read_transactions.active.contains_key(&tx_ptr));
372                assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));
373
374                assert_eq!(tx.id().err(), Some(Error::ReadTransactionTimeout));
375                assert!(!read_transactions.active.contains_key(&tx_ptr));
376                assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));
377
378                // Ensure that the transaction pointer is not reused when opening a new read-only
379                // transaction.
380                let new_tx = env.begin_ro_txn().unwrap();
381                let new_tx_ptr = new_tx.txn() as usize;
382                assert!(read_transactions.active.contains_key(&new_tx_ptr));
383                assert_ne!(tx_ptr, new_tx_ptr);
384
385                // Drop the transaction and ensure that it's not in the list of timed out but not
386                // aborted transactions anymore.
387                drop(tx);
388                assert!(!read_transactions.timed_out_not_aborted.contains(&tx_ptr));
389            }
390        }
391
392        #[test]
393        fn txn_manager_read_transactions_duration_unbounded() {
394            let dir = tempdir().unwrap();
395            let env = Environment::builder()
396                .set_max_read_transaction_duration(MaxReadTransactionDuration::Unbounded)
397                .open(dir.path())
398                .unwrap();
399
400            assert!(env.txn_manager().read_transactions.is_none());
401
402            let tx = env.begin_ro_txn().unwrap();
403            sleep(READ_TRANSACTIONS_CHECK_INTERVAL);
404            assert!(tx.commit().is_ok())
405        }
406    }
407}