reth_transaction_pool/blobstore/
disk.rs

1//! A simple diskstore for blobs
2
3use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
4use alloy_eips::eip4844::{BlobAndProofV1, BlobTransactionSidecar};
5use alloy_primitives::{TxHash, B256};
6use parking_lot::{Mutex, RwLock};
7use schnellru::{ByLength, LruMap};
8use std::{collections::HashSet, fmt, fs, io, path::PathBuf, sync::Arc};
9use tracing::{debug, trace};
10
11/// How many [`BlobTransactionSidecar`] to cache in memory.
12pub const DEFAULT_MAX_CACHED_BLOBS: u32 = 100;
13
14/// A blob store that stores blob data on disk.
15///
16/// The type uses deferred deletion, meaning that blobs are not immediately deleted from disk, but
17/// it's expected that the maintenance task will call [`BlobStore::cleanup`] to remove the deleted
18/// blobs from disk.
19#[derive(Clone, Debug)]
20pub struct DiskFileBlobStore {
21    inner: Arc<DiskFileBlobStoreInner>,
22}
23
24impl DiskFileBlobStore {
25    /// Opens and initializes a new disk file blob store according to the given options.
26    pub fn open(
27        blob_dir: impl Into<PathBuf>,
28        opts: DiskFileBlobStoreConfig,
29    ) -> Result<Self, DiskFileBlobStoreError> {
30        let blob_dir = blob_dir.into();
31        let DiskFileBlobStoreConfig { max_cached_entries, .. } = opts;
32        let inner = DiskFileBlobStoreInner::new(blob_dir, max_cached_entries);
33
34        // initialize the blob store
35        inner.delete_all()?;
36        inner.create_blob_dir()?;
37
38        Ok(Self { inner: Arc::new(inner) })
39    }
40
41    #[cfg(test)]
42    fn is_cached(&self, tx: &B256) -> bool {
43        self.inner.blob_cache.lock().get(tx).is_some()
44    }
45
46    #[cfg(test)]
47    fn clear_cache(&self) {
48        self.inner.blob_cache.lock().clear()
49    }
50}
51
52impl BlobStore for DiskFileBlobStore {
53    fn insert(&self, tx: B256, data: BlobTransactionSidecar) -> Result<(), BlobStoreError> {
54        self.inner.insert_one(tx, data)
55    }
56
57    fn insert_all(&self, txs: Vec<(B256, BlobTransactionSidecar)>) -> Result<(), BlobStoreError> {
58        if txs.is_empty() {
59            return Ok(())
60        }
61        self.inner.insert_many(txs)
62    }
63
64    fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
65        if self.inner.contains(tx)? {
66            self.inner.txs_to_delete.write().insert(tx);
67        }
68        Ok(())
69    }
70
71    fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
72        let txs = self.inner.retain_existing(txs)?;
73        self.inner.txs_to_delete.write().extend(txs);
74        Ok(())
75    }
76
77    fn cleanup(&self) -> BlobStoreCleanupStat {
78        let txs_to_delete = std::mem::take(&mut *self.inner.txs_to_delete.write());
79        let mut stat = BlobStoreCleanupStat::default();
80        let mut subsize = 0;
81        debug!(target:"txpool::blob", num_blobs=%txs_to_delete.len(), "Removing blobs from disk");
82        for tx in txs_to_delete {
83            let path = self.inner.blob_disk_file(tx);
84            let filesize = fs::metadata(&path).map_or(0, |meta| meta.len());
85            match fs::remove_file(&path) {
86                Ok(_) => {
87                    stat.delete_succeed += 1;
88                    subsize += filesize;
89                }
90                Err(e) => {
91                    stat.delete_failed += 1;
92                    let err = DiskFileBlobStoreError::DeleteFile(tx, path, e);
93                    debug!(target:"txpool::blob", %err);
94                }
95            };
96        }
97        self.inner.size_tracker.sub_size(subsize as usize);
98        self.inner.size_tracker.sub_len(stat.delete_succeed);
99        stat
100    }
101
102    fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
103        self.inner.get_one(tx)
104    }
105
106    fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
107        self.inner.contains(tx)
108    }
109
110    fn get_all(
111        &self,
112        txs: Vec<B256>,
113    ) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
114        if txs.is_empty() {
115            return Ok(Vec::new())
116        }
117        self.inner.get_all(txs)
118    }
119
120    fn get_exact(
121        &self,
122        txs: Vec<B256>,
123    ) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
124        if txs.is_empty() {
125            return Ok(Vec::new())
126        }
127        self.inner.get_exact(txs)
128    }
129
130    fn get_by_versioned_hashes(
131        &self,
132        versioned_hashes: &[B256],
133    ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
134        let mut result = vec![None; versioned_hashes.len()];
135        for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
136            for (i, blob_versioned_hash) in blob_sidecar.versioned_hashes().enumerate() {
137                for (j, target_versioned_hash) in versioned_hashes.iter().enumerate() {
138                    if blob_versioned_hash == *target_versioned_hash {
139                        result[j].get_or_insert_with(|| BlobAndProofV1 {
140                            blob: Box::new(blob_sidecar.blobs[i]),
141                            proof: blob_sidecar.proofs[i],
142                        });
143                    }
144                }
145            }
146
147            // Return early if all blobs are found.
148            if result.iter().all(|blob| blob.is_some()) {
149                break;
150            }
151        }
152        Ok(result)
153    }
154
155    fn data_size_hint(&self) -> Option<usize> {
156        Some(self.inner.size_tracker.data_size())
157    }
158
159    fn blobs_len(&self) -> usize {
160        self.inner.size_tracker.blobs_len()
161    }
162}
163
164struct DiskFileBlobStoreInner {
165    blob_dir: PathBuf,
166    blob_cache: Mutex<LruMap<TxHash, Arc<BlobTransactionSidecar>, ByLength>>,
167    size_tracker: BlobStoreSize,
168    file_lock: RwLock<()>,
169    txs_to_delete: RwLock<HashSet<B256>>,
170}
171
172impl DiskFileBlobStoreInner {
173    /// Creates a new empty disk file blob store with the given maximum length of the blob cache.
174    fn new(blob_dir: PathBuf, max_length: u32) -> Self {
175        Self {
176            blob_dir,
177            blob_cache: Mutex::new(LruMap::new(ByLength::new(max_length))),
178            size_tracker: Default::default(),
179            file_lock: Default::default(),
180            txs_to_delete: Default::default(),
181        }
182    }
183
184    /// Creates the directory where blobs will be stored on disk.
185    fn create_blob_dir(&self) -> Result<(), DiskFileBlobStoreError> {
186        debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Creating blob store");
187        fs::create_dir_all(&self.blob_dir)
188            .map_err(|e| DiskFileBlobStoreError::Open(self.blob_dir.clone(), e))
189    }
190
191    /// Deletes the entire blob store.
192    fn delete_all(&self) -> Result<(), DiskFileBlobStoreError> {
193        match fs::remove_dir_all(&self.blob_dir) {
194            Ok(_) => {
195                debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Removed blob store directory");
196            }
197            Err(err) if err.kind() == io::ErrorKind::NotFound => {}
198            Err(err) => return Err(DiskFileBlobStoreError::Open(self.blob_dir.clone(), err)),
199        }
200        Ok(())
201    }
202
203    /// Ensures blob is in the blob cache and written to the disk.
204    fn insert_one(&self, tx: B256, data: BlobTransactionSidecar) -> Result<(), BlobStoreError> {
205        let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
206        data.rlp_encode_fields(&mut buf);
207        self.blob_cache.lock().insert(tx, Arc::new(data));
208        let size = self.write_one_encoded(tx, &buf)?;
209
210        self.size_tracker.add_size(size);
211        self.size_tracker.inc_len(1);
212        Ok(())
213    }
214
215    /// Ensures blobs are in the blob cache and written to the disk.
216    fn insert_many(&self, txs: Vec<(B256, BlobTransactionSidecar)>) -> Result<(), BlobStoreError> {
217        let raw = txs
218            .iter()
219            .map(|(tx, data)| {
220                let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
221                data.rlp_encode_fields(&mut buf);
222                (self.blob_disk_file(*tx), buf)
223            })
224            .collect::<Vec<_>>();
225
226        {
227            let mut cache = self.blob_cache.lock();
228            for (tx, data) in txs {
229                cache.insert(tx, Arc::new(data));
230            }
231        }
232        let mut add = 0;
233        let mut num = 0;
234        {
235            let _lock = self.file_lock.write();
236            for (path, data) in raw {
237                if path.exists() {
238                    debug!(target:"txpool::blob", ?path, "Blob already exists");
239                } else if let Err(err) = fs::write(&path, &data) {
240                    debug!(target:"txpool::blob", %err, ?path, "Failed to write blob file");
241                } else {
242                    add += data.len();
243                    num += 1;
244                }
245            }
246        }
247        self.size_tracker.add_size(add);
248        self.size_tracker.inc_len(num);
249
250        Ok(())
251    }
252
253    /// Returns true if the blob for the given transaction hash is in the blob cache or on disk.
254    fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
255        if self.blob_cache.lock().get(&tx).is_some() {
256            return Ok(true)
257        }
258        // we only check if the file exists and assume it's valid
259        Ok(self.blob_disk_file(tx).is_file())
260    }
261
262    /// Returns all the blob transactions which are in the cache or on the disk.
263    fn retain_existing(&self, txs: Vec<B256>) -> Result<Vec<B256>, BlobStoreError> {
264        let (in_cache, not_in_cache): (Vec<B256>, Vec<B256>) = {
265            let mut cache = self.blob_cache.lock();
266            txs.into_iter().partition(|tx| cache.get(tx).is_some())
267        };
268
269        let mut existing = in_cache;
270        for tx in not_in_cache {
271            if self.blob_disk_file(tx).is_file() {
272                existing.push(tx);
273            }
274        }
275
276        Ok(existing)
277    }
278
279    /// Retrieves the blob for the given transaction hash from the blob cache or disk.
280    fn get_one(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
281        if let Some(blob) = self.blob_cache.lock().get(&tx) {
282            return Ok(Some(blob.clone()))
283        }
284        let blob = self.read_one(tx)?;
285
286        if let Some(blob) = &blob {
287            let blob_arc = Arc::new(blob.clone());
288            self.blob_cache.lock().insert(tx, blob_arc.clone());
289            return Ok(Some(blob_arc))
290        }
291
292        Ok(None)
293    }
294
295    /// Returns the path to the blob file for the given transaction hash.
296    #[inline]
297    fn blob_disk_file(&self, tx: B256) -> PathBuf {
298        self.blob_dir.join(format!("{tx:x}"))
299    }
300
301    /// Retrieves the blob data for the given transaction hash.
302    #[inline]
303    fn read_one(&self, tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
304        let path = self.blob_disk_file(tx);
305        let data = {
306            let _lock = self.file_lock.read();
307            match fs::read(&path) {
308                Ok(data) => data,
309                Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
310                Err(e) => {
311                    return Err(BlobStoreError::Other(Box::new(DiskFileBlobStoreError::ReadFile(
312                        tx, path, e,
313                    ))))
314                }
315            }
316        };
317        BlobTransactionSidecar::rlp_decode_fields(&mut data.as_slice())
318            .map(Some)
319            .map_err(BlobStoreError::DecodeError)
320    }
321
322    /// Returns decoded blobs read from disk.
323    fn read_many_decoded(&self, txs: Vec<TxHash>) -> Vec<(TxHash, BlobTransactionSidecar)> {
324        self.read_many_raw(txs)
325            .into_iter()
326            .filter_map(|(tx, data)| {
327                BlobTransactionSidecar::rlp_decode_fields(&mut data.as_slice())
328                    .map(|sidecar| (tx, sidecar))
329                    .ok()
330            })
331            .collect()
332    }
333
334    /// Retrieves the raw blob data for the given transaction hashes.
335    ///
336    /// Only returns the blobs that were found on file.
337    #[inline]
338    fn read_many_raw(&self, txs: Vec<TxHash>) -> Vec<(TxHash, Vec<u8>)> {
339        let mut res = Vec::with_capacity(txs.len());
340        let _lock = self.file_lock.read();
341        for tx in txs {
342            let path = self.blob_disk_file(tx);
343            match fs::read(&path) {
344                Ok(data) => {
345                    res.push((tx, data));
346                }
347                Err(err) => {
348                    debug!(target:"txpool::blob", %err, ?tx, "Failed to read blob file");
349                }
350            };
351        }
352        res
353    }
354
355    /// Writes the blob data for the given transaction hash to the disk.
356    #[inline]
357    fn write_one_encoded(&self, tx: B256, data: &[u8]) -> Result<usize, DiskFileBlobStoreError> {
358        trace!(target:"txpool::blob", "[{:?}] writing blob file", tx);
359        let mut add = 0;
360        let path = self.blob_disk_file(tx);
361        {
362            let _lock = self.file_lock.write();
363            if !path.exists() {
364                fs::write(&path, data)
365                    .map_err(|e| DiskFileBlobStoreError::WriteFile(tx, path, e))?;
366                add = data.len();
367            }
368        }
369        Ok(add)
370    }
371
372    /// Retrieves blobs for the given transaction hashes from the blob cache or disk.
373    ///
374    /// This will not return an error if there are missing blobs. Therefore, the result may be a
375    /// subset of the request or an empty vector if none of the blobs were found.
376    #[inline]
377    fn get_all(
378        &self,
379        txs: Vec<B256>,
380    ) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
381        let mut res = Vec::with_capacity(txs.len());
382        let mut cache_miss = Vec::new();
383        {
384            let mut cache = self.blob_cache.lock();
385            for tx in txs {
386                if let Some(blob) = cache.get(&tx) {
387                    res.push((tx, blob.clone()));
388                } else {
389                    cache_miss.push(tx)
390                }
391            }
392        }
393        if cache_miss.is_empty() {
394            return Ok(res)
395        }
396        let from_disk = self.read_many_decoded(cache_miss);
397        if from_disk.is_empty() {
398            return Ok(res)
399        }
400        let mut cache = self.blob_cache.lock();
401        for (tx, data) in from_disk {
402            let arc = Arc::new(data.clone());
403            cache.insert(tx, arc.clone());
404            res.push((tx, arc.clone()));
405        }
406
407        Ok(res)
408    }
409
410    /// Retrieves blobs for the given transaction hashes from the blob cache or disk.
411    ///
412    /// Returns an error if there are any missing blobs.
413    #[inline]
414    fn get_exact(
415        &self,
416        txs: Vec<B256>,
417    ) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
418        txs.into_iter()
419            .map(|tx| self.get_one(tx)?.ok_or(BlobStoreError::MissingSidecar(tx)))
420            .collect()
421    }
422}
423
424impl fmt::Debug for DiskFileBlobStoreInner {
425    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
426        f.debug_struct("DiskFileBlobStoreInner")
427            .field("blob_dir", &self.blob_dir)
428            .field("cached_blobs", &self.blob_cache.try_lock().map(|lock| lock.len()))
429            .field("txs_to_delete", &self.txs_to_delete.try_read())
430            .finish()
431    }
432}
433
434/// Errors that can occur when interacting with a disk file blob store.
435#[derive(Debug, thiserror::Error)]
436pub enum DiskFileBlobStoreError {
437    /// Thrown during [`DiskFileBlobStore::open`] if the blob store directory cannot be opened.
438    #[error("failed to open blobstore at {0}: {1}")]
439    /// Indicates a failure to open the blob store directory.
440    Open(PathBuf, io::Error),
441    /// Failure while reading a blob file.
442    #[error("[{0}] failed to read blob file at {1}: {2}")]
443    /// Indicates a failure while reading a blob file.
444    ReadFile(TxHash, PathBuf, io::Error),
445    /// Failure while writing a blob file.
446    #[error("[{0}] failed to write blob file at {1}: {2}")]
447    /// Indicates a failure while writing a blob file.
448    WriteFile(TxHash, PathBuf, io::Error),
449    /// Failure while deleting a blob file.
450    #[error("[{0}] failed to delete blob file at {1}: {2}")]
451    /// Indicates a failure while deleting a blob file.
452    DeleteFile(TxHash, PathBuf, io::Error),
453}
454
455impl From<DiskFileBlobStoreError> for BlobStoreError {
456    fn from(value: DiskFileBlobStoreError) -> Self {
457        Self::Other(Box::new(value))
458    }
459}
460
461/// Configuration for a disk file blob store.
462#[derive(Debug, Clone)]
463pub struct DiskFileBlobStoreConfig {
464    /// The maximum number of blobs to keep in the in memory blob cache.
465    pub max_cached_entries: u32,
466    /// How to open the blob store.
467    pub open: OpenDiskFileBlobStore,
468}
469
470impl Default for DiskFileBlobStoreConfig {
471    fn default() -> Self {
472        Self { max_cached_entries: DEFAULT_MAX_CACHED_BLOBS, open: Default::default() }
473    }
474}
475
476impl DiskFileBlobStoreConfig {
477    /// Set maximum number of blobs to keep in the in memory blob cache.
478    pub const fn with_max_cached_entries(mut self, max_cached_entries: u32) -> Self {
479        self.max_cached_entries = max_cached_entries;
480        self
481    }
482}
483
484/// How to open a disk file blob store.
485#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
486pub enum OpenDiskFileBlobStore {
487    /// Clear everything in the blob store.
488    #[default]
489    Clear,
490    /// Keep the existing blob store and index
491    ReIndex,
492}
493
494#[cfg(test)]
495mod tests {
496    use super::*;
497    use std::sync::atomic::Ordering;
498
499    fn tmp_store() -> (DiskFileBlobStore, tempfile::TempDir) {
500        let dir = tempfile::tempdir().unwrap();
501        let store = DiskFileBlobStore::open(dir.path(), Default::default()).unwrap();
502        (store, dir)
503    }
504
505    fn rng_blobs(num: usize) -> Vec<(TxHash, BlobTransactionSidecar)> {
506        let mut rng = rand::thread_rng();
507        (0..num)
508            .map(|_| {
509                let tx = TxHash::random_with(&mut rng);
510                let blob =
511                    BlobTransactionSidecar { blobs: vec![], commitments: vec![], proofs: vec![] };
512                (tx, blob)
513            })
514            .collect()
515    }
516
517    #[test]
518    fn disk_insert_all_get_all() {
519        let (store, _dir) = tmp_store();
520
521        let blobs = rng_blobs(10);
522        let all_hashes = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
523        store.insert_all(blobs.clone()).unwrap();
524
525        // all cached
526        for (tx, blob) in &blobs {
527            assert!(store.is_cached(tx));
528            let b = store.get(*tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
529            assert_eq!(b, *blob);
530        }
531
532        let all = store.get_all(all_hashes.clone()).unwrap();
533        for (tx, blob) in all {
534            assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))), "missing blob {tx:?}");
535        }
536
537        assert!(store.contains(all_hashes[0]).unwrap());
538        store.delete_all(all_hashes.clone()).unwrap();
539        assert!(store.inner.txs_to_delete.read().contains(&all_hashes[0]));
540        store.clear_cache();
541        store.cleanup();
542
543        assert!(store.get(blobs[0].0).unwrap().is_none());
544
545        let all = store.get_all(all_hashes.clone()).unwrap();
546        assert!(all.is_empty());
547
548        assert!(!store.contains(all_hashes[0]).unwrap());
549        assert!(store.get_exact(all_hashes).is_err());
550
551        assert_eq!(store.data_size_hint(), Some(0));
552        assert_eq!(store.inner.size_tracker.num_blobs.load(Ordering::Relaxed), 0);
553    }
554
555    #[test]
556    fn disk_insert_and_retrieve() {
557        let (store, _dir) = tmp_store();
558
559        let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
560        store.insert(tx, blob.clone()).unwrap();
561
562        assert!(store.is_cached(&tx));
563        let retrieved_blob = store.get(tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
564        assert_eq!(retrieved_blob, blob);
565    }
566
567    #[test]
568    fn disk_delete_blob() {
569        let (store, _dir) = tmp_store();
570
571        let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
572        store.insert(tx, blob).unwrap();
573        assert!(store.is_cached(&tx));
574
575        store.delete(tx).unwrap();
576        assert!(store.inner.txs_to_delete.read().contains(&tx));
577        store.cleanup();
578
579        let result = store.get(tx).unwrap();
580        assert_eq!(
581            result,
582            Some(Arc::new(BlobTransactionSidecar {
583                blobs: vec![],
584                commitments: vec![],
585                proofs: vec![]
586            }))
587        );
588    }
589
590    #[test]
591    fn disk_insert_all_and_delete_all() {
592        let (store, _dir) = tmp_store();
593
594        let blobs = rng_blobs(5);
595        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
596        store.insert_all(blobs.clone()).unwrap();
597
598        for (tx, _) in &blobs {
599            assert!(store.is_cached(tx));
600        }
601
602        store.delete_all(txs.clone()).unwrap();
603        store.cleanup();
604
605        for tx in txs {
606            let result = store.get(tx).unwrap();
607            assert_eq!(
608                result,
609                Some(Arc::new(BlobTransactionSidecar {
610                    blobs: vec![],
611                    commitments: vec![],
612                    proofs: vec![]
613                }))
614            );
615        }
616    }
617
618    #[test]
619    fn disk_get_all_blobs() {
620        let (store, _dir) = tmp_store();
621
622        let blobs = rng_blobs(3);
623        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
624        store.insert_all(blobs.clone()).unwrap();
625
626        let retrieved_blobs = store.get_all(txs.clone()).unwrap();
627        for (tx, blob) in retrieved_blobs {
628            assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))));
629        }
630
631        store.delete_all(txs).unwrap();
632        store.cleanup();
633    }
634
635    #[test]
636    fn disk_get_exact_blobs_success() {
637        let (store, _dir) = tmp_store();
638
639        let blobs = rng_blobs(3);
640        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
641        store.insert_all(blobs.clone()).unwrap();
642
643        let retrieved_blobs = store.get_exact(txs).unwrap();
644        for (retrieved_blob, (_, original_blob)) in retrieved_blobs.into_iter().zip(blobs) {
645            assert_eq!(Arc::unwrap_or_clone(retrieved_blob), original_blob);
646        }
647    }
648
649    #[test]
650    fn disk_get_exact_blobs_failure() {
651        let (store, _dir) = tmp_store();
652
653        let blobs = rng_blobs(2);
654        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
655        store.insert_all(blobs).unwrap();
656
657        // Try to get a blob that was never inserted
658        let missing_tx = TxHash::random();
659        let result = store.get_exact(vec![txs[0], missing_tx]);
660        assert!(result.is_err());
661    }
662
663    #[test]
664    fn disk_data_size_hint() {
665        let (store, _dir) = tmp_store();
666        assert_eq!(store.data_size_hint(), Some(0));
667
668        let blobs = rng_blobs(2);
669        store.insert_all(blobs).unwrap();
670        assert!(store.data_size_hint().unwrap() > 0);
671    }
672
673    #[test]
674    fn disk_cleanup_stat() {
675        let (store, _dir) = tmp_store();
676
677        let blobs = rng_blobs(3);
678        let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
679        store.insert_all(blobs).unwrap();
680
681        store.delete_all(txs).unwrap();
682        let stat = store.cleanup();
683        assert_eq!(stat.delete_succeed, 3);
684        assert_eq!(stat.delete_failed, 0);
685    }
686}