reth_transaction_pool/blobstore/
disk.rs

1//! A simple diskstore for blobs
2
3use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
4use alloy_eips::{
5    eip4844::{BlobAndProofV1, BlobAndProofV2},
6    eip7594::BlobTransactionSidecarVariant,
7};
8use alloy_primitives::{TxHash, B256};
9use parking_lot::{Mutex, RwLock};
10use schnellru::{ByLength, LruMap};
11use std::{collections::HashSet, fmt, fs, io, path::PathBuf, sync::Arc};
12use tracing::{debug, trace};
13
14/// How many [`BlobTransactionSidecarVariant`] to cache in memory.
15pub const DEFAULT_MAX_CACHED_BLOBS: u32 = 100;
16
17/// A blob store that stores blob data on disk.
18///
19/// The type uses deferred deletion, meaning that blobs are not immediately deleted from disk, but
20/// it's expected that the maintenance task will call [`BlobStore::cleanup`] to remove the deleted
21/// blobs from disk.
22#[derive(Clone, Debug)]
23pub struct DiskFileBlobStore {
24    inner: Arc<DiskFileBlobStoreInner>,
25}
26
27impl DiskFileBlobStore {
28    /// Opens and initializes a new disk file blob store according to the given options.
29    pub fn open(
30        blob_dir: impl Into<PathBuf>,
31        opts: DiskFileBlobStoreConfig,
32    ) -> Result<Self, DiskFileBlobStoreError> {
33        let blob_dir = blob_dir.into();
34        let DiskFileBlobStoreConfig { max_cached_entries, .. } = opts;
35        let inner = DiskFileBlobStoreInner::new(blob_dir, max_cached_entries);
36
37        // initialize the blob store
38        inner.delete_all()?;
39        inner.create_blob_dir()?;
40
41        Ok(Self { inner: Arc::new(inner) })
42    }
43
44    #[cfg(test)]
45    fn is_cached(&self, tx: &B256) -> bool {
46        self.inner.blob_cache.lock().get(tx).is_some()
47    }
48
49    #[cfg(test)]
50    fn clear_cache(&self) {
51        self.inner.blob_cache.lock().clear()
52    }
53}
54
55impl BlobStore for DiskFileBlobStore {
56    fn insert(&self, tx: B256, data: BlobTransactionSidecarVariant) -> Result<(), BlobStoreError> {
57        self.inner.insert_one(tx, data)
58    }
59
60    fn insert_all(
61        &self,
62        txs: Vec<(B256, BlobTransactionSidecarVariant)>,
63    ) -> Result<(), BlobStoreError> {
64        if txs.is_empty() {
65            return Ok(())
66        }
67        self.inner.insert_many(txs)
68    }
69
70    fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
71        if self.inner.contains(tx)? {
72            self.inner.txs_to_delete.write().insert(tx);
73        }
74        Ok(())
75    }
76
77    fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
78        let txs = self.inner.retain_existing(txs)?;
79        self.inner.txs_to_delete.write().extend(txs);
80        Ok(())
81    }
82
83    fn cleanup(&self) -> BlobStoreCleanupStat {
84        let txs_to_delete = std::mem::take(&mut *self.inner.txs_to_delete.write());
85        let mut stat = BlobStoreCleanupStat::default();
86        let mut subsize = 0;
87        debug!(target:"txpool::blob", num_blobs=%txs_to_delete.len(), "Removing blobs from disk");
88        for tx in txs_to_delete {
89            let path = self.inner.blob_disk_file(tx);
90            let filesize = fs::metadata(&path).map_or(0, |meta| meta.len());
91            match fs::remove_file(&path) {
92                Ok(_) => {
93                    stat.delete_succeed += 1;
94                    subsize += filesize;
95                }
96                Err(e) => {
97                    stat.delete_failed += 1;
98                    let err = DiskFileBlobStoreError::DeleteFile(tx, path, e);
99                    debug!(target:"txpool::blob", %err);
100                }
101            };
102        }
103        self.inner.size_tracker.sub_size(subsize as usize);
104        self.inner.size_tracker.sub_len(stat.delete_succeed);
105        stat
106    }
107
108    fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
109        self.inner.get_one(tx)
110    }
111
112    fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
113        self.inner.contains(tx)
114    }
115
116    fn get_all(
117        &self,
118        txs: Vec<B256>,
119    ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
120        if txs.is_empty() {
121            return Ok(Vec::new())
122        }
123        self.inner.get_all(txs)
124    }
125
126    fn get_exact(
127        &self,
128        txs: Vec<B256>,
129    ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
130        if txs.is_empty() {
131            return Ok(Vec::new())
132        }
133        self.inner.get_exact(txs)
134    }
135
136    fn get_by_versioned_hashes_v1(
137        &self,
138        versioned_hashes: &[B256],
139    ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
140        // the response must always be the same len as the request, misses must be None
141        let mut result = vec![None; versioned_hashes.len()];
142
143        // first scan all cached full sidecars
144        for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
145            if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
146                for (hash_idx, match_result) in
147                    blob_sidecar.match_versioned_hashes(versioned_hashes)
148                {
149                    result[hash_idx] = Some(match_result);
150                }
151            }
152
153            // return early if all blobs are found.
154            if result.iter().all(|blob| blob.is_some()) {
155                return Ok(result);
156            }
157        }
158
159        // not all versioned hashes were be found, try to look up a matching tx
160
161        let mut missing_tx_hashes = Vec::new();
162
163        {
164            let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
165            for (idx, _) in
166                result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
167            {
168                // this is safe because the result vec has the same len
169                let versioned_hash = versioned_hashes[idx];
170                if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
171                    missing_tx_hashes.push(tx_hash);
172                }
173            }
174        }
175
176        // if we have missing blobs, try to read them from disk and try again
177        if !missing_tx_hashes.is_empty() {
178            let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
179            for (_, blob_sidecar) in blobs_from_disk {
180                if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
181                    for (hash_idx, match_result) in
182                        blob_sidecar.match_versioned_hashes(versioned_hashes)
183                    {
184                        if result[hash_idx].is_none() {
185                            result[hash_idx] = Some(match_result);
186                        }
187                    }
188                }
189            }
190        }
191
192        Ok(result)
193    }
194
195    fn get_by_versioned_hashes_v2(
196        &self,
197        versioned_hashes: &[B256],
198    ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
199        // we must return the blobs in order but we don't necessarily find them in the requested
200        // order
201        let mut result = vec![None; versioned_hashes.len()];
202
203        // first scan all cached full sidecars
204        for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
205            if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
206                for (hash_idx, match_result) in
207                    blob_sidecar.match_versioned_hashes(versioned_hashes)
208                {
209                    result[hash_idx] = Some(match_result);
210                }
211            }
212
213            // return early if all blobs are found.
214            if result.iter().all(|blob| blob.is_some()) {
215                // got all blobs, can return early
216                return Ok(Some(result.into_iter().map(Option::unwrap).collect()))
217            }
218        }
219
220        // not all versioned hashes were found, try to look up a matching tx
221        let mut missing_tx_hashes = Vec::new();
222
223        {
224            let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
225            for (idx, _) in
226                result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
227            {
228                // this is safe because the result vec has the same len
229                let versioned_hash = versioned_hashes[idx];
230                if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
231                    missing_tx_hashes.push(tx_hash);
232                }
233            }
234        }
235
236        // if we have missing blobs, try to read them from disk and try again
237        if !missing_tx_hashes.is_empty() {
238            let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
239            for (_, blob_sidecar) in blobs_from_disk {
240                if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
241                    for (hash_idx, match_result) in
242                        blob_sidecar.match_versioned_hashes(versioned_hashes)
243                    {
244                        if result[hash_idx].is_none() {
245                            result[hash_idx] = Some(match_result);
246                        }
247                    }
248                }
249            }
250        }
251
252        // only return the blobs if we found all requested versioned hashes
253        if result.iter().all(|blob| blob.is_some()) {
254            Ok(Some(result.into_iter().map(Option::unwrap).collect()))
255        } else {
256            Ok(None)
257        }
258    }
259
260    fn data_size_hint(&self) -> Option<usize> {
261        Some(self.inner.size_tracker.data_size())
262    }
263
264    fn blobs_len(&self) -> usize {
265        self.inner.size_tracker.blobs_len()
266    }
267}
268
269struct DiskFileBlobStoreInner {
270    blob_dir: PathBuf,
271    blob_cache: Mutex<LruMap<TxHash, Arc<BlobTransactionSidecarVariant>, ByLength>>,
272    size_tracker: BlobStoreSize,
273    file_lock: RwLock<()>,
274    txs_to_delete: RwLock<HashSet<B256>>,
275    /// Tracks of known versioned hashes and a transaction they exist in
276    ///
277    /// Note: It is possible that one blob can appear in multiple transactions but this only tracks
278    /// the most recent one.
279    versioned_hashes_to_txhash: Mutex<LruMap<B256, B256>>,
280}
281
282impl DiskFileBlobStoreInner {
283    /// Creates a new empty disk file blob store with the given maximum length of the blob cache.
284    fn new(blob_dir: PathBuf, max_length: u32) -> Self {
285        Self {
286            blob_dir,
287            blob_cache: Mutex::new(LruMap::new(ByLength::new(max_length))),
288            size_tracker: Default::default(),
289            file_lock: Default::default(),
290            txs_to_delete: Default::default(),
291            versioned_hashes_to_txhash: Mutex::new(LruMap::new(ByLength::new(max_length * 6))),
292        }
293    }
294
295    /// Creates the directory where blobs will be stored on disk.
296    fn create_blob_dir(&self) -> Result<(), DiskFileBlobStoreError> {
297        debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Creating blob store");
298        fs::create_dir_all(&self.blob_dir)
299            .map_err(|e| DiskFileBlobStoreError::Open(self.blob_dir.clone(), e))
300    }
301
302    /// Deletes the entire blob store.
303    fn delete_all(&self) -> Result<(), DiskFileBlobStoreError> {
304        match fs::remove_dir_all(&self.blob_dir) {
305            Ok(_) => {
306                debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Removed blob store directory");
307            }
308            Err(err) if err.kind() == io::ErrorKind::NotFound => {}
309            Err(err) => return Err(DiskFileBlobStoreError::Open(self.blob_dir.clone(), err)),
310        }
311        Ok(())
312    }
313
314    /// Ensures blob is in the blob cache and written to the disk.
315    fn insert_one(
316        &self,
317        tx: B256,
318        data: BlobTransactionSidecarVariant,
319    ) -> Result<(), BlobStoreError> {
320        let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
321        data.rlp_encode_fields(&mut buf);
322
323        {
324            // cache the versioned hashes to tx hash
325            let mut map = self.versioned_hashes_to_txhash.lock();
326            match &data {
327                BlobTransactionSidecarVariant::Eip4844(data) => {
328                    data.versioned_hashes().for_each(|hash| {
329                        map.insert(hash, tx);
330                    });
331                }
332                BlobTransactionSidecarVariant::Eip7594(data) => {
333                    data.versioned_hashes().for_each(|hash| {
334                        map.insert(hash, tx);
335                    });
336                }
337            }
338        }
339
340        self.blob_cache.lock().insert(tx, Arc::new(data));
341
342        let size = self.write_one_encoded(tx, &buf)?;
343
344        self.size_tracker.add_size(size);
345        self.size_tracker.inc_len(1);
346        Ok(())
347    }
348
349    /// Ensures blobs are in the blob cache and written to the disk.
350    fn insert_many(
351        &self,
352        txs: Vec<(B256, BlobTransactionSidecarVariant)>,
353    ) -> Result<(), BlobStoreError> {
354        let raw = txs
355            .iter()
356            .map(|(tx, data)| {
357                let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
358                data.rlp_encode_fields(&mut buf);
359                (self.blob_disk_file(*tx), buf)
360            })
361            .collect::<Vec<_>>();
362
363        {
364            // cache versioned hashes to tx hash
365            let mut map = self.versioned_hashes_to_txhash.lock();
366            for (tx, data) in &txs {
367                match data {
368                    BlobTransactionSidecarVariant::Eip4844(data) => {
369                        data.versioned_hashes().for_each(|hash| {
370                            map.insert(hash, *tx);
371                        });
372                    }
373                    BlobTransactionSidecarVariant::Eip7594(data) => {
374                        data.versioned_hashes().for_each(|hash| {
375                            map.insert(hash, *tx);
376                        });
377                    }
378                }
379            }
380        }
381
382        {
383            // cache blobs
384            let mut cache = self.blob_cache.lock();
385            for (tx, data) in txs {
386                cache.insert(tx, Arc::new(data));
387            }
388        }
389
390        let mut add = 0;
391        let mut num = 0;
392        {
393            let _lock = self.file_lock.write();
394            for (path, data) in raw {
395                if path.exists() {
396                    debug!(target:"txpool::blob", ?path, "Blob already exists");
397                } else if let Err(err) = fs::write(&path, &data) {
398                    debug!(target:"txpool::blob", %err, ?path, "Failed to write blob file");
399                } else {
400                    add += data.len();
401                    num += 1;
402                }
403            }
404        }
405        self.size_tracker.add_size(add);
406        self.size_tracker.inc_len(num);
407
408        Ok(())
409    }
410
411    /// Returns true if the blob for the given transaction hash is in the blob cache or on disk.
412    fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
413        if self.blob_cache.lock().get(&tx).is_some() {
414            return Ok(true)
415        }
416        // we only check if the file exists and assume it's valid
417        Ok(self.blob_disk_file(tx).is_file())
418    }
419
420    /// Returns all the blob transactions which are in the cache or on the disk.
421    fn retain_existing(&self, txs: Vec<B256>) -> Result<Vec<B256>, BlobStoreError> {
422        let (in_cache, not_in_cache): (Vec<B256>, Vec<B256>) = {
423            let mut cache = self.blob_cache.lock();
424            txs.into_iter().partition(|tx| cache.get(tx).is_some())
425        };
426
427        let mut existing = in_cache;
428        for tx in not_in_cache {
429            if self.blob_disk_file(tx).is_file() {
430                existing.push(tx);
431            }
432        }
433
434        Ok(existing)
435    }
436
437    /// Retrieves the blob for the given transaction hash from the blob cache or disk.
438    fn get_one(
439        &self,
440        tx: B256,
441    ) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
442        if let Some(blob) = self.blob_cache.lock().get(&tx) {
443            return Ok(Some(blob.clone()))
444        }
445        let blob = self.read_one(tx)?;
446
447        if let Some(blob) = &blob {
448            let blob_arc = Arc::new(blob.clone());
449            self.blob_cache.lock().insert(tx, blob_arc.clone());
450            return Ok(Some(blob_arc))
451        }
452
453        Ok(None)
454    }
455
456    /// Returns the path to the blob file for the given transaction hash.
457    #[inline]
458    fn blob_disk_file(&self, tx: B256) -> PathBuf {
459        self.blob_dir.join(format!("{tx:x}"))
460    }
461
462    /// Retrieves the blob data for the given transaction hash.
463    #[inline]
464    fn read_one(&self, tx: B256) -> Result<Option<BlobTransactionSidecarVariant>, BlobStoreError> {
465        let path = self.blob_disk_file(tx);
466        let data = {
467            let _lock = self.file_lock.read();
468            match fs::read(&path) {
469                Ok(data) => data,
470                Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
471                Err(e) => {
472                    return Err(BlobStoreError::Other(Box::new(DiskFileBlobStoreError::ReadFile(
473                        tx, path, e,
474                    ))))
475                }
476            }
477        };
478        BlobTransactionSidecarVariant::rlp_decode_fields(&mut data.as_slice())
479            .map(Some)
480            .map_err(BlobStoreError::DecodeError)
481    }
482
483    /// Returns decoded blobs read from disk.
484    ///
485    /// Only returns sidecars that were found and successfully decoded.
486    fn read_many_decoded(&self, txs: Vec<TxHash>) -> Vec<(TxHash, BlobTransactionSidecarVariant)> {
487        self.read_many_raw(txs)
488            .into_iter()
489            .filter_map(|(tx, data)| {
490                BlobTransactionSidecarVariant::rlp_decode_fields(&mut data.as_slice())
491                    .map(|sidecar| (tx, sidecar))
492                    .ok()
493            })
494            .collect()
495    }
496
497    /// Retrieves the raw blob data for the given transaction hashes.
498    ///
499    /// Only returns the blobs that were found on file.
500    #[inline]
501    fn read_many_raw(&self, txs: Vec<TxHash>) -> Vec<(TxHash, Vec<u8>)> {
502        let mut res = Vec::with_capacity(txs.len());
503        let _lock = self.file_lock.read();
504        for tx in txs {
505            let path = self.blob_disk_file(tx);
506            match fs::read(&path) {
507                Ok(data) => {
508                    res.push((tx, data));
509                }
510                Err(err) => {
511                    debug!(target:"txpool::blob", %err, ?tx, "Failed to read blob file");
512                }
513            };
514        }
515        res
516    }
517
518    /// Writes the blob data for the given transaction hash to the disk.
519    #[inline]
520    fn write_one_encoded(&self, tx: B256, data: &[u8]) -> Result<usize, DiskFileBlobStoreError> {
521        trace!(target:"txpool::blob", "[{:?}] writing blob file", tx);
522        let mut add = 0;
523        let path = self.blob_disk_file(tx);
524        {
525            let _lock = self.file_lock.write();
526            if !path.exists() {
527                fs::write(&path, data)
528                    .map_err(|e| DiskFileBlobStoreError::WriteFile(tx, path, e))?;
529                add = data.len();
530            }
531        }
532        Ok(add)
533    }
534
535    /// Retrieves blobs for the given transaction hashes from the blob cache or disk.
536    ///
537    /// This will not return an error if there are missing blobs. Therefore, the result may be a
538    /// subset of the request or an empty vector if none of the blobs were found.
539    #[inline]
540    fn get_all(
541        &self,
542        txs: Vec<B256>,
543    ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
544        let mut res = Vec::with_capacity(txs.len());
545        let mut cache_miss = Vec::new();
546        {
547            let mut cache = self.blob_cache.lock();
548            for tx in txs {
549                if let Some(blob) = cache.get(&tx) {
550                    res.push((tx, blob.clone()));
551                } else {
552                    cache_miss.push(tx)
553                }
554            }
555        }
556        if cache_miss.is_empty() {
557            return Ok(res)
558        }
559        let from_disk = self.read_many_decoded(cache_miss);
560        if from_disk.is_empty() {
561            return Ok(res)
562        }
563        let mut cache = self.blob_cache.lock();
564        for (tx, data) in from_disk {
565            let arc = Arc::new(data.clone());
566            cache.insert(tx, arc.clone());
567            res.push((tx, arc.clone()));
568        }
569
570        Ok(res)
571    }
572
573    /// Retrieves blobs for the given transaction hashes from the blob cache or disk.
574    ///
575    /// Returns an error if there are any missing blobs.
576    #[inline]
577    fn get_exact(
578        &self,
579        txs: Vec<B256>,
580    ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
581        txs.into_iter()
582            .map(|tx| self.get_one(tx)?.ok_or(BlobStoreError::MissingSidecar(tx)))
583            .collect()
584    }
585}
586
587impl fmt::Debug for DiskFileBlobStoreInner {
588    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
589        f.debug_struct("DiskFileBlobStoreInner")
590            .field("blob_dir", &self.blob_dir)
591            .field("cached_blobs", &self.blob_cache.try_lock().map(|lock| lock.len()))
592            .field("txs_to_delete", &self.txs_to_delete.try_read())
593            .finish()
594    }
595}
596
597/// Errors that can occur when interacting with a disk file blob store.
598#[derive(Debug, thiserror::Error)]
599pub enum DiskFileBlobStoreError {
600    /// Thrown during [`DiskFileBlobStore::open`] if the blob store directory cannot be opened.
601    #[error("failed to open blobstore at {0}: {1}")]
602    /// Indicates a failure to open the blob store directory.
603    Open(PathBuf, io::Error),
604    /// Failure while reading a blob file.
605    #[error("[{0}] failed to read blob file at {1}: {2}")]
606    /// Indicates a failure while reading a blob file.
607    ReadFile(TxHash, PathBuf, io::Error),
608    /// Failure while writing a blob file.
609    #[error("[{0}] failed to write blob file at {1}: {2}")]
610    /// Indicates a failure while writing a blob file.
611    WriteFile(TxHash, PathBuf, io::Error),
612    /// Failure while deleting a blob file.
613    #[error("[{0}] failed to delete blob file at {1}: {2}")]
614    /// Indicates a failure while deleting a blob file.
615    DeleteFile(TxHash, PathBuf, io::Error),
616}
617
618impl From<DiskFileBlobStoreError> for BlobStoreError {
619    fn from(value: DiskFileBlobStoreError) -> Self {
620        Self::Other(Box::new(value))
621    }
622}
623
624/// Configuration for a disk file blob store.
625#[derive(Debug, Clone)]
626pub struct DiskFileBlobStoreConfig {
627    /// The maximum number of blobs to keep in the in memory blob cache.
628    pub max_cached_entries: u32,
629    /// How to open the blob store.
630    pub open: OpenDiskFileBlobStore,
631}
632
633impl Default for DiskFileBlobStoreConfig {
634    fn default() -> Self {
635        Self { max_cached_entries: DEFAULT_MAX_CACHED_BLOBS, open: Default::default() }
636    }
637}
638
639impl DiskFileBlobStoreConfig {
640    /// Set maximum number of blobs to keep in the in memory blob cache.
641    pub const fn with_max_cached_entries(mut self, max_cached_entries: u32) -> Self {
642        self.max_cached_entries = max_cached_entries;
643        self
644    }
645}
646
647/// How to open a disk file blob store.
648#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
649pub enum OpenDiskFileBlobStore {
650    /// Clear everything in the blob store.
651    #[default]
652    Clear,
653    /// Keep the existing blob store and index
654    ReIndex,
655}
656
657#[cfg(test)]
658mod tests {
659    use alloy_consensus::BlobTransactionSidecar;
660    use alloy_eips::eip7594::BlobTransactionSidecarVariant;
661
662    use super::*;
663    use std::sync::atomic::Ordering;
664
665    fn tmp_store() -> (DiskFileBlobStore, tempfile::TempDir) {
666        let dir = tempfile::tempdir().unwrap();
667        let store = DiskFileBlobStore::open(dir.path(), Default::default()).unwrap();
668        (store, dir)
669    }
670
671    fn rng_blobs(num: usize) -> Vec<(TxHash, BlobTransactionSidecarVariant)> {
672        let mut rng = rand::rng();
673        (0..num)
674            .map(|_| {
675                let tx = TxHash::random_with(&mut rng);
676                let blob = BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
677                    blobs: vec![],
678                    commitments: vec![],
679                    proofs: vec![],
680                });
681                (tx, blob)
682            })
683            .collect()
684    }
685
686    #[test]
687    fn disk_insert_all_get_all() {
688        let (store, _dir) = tmp_store();
689
690        let blobs = rng_blobs(10);
691        let all_hashes = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
692        store.insert_all(blobs.clone()).unwrap();
693
694        // all cached
695        for (tx, blob) in &blobs {
696            assert!(store.is_cached(tx));
697            let b = store.get(*tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
698            assert_eq!(b, *blob);
699        }
700
701        let all = store.get_all(all_hashes.clone()).unwrap();
702        for (tx, blob) in all {
703            assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))), "missing blob {tx:?}");
704        }
705
706        assert!(store.contains(all_hashes[0]).unwrap());
707        store.delete_all(all_hashes.clone()).unwrap();
708        assert!(store.inner.txs_to_delete.read().contains(&all_hashes[0]));
709        store.clear_cache();
710        store.cleanup();
711
712        assert!(store.get(blobs[0].0).unwrap().is_none());
713
714        let all = store.get_all(all_hashes.clone()).unwrap();
715        assert!(all.is_empty());
716
717        assert!(!store.contains(all_hashes[0]).unwrap());
718        assert!(store.get_exact(all_hashes).is_err());
719
720        assert_eq!(store.data_size_hint(), Some(0));
721        assert_eq!(store.inner.size_tracker.num_blobs.load(Ordering::Relaxed), 0);
722    }
723
724    #[test]
725    fn disk_insert_and_retrieve() {
726        let (store, _dir) = tmp_store();
727
728        let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
729        store.insert(tx, blob.clone()).unwrap();
730
731        assert!(store.is_cached(&tx));
732        let retrieved_blob = store.get(tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
733        assert_eq!(retrieved_blob, blob);
734    }
735
736    #[test]
737    fn disk_delete_blob() {
738        let (store, _dir) = tmp_store();
739
740        let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
741        store.insert(tx, blob).unwrap();
742        assert!(store.is_cached(&tx));
743
744        store.delete(tx).unwrap();
745        assert!(store.inner.txs_to_delete.read().contains(&tx));
746        store.cleanup();
747
748        let result = store.get(tx).unwrap();
749        assert_eq!(
750            result,
751            Some(Arc::new(BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
752                blobs: vec![],
753                commitments: vec![],
754                proofs: vec![]
755            })))
756        );
757    }
758
759    #[test]
760    fn disk_insert_all_and_delete_all() {
761        let (store, _dir) = tmp_store();
762
763        let blobs = rng_blobs(5);
764        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
765        store.insert_all(blobs.clone()).unwrap();
766
767        for (tx, _) in &blobs {
768            assert!(store.is_cached(tx));
769        }
770
771        store.delete_all(txs.clone()).unwrap();
772        store.cleanup();
773
774        for tx in txs {
775            let result = store.get(tx).unwrap();
776            assert_eq!(
777                result,
778                Some(Arc::new(BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
779                    blobs: vec![],
780                    commitments: vec![],
781                    proofs: vec![]
782                })))
783            );
784        }
785    }
786
787    #[test]
788    fn disk_get_all_blobs() {
789        let (store, _dir) = tmp_store();
790
791        let blobs = rng_blobs(3);
792        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
793        store.insert_all(blobs.clone()).unwrap();
794
795        let retrieved_blobs = store.get_all(txs.clone()).unwrap();
796        for (tx, blob) in retrieved_blobs {
797            assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))));
798        }
799
800        store.delete_all(txs).unwrap();
801        store.cleanup();
802    }
803
804    #[test]
805    fn disk_get_exact_blobs_success() {
806        let (store, _dir) = tmp_store();
807
808        let blobs = rng_blobs(3);
809        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
810        store.insert_all(blobs.clone()).unwrap();
811
812        let retrieved_blobs = store.get_exact(txs).unwrap();
813        for (retrieved_blob, (_, original_blob)) in retrieved_blobs.into_iter().zip(blobs) {
814            assert_eq!(Arc::unwrap_or_clone(retrieved_blob), original_blob);
815        }
816    }
817
818    #[test]
819    fn disk_get_exact_blobs_failure() {
820        let (store, _dir) = tmp_store();
821
822        let blobs = rng_blobs(2);
823        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
824        store.insert_all(blobs).unwrap();
825
826        // Try to get a blob that was never inserted
827        let missing_tx = TxHash::random();
828        let result = store.get_exact(vec![txs[0], missing_tx]);
829        assert!(result.is_err());
830    }
831
832    #[test]
833    fn disk_data_size_hint() {
834        let (store, _dir) = tmp_store();
835        assert_eq!(store.data_size_hint(), Some(0));
836
837        let blobs = rng_blobs(2);
838        store.insert_all(blobs).unwrap();
839        assert!(store.data_size_hint().unwrap() > 0);
840    }
841
842    #[test]
843    fn disk_cleanup_stat() {
844        let (store, _dir) = tmp_store();
845
846        let blobs = rng_blobs(3);
847        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
848        store.insert_all(blobs).unwrap();
849
850        store.delete_all(txs).unwrap();
851        let stat = store.cleanup();
852        assert_eq!(stat.delete_succeed, 3);
853        assert_eq!(stat.delete_failed, 0);
854    }
855}