reth_transaction_pool/blobstore/
mod.rs1use alloy_eips::{
4 eip4844::{BlobAndProofV1, BlobAndProofV2},
5 eip7594::BlobTransactionSidecarVariant,
6};
7use alloy_primitives::B256;
8pub use disk::{DiskFileBlobStore, DiskFileBlobStoreConfig, OpenDiskFileBlobStore};
9pub use mem::InMemoryBlobStore;
10pub use noop::NoopBlobStore;
11use std::{
12 fmt,
13 sync::{
14 atomic::{AtomicUsize, Ordering},
15 Arc,
16 },
17};
18pub use tracker::{BlobStoreCanonTracker, BlobStoreUpdates};
19
20pub mod disk;
21mod mem;
22mod noop;
23mod tracker;
24
25pub trait BlobStore: fmt::Debug + Send + Sync + 'static {
32 fn insert(&self, tx: B256, data: BlobTransactionSidecarVariant) -> Result<(), BlobStoreError>;
34
35 fn insert_all(
37 &self,
38 txs: Vec<(B256, BlobTransactionSidecarVariant)>,
39 ) -> Result<(), BlobStoreError>;
40
41 fn delete(&self, tx: B256) -> Result<(), BlobStoreError>;
43
44 fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError>;
46
47 fn cleanup(&self) -> BlobStoreCleanupStat;
53
54 fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError>;
56
57 fn contains(&self, tx: B256) -> Result<bool, BlobStoreError>;
59
60 fn get_all(
67 &self,
68 txs: Vec<B256>,
69 ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError>;
70
71 fn get_exact(
76 &self,
77 txs: Vec<B256>,
78 ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError>;
79
80 fn get_by_versioned_hashes_v1(
82 &self,
83 versioned_hashes: &[B256],
84 ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError>;
85
86 fn get_by_versioned_hashes_v2(
97 &self,
98 versioned_hashes: &[B256],
99 ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError>;
100
101 fn data_size_hint(&self) -> Option<usize>;
103
104 fn blobs_len(&self) -> usize;
106}
107
108#[derive(Debug, thiserror::Error)]
110pub enum BlobStoreError {
111 #[error("blob sidecar not found for transaction {0:?}")]
113 MissingSidecar(B256),
114 #[error("failed to decode blob data: {0}")]
116 DecodeError(#[from] alloy_rlp::Error),
117 #[error(transparent)]
119 Other(Box<dyn core::error::Error + Send + Sync>),
120}
121
122#[derive(Debug, Default)]
124pub(crate) struct BlobStoreSize {
125 data_size: AtomicUsize,
126 num_blobs: AtomicUsize,
127}
128
129impl BlobStoreSize {
130 #[inline]
131 pub(crate) fn add_size(&self, add: usize) {
132 self.data_size.fetch_add(add, Ordering::Relaxed);
133 }
134
135 #[inline]
136 pub(crate) fn sub_size(&self, sub: usize) {
137 let _ = self.data_size.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
138 Some(current.saturating_sub(sub))
139 });
140 }
141
142 #[inline]
143 pub(crate) fn update_len(&self, len: usize) {
144 self.num_blobs.store(len, Ordering::Relaxed);
145 }
146
147 #[inline]
148 pub(crate) fn inc_len(&self, add: usize) {
149 self.num_blobs.fetch_add(add, Ordering::Relaxed);
150 }
151
152 #[inline]
153 pub(crate) fn sub_len(&self, sub: usize) {
154 let _ = self.num_blobs.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
155 Some(current.saturating_sub(sub))
156 });
157 }
158
159 #[inline]
160 pub(crate) fn data_size(&self) -> usize {
161 self.data_size.load(Ordering::Relaxed)
162 }
163
164 #[inline]
165 pub(crate) fn blobs_len(&self) -> usize {
166 self.num_blobs.load(Ordering::Relaxed)
167 }
168}
169
170impl PartialEq for BlobStoreSize {
171 fn eq(&self, other: &Self) -> bool {
172 self.data_size.load(Ordering::Relaxed) == other.data_size.load(Ordering::Relaxed) &&
173 self.num_blobs.load(Ordering::Relaxed) == other.num_blobs.load(Ordering::Relaxed)
174 }
175}
176
177#[derive(Debug, Clone, Default, PartialEq, Eq)]
179pub struct BlobStoreCleanupStat {
180 pub delete_succeed: usize,
182 pub delete_failed: usize,
184}
185
186#[cfg(test)]
187mod tests {
188 use super::*;
189
190 #[expect(dead_code)]
191 struct DynStore {
192 store: Box<dyn BlobStore>,
193 }
194}