reth_db/
lib.rs

1//! MDBX implementation for reth's database abstraction layer.
2//!
3//! This crate is an implementation of [`reth-db-api`] for MDBX, as well as a few other common
4//! database types.
5//!
6//! # Overview
7//!
8//! An overview of the current data model of reth can be found in the [`mod@tables`] module.
9
10#![doc(
11    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
12    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
13    issue_tracker_base_url = "https://github.com/SeismicSystems/seismic-reth/issues/"
14)]
15#![cfg_attr(not(test), warn(unused_crate_dependencies))]
16#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
17
18mod implementation;
19pub mod lockfile;
20#[cfg(feature = "mdbx")]
21mod metrics;
22pub mod static_file;
23pub mod tables;
24#[cfg(feature = "mdbx")]
25mod utils;
26pub mod version;
27
28#[cfg(feature = "mdbx")]
29pub mod mdbx;
30
31pub use reth_storage_errors::db::{DatabaseError, DatabaseWriteOperation};
32pub use tables::*;
33#[cfg(feature = "mdbx")]
34pub use utils::is_database_empty;
35
36#[cfg(feature = "mdbx")]
37pub use mdbx::{create_db, init_db, open_db, open_db_read_only, DatabaseEnv, DatabaseEnvKind};
38
39pub use models::ClientVersion;
40pub use reth_db_api::*;
41
42/// Collection of database test utilities
43#[cfg(any(test, feature = "test-utils"))]
44pub mod test_utils {
45    use super::*;
46    use crate::mdbx::DatabaseArguments;
47    use parking_lot::RwLock;
48    use reth_db_api::{
49        database::Database,
50        database_metrics::{DatabaseMetadata, DatabaseMetadataValue, DatabaseMetrics},
51        models::ClientVersion,
52    };
53    use reth_fs_util;
54    use reth_libmdbx::MaxReadTransactionDuration;
55    use std::{
56        fmt::Formatter,
57        path::{Path, PathBuf},
58        sync::Arc,
59    };
60    use tempfile::TempDir;
61
62    /// Error during database open
63    pub const ERROR_DB_OPEN: &str = "Not able to open the database file.";
64    /// Error during database creation
65    pub const ERROR_DB_CREATION: &str = "Not able to create the database file.";
66    /// Error during database creation
67    pub const ERROR_STATIC_FILES_CREATION: &str = "Not able to create the static file path.";
68    /// Error during table creation
69    pub const ERROR_TABLE_CREATION: &str = "Not able to create tables in the database.";
70    /// Error during tempdir creation
71    pub const ERROR_TEMPDIR: &str = "Not able to create a temporary directory.";
72
73    /// A database will delete the db dir when dropped.
74    pub struct TempDatabase<DB> {
75        db: Option<DB>,
76        path: PathBuf,
77        /// Executed right before a database transaction is created.
78        pre_tx_hook: RwLock<Box<dyn Fn() + Send + Sync>>,
79        /// Executed right after a database transaction is created.
80        post_tx_hook: RwLock<Box<dyn Fn() + Send + Sync>>,
81    }
82
83    impl<DB: std::fmt::Debug> std::fmt::Debug for TempDatabase<DB> {
84        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
85            f.debug_struct("TempDatabase").field("db", &self.db).field("path", &self.path).finish()
86        }
87    }
88
89    impl<DB> Drop for TempDatabase<DB> {
90        fn drop(&mut self) {
91            if let Some(db) = self.db.take() {
92                drop(db);
93                let _ = reth_fs_util::remove_dir_all(&self.path);
94            }
95        }
96    }
97
98    impl<DB> TempDatabase<DB> {
99        /// Create new [`TempDatabase`] instance.
100        pub fn new(db: DB, path: PathBuf) -> Self {
101            Self {
102                db: Some(db),
103                path,
104                pre_tx_hook: RwLock::new(Box::new(|| ())),
105                post_tx_hook: RwLock::new(Box::new(|| ())),
106            }
107        }
108
109        /// Returns the reference to inner db.
110        pub fn db(&self) -> &DB {
111            self.db.as_ref().unwrap()
112        }
113
114        /// Returns the path to the database.
115        pub fn path(&self) -> &Path {
116            &self.path
117        }
118
119        /// Convert temp database into inner.
120        pub fn into_inner_db(mut self) -> DB {
121            self.db.take().unwrap() // take out db to avoid clean path in drop fn
122        }
123
124        /// Sets [`TempDatabase`] new pre transaction creation hook.
125        pub fn set_pre_transaction_hook(&self, hook: Box<dyn Fn() + Send + Sync>) {
126            let mut db_hook = self.pre_tx_hook.write();
127            *db_hook = hook;
128        }
129
130        /// Sets [`TempDatabase`] new post transaction creation hook.
131        pub fn set_post_transaction_hook(&self, hook: Box<dyn Fn() + Send + Sync>) {
132            let mut db_hook = self.post_tx_hook.write();
133            *db_hook = hook;
134        }
135    }
136
137    impl<DB: Database> Database for TempDatabase<DB> {
138        type TX = <DB as Database>::TX;
139        type TXMut = <DB as Database>::TXMut;
140        fn tx(&self) -> Result<Self::TX, DatabaseError> {
141            self.pre_tx_hook.read()();
142            let tx = self.db().tx()?;
143            self.post_tx_hook.read()();
144            Ok(tx)
145        }
146
147        fn tx_mut(&self) -> Result<Self::TXMut, DatabaseError> {
148            self.db().tx_mut()
149        }
150    }
151
152    impl<DB: DatabaseMetrics> DatabaseMetrics for TempDatabase<DB> {
153        fn report_metrics(&self) {
154            self.db().report_metrics()
155        }
156    }
157
158    impl<DB: DatabaseMetadata> DatabaseMetadata for TempDatabase<DB> {
159        fn metadata(&self) -> DatabaseMetadataValue {
160            self.db().metadata()
161        }
162    }
163
164    /// Create `static_files` path for testing
165    pub fn create_test_static_files_dir() -> (TempDir, PathBuf) {
166        let temp_dir = TempDir::with_prefix("reth-test-static-").expect(ERROR_TEMPDIR);
167        let path = temp_dir.path().to_path_buf();
168        (temp_dir, path)
169    }
170
171    /// Get a temporary directory path to use for the database
172    pub fn tempdir_path() -> PathBuf {
173        let builder = tempfile::Builder::new().prefix("reth-test-").rand_bytes(8).tempdir();
174        builder.expect(ERROR_TEMPDIR).into_path()
175    }
176
177    /// Create read/write database for testing
178    pub fn create_test_rw_db() -> Arc<TempDatabase<DatabaseEnv>> {
179        let path = tempdir_path();
180        let emsg = format!("{ERROR_DB_CREATION}: {path:?}");
181
182        let db = init_db(
183            &path,
184            DatabaseArguments::new(ClientVersion::default())
185                .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded)),
186        )
187        .expect(&emsg);
188
189        Arc::new(TempDatabase::new(db, path))
190    }
191
192    /// Create read/write database for testing
193    pub fn create_test_rw_db_with_path<P: AsRef<Path>>(path: P) -> Arc<TempDatabase<DatabaseEnv>> {
194        let path = path.as_ref().to_path_buf();
195        let db = init_db(
196            path.as_path(),
197            DatabaseArguments::new(ClientVersion::default())
198                .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded)),
199        )
200        .expect(ERROR_DB_CREATION);
201        Arc::new(TempDatabase::new(db, path))
202    }
203
204    /// Create read only database for testing
205    pub fn create_test_ro_db() -> Arc<TempDatabase<DatabaseEnv>> {
206        let args = DatabaseArguments::new(ClientVersion::default())
207            .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded));
208
209        let path = tempdir_path();
210        {
211            init_db(path.as_path(), args.clone()).expect(ERROR_DB_CREATION);
212        }
213        let db = open_db_read_only(path.as_path(), args).expect(ERROR_DB_OPEN);
214        Arc::new(TempDatabase::new(db, path))
215    }
216}
217
218#[cfg(test)]
219mod tests {
220    use crate::{
221        init_db,
222        mdbx::DatabaseArguments,
223        open_db, tables,
224        version::{db_version_file_path, DatabaseVersionError},
225    };
226    use assert_matches::assert_matches;
227    use reth_db_api::{
228        cursor::DbCursorRO, database::Database, models::ClientVersion, transaction::DbTx,
229    };
230    use reth_libmdbx::MaxReadTransactionDuration;
231    use std::time::Duration;
232    use tempfile::tempdir;
233
234    #[test]
235    fn db_version() {
236        let path = tempdir().unwrap();
237
238        let args = DatabaseArguments::new(ClientVersion::default())
239            .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded));
240
241        // Database is empty
242        {
243            let db = init_db(&path, args.clone());
244            assert_matches!(db, Ok(_));
245        }
246
247        // Database is not empty, current version is the same as in the file
248        {
249            let db = init_db(&path, args.clone());
250            assert_matches!(db, Ok(_));
251        }
252
253        // Database is not empty, version file is malformed
254        {
255            reth_fs_util::write(path.path().join(db_version_file_path(&path)), "invalid-version")
256                .unwrap();
257            let db = init_db(&path, args.clone());
258            assert!(db.is_err());
259            assert_matches!(
260                db.unwrap_err().downcast_ref::<DatabaseVersionError>(),
261                Some(DatabaseVersionError::MalformedFile)
262            )
263        }
264
265        // Database is not empty, version file contains not matching version
266        {
267            reth_fs_util::write(path.path().join(db_version_file_path(&path)), "0").unwrap();
268            let db = init_db(&path, args);
269            assert!(db.is_err());
270            assert_matches!(
271                db.unwrap_err().downcast_ref::<DatabaseVersionError>(),
272                Some(DatabaseVersionError::VersionMismatch { version: 0 })
273            )
274        }
275    }
276
277    #[test]
278    fn db_client_version() {
279        let path = tempdir().unwrap();
280
281        // Empty client version is not recorded
282        {
283            let db = init_db(&path, DatabaseArguments::new(ClientVersion::default())).unwrap();
284            let tx = db.tx().unwrap();
285            let mut cursor = tx.cursor_read::<tables::VersionHistory>().unwrap();
286            assert_matches!(cursor.first(), Ok(None));
287        }
288
289        // Client version is recorded
290        let first_version = ClientVersion { version: String::from("v1"), ..Default::default() };
291        {
292            let db = init_db(&path, DatabaseArguments::new(first_version.clone())).unwrap();
293            let tx = db.tx().unwrap();
294            let mut cursor = tx.cursor_read::<tables::VersionHistory>().unwrap();
295            assert_eq!(
296                cursor
297                    .walk_range(..)
298                    .unwrap()
299                    .map(|x| x.map(|(_, v)| v))
300                    .collect::<Result<Vec<_>, _>>()
301                    .unwrap(),
302                vec![first_version.clone()]
303            );
304        }
305
306        // Same client version is not duplicated.
307        {
308            let db = init_db(&path, DatabaseArguments::new(first_version.clone())).unwrap();
309            let tx = db.tx().unwrap();
310            let mut cursor = tx.cursor_read::<tables::VersionHistory>().unwrap();
311            assert_eq!(
312                cursor
313                    .walk_range(..)
314                    .unwrap()
315                    .map(|x| x.map(|(_, v)| v))
316                    .collect::<Result<Vec<_>, _>>()
317                    .unwrap(),
318                vec![first_version.clone()]
319            );
320        }
321
322        // Different client version is recorded
323        std::thread::sleep(Duration::from_secs(1));
324        let second_version = ClientVersion { version: String::from("v2"), ..Default::default() };
325        {
326            let db = init_db(&path, DatabaseArguments::new(second_version.clone())).unwrap();
327            let tx = db.tx().unwrap();
328            let mut cursor = tx.cursor_read::<tables::VersionHistory>().unwrap();
329            assert_eq!(
330                cursor
331                    .walk_range(..)
332                    .unwrap()
333                    .map(|x| x.map(|(_, v)| v))
334                    .collect::<Result<Vec<_>, _>>()
335                    .unwrap(),
336                vec![first_version.clone(), second_version.clone()]
337            );
338        }
339
340        // Different client version is recorded on db open.
341        std::thread::sleep(Duration::from_secs(1));
342        let third_version = ClientVersion { version: String::from("v3"), ..Default::default() };
343        {
344            let db = open_db(path.path(), DatabaseArguments::new(third_version.clone())).unwrap();
345            let tx = db.tx().unwrap();
346            let mut cursor = tx.cursor_read::<tables::VersionHistory>().unwrap();
347            assert_eq!(
348                cursor
349                    .walk_range(..)
350                    .unwrap()
351                    .map(|x| x.map(|(_, v)| v))
352                    .collect::<Result<Vec<_>, _>>()
353                    .unwrap(),
354                vec![first_version, second_version, third_version]
355            );
356        }
357    }
358}