reth_transaction_pool/blobstore/
mem.rs1use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
2use alloy_eips::{
3 eip4844::{BlobAndProofV1, BlobAndProofV2},
4 eip7594::BlobTransactionSidecarVariant,
5};
6use alloy_primitives::B256;
7use parking_lot::RwLock;
8use std::{collections::HashMap, sync::Arc};
9
10#[derive(Clone, Debug, Default, PartialEq)]
12pub struct InMemoryBlobStore {
13 inner: Arc<InMemoryBlobStoreInner>,
14}
15
16#[derive(Debug, Default)]
17struct InMemoryBlobStoreInner {
18 store: RwLock<HashMap<B256, Arc<BlobTransactionSidecarVariant>>>,
20 size_tracker: BlobStoreSize,
21}
22
23impl PartialEq for InMemoryBlobStoreInner {
24 fn eq(&self, other: &Self) -> bool {
25 self.store.read().eq(&other.store.read())
26 }
27}
28
29impl BlobStore for InMemoryBlobStore {
30 fn insert(&self, tx: B256, data: BlobTransactionSidecarVariant) -> Result<(), BlobStoreError> {
31 let mut store = self.inner.store.write();
32 self.inner.size_tracker.add_size(insert_size(&mut store, tx, data));
33 self.inner.size_tracker.update_len(store.len());
34 Ok(())
35 }
36
37 fn insert_all(
38 &self,
39 txs: Vec<(B256, BlobTransactionSidecarVariant)>,
40 ) -> Result<(), BlobStoreError> {
41 if txs.is_empty() {
42 return Ok(())
43 }
44 let mut store = self.inner.store.write();
45 let mut total_add = 0;
46 for (tx, data) in txs {
47 let add = insert_size(&mut store, tx, data);
48 total_add += add;
49 }
50 self.inner.size_tracker.add_size(total_add);
51 self.inner.size_tracker.update_len(store.len());
52 Ok(())
53 }
54
55 fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
56 let mut store = self.inner.store.write();
57 let sub = remove_size(&mut store, &tx);
58 self.inner.size_tracker.sub_size(sub);
59 self.inner.size_tracker.update_len(store.len());
60 Ok(())
61 }
62
63 fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
64 if txs.is_empty() {
65 return Ok(())
66 }
67 let mut store = self.inner.store.write();
68 let mut total_sub = 0;
69 for tx in txs {
70 total_sub += remove_size(&mut store, &tx);
71 }
72 self.inner.size_tracker.sub_size(total_sub);
73 self.inner.size_tracker.update_len(store.len());
74 Ok(())
75 }
76
77 fn cleanup(&self) -> BlobStoreCleanupStat {
78 BlobStoreCleanupStat::default()
79 }
80
81 fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
83 Ok(self.inner.store.read().get(&tx).cloned())
84 }
85
86 fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
87 Ok(self.inner.store.read().contains_key(&tx))
88 }
89
90 fn get_all(
91 &self,
92 txs: Vec<B256>,
93 ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
94 let store = self.inner.store.read();
95 Ok(txs.into_iter().filter_map(|tx| store.get(&tx).map(|item| (tx, item.clone()))).collect())
96 }
97
98 fn get_exact(
99 &self,
100 txs: Vec<B256>,
101 ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
102 let store = self.inner.store.read();
103 Ok(txs.into_iter().filter_map(|tx| store.get(&tx).cloned()).collect())
104 }
105
106 fn get_by_versioned_hashes_v1(
107 &self,
108 versioned_hashes: &[B256],
109 ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
110 let mut result = vec![None; versioned_hashes.len()];
111 for (_tx_hash, blob_sidecar) in self.inner.store.read().iter() {
112 if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
113 for (hash_idx, match_result) in
114 blob_sidecar.match_versioned_hashes(versioned_hashes)
115 {
116 result[hash_idx] = Some(match_result);
117 }
118 }
119
120 if result.iter().all(|blob| blob.is_some()) {
122 break;
123 }
124 }
125 Ok(result)
126 }
127
128 fn get_by_versioned_hashes_v2(
129 &self,
130 versioned_hashes: &[B256],
131 ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
132 let mut result = vec![None; versioned_hashes.len()];
133 for (_tx_hash, blob_sidecar) in self.inner.store.read().iter() {
134 if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
135 for (hash_idx, match_result) in
136 blob_sidecar.match_versioned_hashes(versioned_hashes)
137 {
138 result[hash_idx] = Some(match_result);
139 }
140 }
141
142 if result.iter().all(|blob| blob.is_some()) {
143 break;
144 }
145 }
146 if result.iter().all(|blob| blob.is_some()) {
147 Ok(Some(result.into_iter().map(Option::unwrap).collect()))
148 } else {
149 Ok(None)
150 }
151 }
152
153 fn data_size_hint(&self) -> Option<usize> {
154 Some(self.inner.size_tracker.data_size())
155 }
156
157 fn blobs_len(&self) -> usize {
158 self.inner.size_tracker.blobs_len()
159 }
160}
161
162#[inline]
164fn remove_size(store: &mut HashMap<B256, Arc<BlobTransactionSidecarVariant>>, tx: &B256) -> usize {
165 store.remove(tx).map(|rem| rem.size()).unwrap_or_default()
166}
167
168#[inline]
172fn insert_size(
173 store: &mut HashMap<B256, Arc<BlobTransactionSidecarVariant>>,
174 tx: B256,
175 blob: BlobTransactionSidecarVariant,
176) -> usize {
177 let add = blob.size();
178 store.insert(tx, Arc::new(blob));
179 add
180}