1use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
4use alloy_eips::{
5 eip4844::{BlobAndProofV1, BlobAndProofV2},
6 eip7594::BlobTransactionSidecarVariant,
7};
8use alloy_primitives::{TxHash, B256};
9use parking_lot::{Mutex, RwLock};
10use schnellru::{ByLength, LruMap};
11use std::{collections::HashSet, fmt, fs, io, path::PathBuf, sync::Arc};
12use tracing::{debug, trace};
13
14pub const DEFAULT_MAX_CACHED_BLOBS: u32 = 100;
16
17#[derive(Clone, Debug)]
23pub struct DiskFileBlobStore {
24 inner: Arc<DiskFileBlobStoreInner>,
25}
26
27impl DiskFileBlobStore {
28 pub fn open(
30 blob_dir: impl Into<PathBuf>,
31 opts: DiskFileBlobStoreConfig,
32 ) -> Result<Self, DiskFileBlobStoreError> {
33 let blob_dir = blob_dir.into();
34 let DiskFileBlobStoreConfig { max_cached_entries, .. } = opts;
35 let inner = DiskFileBlobStoreInner::new(blob_dir, max_cached_entries);
36
37 inner.delete_all()?;
39 inner.create_blob_dir()?;
40
41 Ok(Self { inner: Arc::new(inner) })
42 }
43
44 #[cfg(test)]
45 fn is_cached(&self, tx: &B256) -> bool {
46 self.inner.blob_cache.lock().get(tx).is_some()
47 }
48
49 #[cfg(test)]
50 fn clear_cache(&self) {
51 self.inner.blob_cache.lock().clear()
52 }
53}
54
55impl BlobStore for DiskFileBlobStore {
56 fn insert(&self, tx: B256, data: BlobTransactionSidecarVariant) -> Result<(), BlobStoreError> {
57 self.inner.insert_one(tx, data)
58 }
59
60 fn insert_all(
61 &self,
62 txs: Vec<(B256, BlobTransactionSidecarVariant)>,
63 ) -> Result<(), BlobStoreError> {
64 if txs.is_empty() {
65 return Ok(())
66 }
67 self.inner.insert_many(txs)
68 }
69
70 fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
71 if self.inner.contains(tx)? {
72 self.inner.txs_to_delete.write().insert(tx);
73 }
74 Ok(())
75 }
76
77 fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
78 let txs = self.inner.retain_existing(txs)?;
79 self.inner.txs_to_delete.write().extend(txs);
80 Ok(())
81 }
82
83 fn cleanup(&self) -> BlobStoreCleanupStat {
84 let txs_to_delete = std::mem::take(&mut *self.inner.txs_to_delete.write());
85 let mut stat = BlobStoreCleanupStat::default();
86 let mut subsize = 0;
87 debug!(target:"txpool::blob", num_blobs=%txs_to_delete.len(), "Removing blobs from disk");
88 for tx in txs_to_delete {
89 let path = self.inner.blob_disk_file(tx);
90 let filesize = fs::metadata(&path).map_or(0, |meta| meta.len());
91 match fs::remove_file(&path) {
92 Ok(_) => {
93 stat.delete_succeed += 1;
94 subsize += filesize;
95 }
96 Err(e) => {
97 stat.delete_failed += 1;
98 let err = DiskFileBlobStoreError::DeleteFile(tx, path, e);
99 debug!(target:"txpool::blob", %err);
100 }
101 };
102 }
103 self.inner.size_tracker.sub_size(subsize as usize);
104 self.inner.size_tracker.sub_len(stat.delete_succeed);
105 stat
106 }
107
108 fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
109 self.inner.get_one(tx)
110 }
111
112 fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
113 self.inner.contains(tx)
114 }
115
116 fn get_all(
117 &self,
118 txs: Vec<B256>,
119 ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
120 if txs.is_empty() {
121 return Ok(Vec::new())
122 }
123 self.inner.get_all(txs)
124 }
125
126 fn get_exact(
127 &self,
128 txs: Vec<B256>,
129 ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
130 if txs.is_empty() {
131 return Ok(Vec::new())
132 }
133 self.inner.get_exact(txs)
134 }
135
136 fn get_by_versioned_hashes_v1(
137 &self,
138 versioned_hashes: &[B256],
139 ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
140 let mut result = vec![None; versioned_hashes.len()];
142
143 for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
145 if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
146 for (hash_idx, match_result) in
147 blob_sidecar.match_versioned_hashes(versioned_hashes)
148 {
149 result[hash_idx] = Some(match_result);
150 }
151 }
152
153 if result.iter().all(|blob| blob.is_some()) {
155 return Ok(result);
156 }
157 }
158
159 let mut missing_tx_hashes = Vec::new();
162
163 {
164 let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
165 for (idx, _) in
166 result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
167 {
168 let versioned_hash = versioned_hashes[idx];
170 if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
171 missing_tx_hashes.push(tx_hash);
172 }
173 }
174 }
175
176 if !missing_tx_hashes.is_empty() {
178 let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
179 for (_, blob_sidecar) in blobs_from_disk {
180 if let Some(blob_sidecar) = blob_sidecar.as_eip4844() {
181 for (hash_idx, match_result) in
182 blob_sidecar.match_versioned_hashes(versioned_hashes)
183 {
184 if result[hash_idx].is_none() {
185 result[hash_idx] = Some(match_result);
186 }
187 }
188 }
189 }
190 }
191
192 Ok(result)
193 }
194
195 fn get_by_versioned_hashes_v2(
196 &self,
197 versioned_hashes: &[B256],
198 ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
199 let mut result = vec![None; versioned_hashes.len()];
202
203 for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
205 if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
206 for (hash_idx, match_result) in
207 blob_sidecar.match_versioned_hashes(versioned_hashes)
208 {
209 result[hash_idx] = Some(match_result);
210 }
211 }
212
213 if result.iter().all(|blob| blob.is_some()) {
215 return Ok(Some(result.into_iter().map(Option::unwrap).collect()))
217 }
218 }
219
220 let mut missing_tx_hashes = Vec::new();
222
223 {
224 let mut versioned_to_txhashes = self.inner.versioned_hashes_to_txhash.lock();
225 for (idx, _) in
226 result.iter().enumerate().filter(|(_, blob_and_proof)| blob_and_proof.is_none())
227 {
228 let versioned_hash = versioned_hashes[idx];
230 if let Some(tx_hash) = versioned_to_txhashes.get(&versioned_hash).copied() {
231 missing_tx_hashes.push(tx_hash);
232 }
233 }
234 }
235
236 if !missing_tx_hashes.is_empty() {
238 let blobs_from_disk = self.inner.read_many_decoded(missing_tx_hashes);
239 for (_, blob_sidecar) in blobs_from_disk {
240 if let Some(blob_sidecar) = blob_sidecar.as_eip7594() {
241 for (hash_idx, match_result) in
242 blob_sidecar.match_versioned_hashes(versioned_hashes)
243 {
244 if result[hash_idx].is_none() {
245 result[hash_idx] = Some(match_result);
246 }
247 }
248 }
249 }
250 }
251
252 if result.iter().all(|blob| blob.is_some()) {
254 Ok(Some(result.into_iter().map(Option::unwrap).collect()))
255 } else {
256 Ok(None)
257 }
258 }
259
260 fn data_size_hint(&self) -> Option<usize> {
261 Some(self.inner.size_tracker.data_size())
262 }
263
264 fn blobs_len(&self) -> usize {
265 self.inner.size_tracker.blobs_len()
266 }
267}
268
269struct DiskFileBlobStoreInner {
270 blob_dir: PathBuf,
271 blob_cache: Mutex<LruMap<TxHash, Arc<BlobTransactionSidecarVariant>, ByLength>>,
272 size_tracker: BlobStoreSize,
273 file_lock: RwLock<()>,
274 txs_to_delete: RwLock<HashSet<B256>>,
275 versioned_hashes_to_txhash: Mutex<LruMap<B256, B256>>,
280}
281
282impl DiskFileBlobStoreInner {
283 fn new(blob_dir: PathBuf, max_length: u32) -> Self {
285 Self {
286 blob_dir,
287 blob_cache: Mutex::new(LruMap::new(ByLength::new(max_length))),
288 size_tracker: Default::default(),
289 file_lock: Default::default(),
290 txs_to_delete: Default::default(),
291 versioned_hashes_to_txhash: Mutex::new(LruMap::new(ByLength::new(max_length * 6))),
292 }
293 }
294
295 fn create_blob_dir(&self) -> Result<(), DiskFileBlobStoreError> {
297 debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Creating blob store");
298 fs::create_dir_all(&self.blob_dir)
299 .map_err(|e| DiskFileBlobStoreError::Open(self.blob_dir.clone(), e))
300 }
301
302 fn delete_all(&self) -> Result<(), DiskFileBlobStoreError> {
304 match fs::remove_dir_all(&self.blob_dir) {
305 Ok(_) => {
306 debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Removed blob store directory");
307 }
308 Err(err) if err.kind() == io::ErrorKind::NotFound => {}
309 Err(err) => return Err(DiskFileBlobStoreError::Open(self.blob_dir.clone(), err)),
310 }
311 Ok(())
312 }
313
314 fn insert_one(
316 &self,
317 tx: B256,
318 data: BlobTransactionSidecarVariant,
319 ) -> Result<(), BlobStoreError> {
320 let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
321 data.rlp_encode_fields(&mut buf);
322
323 {
324 let mut map = self.versioned_hashes_to_txhash.lock();
326 match &data {
327 BlobTransactionSidecarVariant::Eip4844(data) => {
328 data.versioned_hashes().for_each(|hash| {
329 map.insert(hash, tx);
330 });
331 }
332 BlobTransactionSidecarVariant::Eip7594(data) => {
333 data.versioned_hashes().for_each(|hash| {
334 map.insert(hash, tx);
335 });
336 }
337 }
338 }
339
340 self.blob_cache.lock().insert(tx, Arc::new(data));
341
342 let size = self.write_one_encoded(tx, &buf)?;
343
344 self.size_tracker.add_size(size);
345 self.size_tracker.inc_len(1);
346 Ok(())
347 }
348
349 fn insert_many(
351 &self,
352 txs: Vec<(B256, BlobTransactionSidecarVariant)>,
353 ) -> Result<(), BlobStoreError> {
354 let raw = txs
355 .iter()
356 .map(|(tx, data)| {
357 let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
358 data.rlp_encode_fields(&mut buf);
359 (self.blob_disk_file(*tx), buf)
360 })
361 .collect::<Vec<_>>();
362
363 {
364 let mut map = self.versioned_hashes_to_txhash.lock();
366 for (tx, data) in &txs {
367 match data {
368 BlobTransactionSidecarVariant::Eip4844(data) => {
369 data.versioned_hashes().for_each(|hash| {
370 map.insert(hash, *tx);
371 });
372 }
373 BlobTransactionSidecarVariant::Eip7594(data) => {
374 data.versioned_hashes().for_each(|hash| {
375 map.insert(hash, *tx);
376 });
377 }
378 }
379 }
380 }
381
382 {
383 let mut cache = self.blob_cache.lock();
385 for (tx, data) in txs {
386 cache.insert(tx, Arc::new(data));
387 }
388 }
389
390 let mut add = 0;
391 let mut num = 0;
392 {
393 let _lock = self.file_lock.write();
394 for (path, data) in raw {
395 if path.exists() {
396 debug!(target:"txpool::blob", ?path, "Blob already exists");
397 } else if let Err(err) = fs::write(&path, &data) {
398 debug!(target:"txpool::blob", %err, ?path, "Failed to write blob file");
399 } else {
400 add += data.len();
401 num += 1;
402 }
403 }
404 }
405 self.size_tracker.add_size(add);
406 self.size_tracker.inc_len(num);
407
408 Ok(())
409 }
410
411 fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
413 if self.blob_cache.lock().get(&tx).is_some() {
414 return Ok(true)
415 }
416 Ok(self.blob_disk_file(tx).is_file())
418 }
419
420 fn retain_existing(&self, txs: Vec<B256>) -> Result<Vec<B256>, BlobStoreError> {
422 let (in_cache, not_in_cache): (Vec<B256>, Vec<B256>) = {
423 let mut cache = self.blob_cache.lock();
424 txs.into_iter().partition(|tx| cache.get(tx).is_some())
425 };
426
427 let mut existing = in_cache;
428 for tx in not_in_cache {
429 if self.blob_disk_file(tx).is_file() {
430 existing.push(tx);
431 }
432 }
433
434 Ok(existing)
435 }
436
437 fn get_one(
439 &self,
440 tx: B256,
441 ) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
442 if let Some(blob) = self.blob_cache.lock().get(&tx) {
443 return Ok(Some(blob.clone()))
444 }
445 let blob = self.read_one(tx)?;
446
447 if let Some(blob) = &blob {
448 let blob_arc = Arc::new(blob.clone());
449 self.blob_cache.lock().insert(tx, blob_arc.clone());
450 return Ok(Some(blob_arc))
451 }
452
453 Ok(None)
454 }
455
456 #[inline]
458 fn blob_disk_file(&self, tx: B256) -> PathBuf {
459 self.blob_dir.join(format!("{tx:x}"))
460 }
461
462 #[inline]
464 fn read_one(&self, tx: B256) -> Result<Option<BlobTransactionSidecarVariant>, BlobStoreError> {
465 let path = self.blob_disk_file(tx);
466 let data = {
467 let _lock = self.file_lock.read();
468 match fs::read(&path) {
469 Ok(data) => data,
470 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
471 Err(e) => {
472 return Err(BlobStoreError::Other(Box::new(DiskFileBlobStoreError::ReadFile(
473 tx, path, e,
474 ))))
475 }
476 }
477 };
478 BlobTransactionSidecarVariant::rlp_decode_fields(&mut data.as_slice())
479 .map(Some)
480 .map_err(BlobStoreError::DecodeError)
481 }
482
483 fn read_many_decoded(&self, txs: Vec<TxHash>) -> Vec<(TxHash, BlobTransactionSidecarVariant)> {
487 self.read_many_raw(txs)
488 .into_iter()
489 .filter_map(|(tx, data)| {
490 BlobTransactionSidecarVariant::rlp_decode_fields(&mut data.as_slice())
491 .map(|sidecar| (tx, sidecar))
492 .ok()
493 })
494 .collect()
495 }
496
497 #[inline]
501 fn read_many_raw(&self, txs: Vec<TxHash>) -> Vec<(TxHash, Vec<u8>)> {
502 let mut res = Vec::with_capacity(txs.len());
503 let _lock = self.file_lock.read();
504 for tx in txs {
505 let path = self.blob_disk_file(tx);
506 match fs::read(&path) {
507 Ok(data) => {
508 res.push((tx, data));
509 }
510 Err(err) => {
511 debug!(target:"txpool::blob", %err, ?tx, "Failed to read blob file");
512 }
513 };
514 }
515 res
516 }
517
518 #[inline]
520 fn write_one_encoded(&self, tx: B256, data: &[u8]) -> Result<usize, DiskFileBlobStoreError> {
521 trace!(target:"txpool::blob", "[{:?}] writing blob file", tx);
522 let mut add = 0;
523 let path = self.blob_disk_file(tx);
524 {
525 let _lock = self.file_lock.write();
526 if !path.exists() {
527 fs::write(&path, data)
528 .map_err(|e| DiskFileBlobStoreError::WriteFile(tx, path, e))?;
529 add = data.len();
530 }
531 }
532 Ok(add)
533 }
534
535 #[inline]
540 fn get_all(
541 &self,
542 txs: Vec<B256>,
543 ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
544 let mut res = Vec::with_capacity(txs.len());
545 let mut cache_miss = Vec::new();
546 {
547 let mut cache = self.blob_cache.lock();
548 for tx in txs {
549 if let Some(blob) = cache.get(&tx) {
550 res.push((tx, blob.clone()));
551 } else {
552 cache_miss.push(tx)
553 }
554 }
555 }
556 if cache_miss.is_empty() {
557 return Ok(res)
558 }
559 let from_disk = self.read_many_decoded(cache_miss);
560 if from_disk.is_empty() {
561 return Ok(res)
562 }
563 let mut cache = self.blob_cache.lock();
564 for (tx, data) in from_disk {
565 let arc = Arc::new(data.clone());
566 cache.insert(tx, arc.clone());
567 res.push((tx, arc.clone()));
568 }
569
570 Ok(res)
571 }
572
573 #[inline]
577 fn get_exact(
578 &self,
579 txs: Vec<B256>,
580 ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
581 txs.into_iter()
582 .map(|tx| self.get_one(tx)?.ok_or(BlobStoreError::MissingSidecar(tx)))
583 .collect()
584 }
585}
586
587impl fmt::Debug for DiskFileBlobStoreInner {
588 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
589 f.debug_struct("DiskFileBlobStoreInner")
590 .field("blob_dir", &self.blob_dir)
591 .field("cached_blobs", &self.blob_cache.try_lock().map(|lock| lock.len()))
592 .field("txs_to_delete", &self.txs_to_delete.try_read())
593 .finish()
594 }
595}
596
597#[derive(Debug, thiserror::Error)]
599pub enum DiskFileBlobStoreError {
600 #[error("failed to open blobstore at {0}: {1}")]
602 Open(PathBuf, io::Error),
604 #[error("[{0}] failed to read blob file at {1}: {2}")]
606 ReadFile(TxHash, PathBuf, io::Error),
608 #[error("[{0}] failed to write blob file at {1}: {2}")]
610 WriteFile(TxHash, PathBuf, io::Error),
612 #[error("[{0}] failed to delete blob file at {1}: {2}")]
614 DeleteFile(TxHash, PathBuf, io::Error),
616}
617
618impl From<DiskFileBlobStoreError> for BlobStoreError {
619 fn from(value: DiskFileBlobStoreError) -> Self {
620 Self::Other(Box::new(value))
621 }
622}
623
624#[derive(Debug, Clone)]
626pub struct DiskFileBlobStoreConfig {
627 pub max_cached_entries: u32,
629 pub open: OpenDiskFileBlobStore,
631}
632
633impl Default for DiskFileBlobStoreConfig {
634 fn default() -> Self {
635 Self { max_cached_entries: DEFAULT_MAX_CACHED_BLOBS, open: Default::default() }
636 }
637}
638
639impl DiskFileBlobStoreConfig {
640 pub const fn with_max_cached_entries(mut self, max_cached_entries: u32) -> Self {
642 self.max_cached_entries = max_cached_entries;
643 self
644 }
645}
646
647#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
649pub enum OpenDiskFileBlobStore {
650 #[default]
652 Clear,
653 ReIndex,
655}
656
657#[cfg(test)]
658mod tests {
659 use alloy_consensus::BlobTransactionSidecar;
660 use alloy_eips::eip7594::BlobTransactionSidecarVariant;
661
662 use super::*;
663 use std::sync::atomic::Ordering;
664
665 fn tmp_store() -> (DiskFileBlobStore, tempfile::TempDir) {
666 let dir = tempfile::tempdir().unwrap();
667 let store = DiskFileBlobStore::open(dir.path(), Default::default()).unwrap();
668 (store, dir)
669 }
670
671 fn rng_blobs(num: usize) -> Vec<(TxHash, BlobTransactionSidecarVariant)> {
672 let mut rng = rand::rng();
673 (0..num)
674 .map(|_| {
675 let tx = TxHash::random_with(&mut rng);
676 let blob = BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
677 blobs: vec![],
678 commitments: vec![],
679 proofs: vec![],
680 });
681 (tx, blob)
682 })
683 .collect()
684 }
685
686 #[test]
687 fn disk_insert_all_get_all() {
688 let (store, _dir) = tmp_store();
689
690 let blobs = rng_blobs(10);
691 let all_hashes = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
692 store.insert_all(blobs.clone()).unwrap();
693
694 for (tx, blob) in &blobs {
696 assert!(store.is_cached(tx));
697 let b = store.get(*tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
698 assert_eq!(b, *blob);
699 }
700
701 let all = store.get_all(all_hashes.clone()).unwrap();
702 for (tx, blob) in all {
703 assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))), "missing blob {tx:?}");
704 }
705
706 assert!(store.contains(all_hashes[0]).unwrap());
707 store.delete_all(all_hashes.clone()).unwrap();
708 assert!(store.inner.txs_to_delete.read().contains(&all_hashes[0]));
709 store.clear_cache();
710 store.cleanup();
711
712 assert!(store.get(blobs[0].0).unwrap().is_none());
713
714 let all = store.get_all(all_hashes.clone()).unwrap();
715 assert!(all.is_empty());
716
717 assert!(!store.contains(all_hashes[0]).unwrap());
718 assert!(store.get_exact(all_hashes).is_err());
719
720 assert_eq!(store.data_size_hint(), Some(0));
721 assert_eq!(store.inner.size_tracker.num_blobs.load(Ordering::Relaxed), 0);
722 }
723
724 #[test]
725 fn disk_insert_and_retrieve() {
726 let (store, _dir) = tmp_store();
727
728 let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
729 store.insert(tx, blob.clone()).unwrap();
730
731 assert!(store.is_cached(&tx));
732 let retrieved_blob = store.get(tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
733 assert_eq!(retrieved_blob, blob);
734 }
735
736 #[test]
737 fn disk_delete_blob() {
738 let (store, _dir) = tmp_store();
739
740 let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
741 store.insert(tx, blob).unwrap();
742 assert!(store.is_cached(&tx));
743
744 store.delete(tx).unwrap();
745 assert!(store.inner.txs_to_delete.read().contains(&tx));
746 store.cleanup();
747
748 let result = store.get(tx).unwrap();
749 assert_eq!(
750 result,
751 Some(Arc::new(BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
752 blobs: vec![],
753 commitments: vec![],
754 proofs: vec![]
755 })))
756 );
757 }
758
759 #[test]
760 fn disk_insert_all_and_delete_all() {
761 let (store, _dir) = tmp_store();
762
763 let blobs = rng_blobs(5);
764 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
765 store.insert_all(blobs.clone()).unwrap();
766
767 for (tx, _) in &blobs {
768 assert!(store.is_cached(tx));
769 }
770
771 store.delete_all(txs.clone()).unwrap();
772 store.cleanup();
773
774 for tx in txs {
775 let result = store.get(tx).unwrap();
776 assert_eq!(
777 result,
778 Some(Arc::new(BlobTransactionSidecarVariant::Eip4844(BlobTransactionSidecar {
779 blobs: vec![],
780 commitments: vec![],
781 proofs: vec![]
782 })))
783 );
784 }
785 }
786
787 #[test]
788 fn disk_get_all_blobs() {
789 let (store, _dir) = tmp_store();
790
791 let blobs = rng_blobs(3);
792 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
793 store.insert_all(blobs.clone()).unwrap();
794
795 let retrieved_blobs = store.get_all(txs.clone()).unwrap();
796 for (tx, blob) in retrieved_blobs {
797 assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))));
798 }
799
800 store.delete_all(txs).unwrap();
801 store.cleanup();
802 }
803
804 #[test]
805 fn disk_get_exact_blobs_success() {
806 let (store, _dir) = tmp_store();
807
808 let blobs = rng_blobs(3);
809 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
810 store.insert_all(blobs.clone()).unwrap();
811
812 let retrieved_blobs = store.get_exact(txs).unwrap();
813 for (retrieved_blob, (_, original_blob)) in retrieved_blobs.into_iter().zip(blobs) {
814 assert_eq!(Arc::unwrap_or_clone(retrieved_blob), original_blob);
815 }
816 }
817
818 #[test]
819 fn disk_get_exact_blobs_failure() {
820 let (store, _dir) = tmp_store();
821
822 let blobs = rng_blobs(2);
823 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
824 store.insert_all(blobs).unwrap();
825
826 let missing_tx = TxHash::random();
828 let result = store.get_exact(vec![txs[0], missing_tx]);
829 assert!(result.is_err());
830 }
831
832 #[test]
833 fn disk_data_size_hint() {
834 let (store, _dir) = tmp_store();
835 assert_eq!(store.data_size_hint(), Some(0));
836
837 let blobs = rng_blobs(2);
838 store.insert_all(blobs).unwrap();
839 assert!(store.data_size_hint().unwrap() > 0);
840 }
841
842 #[test]
843 fn disk_cleanup_stat() {
844 let (store, _dir) = tmp_store();
845
846 let blobs = rng_blobs(3);
847 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
848 store.insert_all(blobs).unwrap();
849
850 store.delete_all(txs).unwrap();
851 let stat = store.cleanup();
852 assert_eq!(stat.delete_succeed, 3);
853 assert_eq!(stat.delete_failed, 0);
854 }
855}