reth_provider/providers/static_file/
writer.rs1use super::{
2 manager::StaticFileProviderInner, metrics::StaticFileProviderMetrics, StaticFileProvider,
3};
4use crate::providers::static_file::metrics::StaticFileProviderOperation;
5use alloy_consensus::BlockHeader;
6use alloy_primitives::{BlockHash, BlockNumber, TxNumber, U256};
7use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock};
8use reth_codecs::Compact;
9use reth_db_api::models::CompactU256;
10use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter};
11use reth_node_types::NodePrimitives;
12use reth_primitives::{
13 static_file::{SegmentHeader, SegmentRangeInclusive},
14 StaticFileSegment,
15};
16use reth_storage_errors::provider::{ProviderError, ProviderResult};
17use std::{
18 borrow::Borrow,
19 fmt::Debug,
20 path::{Path, PathBuf},
21 sync::{Arc, Weak},
22 time::Instant,
23};
24use tracing::debug;
25
26#[derive(Debug)]
31pub(crate) struct StaticFileWriters<N> {
32 headers: RwLock<Option<StaticFileProviderRW<N>>>,
33 transactions: RwLock<Option<StaticFileProviderRW<N>>>,
34 receipts: RwLock<Option<StaticFileProviderRW<N>>>,
35}
36
37impl<N> Default for StaticFileWriters<N> {
38 fn default() -> Self {
39 Self {
40 headers: Default::default(),
41 transactions: Default::default(),
42 receipts: Default::default(),
43 }
44 }
45}
46
47impl<N: NodePrimitives> StaticFileWriters<N> {
48 pub(crate) fn get_or_create(
49 &self,
50 segment: StaticFileSegment,
51 create_fn: impl FnOnce() -> ProviderResult<StaticFileProviderRW<N>>,
52 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, N>> {
53 let mut write_guard = match segment {
54 StaticFileSegment::Headers => self.headers.write(),
55 StaticFileSegment::Transactions => self.transactions.write(),
56 StaticFileSegment::Receipts => self.receipts.write(),
57 };
58
59 if write_guard.is_none() {
60 *write_guard = Some(create_fn()?);
61 }
62
63 Ok(StaticFileProviderRWRefMut(write_guard))
64 }
65
66 pub(crate) fn commit(&self) -> ProviderResult<()> {
67 for writer_lock in [&self.headers, &self.transactions, &self.receipts] {
68 let mut writer = writer_lock.write();
69 if let Some(writer) = writer.as_mut() {
70 writer.commit()?;
71 }
72 }
73 Ok(())
74 }
75}
76
77#[derive(Debug)]
79pub struct StaticFileProviderRWRefMut<'a, N>(
80 pub(crate) RwLockWriteGuard<'a, RawRwLock, Option<StaticFileProviderRW<N>>>,
81);
82
83impl<N> std::ops::DerefMut for StaticFileProviderRWRefMut<'_, N> {
84 fn deref_mut(&mut self) -> &mut Self::Target {
85 self.0.as_mut().expect("static file writer provider should be init")
87 }
88}
89
90impl<N> std::ops::Deref for StaticFileProviderRWRefMut<'_, N> {
91 type Target = StaticFileProviderRW<N>;
92
93 fn deref(&self) -> &Self::Target {
94 self.0.as_ref().expect("static file writer provider should be init")
96 }
97}
98
99#[derive(Debug)]
100pub struct StaticFileProviderRW<N> {
102 reader: Weak<StaticFileProviderInner<N>>,
106 writer: NippyJarWriter<SegmentHeader>,
108 data_path: PathBuf,
110 buf: Vec<u8>,
112 metrics: Option<Arc<StaticFileProviderMetrics>>,
114 prune_on_commit: Option<(u64, Option<BlockNumber>)>,
117}
118
119impl<N: NodePrimitives> StaticFileProviderRW<N> {
120 pub fn new(
125 segment: StaticFileSegment,
126 block: BlockNumber,
127 reader: Weak<StaticFileProviderInner<N>>,
128 metrics: Option<Arc<StaticFileProviderMetrics>>,
129 ) -> ProviderResult<Self> {
130 let (writer, data_path) = Self::open(segment, block, reader.clone(), metrics.clone())?;
131 let mut writer = Self {
132 writer,
133 data_path,
134 buf: Vec::with_capacity(100),
135 reader,
136 metrics,
137 prune_on_commit: None,
138 };
139
140 writer.ensure_end_range_consistency()?;
141
142 Ok(writer)
143 }
144
145 fn open(
146 segment: StaticFileSegment,
147 block: u64,
148 reader: Weak<StaticFileProviderInner<N>>,
149 metrics: Option<Arc<StaticFileProviderMetrics>>,
150 ) -> ProviderResult<(NippyJarWriter<SegmentHeader>, PathBuf)> {
151 let start = Instant::now();
152
153 let static_file_provider = Self::upgrade_provider_to_strong_reference(&reader);
154
155 let block_range = static_file_provider.find_fixed_range(block);
156 let (jar, path) = match static_file_provider.get_segment_provider_from_block(
157 segment,
158 block_range.start(),
159 None,
160 ) {
161 Ok(provider) => (
162 NippyJar::load(provider.data_path())
163 .map_err(|e| ProviderError::NippyJar(e.to_string()))?,
164 provider.data_path().into(),
165 ),
166 Err(ProviderError::MissingStaticFileBlock(_, _)) => {
167 let path = static_file_provider.directory().join(segment.filename(&block_range));
168 (create_jar(segment, &path, block_range), path)
169 }
170 Err(err) => return Err(err),
171 };
172
173 let result = match NippyJarWriter::new(jar) {
174 Ok(writer) => Ok((writer, path)),
175 Err(NippyJarError::FrozenJar) => {
176 Err(ProviderError::FinalizedStaticFile(segment, block))
178 }
179 Err(e) => Err(ProviderError::NippyJar(e.to_string())),
180 }?;
181
182 if let Some(metrics) = &metrics {
183 metrics.record_segment_operation(
184 segment,
185 StaticFileProviderOperation::OpenWriter,
186 Some(start.elapsed()),
187 );
188 }
189
190 Ok(result)
191 }
192
193 fn ensure_end_range_consistency(&mut self) -> ProviderResult<()> {
202 let expected_rows = if self.user_header().segment().is_headers() {
204 self.user_header().block_len().unwrap_or_default()
205 } else {
206 self.user_header().tx_len().unwrap_or_default()
207 };
208 let pruned_rows = expected_rows - self.writer.rows() as u64;
209 if pruned_rows > 0 {
210 self.user_header_mut().prune(pruned_rows);
211 }
212
213 self.writer.commit().map_err(|error| ProviderError::NippyJar(error.to_string()))?;
214
215 self.update_index()?;
217 Ok(())
218 }
219
220 pub fn commit(&mut self) -> ProviderResult<()> {
222 let start = Instant::now();
223
224 if let Some((to_delete, last_block_number)) = self.prune_on_commit.take() {
226 match self.writer.user_header().segment() {
227 StaticFileSegment::Headers => self.prune_header_data(to_delete)?,
228 StaticFileSegment::Transactions => self
229 .prune_transaction_data(to_delete, last_block_number.expect("should exist"))?,
230 StaticFileSegment::Receipts => {
231 self.prune_receipt_data(to_delete, last_block_number.expect("should exist"))?
232 }
233 }
234 }
235
236 if self.writer.is_dirty() {
237 self.writer.commit().map_err(|e| ProviderError::NippyJar(e.to_string()))?;
239
240 if let Some(metrics) = &self.metrics {
241 metrics.record_segment_operation(
242 self.writer.user_header().segment(),
243 StaticFileProviderOperation::CommitWriter,
244 Some(start.elapsed()),
245 );
246 }
247
248 debug!(
249 target: "provider::static_file",
250 segment = ?self.writer.user_header().segment(),
251 path = ?self.data_path,
252 duration = ?start.elapsed(),
253 "Commit"
254 );
255
256 self.update_index()?;
257 }
258
259 Ok(())
260 }
261
262 #[cfg(feature = "test-utils")]
266 pub fn commit_without_sync_all(&mut self) -> ProviderResult<()> {
267 let start = Instant::now();
268
269 self.writer
271 .commit_without_sync_all()
272 .map_err(|e| ProviderError::NippyJar(e.to_string()))?;
273
274 if let Some(metrics) = &self.metrics {
275 metrics.record_segment_operation(
276 self.writer.user_header().segment(),
277 StaticFileProviderOperation::CommitWriter,
278 Some(start.elapsed()),
279 );
280 }
281
282 debug!(
283 target: "provider::static_file",
284 segment = ?self.writer.user_header().segment(),
285 path = ?self.data_path,
286 duration = ?start.elapsed(),
287 "Commit"
288 );
289
290 self.update_index()?;
291
292 Ok(())
293 }
294
295 fn update_index(&self) -> ProviderResult<()> {
297 let segment_max_block = self
306 .writer
307 .user_header()
308 .block_range()
309 .as_ref()
310 .map(|block_range| block_range.end())
311 .or_else(|| {
312 (self.writer.user_header().expected_block_start() > 0)
313 .then(|| self.writer.user_header().expected_block_start() - 1)
314 });
315
316 self.reader().update_index(self.writer.user_header().segment(), segment_max_block)
317 }
318
319 pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
324 let segment = self.writer.user_header().segment();
325
326 self.check_next_block_number(expected_block_number)?;
327
328 let start = Instant::now();
329 if let Some(last_block) = self.writer.user_header().block_end() {
330 if last_block == self.writer.user_header().expected_block_end() {
332 self.commit()?;
334
335 let (writer, data_path) =
337 Self::open(segment, last_block + 1, self.reader.clone(), self.metrics.clone())?;
338 self.writer = writer;
339 self.data_path = data_path;
340
341 *self.writer.user_header_mut() = SegmentHeader::new(
342 self.reader().find_fixed_range(last_block + 1),
343 None,
344 None,
345 segment,
346 );
347 }
348 }
349
350 self.writer.user_header_mut().increment_block();
351 if let Some(metrics) = &self.metrics {
352 metrics.record_segment_operation(
353 segment,
354 StaticFileProviderOperation::IncrementBlock,
355 Some(start.elapsed()),
356 );
357 }
358
359 Ok(())
360 }
361
362 fn check_next_block_number(&self, expected_block_number: u64) -> ProviderResult<()> {
365 let next_static_file_block = self
369 .writer
370 .user_header()
371 .block_end()
372 .map(|b| b + 1)
373 .unwrap_or_else(|| self.writer.user_header().expected_block_start());
374
375 if expected_block_number != next_static_file_block {
376 return Err(ProviderError::UnexpectedStaticFileBlockNumber(
377 self.writer.user_header().segment(),
378 expected_block_number,
379 next_static_file_block,
380 ))
381 }
382 Ok(())
383 }
384
385 fn truncate(&mut self, num_rows: u64, last_block: Option<u64>) -> ProviderResult<()> {
393 let mut remaining_rows = num_rows;
394 let segment = self.writer.user_header().segment();
395 while remaining_rows > 0 {
396 let len = match segment {
397 StaticFileSegment::Headers => {
398 self.writer.user_header().block_len().unwrap_or_default()
399 }
400 StaticFileSegment::Transactions | StaticFileSegment::Receipts => {
401 self.writer.user_header().tx_len().unwrap_or_default()
402 }
403 };
404
405 if remaining_rows >= len {
406 let block_start = self.writer.user_header().expected_block_start();
409
410 if block_start != 0 &&
416 (segment.is_headers() || last_block.is_some_and(|b| b < block_start))
417 {
418 self.delete_current_and_open_previous()?;
419 } else {
420 self.writer.user_header_mut().prune(len);
422 self.writer
423 .prune_rows(len as usize)
424 .map_err(|e| ProviderError::NippyJar(e.to_string()))?;
425 break
426 }
427
428 remaining_rows -= len;
429 } else {
430 self.writer.user_header_mut().prune(remaining_rows);
432
433 self.writer
435 .prune_rows(remaining_rows as usize)
436 .map_err(|e| ProviderError::NippyJar(e.to_string()))?;
437 remaining_rows = 0;
438 }
439 }
440
441 if let Some(last_block) = last_block {
443 let mut expected_block_start = self.writer.user_header().expected_block_start();
444
445 if num_rows == 0 {
446 while last_block < expected_block_start {
450 self.delete_current_and_open_previous()?;
451 expected_block_start = self.writer.user_header().expected_block_start();
452 }
453 }
454 self.writer.user_header_mut().set_block_range(expected_block_start, last_block);
455 }
456
457 self.commit()?;
459
460 Ok(())
461 }
462
463 fn delete_current_and_open_previous(&mut self) -> Result<(), ProviderError> {
466 let current_path = self.data_path.clone();
467 let (previous_writer, data_path) = Self::open(
468 self.user_header().segment(),
469 self.writer.user_header().expected_block_start() - 1,
470 self.reader.clone(),
471 self.metrics.clone(),
472 )?;
473 self.writer = previous_writer;
474 self.writer.set_dirty();
475 self.data_path = data_path;
476 NippyJar::<SegmentHeader>::load(¤t_path)
477 .map_err(|e| ProviderError::NippyJar(e.to_string()))?
478 .delete()
479 .map_err(|e| ProviderError::NippyJar(e.to_string()))?;
480 Ok(())
481 }
482
483 fn append_column<T: Compact>(&mut self, column: T) -> ProviderResult<()> {
485 self.buf.clear();
486 column.to_compact(&mut self.buf);
487
488 self.writer
489 .append_column(Some(Ok(&self.buf)))
490 .map_err(|e| ProviderError::NippyJar(e.to_string()))?;
491 Ok(())
492 }
493
494 fn append_with_tx_number<V: Compact>(
498 &mut self,
499 tx_num: TxNumber,
500 value: V,
501 ) -> ProviderResult<()> {
502 if let Some(range) = self.writer.user_header().tx_range() {
503 let next_tx = range.end() + 1;
504 if next_tx != tx_num {
505 return Err(ProviderError::UnexpectedStaticFileTxNumber(
506 self.writer.user_header().segment(),
507 tx_num,
508 next_tx,
509 ))
510 }
511 self.writer.user_header_mut().increment_tx();
512 } else {
513 self.writer.user_header_mut().set_tx_range(tx_num, tx_num);
514 }
515
516 self.append_column(value)?;
517
518 Ok(())
519 }
520
521 pub fn append_header(
528 &mut self,
529 header: &N::BlockHeader,
530 total_difficulty: U256,
531 hash: &BlockHash,
532 ) -> ProviderResult<()>
533 where
534 N::BlockHeader: Compact,
535 {
536 let start = Instant::now();
537 self.ensure_no_queued_prune()?;
538
539 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
540
541 self.increment_block(header.number())?;
542
543 self.append_column(header)?;
544 self.append_column(CompactU256::from(total_difficulty))?;
545 self.append_column(hash)?;
546
547 if let Some(metrics) = &self.metrics {
548 metrics.record_segment_operation(
549 StaticFileSegment::Headers,
550 StaticFileProviderOperation::Append,
551 Some(start.elapsed()),
552 );
553 }
554
555 Ok(())
556 }
557
558 pub fn append_transaction(&mut self, tx_num: TxNumber, tx: &N::SignedTx) -> ProviderResult<()>
565 where
566 N::SignedTx: Compact,
567 {
568 let start = Instant::now();
569 self.ensure_no_queued_prune()?;
570
571 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
572 self.append_with_tx_number(tx_num, tx)?;
573
574 if let Some(metrics) = &self.metrics {
575 metrics.record_segment_operation(
576 StaticFileSegment::Transactions,
577 StaticFileProviderOperation::Append,
578 Some(start.elapsed()),
579 );
580 }
581
582 Ok(())
583 }
584
585 pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()>
592 where
593 N::Receipt: Compact,
594 {
595 let start = Instant::now();
596 self.ensure_no_queued_prune()?;
597
598 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
599 self.append_with_tx_number(tx_num, receipt)?;
600
601 if let Some(metrics) = &self.metrics {
602 metrics.record_segment_operation(
603 StaticFileSegment::Receipts,
604 StaticFileProviderOperation::Append,
605 Some(start.elapsed()),
606 );
607 }
608
609 Ok(())
610 }
611
612 pub fn append_receipts<I, R>(&mut self, receipts: I) -> ProviderResult<Option<TxNumber>>
616 where
617 I: Iterator<Item = Result<(TxNumber, R), ProviderError>>,
618 R: Borrow<N::Receipt>,
619 N::Receipt: Compact,
620 {
621 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
622
623 let mut receipts_iter = receipts.into_iter().peekable();
624 if receipts_iter.peek().is_none() {
626 return Ok(None);
627 }
628
629 let start = Instant::now();
630 self.ensure_no_queued_prune()?;
631
632 let mut tx_number = 0;
634 let mut count: u64 = 0;
635
636 for receipt_result in receipts_iter {
637 let (tx_num, receipt) = receipt_result?;
638 self.append_with_tx_number(tx_num, receipt.borrow())?;
639 tx_number = tx_num;
640 count += 1;
641 }
642
643 if let Some(metrics) = &self.metrics {
644 metrics.record_segment_operations(
645 StaticFileSegment::Receipts,
646 StaticFileProviderOperation::Append,
647 count,
648 Some(start.elapsed()),
649 );
650 }
651
652 Ok(Some(tx_number))
653 }
654
655 pub fn prune_transactions(
659 &mut self,
660 to_delete: u64,
661 last_block: BlockNumber,
662 ) -> ProviderResult<()> {
663 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Transactions);
664 self.queue_prune(to_delete, Some(last_block))
665 }
666
667 pub fn prune_receipts(
671 &mut self,
672 to_delete: u64,
673 last_block: BlockNumber,
674 ) -> ProviderResult<()> {
675 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Receipts);
676 self.queue_prune(to_delete, Some(last_block))
677 }
678
679 pub fn prune_headers(&mut self, to_delete: u64) -> ProviderResult<()> {
681 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Headers);
682 self.queue_prune(to_delete, None)
683 }
684
685 fn queue_prune(
690 &mut self,
691 to_delete: u64,
692 last_block: Option<BlockNumber>,
693 ) -> ProviderResult<()> {
694 self.ensure_no_queued_prune()?;
695 self.prune_on_commit = Some((to_delete, last_block));
696 Ok(())
697 }
698
699 fn ensure_no_queued_prune(&self) -> ProviderResult<()> {
701 if self.prune_on_commit.is_some() {
702 return Err(ProviderError::NippyJar(
703 "Pruning should be committed before appending or pruning more data".to_string(),
704 ))
705 }
706 Ok(())
707 }
708
709 fn prune_transaction_data(
711 &mut self,
712 to_delete: u64,
713 last_block: BlockNumber,
714 ) -> ProviderResult<()> {
715 let start = Instant::now();
716
717 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
718
719 self.truncate(to_delete, Some(last_block))?;
720
721 if let Some(metrics) = &self.metrics {
722 metrics.record_segment_operation(
723 StaticFileSegment::Transactions,
724 StaticFileProviderOperation::Prune,
725 Some(start.elapsed()),
726 );
727 }
728
729 Ok(())
730 }
731
732 fn prune_receipt_data(
734 &mut self,
735 to_delete: u64,
736 last_block: BlockNumber,
737 ) -> ProviderResult<()> {
738 let start = Instant::now();
739
740 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
741
742 self.truncate(to_delete, Some(last_block))?;
743
744 if let Some(metrics) = &self.metrics {
745 metrics.record_segment_operation(
746 StaticFileSegment::Receipts,
747 StaticFileProviderOperation::Prune,
748 Some(start.elapsed()),
749 );
750 }
751
752 Ok(())
753 }
754
755 fn prune_header_data(&mut self, to_delete: u64) -> ProviderResult<()> {
757 let start = Instant::now();
758
759 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
760
761 self.truncate(to_delete, None)?;
762
763 if let Some(metrics) = &self.metrics {
764 metrics.record_segment_operation(
765 StaticFileSegment::Headers,
766 StaticFileProviderOperation::Prune,
767 Some(start.elapsed()),
768 );
769 }
770
771 Ok(())
772 }
773
774 fn reader(&self) -> StaticFileProvider<N> {
775 Self::upgrade_provider_to_strong_reference(&self.reader)
776 }
777
778 fn upgrade_provider_to_strong_reference(
787 provider: &Weak<StaticFileProviderInner<N>>,
788 ) -> StaticFileProvider<N> {
789 provider.upgrade().map(StaticFileProvider).expect("StaticFileProvider is dropped")
790 }
791
792 pub const fn user_header(&self) -> &SegmentHeader {
794 self.writer.user_header()
795 }
796
797 pub fn user_header_mut(&mut self) -> &mut SegmentHeader {
799 self.writer.user_header_mut()
800 }
801
802 #[cfg(any(test, feature = "test-utils"))]
804 pub fn set_block_range(&mut self, block_range: std::ops::RangeInclusive<BlockNumber>) {
805 self.writer.user_header_mut().set_block_range(*block_range.start(), *block_range.end())
806 }
807
808 #[cfg(any(test, feature = "test-utils"))]
810 pub fn inner(&mut self) -> &mut NippyJarWriter<SegmentHeader> {
811 &mut self.writer
812 }
813}
814
815fn create_jar(
816 segment: StaticFileSegment,
817 path: &Path,
818 expected_block_range: SegmentRangeInclusive,
819) -> NippyJar<SegmentHeader> {
820 let mut jar = NippyJar::new(
821 segment.columns(),
822 path,
823 SegmentHeader::new(expected_block_range, None, None, segment),
824 );
825
826 if segment.is_headers() {
829 jar = jar.with_lz4();
830 }
831
832 jar
833}