1use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStoreSize};
4use alloy_eips::eip4844::{BlobAndProofV1, BlobTransactionSidecar};
5use alloy_primitives::{TxHash, B256};
6use parking_lot::{Mutex, RwLock};
7use schnellru::{ByLength, LruMap};
8use std::{collections::HashSet, fmt, fs, io, path::PathBuf, sync::Arc};
9use tracing::{debug, trace};
10
11pub const DEFAULT_MAX_CACHED_BLOBS: u32 = 100;
13
14#[derive(Clone, Debug)]
20pub struct DiskFileBlobStore {
21 inner: Arc<DiskFileBlobStoreInner>,
22}
23
24impl DiskFileBlobStore {
25 pub fn open(
27 blob_dir: impl Into<PathBuf>,
28 opts: DiskFileBlobStoreConfig,
29 ) -> Result<Self, DiskFileBlobStoreError> {
30 let blob_dir = blob_dir.into();
31 let DiskFileBlobStoreConfig { max_cached_entries, .. } = opts;
32 let inner = DiskFileBlobStoreInner::new(blob_dir, max_cached_entries);
33
34 inner.delete_all()?;
36 inner.create_blob_dir()?;
37
38 Ok(Self { inner: Arc::new(inner) })
39 }
40
41 #[cfg(test)]
42 fn is_cached(&self, tx: &B256) -> bool {
43 self.inner.blob_cache.lock().get(tx).is_some()
44 }
45
46 #[cfg(test)]
47 fn clear_cache(&self) {
48 self.inner.blob_cache.lock().clear()
49 }
50}
51
52impl BlobStore for DiskFileBlobStore {
53 fn insert(&self, tx: B256, data: BlobTransactionSidecar) -> Result<(), BlobStoreError> {
54 self.inner.insert_one(tx, data)
55 }
56
57 fn insert_all(&self, txs: Vec<(B256, BlobTransactionSidecar)>) -> Result<(), BlobStoreError> {
58 if txs.is_empty() {
59 return Ok(())
60 }
61 self.inner.insert_many(txs)
62 }
63
64 fn delete(&self, tx: B256) -> Result<(), BlobStoreError> {
65 if self.inner.contains(tx)? {
66 self.inner.txs_to_delete.write().insert(tx);
67 }
68 Ok(())
69 }
70
71 fn delete_all(&self, txs: Vec<B256>) -> Result<(), BlobStoreError> {
72 let txs = self.inner.retain_existing(txs)?;
73 self.inner.txs_to_delete.write().extend(txs);
74 Ok(())
75 }
76
77 fn cleanup(&self) -> BlobStoreCleanupStat {
78 let txs_to_delete = std::mem::take(&mut *self.inner.txs_to_delete.write());
79 let mut stat = BlobStoreCleanupStat::default();
80 let mut subsize = 0;
81 debug!(target:"txpool::blob", num_blobs=%txs_to_delete.len(), "Removing blobs from disk");
82 for tx in txs_to_delete {
83 let path = self.inner.blob_disk_file(tx);
84 let filesize = fs::metadata(&path).map_or(0, |meta| meta.len());
85 match fs::remove_file(&path) {
86 Ok(_) => {
87 stat.delete_succeed += 1;
88 subsize += filesize;
89 }
90 Err(e) => {
91 stat.delete_failed += 1;
92 let err = DiskFileBlobStoreError::DeleteFile(tx, path, e);
93 debug!(target:"txpool::blob", %err);
94 }
95 };
96 }
97 self.inner.size_tracker.sub_size(subsize as usize);
98 self.inner.size_tracker.sub_len(stat.delete_succeed);
99 stat
100 }
101
102 fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
103 self.inner.get_one(tx)
104 }
105
106 fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
107 self.inner.contains(tx)
108 }
109
110 fn get_all(
111 &self,
112 txs: Vec<B256>,
113 ) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
114 if txs.is_empty() {
115 return Ok(Vec::new())
116 }
117 self.inner.get_all(txs)
118 }
119
120 fn get_exact(
121 &self,
122 txs: Vec<B256>,
123 ) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
124 if txs.is_empty() {
125 return Ok(Vec::new())
126 }
127 self.inner.get_exact(txs)
128 }
129
130 fn get_by_versioned_hashes(
131 &self,
132 versioned_hashes: &[B256],
133 ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
134 let mut result = vec![None; versioned_hashes.len()];
135 for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() {
136 for (i, blob_versioned_hash) in blob_sidecar.versioned_hashes().enumerate() {
137 for (j, target_versioned_hash) in versioned_hashes.iter().enumerate() {
138 if blob_versioned_hash == *target_versioned_hash {
139 result[j].get_or_insert_with(|| BlobAndProofV1 {
140 blob: Box::new(blob_sidecar.blobs[i]),
141 proof: blob_sidecar.proofs[i],
142 });
143 }
144 }
145 }
146
147 if result.iter().all(|blob| blob.is_some()) {
149 break;
150 }
151 }
152 Ok(result)
153 }
154
155 fn data_size_hint(&self) -> Option<usize> {
156 Some(self.inner.size_tracker.data_size())
157 }
158
159 fn blobs_len(&self) -> usize {
160 self.inner.size_tracker.blobs_len()
161 }
162}
163
164struct DiskFileBlobStoreInner {
165 blob_dir: PathBuf,
166 blob_cache: Mutex<LruMap<TxHash, Arc<BlobTransactionSidecar>, ByLength>>,
167 size_tracker: BlobStoreSize,
168 file_lock: RwLock<()>,
169 txs_to_delete: RwLock<HashSet<B256>>,
170}
171
172impl DiskFileBlobStoreInner {
173 fn new(blob_dir: PathBuf, max_length: u32) -> Self {
175 Self {
176 blob_dir,
177 blob_cache: Mutex::new(LruMap::new(ByLength::new(max_length))),
178 size_tracker: Default::default(),
179 file_lock: Default::default(),
180 txs_to_delete: Default::default(),
181 }
182 }
183
184 fn create_blob_dir(&self) -> Result<(), DiskFileBlobStoreError> {
186 debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Creating blob store");
187 fs::create_dir_all(&self.blob_dir)
188 .map_err(|e| DiskFileBlobStoreError::Open(self.blob_dir.clone(), e))
189 }
190
191 fn delete_all(&self) -> Result<(), DiskFileBlobStoreError> {
193 match fs::remove_dir_all(&self.blob_dir) {
194 Ok(_) => {
195 debug!(target:"txpool::blob", blob_dir = ?self.blob_dir, "Removed blob store directory");
196 }
197 Err(err) if err.kind() == io::ErrorKind::NotFound => {}
198 Err(err) => return Err(DiskFileBlobStoreError::Open(self.blob_dir.clone(), err)),
199 }
200 Ok(())
201 }
202
203 fn insert_one(&self, tx: B256, data: BlobTransactionSidecar) -> Result<(), BlobStoreError> {
205 let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
206 data.rlp_encode_fields(&mut buf);
207 self.blob_cache.lock().insert(tx, Arc::new(data));
208 let size = self.write_one_encoded(tx, &buf)?;
209
210 self.size_tracker.add_size(size);
211 self.size_tracker.inc_len(1);
212 Ok(())
213 }
214
215 fn insert_many(&self, txs: Vec<(B256, BlobTransactionSidecar)>) -> Result<(), BlobStoreError> {
217 let raw = txs
218 .iter()
219 .map(|(tx, data)| {
220 let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
221 data.rlp_encode_fields(&mut buf);
222 (self.blob_disk_file(*tx), buf)
223 })
224 .collect::<Vec<_>>();
225
226 {
227 let mut cache = self.blob_cache.lock();
228 for (tx, data) in txs {
229 cache.insert(tx, Arc::new(data));
230 }
231 }
232 let mut add = 0;
233 let mut num = 0;
234 {
235 let _lock = self.file_lock.write();
236 for (path, data) in raw {
237 if path.exists() {
238 debug!(target:"txpool::blob", ?path, "Blob already exists");
239 } else if let Err(err) = fs::write(&path, &data) {
240 debug!(target:"txpool::blob", %err, ?path, "Failed to write blob file");
241 } else {
242 add += data.len();
243 num += 1;
244 }
245 }
246 }
247 self.size_tracker.add_size(add);
248 self.size_tracker.inc_len(num);
249
250 Ok(())
251 }
252
253 fn contains(&self, tx: B256) -> Result<bool, BlobStoreError> {
255 if self.blob_cache.lock().get(&tx).is_some() {
256 return Ok(true)
257 }
258 Ok(self.blob_disk_file(tx).is_file())
260 }
261
262 fn retain_existing(&self, txs: Vec<B256>) -> Result<Vec<B256>, BlobStoreError> {
264 let (in_cache, not_in_cache): (Vec<B256>, Vec<B256>) = {
265 let mut cache = self.blob_cache.lock();
266 txs.into_iter().partition(|tx| cache.get(tx).is_some())
267 };
268
269 let mut existing = in_cache;
270 for tx in not_in_cache {
271 if self.blob_disk_file(tx).is_file() {
272 existing.push(tx);
273 }
274 }
275
276 Ok(existing)
277 }
278
279 fn get_one(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
281 if let Some(blob) = self.blob_cache.lock().get(&tx) {
282 return Ok(Some(blob.clone()))
283 }
284 let blob = self.read_one(tx)?;
285
286 if let Some(blob) = &blob {
287 let blob_arc = Arc::new(blob.clone());
288 self.blob_cache.lock().insert(tx, blob_arc.clone());
289 return Ok(Some(blob_arc))
290 }
291
292 Ok(None)
293 }
294
295 #[inline]
297 fn blob_disk_file(&self, tx: B256) -> PathBuf {
298 self.blob_dir.join(format!("{tx:x}"))
299 }
300
301 #[inline]
303 fn read_one(&self, tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
304 let path = self.blob_disk_file(tx);
305 let data = {
306 let _lock = self.file_lock.read();
307 match fs::read(&path) {
308 Ok(data) => data,
309 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
310 Err(e) => {
311 return Err(BlobStoreError::Other(Box::new(DiskFileBlobStoreError::ReadFile(
312 tx, path, e,
313 ))))
314 }
315 }
316 };
317 BlobTransactionSidecar::rlp_decode_fields(&mut data.as_slice())
318 .map(Some)
319 .map_err(BlobStoreError::DecodeError)
320 }
321
322 fn read_many_decoded(&self, txs: Vec<TxHash>) -> Vec<(TxHash, BlobTransactionSidecar)> {
324 self.read_many_raw(txs)
325 .into_iter()
326 .filter_map(|(tx, data)| {
327 BlobTransactionSidecar::rlp_decode_fields(&mut data.as_slice())
328 .map(|sidecar| (tx, sidecar))
329 .ok()
330 })
331 .collect()
332 }
333
334 #[inline]
338 fn read_many_raw(&self, txs: Vec<TxHash>) -> Vec<(TxHash, Vec<u8>)> {
339 let mut res = Vec::with_capacity(txs.len());
340 let _lock = self.file_lock.read();
341 for tx in txs {
342 let path = self.blob_disk_file(tx);
343 match fs::read(&path) {
344 Ok(data) => {
345 res.push((tx, data));
346 }
347 Err(err) => {
348 debug!(target:"txpool::blob", %err, ?tx, "Failed to read blob file");
349 }
350 };
351 }
352 res
353 }
354
355 #[inline]
357 fn write_one_encoded(&self, tx: B256, data: &[u8]) -> Result<usize, DiskFileBlobStoreError> {
358 trace!(target:"txpool::blob", "[{:?}] writing blob file", tx);
359 let mut add = 0;
360 let path = self.blob_disk_file(tx);
361 {
362 let _lock = self.file_lock.write();
363 if !path.exists() {
364 fs::write(&path, data)
365 .map_err(|e| DiskFileBlobStoreError::WriteFile(tx, path, e))?;
366 add = data.len();
367 }
368 }
369 Ok(add)
370 }
371
372 #[inline]
377 fn get_all(
378 &self,
379 txs: Vec<B256>,
380 ) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
381 let mut res = Vec::with_capacity(txs.len());
382 let mut cache_miss = Vec::new();
383 {
384 let mut cache = self.blob_cache.lock();
385 for tx in txs {
386 if let Some(blob) = cache.get(&tx) {
387 res.push((tx, blob.clone()));
388 } else {
389 cache_miss.push(tx)
390 }
391 }
392 }
393 if cache_miss.is_empty() {
394 return Ok(res)
395 }
396 let from_disk = self.read_many_decoded(cache_miss);
397 if from_disk.is_empty() {
398 return Ok(res)
399 }
400 let mut cache = self.blob_cache.lock();
401 for (tx, data) in from_disk {
402 let arc = Arc::new(data.clone());
403 cache.insert(tx, arc.clone());
404 res.push((tx, arc.clone()));
405 }
406
407 Ok(res)
408 }
409
410 #[inline]
414 fn get_exact(
415 &self,
416 txs: Vec<B256>,
417 ) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
418 txs.into_iter()
419 .map(|tx| self.get_one(tx)?.ok_or(BlobStoreError::MissingSidecar(tx)))
420 .collect()
421 }
422}
423
424impl fmt::Debug for DiskFileBlobStoreInner {
425 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
426 f.debug_struct("DiskFileBlobStoreInner")
427 .field("blob_dir", &self.blob_dir)
428 .field("cached_blobs", &self.blob_cache.try_lock().map(|lock| lock.len()))
429 .field("txs_to_delete", &self.txs_to_delete.try_read())
430 .finish()
431 }
432}
433
434#[derive(Debug, thiserror::Error)]
436pub enum DiskFileBlobStoreError {
437 #[error("failed to open blobstore at {0}: {1}")]
439 Open(PathBuf, io::Error),
441 #[error("[{0}] failed to read blob file at {1}: {2}")]
443 ReadFile(TxHash, PathBuf, io::Error),
445 #[error("[{0}] failed to write blob file at {1}: {2}")]
447 WriteFile(TxHash, PathBuf, io::Error),
449 #[error("[{0}] failed to delete blob file at {1}: {2}")]
451 DeleteFile(TxHash, PathBuf, io::Error),
453}
454
455impl From<DiskFileBlobStoreError> for BlobStoreError {
456 fn from(value: DiskFileBlobStoreError) -> Self {
457 Self::Other(Box::new(value))
458 }
459}
460
461#[derive(Debug, Clone)]
463pub struct DiskFileBlobStoreConfig {
464 pub max_cached_entries: u32,
466 pub open: OpenDiskFileBlobStore,
468}
469
470impl Default for DiskFileBlobStoreConfig {
471 fn default() -> Self {
472 Self { max_cached_entries: DEFAULT_MAX_CACHED_BLOBS, open: Default::default() }
473 }
474}
475
476impl DiskFileBlobStoreConfig {
477 pub const fn with_max_cached_entries(mut self, max_cached_entries: u32) -> Self {
479 self.max_cached_entries = max_cached_entries;
480 self
481 }
482}
483
484#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
486pub enum OpenDiskFileBlobStore {
487 #[default]
489 Clear,
490 ReIndex,
492}
493
494#[cfg(test)]
495mod tests {
496 use super::*;
497 use std::sync::atomic::Ordering;
498
499 fn tmp_store() -> (DiskFileBlobStore, tempfile::TempDir) {
500 let dir = tempfile::tempdir().unwrap();
501 let store = DiskFileBlobStore::open(dir.path(), Default::default()).unwrap();
502 (store, dir)
503 }
504
505 fn rng_blobs(num: usize) -> Vec<(TxHash, BlobTransactionSidecar)> {
506 let mut rng = rand::thread_rng();
507 (0..num)
508 .map(|_| {
509 let tx = TxHash::random_with(&mut rng);
510 let blob =
511 BlobTransactionSidecar { blobs: vec![], commitments: vec![], proofs: vec![] };
512 (tx, blob)
513 })
514 .collect()
515 }
516
517 #[test]
518 fn disk_insert_all_get_all() {
519 let (store, _dir) = tmp_store();
520
521 let blobs = rng_blobs(10);
522 let all_hashes = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
523 store.insert_all(blobs.clone()).unwrap();
524
525 for (tx, blob) in &blobs {
527 assert!(store.is_cached(tx));
528 let b = store.get(*tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
529 assert_eq!(b, *blob);
530 }
531
532 let all = store.get_all(all_hashes.clone()).unwrap();
533 for (tx, blob) in all {
534 assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))), "missing blob {tx:?}");
535 }
536
537 assert!(store.contains(all_hashes[0]).unwrap());
538 store.delete_all(all_hashes.clone()).unwrap();
539 assert!(store.inner.txs_to_delete.read().contains(&all_hashes[0]));
540 store.clear_cache();
541 store.cleanup();
542
543 assert!(store.get(blobs[0].0).unwrap().is_none());
544
545 let all = store.get_all(all_hashes.clone()).unwrap();
546 assert!(all.is_empty());
547
548 assert!(!store.contains(all_hashes[0]).unwrap());
549 assert!(store.get_exact(all_hashes).is_err());
550
551 assert_eq!(store.data_size_hint(), Some(0));
552 assert_eq!(store.inner.size_tracker.num_blobs.load(Ordering::Relaxed), 0);
553 }
554
555 #[test]
556 fn disk_insert_and_retrieve() {
557 let (store, _dir) = tmp_store();
558
559 let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
560 store.insert(tx, blob.clone()).unwrap();
561
562 assert!(store.is_cached(&tx));
563 let retrieved_blob = store.get(tx).unwrap().map(Arc::unwrap_or_clone).unwrap();
564 assert_eq!(retrieved_blob, blob);
565 }
566
567 #[test]
568 fn disk_delete_blob() {
569 let (store, _dir) = tmp_store();
570
571 let (tx, blob) = rng_blobs(1).into_iter().next().unwrap();
572 store.insert(tx, blob).unwrap();
573 assert!(store.is_cached(&tx));
574
575 store.delete(tx).unwrap();
576 assert!(store.inner.txs_to_delete.read().contains(&tx));
577 store.cleanup();
578
579 let result = store.get(tx).unwrap();
580 assert_eq!(
581 result,
582 Some(Arc::new(BlobTransactionSidecar {
583 blobs: vec![],
584 commitments: vec![],
585 proofs: vec![]
586 }))
587 );
588 }
589
590 #[test]
591 fn disk_insert_all_and_delete_all() {
592 let (store, _dir) = tmp_store();
593
594 let blobs = rng_blobs(5);
595 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
596 store.insert_all(blobs.clone()).unwrap();
597
598 for (tx, _) in &blobs {
599 assert!(store.is_cached(tx));
600 }
601
602 store.delete_all(txs.clone()).unwrap();
603 store.cleanup();
604
605 for tx in txs {
606 let result = store.get(tx).unwrap();
607 assert_eq!(
608 result,
609 Some(Arc::new(BlobTransactionSidecar {
610 blobs: vec![],
611 commitments: vec![],
612 proofs: vec![]
613 }))
614 );
615 }
616 }
617
618 #[test]
619 fn disk_get_all_blobs() {
620 let (store, _dir) = tmp_store();
621
622 let blobs = rng_blobs(3);
623 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
624 store.insert_all(blobs.clone()).unwrap();
625
626 let retrieved_blobs = store.get_all(txs.clone()).unwrap();
627 for (tx, blob) in retrieved_blobs {
628 assert!(blobs.contains(&(tx, Arc::unwrap_or_clone(blob))));
629 }
630
631 store.delete_all(txs).unwrap();
632 store.cleanup();
633 }
634
635 #[test]
636 fn disk_get_exact_blobs_success() {
637 let (store, _dir) = tmp_store();
638
639 let blobs = rng_blobs(3);
640 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
641 store.insert_all(blobs.clone()).unwrap();
642
643 let retrieved_blobs = store.get_exact(txs).unwrap();
644 for (retrieved_blob, (_, original_blob)) in retrieved_blobs.into_iter().zip(blobs) {
645 assert_eq!(Arc::unwrap_or_clone(retrieved_blob), original_blob);
646 }
647 }
648
649 #[test]
650 fn disk_get_exact_blobs_failure() {
651 let (store, _dir) = tmp_store();
652
653 let blobs = rng_blobs(2);
654 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
655 store.insert_all(blobs).unwrap();
656
657 let missing_tx = TxHash::random();
659 let result = store.get_exact(vec![txs[0], missing_tx]);
660 assert!(result.is_err());
661 }
662
663 #[test]
664 fn disk_data_size_hint() {
665 let (store, _dir) = tmp_store();
666 assert_eq!(store.data_size_hint(), Some(0));
667
668 let blobs = rng_blobs(2);
669 store.insert_all(blobs).unwrap();
670 assert!(store.data_size_hint().unwrap() > 0);
671 }
672
673 #[test]
674 fn disk_cleanup_stat() {
675 let (store, _dir) = tmp_store();
676
677 let blobs = rng_blobs(3);
678 let txs = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
679 store.insert_all(blobs).unwrap();
680
681 store.delete_all(txs).unwrap();
682 let stat = store.cleanup();
683 assert_eq!(stat.delete_succeed, 3);
684 assert_eq!(stat.delete_failed, 0);
685 }
686}