reth_storage_api/
database_provider.rs

1use reth_db_api::{
2    common::KeyValue,
3    cursor::DbCursorRO,
4    database::Database,
5    table::Table,
6    transaction::{DbTx, DbTxMut},
7    DatabaseError,
8};
9use reth_prune_types::PruneModes;
10use reth_storage_errors::provider::ProviderResult;
11use std::ops::{Bound, RangeBounds};
12
13/// Database provider.
14pub trait DBProvider: Send + Sync + Sized + 'static {
15    /// Underlying database transaction held by the provider.
16    type Tx: DbTx;
17
18    /// Returns a reference to the underlying transaction.
19    fn tx_ref(&self) -> &Self::Tx;
20
21    /// Returns a mutable reference to the underlying transaction.
22    fn tx_mut(&mut self) -> &mut Self::Tx;
23
24    /// Consumes the provider and returns the underlying transaction.
25    fn into_tx(self) -> Self::Tx;
26
27    /// Disables long-lived read transaction safety guarantees for leaks prevention and
28    /// observability improvements.
29    ///
30    /// CAUTION: In most of the cases, you want the safety guarantees for long read transactions
31    /// enabled. Use this only if you're sure that no write transaction is open in parallel, meaning
32    /// that Reth as a node is offline and not progressing.
33    fn disable_long_read_transaction_safety(mut self) -> Self {
34        self.tx_mut().disable_long_read_transaction_safety();
35        self
36    }
37
38    /// Commit database transaction
39    fn commit(self) -> ProviderResult<bool> {
40        Ok(self.into_tx().commit()?)
41    }
42
43    /// Returns a reference to prune modes.
44    fn prune_modes_ref(&self) -> &PruneModes;
45
46    /// Return full table as Vec
47    fn table<T: Table>(&self) -> Result<Vec<KeyValue<T>>, DatabaseError>
48    where
49        T::Key: Default + Ord,
50    {
51        self.tx_ref()
52            .cursor_read::<T>()?
53            .walk(Some(T::Key::default()))?
54            .collect::<Result<Vec<_>, DatabaseError>>()
55    }
56
57    /// Return a list of entries from the table, based on the given range.
58    #[inline]
59    fn get<T: Table>(
60        &self,
61        range: impl RangeBounds<T::Key>,
62    ) -> Result<Vec<KeyValue<T>>, DatabaseError> {
63        self.tx_ref().cursor_read::<T>()?.walk_range(range)?.collect::<Result<Vec<_>, _>>()
64    }
65
66    /// Iterates over read only values in the given table and collects them into a vector.
67    ///
68    /// Early-returns if the range is empty, without opening a cursor transaction.
69    fn cursor_read_collect<T: Table<Key = u64>>(
70        &self,
71        range: impl RangeBounds<T::Key>,
72    ) -> ProviderResult<Vec<T::Value>> {
73        let capacity = match range_size_hint(&range) {
74            Some(0) | None => return Ok(Vec::new()),
75            Some(capacity) => capacity,
76        };
77        let mut cursor = self.tx_ref().cursor_read::<T>()?;
78        self.cursor_collect_with_capacity(&mut cursor, range, capacity)
79    }
80
81    /// Iterates over read only values in the given table and collects them into a vector.
82    fn cursor_collect<T: Table<Key = u64>>(
83        &self,
84        cursor: &mut impl DbCursorRO<T>,
85        range: impl RangeBounds<T::Key>,
86    ) -> ProviderResult<Vec<T::Value>> {
87        let capacity = range_size_hint(&range).unwrap_or(0);
88        self.cursor_collect_with_capacity(cursor, range, capacity)
89    }
90
91    /// Iterates over read only values in the given table and collects them into a vector with
92    /// capacity.
93    fn cursor_collect_with_capacity<T: Table<Key = u64>>(
94        &self,
95        cursor: &mut impl DbCursorRO<T>,
96        range: impl RangeBounds<T::Key>,
97        capacity: usize,
98    ) -> ProviderResult<Vec<T::Value>> {
99        let mut items = Vec::with_capacity(capacity);
100        for entry in cursor.walk_range(range)? {
101            items.push(entry?.1);
102        }
103        Ok(items)
104    }
105
106    /// Remove list of entries from the table. Returns the number of entries removed.
107    #[inline]
108    fn remove<T: Table>(&self, range: impl RangeBounds<T::Key>) -> Result<usize, DatabaseError>
109    where
110        Self::Tx: DbTxMut,
111    {
112        let mut entries = 0;
113        let mut cursor_write = self.tx_ref().cursor_write::<T>()?;
114        let mut walker = cursor_write.walk_range(range)?;
115        while walker.next().transpose()?.is_some() {
116            walker.delete_current()?;
117            entries += 1;
118        }
119        Ok(entries)
120    }
121
122    /// Return a list of entries from the table, and remove them, based on the given range.
123    #[inline]
124    fn take<T: Table>(
125        &self,
126        range: impl RangeBounds<T::Key>,
127    ) -> Result<Vec<KeyValue<T>>, DatabaseError>
128    where
129        Self::Tx: DbTxMut,
130    {
131        let mut cursor_write = self.tx_ref().cursor_write::<T>()?;
132        let mut walker = cursor_write.walk_range(range)?;
133        let mut items = Vec::new();
134        while let Some(i) = walker.next().transpose()? {
135            walker.delete_current()?;
136            items.push(i)
137        }
138        Ok(items)
139    }
140}
141
142/// Database provider factory.
143#[auto_impl::auto_impl(&, Arc)]
144pub trait DatabaseProviderFactory: Send + Sync {
145    /// Database this factory produces providers for.
146    type DB: Database;
147
148    /// Provider type returned by the factory.
149    type Provider: DBProvider<Tx = <Self::DB as Database>::TX>;
150
151    /// Read-write provider type returned by the factory.
152    type ProviderRW: DBProvider<Tx = <Self::DB as Database>::TXMut>;
153
154    /// Create new read-only database provider.
155    fn database_provider_ro(&self) -> ProviderResult<Self::Provider>;
156
157    /// Create new read-write database provider.
158    fn database_provider_rw(&self) -> ProviderResult<Self::ProviderRW>;
159}
160
161fn range_size_hint(range: &impl RangeBounds<u64>) -> Option<usize> {
162    let start = match range.start_bound().cloned() {
163        Bound::Included(start) => start,
164        Bound::Excluded(start) => start.checked_add(1)?,
165        Bound::Unbounded => 0,
166    };
167    let end = match range.end_bound().cloned() {
168        Bound::Included(end) => end.saturating_add(1),
169        Bound::Excluded(end) => end,
170        Bound::Unbounded => return None,
171    };
172    end.checked_sub(start).map(|x| x as _)
173}