reth_storage_api/
database_provider.rs1use reth_db_api::{
2 common::KeyValue,
3 cursor::DbCursorRO,
4 database::Database,
5 table::Table,
6 transaction::{DbTx, DbTxMut},
7 DatabaseError,
8};
9use reth_prune_types::PruneModes;
10use reth_storage_errors::provider::ProviderResult;
11use std::ops::{Bound, RangeBounds};
12
13pub trait DBProvider: Send + Sync + Sized + 'static {
15 type Tx: DbTx;
17
18 fn tx_ref(&self) -> &Self::Tx;
20
21 fn tx_mut(&mut self) -> &mut Self::Tx;
23
24 fn into_tx(self) -> Self::Tx;
26
27 fn disable_long_read_transaction_safety(mut self) -> Self {
34 self.tx_mut().disable_long_read_transaction_safety();
35 self
36 }
37
38 fn commit(self) -> ProviderResult<bool> {
40 Ok(self.into_tx().commit()?)
41 }
42
43 fn prune_modes_ref(&self) -> &PruneModes;
45
46 fn table<T: Table>(&self) -> Result<Vec<KeyValue<T>>, DatabaseError>
48 where
49 T::Key: Default + Ord,
50 {
51 self.tx_ref()
52 .cursor_read::<T>()?
53 .walk(Some(T::Key::default()))?
54 .collect::<Result<Vec<_>, DatabaseError>>()
55 }
56
57 #[inline]
59 fn get<T: Table>(
60 &self,
61 range: impl RangeBounds<T::Key>,
62 ) -> Result<Vec<KeyValue<T>>, DatabaseError> {
63 self.tx_ref().cursor_read::<T>()?.walk_range(range)?.collect::<Result<Vec<_>, _>>()
64 }
65
66 fn cursor_read_collect<T: Table<Key = u64>>(
70 &self,
71 range: impl RangeBounds<T::Key>,
72 ) -> ProviderResult<Vec<T::Value>> {
73 let capacity = match range_size_hint(&range) {
74 Some(0) | None => return Ok(Vec::new()),
75 Some(capacity) => capacity,
76 };
77 let mut cursor = self.tx_ref().cursor_read::<T>()?;
78 self.cursor_collect_with_capacity(&mut cursor, range, capacity)
79 }
80
81 fn cursor_collect<T: Table<Key = u64>>(
83 &self,
84 cursor: &mut impl DbCursorRO<T>,
85 range: impl RangeBounds<T::Key>,
86 ) -> ProviderResult<Vec<T::Value>> {
87 let capacity = range_size_hint(&range).unwrap_or(0);
88 self.cursor_collect_with_capacity(cursor, range, capacity)
89 }
90
91 fn cursor_collect_with_capacity<T: Table<Key = u64>>(
94 &self,
95 cursor: &mut impl DbCursorRO<T>,
96 range: impl RangeBounds<T::Key>,
97 capacity: usize,
98 ) -> ProviderResult<Vec<T::Value>> {
99 let mut items = Vec::with_capacity(capacity);
100 for entry in cursor.walk_range(range)? {
101 items.push(entry?.1);
102 }
103 Ok(items)
104 }
105
106 #[inline]
108 fn remove<T: Table>(&self, range: impl RangeBounds<T::Key>) -> Result<usize, DatabaseError>
109 where
110 Self::Tx: DbTxMut,
111 {
112 let mut entries = 0;
113 let mut cursor_write = self.tx_ref().cursor_write::<T>()?;
114 let mut walker = cursor_write.walk_range(range)?;
115 while walker.next().transpose()?.is_some() {
116 walker.delete_current()?;
117 entries += 1;
118 }
119 Ok(entries)
120 }
121
122 #[inline]
124 fn take<T: Table>(
125 &self,
126 range: impl RangeBounds<T::Key>,
127 ) -> Result<Vec<KeyValue<T>>, DatabaseError>
128 where
129 Self::Tx: DbTxMut,
130 {
131 let mut cursor_write = self.tx_ref().cursor_write::<T>()?;
132 let mut walker = cursor_write.walk_range(range)?;
133 let mut items = Vec::new();
134 while let Some(i) = walker.next().transpose()? {
135 walker.delete_current()?;
136 items.push(i)
137 }
138 Ok(items)
139 }
140}
141
142#[auto_impl::auto_impl(&, Arc)]
144pub trait DatabaseProviderFactory: Send + Sync {
145 type DB: Database;
147
148 type Provider: DBProvider<Tx = <Self::DB as Database>::TX>;
150
151 type ProviderRW: DBProvider<Tx = <Self::DB as Database>::TXMut>;
153
154 fn database_provider_ro(&self) -> ProviderResult<Self::Provider>;
156
157 fn database_provider_rw(&self) -> ProviderResult<Self::ProviderRW>;
159}
160
161fn range_size_hint(range: &impl RangeBounds<u64>) -> Option<usize> {
162 let start = match range.start_bound().cloned() {
163 Bound::Included(start) => start,
164 Bound::Excluded(start) => start.checked_add(1)?,
165 Bound::Unbounded => 0,
166 };
167 let end = match range.end_bound().cloned() {
168 Bound::Included(end) => end.saturating_add(1),
169 Bound::Excluded(end) => end,
170 Bound::Unbounded => return None,
171 };
172 end.checked_sub(start).map(|x| x as _)
173}