1use crate::{
2 environment::EnvPtr,
3 error::{mdbx_result, Result},
4 CommitLatency,
5};
6use std::{
7 ptr,
8 sync::{
9 mpsc::{sync_channel, Receiver, SyncSender},
10 Arc,
11 },
12};
13
14#[derive(Copy, Clone, Debug)]
15pub(crate) struct TxnPtr(pub(crate) *mut ffi::MDBX_txn);
16unsafe impl Send for TxnPtr {}
17unsafe impl Sync for TxnPtr {}
18
19pub(crate) enum TxnManagerMessage {
20 Begin { parent: TxnPtr, flags: ffi::MDBX_txn_flags_t, sender: SyncSender<Result<TxnPtr>> },
21 Abort { tx: TxnPtr, sender: SyncSender<Result<bool>> },
22 Commit { tx: TxnPtr, sender: SyncSender<Result<(bool, CommitLatency)>> },
23}
24
25#[derive(Debug)]
31pub(crate) struct TxnManager {
32 sender: SyncSender<TxnManagerMessage>,
33 #[cfg(feature = "read-tx-timeouts")]
34 read_transactions: Option<Arc<read_transactions::ReadTransactions>>,
35}
36
37impl TxnManager {
38 pub(crate) fn new(env: EnvPtr) -> Self {
39 let (tx, rx) = sync_channel(0);
40 let txn_manager = Self {
41 sender: tx,
42 #[cfg(feature = "read-tx-timeouts")]
43 read_transactions: None,
44 };
45
46 txn_manager.start_message_listener(env, rx);
47
48 txn_manager
49 }
50
51 fn start_message_listener(&self, env: EnvPtr, rx: Receiver<TxnManagerMessage>) {
58 let task = move || {
59 #[allow(clippy::redundant_locals)]
60 let env = env;
61 loop {
62 match rx.recv() {
63 Ok(msg) => match msg {
64 TxnManagerMessage::Begin { parent, flags, sender } => {
65 let mut txn: *mut ffi::MDBX_txn = ptr::null_mut();
66 let res = mdbx_result(unsafe {
67 ffi::mdbx_txn_begin_ex(
68 env.0,
69 parent.0,
70 flags,
71 &mut txn,
72 ptr::null_mut(),
73 )
74 })
75 .map(|_| TxnPtr(txn));
76 sender.send(res).unwrap();
77 }
78 TxnManagerMessage::Abort { tx, sender } => {
79 sender.send(mdbx_result(unsafe { ffi::mdbx_txn_abort(tx.0) })).unwrap();
80 }
81 TxnManagerMessage::Commit { tx, sender } => {
82 sender
83 .send({
84 let mut latency = CommitLatency::new();
85 mdbx_result(unsafe {
86 ffi::mdbx_txn_commit_ex(tx.0, latency.mdb_commit_latency())
87 })
88 .map(|v| (v, latency))
89 })
90 .unwrap();
91 }
92 },
93 Err(_) => return,
94 }
95 }
96 };
97 std::thread::Builder::new().name("mbdx-rs-txn-manager".to_string()).spawn(task).unwrap();
98 }
99
100 pub(crate) fn send_message(&self, message: TxnManagerMessage) {
101 self.sender.send(message).unwrap()
102 }
103}
104
105#[cfg(feature = "read-tx-timeouts")]
106mod read_transactions {
107 use crate::{
108 environment::EnvPtr, error::mdbx_result, transaction::TransactionPtr,
109 txn_manager::TxnManager,
110 };
111 use dashmap::{DashMap, DashSet};
112 use std::{
113 sync::{mpsc::sync_channel, Arc},
114 time::{Duration, Instant},
115 };
116 use tracing::{error, trace, warn};
117
118 const READ_TRANSACTIONS_CHECK_INTERVAL: Duration = Duration::from_secs(5);
119
120 impl TxnManager {
121 pub(crate) fn new_with_max_read_transaction_duration(
124 env: EnvPtr,
125 duration: Duration,
126 ) -> Self {
127 let read_transactions = Arc::new(ReadTransactions::new(duration));
128 read_transactions.clone().start_monitor();
129
130 let (tx, rx) = sync_channel(0);
131
132 let txn_manager = Self { sender: tx, read_transactions: Some(read_transactions) };
133
134 txn_manager.start_message_listener(env, rx);
135
136 txn_manager
137 }
138
139 pub(crate) fn add_active_read_transaction(
141 &self,
142 ptr: *mut ffi::MDBX_txn,
143 tx: TransactionPtr,
144 ) {
145 if let Some(read_transactions) = &self.read_transactions {
146 read_transactions.add_active(ptr, tx);
147 }
148 }
149
150 pub(crate) fn remove_active_read_transaction(
152 &self,
153 ptr: *mut ffi::MDBX_txn,
154 ) -> Option<(usize, (TransactionPtr, Instant))> {
155 self.read_transactions.as_ref()?.remove_active(ptr)
156 }
157
158 pub(crate) fn timed_out_not_aborted_read_transactions(&self) -> Option<usize> {
160 self.read_transactions
161 .as_ref()
162 .map(|read_transactions| read_transactions.timed_out_not_aborted())
163 }
164 }
165
166 #[derive(Debug, Default)]
167 pub(super) struct ReadTransactions {
168 max_duration: Duration,
171 active: DashMap<usize, (TransactionPtr, Instant)>,
176 timed_out_not_aborted: DashSet<usize>,
179 }
180
181 impl ReadTransactions {
182 pub(super) fn new(max_duration: Duration) -> Self {
183 Self { max_duration, ..Default::default() }
184 }
185
186 pub(super) fn add_active(&self, ptr: *mut ffi::MDBX_txn, tx: TransactionPtr) {
188 let _ = self.active.insert(ptr as usize, (tx, Instant::now()));
189 }
190
191 pub(super) fn remove_active(
193 &self,
194 ptr: *mut ffi::MDBX_txn,
195 ) -> Option<(usize, (TransactionPtr, Instant))> {
196 self.timed_out_not_aborted.remove(&(ptr as usize));
197 self.active.remove(&(ptr as usize))
198 }
199
200 pub(super) fn timed_out_not_aborted(&self) -> usize {
202 self.timed_out_not_aborted.len()
203 }
204
205 pub(super) fn start_monitor(self: Arc<Self>) {
208 let task = move || {
209 let mut timed_out_active = Vec::new();
210
211 loop {
212 let now = Instant::now();
213 let mut max_active_transaction_duration = None;
214
215 for entry in &self.active {
218 let (tx, start) = entry.value();
219 let duration = now - *start;
220
221 if duration > self.max_duration {
222 let result = tx.txn_execute_fail_on_timeout(|txn_ptr| {
223 let result = mdbx_result(unsafe { ffi::mdbx_txn_reset(txn_ptr) });
233 if result.is_ok() {
234 tx.set_timed_out();
235 }
236 (txn_ptr, duration, result)
237 });
238
239 match result {
240 Ok((txn_ptr, duration, error)) => {
241 timed_out_active.push((txn_ptr, duration, error));
245 }
246 Err(err) => {
247 error!(target: "libmdbx", %err, "Failed to abort the long-lived read transaction")
248 }
249 }
250 } else {
251 max_active_transaction_duration = Some(
252 duration.max(max_active_transaction_duration.unwrap_or_default()),
253 );
254 }
255 }
256
257 for (ptr, open_duration, err) in timed_out_active.iter().copied() {
260 let was_in_active = self.remove_active(ptr).is_some();
262 if let Err(err) = err {
263 if was_in_active {
264 error!(target: "libmdbx", %err, ?open_duration, "Failed to time out the long-lived read transaction");
267 }
268 } else {
269 warn!(target: "libmdbx", ?open_duration, "Long-lived read transaction has been timed out");
271 self.timed_out_not_aborted.insert(ptr as usize);
274 }
275 }
276
277 timed_out_active.clear();
280
281 if !self.active.is_empty() {
282 trace!(
283 target: "libmdbx",
284 elapsed = ?now.elapsed(),
285 active = ?self.active.iter().map(|entry| {
286 let (tx, start) = entry.value();
287 (tx.clone(), start.elapsed())
288 }).collect::<Vec<_>>(),
289 "Read transactions"
290 );
291 }
292
293 let sleep_duration = READ_TRANSACTIONS_CHECK_INTERVAL.min(
296 self.max_duration - max_active_transaction_duration.unwrap_or_default(),
297 );
298 trace!(target: "libmdbx", ?sleep_duration, elapsed = ?now.elapsed(), "Putting transaction monitor to sleep");
299 std::thread::sleep(sleep_duration);
300 }
301 };
302 std::thread::Builder::new()
303 .name("mdbx-rs-read-tx-timeouts".to_string())
304 .spawn(task)
305 .unwrap();
306 }
307 }
308
309 #[cfg(test)]
310 mod tests {
311 use crate::{
312 txn_manager::read_transactions::READ_TRANSACTIONS_CHECK_INTERVAL, Environment, Error,
313 MaxReadTransactionDuration,
314 };
315 use std::{thread::sleep, time::Duration};
316 use tempfile::tempdir;
317
318 #[test]
319 fn txn_manager_read_transactions_duration_set() {
320 const MAX_DURATION: Duration = Duration::from_secs(1);
321
322 let dir = tempdir().unwrap();
323 let env = Environment::builder()
324 .set_max_read_transaction_duration(MaxReadTransactionDuration::Set(MAX_DURATION))
325 .open(dir.path())
326 .unwrap();
327
328 let read_transactions = env.txn_manager().read_transactions.as_ref().unwrap();
329
330 {
332 let tx = env.begin_ro_txn().unwrap();
333 let tx_ptr = tx.txn() as usize;
334 assert!(read_transactions.active.contains_key(&tx_ptr));
335
336 tx.open_db(None).unwrap();
337 drop(tx);
338
339 assert!(!read_transactions.active.contains_key(&tx_ptr));
340 }
341
342 {
344 let tx = env.begin_ro_txn().unwrap();
345 let tx_ptr = tx.txn() as usize;
346 assert!(read_transactions.active.contains_key(&tx_ptr));
347
348 tx.open_db(None).unwrap();
349 tx.commit().unwrap();
350
351 assert!(!read_transactions.active.contains_key(&tx_ptr));
352 }
353
354 {
355 let tx = env.begin_ro_txn().unwrap();
358 let tx_ptr = tx.txn() as usize;
359 assert!(read_transactions.active.contains_key(&tx_ptr));
360
361 sleep(MAX_DURATION + READ_TRANSACTIONS_CHECK_INTERVAL);
363
364 assert!(!read_transactions.active.contains_key(&tx_ptr));
367 assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));
368
369 assert_eq!(tx.open_db(None).err(), Some(Error::ReadTransactionTimeout));
371 assert!(!read_transactions.active.contains_key(&tx_ptr));
372 assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));
373
374 assert_eq!(tx.id().err(), Some(Error::ReadTransactionTimeout));
375 assert!(!read_transactions.active.contains_key(&tx_ptr));
376 assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));
377
378 let new_tx = env.begin_ro_txn().unwrap();
381 let new_tx_ptr = new_tx.txn() as usize;
382 assert!(read_transactions.active.contains_key(&new_tx_ptr));
383 assert_ne!(tx_ptr, new_tx_ptr);
384
385 drop(tx);
388 assert!(!read_transactions.timed_out_not_aborted.contains(&tx_ptr));
389 }
390 }
391
392 #[test]
393 fn txn_manager_read_transactions_duration_unbounded() {
394 let dir = tempdir().unwrap();
395 let env = Environment::builder()
396 .set_max_read_transaction_duration(MaxReadTransactionDuration::Unbounded)
397 .open(dir.path())
398 .unwrap();
399
400 assert!(env.txn_manager().read_transactions.is_none());
401
402 let tx = env.begin_ro_txn().unwrap();
403 sleep(READ_TRANSACTIONS_CHECK_INTERVAL);
404 assert!(tx.commit().is_ok())
405 }
406 }
407}