reth_transaction_pool/blobstore/
mem.rs

1use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
2use alloy_eips::{
3    eip4844::{BlobAndProofV1, BlobAndProofV2},
4    eip7594::BlobTransactionSidecarVariant,
5};
6use alloy_primitives::B256;
7use parking_lot::RwLock;
8use std::{collections::HashMap, sync::Arc};
9
10/// An in-memory blob store.
11#[derive(Clone, Debug, Default, PartialEq)]
12pub struct InMemoryBlobStore {
13    inner: Arc<InMemoryBlobStoreInner>,
14}
15
16#[derive(Debug, Default)]
17struct InMemoryBlobStoreInner {
18    /// Storage for all blob data.
19    store: RwLock<HashMap<B256, Arc<BlobTransactionSidecarVariant>>>,
20    size_tracker: BlobStoreSize,
21}
22
23impl PartialEq for InMemoryBlobStoreInner {
24    fn eq(&self, other: &Self) -> bool {
25        self.store.read().eq(&other.store.read())
26    }
27}
28
29impl BlobStore for InMemoryBlobStore {
30    fn insert(&self, tx: B256, data: BlobTransactionSidecarVariant) -> Result<(), BlobStoreError> {
31        let mut store = self.inner.store.write();
32        self.inner.size_tracker.add_size(insert_size(&mut store, tx, data));
33        self.inner.size_tracker.update_len(store.len());
34        Ok(())
35    }
36
37    fn insert_all(
38        &self,
39        txs: Vec<(B256, BlobTransactionSidecarVariant)>,
40    ) -> Result<(), BlobStoreError> {
41        if txs.is_empty() {
42            return Ok(())
43        }
44        let mut store = self.inner.store.write();
45        let mut total_add = 0;
46        for (tx, data) in txs {
47            let add = insert_size(&mut store, tx, data);
48            total_add += add;
49        }
50        self.inner.size_tracker.add_size(total_add);
51        self.inner.size_tracker.update_len(store.len());
52        Ok(())
53    }
54
55    fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
56        let mut store = self.inner.store.write();
57        let sub = remove_size(&mut store, &tx);
58        self.inner.size_tracker.sub_size(sub);
59        self.inner.size_tracker.update_len(store.len());
60        Ok(())
61    }
62
63    fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
64        if txs.is_empty() {
65            return Ok(())
66        }
67        let mut store = self.inner.store.write();
68        let mut total_sub = 0;
69        for tx in txs {
70            total_sub += remove_size(&mut store, &tx);
71        }
72        self.inner.size_tracker.sub_size(total_sub);
73        self.inner.size_tracker.update_len(store.len());
74        Ok(())
75    }
76
77    fn cleanup(&self) -> BlobStoreCleanupStat {
78        BlobStoreCleanupStat::default()
79    }
80
81    // Retrieves the decoded blob data for the given transaction hash.
82    fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
83        Ok(self.inner.store.read().get(&tx).cloned())
84    }
85
86    fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
87        Ok(self.inner.store.read().contains_key(&tx))
88    }
89
90    fn get_all(
91        &self,
92        txs: Vec<B256>,
93    ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
94        let store = self.inner.store.read();
95        Ok(txs.into_iter().filter_map(|tx| store.get(&tx).map(|item| (tx, item.clone()))).collect())
96    }
97
98    fn get_exact(
99        &self,
100        txs: Vec<B256>,
101    ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
102        let store = self.inner.store.read();
103        Ok(txs.into_iter().filter_map(|tx| store.get(&tx).cloned()).collect())
104    }
105
106    fn get_by_versioned_hashes_v1(
107        &self,
108        versioned_hashes: &[B256],
109    ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
110        let mut result = vec![None; versioned_hashes.len()];
111        for (_tx_hash, blob_sidecar) in self.inner.store.read().iter() {
112            if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
113                for (hash_idx, match_result) in
114                    blob_sidecar.match_versioned_hashes(versioned_hashes)
115                {
116                    result[hash_idx] = Some(match_result);
117                }
118            }
119
120            // Return early if all blobs are found.
121            if result.iter().all(|blob| blob.is_some()) {
122                break;
123            }
124        }
125        Ok(result)
126    }
127
128    fn get_by_versioned_hashes_v2(
129        &self,
130        versioned_hashes: &[B256],
131    ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
132        let mut result = vec![None; versioned_hashes.len()];
133        for (_tx_hash, blob_sidecar) in self.inner.store.read().iter() {
134            if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
135                for (hash_idx, match_result) in
136                    blob_sidecar.match_versioned_hashes(versioned_hashes)
137                {
138                    result[hash_idx] = Some(match_result);
139                }
140            }
141
142            if result.iter().all(|blob| blob.is_some()) {
143                break;
144            }
145        }
146        if result.iter().all(|blob| blob.is_some()) {
147            Ok(Some(result.into_iter().map(Option::unwrap).collect()))
148        } else {
149            Ok(None)
150        }
151    }
152
153    fn data_size_hint(&self) -> Option<usize> {
154        Some(self.inner.size_tracker.data_size())
155    }
156
157    fn blobs_len(&self) -> usize {
158        self.inner.size_tracker.blobs_len()
159    }
160}
161
162/// Removes the given blob from the store and returns the size of the blob that was removed.
163#[inline]
164fn remove_size(store: &mut HashMap<B256, Arc<BlobTransactionSidecarVariant>>, tx: &B256) -> usize {
165    store.remove(tx).map(|rem| rem.size()).unwrap_or_default()
166}
167
168/// Inserts the given blob into the store and returns the size of the blob that was added.
169///
170/// We don't need to handle the size updates for replacements because transactions are unique.
171#[inline]
172fn insert_size(
173    store: &mut HashMap<B256, Arc<BlobTransactionSidecarVariant>>,
174    tx: B256,
175    blob: BlobTransactionSidecarVariant,
176) -> usize {
177    let add = blob.size();
178    store.insert(tx, Arc::new(blob));
179    add
180}