1#![doc(
11 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
12 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
13 issue_tracker_base_url = "https://github.com/SeismicSystems/seismic-reth/issues/"
14)]
15#![cfg_attr(not(test), warn(unused_crate_dependencies))]
16#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
17
18mod implementation;
19pub mod lockfile;
20#[cfg(feature = "mdbx")]
21mod metrics;
22pub mod static_file;
23pub mod tables;
24#[cfg(feature = "mdbx")]
25mod utils;
26pub mod version;
27
28#[cfg(feature = "mdbx")]
29pub mod mdbx;
30
31pub use reth_storage_errors::db::{DatabaseError, DatabaseWriteOperation};
32pub use tables::*;
33#[cfg(feature = "mdbx")]
34pub use utils::is_database_empty;
35
36#[cfg(feature = "mdbx")]
37pub use mdbx::{create_db, init_db, open_db, open_db_read_only, DatabaseEnv, DatabaseEnvKind};
38
39pub use models::ClientVersion;
40pub use reth_db_api::*;
41
42#[cfg(any(test, feature = "test-utils"))]
44pub mod test_utils {
45 use super::*;
46 use crate::mdbx::DatabaseArguments;
47 use parking_lot::RwLock;
48 use reth_db_api::{
49 database::Database,
50 database_metrics::{DatabaseMetadata, DatabaseMetadataValue, DatabaseMetrics},
51 models::ClientVersion,
52 };
53 use reth_fs_util;
54 use reth_libmdbx::MaxReadTransactionDuration;
55 use std::{
56 fmt::Formatter,
57 path::{Path, PathBuf},
58 sync::Arc,
59 };
60 use tempfile::TempDir;
61
62 pub const ERROR_DB_OPEN: &str = "Not able to open the database file.";
64 pub const ERROR_DB_CREATION: &str = "Not able to create the database file.";
66 pub const ERROR_STATIC_FILES_CREATION: &str = "Not able to create the static file path.";
68 pub const ERROR_TABLE_CREATION: &str = "Not able to create tables in the database.";
70 pub const ERROR_TEMPDIR: &str = "Not able to create a temporary directory.";
72
73 pub struct TempDatabase<DB> {
75 db: Option<DB>,
76 path: PathBuf,
77 pre_tx_hook: RwLock<Box<dyn Fn() + Send + Sync>>,
79 post_tx_hook: RwLock<Box<dyn Fn() + Send + Sync>>,
81 }
82
83 impl<DB: std::fmt::Debug> std::fmt::Debug for TempDatabase<DB> {
84 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
85 f.debug_struct("TempDatabase").field("db", &self.db).field("path", &self.path).finish()
86 }
87 }
88
89 impl<DB> Drop for TempDatabase<DB> {
90 fn drop(&mut self) {
91 if let Some(db) = self.db.take() {
92 drop(db);
93 let _ = reth_fs_util::remove_dir_all(&self.path);
94 }
95 }
96 }
97
98 impl<DB> TempDatabase<DB> {
99 pub fn new(db: DB, path: PathBuf) -> Self {
101 Self {
102 db: Some(db),
103 path,
104 pre_tx_hook: RwLock::new(Box::new(|| ())),
105 post_tx_hook: RwLock::new(Box::new(|| ())),
106 }
107 }
108
109 pub fn db(&self) -> &DB {
111 self.db.as_ref().unwrap()
112 }
113
114 pub fn path(&self) -> &Path {
116 &self.path
117 }
118
119 pub fn into_inner_db(mut self) -> DB {
121 self.db.take().unwrap() }
123
124 pub fn set_pre_transaction_hook(&self, hook: Box<dyn Fn() + Send + Sync>) {
126 let mut db_hook = self.pre_tx_hook.write();
127 *db_hook = hook;
128 }
129
130 pub fn set_post_transaction_hook(&self, hook: Box<dyn Fn() + Send + Sync>) {
132 let mut db_hook = self.post_tx_hook.write();
133 *db_hook = hook;
134 }
135 }
136
137 impl<DB: Database> Database for TempDatabase<DB> {
138 type TX = <DB as Database>::TX;
139 type TXMut = <DB as Database>::TXMut;
140 fn tx(&self) -> Result<Self::TX, DatabaseError> {
141 self.pre_tx_hook.read()();
142 let tx = self.db().tx()?;
143 self.post_tx_hook.read()();
144 Ok(tx)
145 }
146
147 fn tx_mut(&self) -> Result<Self::TXMut, DatabaseError> {
148 self.db().tx_mut()
149 }
150 }
151
152 impl<DB: DatabaseMetrics> DatabaseMetrics for TempDatabase<DB> {
153 fn report_metrics(&self) {
154 self.db().report_metrics()
155 }
156 }
157
158 impl<DB: DatabaseMetadata> DatabaseMetadata for TempDatabase<DB> {
159 fn metadata(&self) -> DatabaseMetadataValue {
160 self.db().metadata()
161 }
162 }
163
164 pub fn create_test_static_files_dir() -> (TempDir, PathBuf) {
166 let temp_dir = TempDir::with_prefix("reth-test-static-").expect(ERROR_TEMPDIR);
167 let path = temp_dir.path().to_path_buf();
168 (temp_dir, path)
169 }
170
171 pub fn tempdir_path() -> PathBuf {
173 let builder = tempfile::Builder::new().prefix("reth-test-").rand_bytes(8).tempdir();
174 builder.expect(ERROR_TEMPDIR).into_path()
175 }
176
177 pub fn create_test_rw_db() -> Arc<TempDatabase<DatabaseEnv>> {
179 let path = tempdir_path();
180 let emsg = format!("{ERROR_DB_CREATION}: {path:?}");
181
182 let db = init_db(
183 &path,
184 DatabaseArguments::new(ClientVersion::default())
185 .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded)),
186 )
187 .expect(&emsg);
188
189 Arc::new(TempDatabase::new(db, path))
190 }
191
192 pub fn create_test_rw_db_with_path<P: AsRef<Path>>(path: P) -> Arc<TempDatabase<DatabaseEnv>> {
194 let path = path.as_ref().to_path_buf();
195 let db = init_db(
196 path.as_path(),
197 DatabaseArguments::new(ClientVersion::default())
198 .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded)),
199 )
200 .expect(ERROR_DB_CREATION);
201 Arc::new(TempDatabase::new(db, path))
202 }
203
204 pub fn create_test_ro_db() -> Arc<TempDatabase<DatabaseEnv>> {
206 let args = DatabaseArguments::new(ClientVersion::default())
207 .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded));
208
209 let path = tempdir_path();
210 {
211 init_db(path.as_path(), args.clone()).expect(ERROR_DB_CREATION);
212 }
213 let db = open_db_read_only(path.as_path(), args).expect(ERROR_DB_OPEN);
214 Arc::new(TempDatabase::new(db, path))
215 }
216}
217
218#[cfg(test)]
219mod tests {
220 use crate::{
221 init_db,
222 mdbx::DatabaseArguments,
223 open_db, tables,
224 version::{db_version_file_path, DatabaseVersionError},
225 };
226 use assert_matches::assert_matches;
227 use reth_db_api::{
228 cursor::DbCursorRO, database::Database, models::ClientVersion, transaction::DbTx,
229 };
230 use reth_libmdbx::MaxReadTransactionDuration;
231 use std::time::Duration;
232 use tempfile::tempdir;
233
234 #[test]
235 fn db_version() {
236 let path = tempdir().unwrap();
237
238 let args = DatabaseArguments::new(ClientVersion::default())
239 .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded));
240
241 {
243 let db = init_db(&path, args.clone());
244 assert_matches!(db, Ok(_));
245 }
246
247 {
249 let db = init_db(&path, args.clone());
250 assert_matches!(db, Ok(_));
251 }
252
253 {
255 reth_fs_util::write(path.path().join(db_version_file_path(&path)), "invalid-version")
256 .unwrap();
257 let db = init_db(&path, args.clone());
258 assert!(db.is_err());
259 assert_matches!(
260 db.unwrap_err().downcast_ref::<DatabaseVersionError>(),
261 Some(DatabaseVersionError::MalformedFile)
262 )
263 }
264
265 {
267 reth_fs_util::write(path.path().join(db_version_file_path(&path)), "0").unwrap();
268 let db = init_db(&path, args);
269 assert!(db.is_err());
270 assert_matches!(
271 db.unwrap_err().downcast_ref::<DatabaseVersionError>(),
272 Some(DatabaseVersionError::VersionMismatch { version: 0 })
273 )
274 }
275 }
276
277 #[test]
278 fn db_client_version() {
279 let path = tempdir().unwrap();
280
281 {
283 let db = init_db(&path, DatabaseArguments::new(ClientVersion::default())).unwrap();
284 let tx = db.tx().unwrap();
285 let mut cursor = tx.cursor_read::<tables::VersionHistory>().unwrap();
286 assert_matches!(cursor.first(), Ok(None));
287 }
288
289 let first_version = ClientVersion { version: String::from("v1"), ..Default::default() };
291 {
292 let db = init_db(&path, DatabaseArguments::new(first_version.clone())).unwrap();
293 let tx = db.tx().unwrap();
294 let mut cursor = tx.cursor_read::<tables::VersionHistory>().unwrap();
295 assert_eq!(
296 cursor
297 .walk_range(..)
298 .unwrap()
299 .map(|x| x.map(|(_, v)| v))
300 .collect::<Result<Vec<_>, _>>()
301 .unwrap(),
302 vec![first_version.clone()]
303 );
304 }
305
306 {
308 let db = init_db(&path, DatabaseArguments::new(first_version.clone())).unwrap();
309 let tx = db.tx().unwrap();
310 let mut cursor = tx.cursor_read::<tables::VersionHistory>().unwrap();
311 assert_eq!(
312 cursor
313 .walk_range(..)
314 .unwrap()
315 .map(|x| x.map(|(_, v)| v))
316 .collect::<Result<Vec<_>, _>>()
317 .unwrap(),
318 vec![first_version.clone()]
319 );
320 }
321
322 std::thread::sleep(Duration::from_secs(1));
324 let second_version = ClientVersion { version: String::from("v2"), ..Default::default() };
325 {
326 let db = init_db(&path, DatabaseArguments::new(second_version.clone())).unwrap();
327 let tx = db.tx().unwrap();
328 let mut cursor = tx.cursor_read::<tables::VersionHistory>().unwrap();
329 assert_eq!(
330 cursor
331 .walk_range(..)
332 .unwrap()
333 .map(|x| x.map(|(_, v)| v))
334 .collect::<Result<Vec<_>, _>>()
335 .unwrap(),
336 vec![first_version.clone(), second_version.clone()]
337 );
338 }
339
340 std::thread::sleep(Duration::from_secs(1));
342 let third_version = ClientVersion { version: String::from("v3"), ..Default::default() };
343 {
344 let db = open_db(path.path(), DatabaseArguments::new(third_version.clone())).unwrap();
345 let tx = db.tx().unwrap();
346 let mut cursor = tx.cursor_read::<tables::VersionHistory>().unwrap();
347 assert_eq!(
348 cursor
349 .walk_range(..)
350 .unwrap()
351 .map(|x| x.map(|(_, v)| v))
352 .collect::<Result<Vec<_>, _>>()
353 .unwrap(),
354 vec![first_version, second_version, third_version]
355 );
356 }
357 }
358}