reth_provider/providers/static_file/
writer.rs

1use super::{
2    manager::StaticFileProviderInner, metrics::StaticFileProviderMetrics, StaticFileProvider,
3};
4use crate::providers::static_file::metrics::StaticFileProviderOperation;
5use alloy_consensus::BlockHeader;
6use alloy_primitives::{BlockHash, BlockNumber, TxNumber, U256};
7use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock};
8use reth_codecs::Compact;
9use reth_db_api::models::CompactU256;
10use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter};
11use reth_node_types::NodePrimitives;
12use reth_primitives::{
13    static_file::{SegmentHeader, SegmentRangeInclusive},
14    StaticFileSegment,
15};
16use reth_storage_errors::provider::{ProviderError, ProviderResult};
17use std::{
18    borrow::Borrow,
19    fmt::Debug,
20    path::{Path, PathBuf},
21    sync::{Arc, Weak},
22    time::Instant,
23};
24use tracing::debug;
25
26/// Static file writers for every known [`StaticFileSegment`].
27///
28/// WARNING: Trying to use more than one writer for the same segment type **will result in a
29/// deadlock**.
30#[derive(Debug)]
31pub(crate) struct StaticFileWriters<N> {
32    headers: RwLock<Option<StaticFileProviderRW<N>>>,
33    transactions: RwLock<Option<StaticFileProviderRW<N>>>,
34    receipts: RwLock<Option<StaticFileProviderRW<N>>>,
35}
36
37impl<N> Default for StaticFileWriters<N> {
38    fn default() -> Self {
39        Self {
40            headers: Default::default(),
41            transactions: Default::default(),
42            receipts: Default::default(),
43        }
44    }
45}
46
47impl<N: NodePrimitives> StaticFileWriters<N> {
48    pub(crate) fn get_or_create(
49        &self,
50        segment: StaticFileSegment,
51        create_fn: impl FnOnce() -> ProviderResult<StaticFileProviderRW<N>>,
52    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, N>> {
53        let mut write_guard = match segment {
54            StaticFileSegment::Headers => self.headers.write(),
55            StaticFileSegment::Transactions => self.transactions.write(),
56            StaticFileSegment::Receipts => self.receipts.write(),
57        };
58
59        if write_guard.is_none() {
60            *write_guard = Some(create_fn()?);
61        }
62
63        Ok(StaticFileProviderRWRefMut(write_guard))
64    }
65
66    pub(crate) fn commit(&self) -> ProviderResult<()> {
67        for writer_lock in [&self.headers, &self.transactions, &self.receipts] {
68            let mut writer = writer_lock.write();
69            if let Some(writer) = writer.as_mut() {
70                writer.commit()?;
71            }
72        }
73        Ok(())
74    }
75}
76
77/// Mutable reference to a [`StaticFileProviderRW`] behind a [`RwLockWriteGuard`].
78#[derive(Debug)]
79pub struct StaticFileProviderRWRefMut<'a, N>(
80    pub(crate) RwLockWriteGuard<'a, RawRwLock, Option<StaticFileProviderRW<N>>>,
81);
82
83impl<N> std::ops::DerefMut for StaticFileProviderRWRefMut<'_, N> {
84    fn deref_mut(&mut self) -> &mut Self::Target {
85        // This is always created by [`StaticFileWriters::get_or_create`]
86        self.0.as_mut().expect("static file writer provider should be init")
87    }
88}
89
90impl<N> std::ops::Deref for StaticFileProviderRWRefMut<'_, N> {
91    type Target = StaticFileProviderRW<N>;
92
93    fn deref(&self) -> &Self::Target {
94        // This is always created by [`StaticFileWriters::get_or_create`]
95        self.0.as_ref().expect("static file writer provider should be init")
96    }
97}
98
99#[derive(Debug)]
100/// Extends `StaticFileProvider` with writing capabilities
101pub struct StaticFileProviderRW<N> {
102    /// Reference back to the provider. We need [Weak] here because [`StaticFileProviderRW`] is
103    /// stored in a [`dashmap::DashMap`] inside the parent [`StaticFileProvider`].which is an
104    /// [Arc]. If we were to use an [Arc] here, we would create a reference cycle.
105    reader: Weak<StaticFileProviderInner<N>>,
106    /// A [`NippyJarWriter`] instance.
107    writer: NippyJarWriter<SegmentHeader>,
108    /// Path to opened file.
109    data_path: PathBuf,
110    /// Reusable buffer for encoding appended data.
111    buf: Vec<u8>,
112    /// Metrics.
113    metrics: Option<Arc<StaticFileProviderMetrics>>,
114    /// On commit, does the instructed pruning: number of lines, and if it applies, the last block
115    /// it ends at.
116    prune_on_commit: Option<(u64, Option<BlockNumber>)>,
117}
118
119impl<N: NodePrimitives> StaticFileProviderRW<N> {
120    /// Creates a new [`StaticFileProviderRW`] for a [`StaticFileSegment`].
121    ///
122    /// Before use, transaction based segments should ensure the block end range is the expected
123    /// one, and heal if not. For more check `Self::ensure_end_range_consistency`.
124    pub fn new(
125        segment: StaticFileSegment,
126        block: BlockNumber,
127        reader: Weak<StaticFileProviderInner<N>>,
128        metrics: Option<Arc<StaticFileProviderMetrics>>,
129    ) -> ProviderResult<Self> {
130        let (writer, data_path) = Self::open(segment, block, reader.clone(), metrics.clone())?;
131        let mut writer = Self {
132            writer,
133            data_path,
134            buf: Vec::with_capacity(100),
135            reader,
136            metrics,
137            prune_on_commit: None,
138        };
139
140        writer.ensure_end_range_consistency()?;
141
142        Ok(writer)
143    }
144
145    fn open(
146        segment: StaticFileSegment,
147        block: u64,
148        reader: Weak<StaticFileProviderInner<N>>,
149        metrics: Option<Arc<StaticFileProviderMetrics>>,
150    ) -> ProviderResult<(NippyJarWriter<SegmentHeader>, PathBuf)> {
151        let start = Instant::now();
152
153        let static_file_provider = Self::upgrade_provider_to_strong_reference(&reader);
154
155        let block_range = static_file_provider.find_fixed_range(block);
156        let (jar, path) = match static_file_provider.get_segment_provider_from_block(
157            segment,
158            block_range.start(),
159            None,
160        ) {
161            Ok(provider) => (
162                NippyJar::load(provider.data_path())
163                    .map_err(|e| ProviderError::NippyJar(e.to_string()))?,
164                provider.data_path().into(),
165            ),
166            Err(ProviderError::MissingStaticFileBlock(_, _)) => {
167                let path = static_file_provider.directory().join(segment.filename(&block_range));
168                (create_jar(segment, &path, block_range), path)
169            }
170            Err(err) => return Err(err),
171        };
172
173        let result = match NippyJarWriter::new(jar) {
174            Ok(writer) => Ok((writer, path)),
175            Err(NippyJarError::FrozenJar) => {
176                // This static file has been frozen, so we should
177                Err(ProviderError::FinalizedStaticFile(segment, block))
178            }
179            Err(e) => Err(ProviderError::NippyJar(e.to_string())),
180        }?;
181
182        if let Some(metrics) = &metrics {
183            metrics.record_segment_operation(
184                segment,
185                StaticFileProviderOperation::OpenWriter,
186                Some(start.elapsed()),
187            );
188        }
189
190        Ok(result)
191    }
192
193    /// If a file level healing happens, we need to update the end range on the
194    /// [`SegmentHeader`].
195    ///
196    /// However, for transaction based segments, the block end range has to be found and healed
197    /// externally.
198    ///
199    /// Check [`reth_nippy_jar::NippyJarChecker`] &
200    /// [`NippyJarWriter`] for more on healing.
201    fn ensure_end_range_consistency(&mut self) -> ProviderResult<()> {
202        // If we have lost rows (in this run or previous), we need to update the [SegmentHeader].
203        let expected_rows = if self.user_header().segment().is_headers() {
204            self.user_header().block_len().unwrap_or_default()
205        } else {
206            self.user_header().tx_len().unwrap_or_default()
207        };
208        let pruned_rows = expected_rows - self.writer.rows() as u64;
209        if pruned_rows > 0 {
210            self.user_header_mut().prune(pruned_rows);
211        }
212
213        self.writer.commit().map_err(|error| ProviderError::NippyJar(error.to_string()))?;
214
215        // Updates the [SnapshotProvider] manager
216        self.update_index()?;
217        Ok(())
218    }
219
220    /// Commits configuration changes to disk and updates the reader index with the new changes.
221    pub fn commit(&mut self) -> ProviderResult<()> {
222        let start = Instant::now();
223
224        // Truncates the data file if instructed to.
225        if let Some((to_delete, last_block_number)) = self.prune_on_commit.take() {
226            match self.writer.user_header().segment() {
227                StaticFileSegment::Headers => self.prune_header_data(to_delete)?,
228                StaticFileSegment::Transactions => self
229                    .prune_transaction_data(to_delete, last_block_number.expect("should exist"))?,
230                StaticFileSegment::Receipts => {
231                    self.prune_receipt_data(to_delete, last_block_number.expect("should exist"))?
232                }
233            }
234        }
235
236        if self.writer.is_dirty() {
237            // Commits offsets and new user_header to disk
238            self.writer.commit().map_err(|e| ProviderError::NippyJar(e.to_string()))?;
239
240            if let Some(metrics) = &self.metrics {
241                metrics.record_segment_operation(
242                    self.writer.user_header().segment(),
243                    StaticFileProviderOperation::CommitWriter,
244                    Some(start.elapsed()),
245                );
246            }
247
248            debug!(
249                target: "provider::static_file",
250                segment = ?self.writer.user_header().segment(),
251                path = ?self.data_path,
252                duration = ?start.elapsed(),
253                "Commit"
254            );
255
256            self.update_index()?;
257        }
258
259        Ok(())
260    }
261
262    /// Commits configuration changes to disk and updates the reader index with the new changes.
263    ///
264    /// CAUTION: does not call `sync_all` on the files.
265    #[cfg(feature = "test-utils")]
266    pub fn commit_without_sync_all(&mut self) -> ProviderResult<()> {
267        let start = Instant::now();
268
269        // Commits offsets and new user_header to disk
270        self.writer
271            .commit_without_sync_all()
272            .map_err(|e| ProviderError::NippyJar(e.to_string()))?;
273
274        if let Some(metrics) = &self.metrics {
275            metrics.record_segment_operation(
276                self.writer.user_header().segment(),
277                StaticFileProviderOperation::CommitWriter,
278                Some(start.elapsed()),
279            );
280        }
281
282        debug!(
283            target: "provider::static_file",
284            segment = ?self.writer.user_header().segment(),
285            path = ?self.data_path,
286            duration = ?start.elapsed(),
287            "Commit"
288        );
289
290        self.update_index()?;
291
292        Ok(())
293    }
294
295    /// Updates the `self.reader` internal index.
296    fn update_index(&self) -> ProviderResult<()> {
297        // We find the maximum block of the segment by checking this writer's last block.
298        //
299        // However if there's no block range (because there's no data), we try to calculate it by
300        // subtracting 1 from the expected block start, resulting on the last block of the
301        // previous file.
302        //
303        // If that expected block start is 0, then it means that there's no actual block data, and
304        // there's no block data in static files.
305        let segment_max_block = self
306            .writer
307            .user_header()
308            .block_range()
309            .as_ref()
310            .map(|block_range| block_range.end())
311            .or_else(|| {
312                (self.writer.user_header().expected_block_start() > 0)
313                    .then(|| self.writer.user_header().expected_block_start() - 1)
314            });
315
316        self.reader().update_index(self.writer.user_header().segment(), segment_max_block)
317    }
318
319    /// Allows to increment the [`SegmentHeader`] end block. It will commit the current static file,
320    /// and create the next one if we are past the end range.
321    ///
322    /// Returns the current [`BlockNumber`] as seen in the static file.
323    pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
324        let segment = self.writer.user_header().segment();
325
326        self.check_next_block_number(expected_block_number)?;
327
328        let start = Instant::now();
329        if let Some(last_block) = self.writer.user_header().block_end() {
330            // We have finished the previous static file and must freeze it
331            if last_block == self.writer.user_header().expected_block_end() {
332                // Commits offsets and new user_header to disk
333                self.commit()?;
334
335                // Opens the new static file
336                let (writer, data_path) =
337                    Self::open(segment, last_block + 1, self.reader.clone(), self.metrics.clone())?;
338                self.writer = writer;
339                self.data_path = data_path;
340
341                *self.writer.user_header_mut() = SegmentHeader::new(
342                    self.reader().find_fixed_range(last_block + 1),
343                    None,
344                    None,
345                    segment,
346                );
347            }
348        }
349
350        self.writer.user_header_mut().increment_block();
351        if let Some(metrics) = &self.metrics {
352            metrics.record_segment_operation(
353                segment,
354                StaticFileProviderOperation::IncrementBlock,
355                Some(start.elapsed()),
356            );
357        }
358
359        Ok(())
360    }
361
362    /// Verifies if the incoming block number matches the next expected block number
363    /// for a static file. This ensures data continuity when adding new blocks.
364    fn check_next_block_number(&self, expected_block_number: u64) -> ProviderResult<()> {
365        // The next static file block number can be found by checking the one after block_end.
366        // However if it's a new file that hasn't been added any data, its block range will actually
367        // be None. In that case, the next block will be found on `expected_block_start`.
368        let next_static_file_block = self
369            .writer
370            .user_header()
371            .block_end()
372            .map(|b| b + 1)
373            .unwrap_or_else(|| self.writer.user_header().expected_block_start());
374
375        if expected_block_number != next_static_file_block {
376            return Err(ProviderError::UnexpectedStaticFileBlockNumber(
377                self.writer.user_header().segment(),
378                expected_block_number,
379                next_static_file_block,
380            ))
381        }
382        Ok(())
383    }
384
385    /// Truncates a number of rows from disk. It deletes and loads an older static file if block
386    /// goes beyond the start of the current block range.
387    ///
388    /// **`last_block`** should be passed only with transaction based segments.
389    ///
390    /// # Note
391    /// Commits to the configuration file at the end.
392    fn truncate(&mut self, num_rows: u64, last_block: Option<u64>) -> ProviderResult<()> {
393        let mut remaining_rows = num_rows;
394        let segment = self.writer.user_header().segment();
395        while remaining_rows > 0 {
396            let len = match segment {
397                StaticFileSegment::Headers => {
398                    self.writer.user_header().block_len().unwrap_or_default()
399                }
400                StaticFileSegment::Transactions | StaticFileSegment::Receipts => {
401                    self.writer.user_header().tx_len().unwrap_or_default()
402                }
403            };
404
405            if remaining_rows >= len {
406                // If there's more rows to delete than this static file contains, then just
407                // delete the whole file and go to the next static file
408                let block_start = self.writer.user_header().expected_block_start();
409
410                // We only delete the file if it's NOT the first static file AND:
411                // * it's a Header segment  OR
412                // * it's a tx-based segment AND `last_block` is lower than the first block of this
413                //   file's block range. Otherwise, having no rows simply means that this block
414                //   range has no transactions, but the file should remain.
415                if block_start != 0 &&
416                    (segment.is_headers() || last_block.is_some_and(|b| b < block_start))
417                {
418                    self.delete_current_and_open_previous()?;
419                } else {
420                    // Update `SegmentHeader`
421                    self.writer.user_header_mut().prune(len);
422                    self.writer
423                        .prune_rows(len as usize)
424                        .map_err(|e| ProviderError::NippyJar(e.to_string()))?;
425                    break
426                }
427
428                remaining_rows -= len;
429            } else {
430                // Update `SegmentHeader`
431                self.writer.user_header_mut().prune(remaining_rows);
432
433                // Truncate data
434                self.writer
435                    .prune_rows(remaining_rows as usize)
436                    .map_err(|e| ProviderError::NippyJar(e.to_string()))?;
437                remaining_rows = 0;
438            }
439        }
440
441        // Only Transactions and Receipts
442        if let Some(last_block) = last_block {
443            let mut expected_block_start = self.writer.user_header().expected_block_start();
444
445            if num_rows == 0 {
446                // Edge case for when we are unwinding a chain of empty blocks that goes across
447                // files, and therefore, the only reference point to know which file
448                // we are supposed to be at is `last_block`.
449                while last_block < expected_block_start {
450                    self.delete_current_and_open_previous()?;
451                    expected_block_start = self.writer.user_header().expected_block_start();
452                }
453            }
454            self.writer.user_header_mut().set_block_range(expected_block_start, last_block);
455        }
456
457        // Commits new changes to disk.
458        self.commit()?;
459
460        Ok(())
461    }
462
463    /// Delete the current static file, and replace this provider writer with the previous static
464    /// file.
465    fn delete_current_and_open_previous(&mut self) -> Result<(), ProviderError> {
466        let current_path = self.data_path.clone();
467        let (previous_writer, data_path) = Self::open(
468            self.user_header().segment(),
469            self.writer.user_header().expected_block_start() - 1,
470            self.reader.clone(),
471            self.metrics.clone(),
472        )?;
473        self.writer = previous_writer;
474        self.writer.set_dirty();
475        self.data_path = data_path;
476        NippyJar::<SegmentHeader>::load(&current_path)
477            .map_err(|e| ProviderError::NippyJar(e.to_string()))?
478            .delete()
479            .map_err(|e| ProviderError::NippyJar(e.to_string()))?;
480        Ok(())
481    }
482
483    /// Appends column to static file.
484    fn append_column<T: Compact>(&mut self, column: T) -> ProviderResult<()> {
485        self.buf.clear();
486        column.to_compact(&mut self.buf);
487
488        self.writer
489            .append_column(Some(Ok(&self.buf)))
490            .map_err(|e| ProviderError::NippyJar(e.to_string()))?;
491        Ok(())
492    }
493
494    /// Appends to tx number-based static file.
495    ///
496    /// Returns the current [`TxNumber`] as seen in the static file.
497    fn append_with_tx_number<V: Compact>(
498        &mut self,
499        tx_num: TxNumber,
500        value: V,
501    ) -> ProviderResult<()> {
502        if let Some(range) = self.writer.user_header().tx_range() {
503            let next_tx = range.end() + 1;
504            if next_tx != tx_num {
505                return Err(ProviderError::UnexpectedStaticFileTxNumber(
506                    self.writer.user_header().segment(),
507                    tx_num,
508                    next_tx,
509                ))
510            }
511            self.writer.user_header_mut().increment_tx();
512        } else {
513            self.writer.user_header_mut().set_tx_range(tx_num, tx_num);
514        }
515
516        self.append_column(value)?;
517
518        Ok(())
519    }
520
521    /// Appends header to static file.
522    ///
523    /// It **CALLS** `increment_block()` since the number of headers is equal to the number of
524    /// blocks.
525    ///
526    /// Returns the current [`BlockNumber`] as seen in the static file.
527    pub fn append_header(
528        &mut self,
529        header: &N::BlockHeader,
530        total_difficulty: U256,
531        hash: &BlockHash,
532    ) -> ProviderResult<()>
533    where
534        N::BlockHeader: Compact,
535    {
536        let start = Instant::now();
537        self.ensure_no_queued_prune()?;
538
539        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
540
541        self.increment_block(header.number())?;
542
543        self.append_column(header)?;
544        self.append_column(CompactU256::from(total_difficulty))?;
545        self.append_column(hash)?;
546
547        if let Some(metrics) = &self.metrics {
548            metrics.record_segment_operation(
549                StaticFileSegment::Headers,
550                StaticFileProviderOperation::Append,
551                Some(start.elapsed()),
552            );
553        }
554
555        Ok(())
556    }
557
558    /// Appends transaction to static file.
559    ///
560    /// It **DOES NOT CALL** `increment_block()`, it should be handled elsewhere. There might be
561    /// empty blocks and this function wouldn't be called.
562    ///
563    /// Returns the current [`TxNumber`] as seen in the static file.
564    pub fn append_transaction(&mut self, tx_num: TxNumber, tx: &N::SignedTx) -> ProviderResult<()>
565    where
566        N::SignedTx: Compact,
567    {
568        let start = Instant::now();
569        self.ensure_no_queued_prune()?;
570
571        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
572        self.append_with_tx_number(tx_num, tx)?;
573
574        if let Some(metrics) = &self.metrics {
575            metrics.record_segment_operation(
576                StaticFileSegment::Transactions,
577                StaticFileProviderOperation::Append,
578                Some(start.elapsed()),
579            );
580        }
581
582        Ok(())
583    }
584
585    /// Appends receipt to static file.
586    ///
587    /// It **DOES NOT** call `increment_block()`, it should be handled elsewhere. There might be
588    /// empty blocks and this function wouldn't be called.
589    ///
590    /// Returns the current [`TxNumber`] as seen in the static file.
591    pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()>
592    where
593        N::Receipt: Compact,
594    {
595        let start = Instant::now();
596        self.ensure_no_queued_prune()?;
597
598        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
599        self.append_with_tx_number(tx_num, receipt)?;
600
601        if let Some(metrics) = &self.metrics {
602            metrics.record_segment_operation(
603                StaticFileSegment::Receipts,
604                StaticFileProviderOperation::Append,
605                Some(start.elapsed()),
606            );
607        }
608
609        Ok(())
610    }
611
612    /// Appends multiple receipts to the static file.
613    ///
614    /// Returns the current [`TxNumber`] as seen in the static file, if any.
615    pub fn append_receipts<I, R>(&mut self, receipts: I) -> ProviderResult<Option<TxNumber>>
616    where
617        I: Iterator<Item = Result<(TxNumber, R), ProviderError>>,
618        R: Borrow<N::Receipt>,
619        N::Receipt: Compact,
620    {
621        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
622
623        let mut receipts_iter = receipts.into_iter().peekable();
624        // If receipts are empty, we can simply return None
625        if receipts_iter.peek().is_none() {
626            return Ok(None);
627        }
628
629        let start = Instant::now();
630        self.ensure_no_queued_prune()?;
631
632        // At this point receipts contains at least one receipt, so this would be overwritten.
633        let mut tx_number = 0;
634        let mut count: u64 = 0;
635
636        for receipt_result in receipts_iter {
637            let (tx_num, receipt) = receipt_result?;
638            self.append_with_tx_number(tx_num, receipt.borrow())?;
639            tx_number = tx_num;
640            count += 1;
641        }
642
643        if let Some(metrics) = &self.metrics {
644            metrics.record_segment_operations(
645                StaticFileSegment::Receipts,
646                StaticFileProviderOperation::Append,
647                count,
648                Some(start.elapsed()),
649            );
650        }
651
652        Ok(Some(tx_number))
653    }
654
655    /// Adds an instruction to prune `to_delete`transactions during commit.
656    ///
657    /// Note: `last_block` refers to the block the unwinds ends at.
658    pub fn prune_transactions(
659        &mut self,
660        to_delete: u64,
661        last_block: BlockNumber,
662    ) -> ProviderResult<()> {
663        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Transactions);
664        self.queue_prune(to_delete, Some(last_block))
665    }
666
667    /// Adds an instruction to prune `to_delete` receipts during commit.
668    ///
669    /// Note: `last_block` refers to the block the unwinds ends at.
670    pub fn prune_receipts(
671        &mut self,
672        to_delete: u64,
673        last_block: BlockNumber,
674    ) -> ProviderResult<()> {
675        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Receipts);
676        self.queue_prune(to_delete, Some(last_block))
677    }
678
679    /// Adds an instruction to prune `to_delete` headers during commit.
680    pub fn prune_headers(&mut self, to_delete: u64) -> ProviderResult<()> {
681        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Headers);
682        self.queue_prune(to_delete, None)
683    }
684
685    /// Adds an instruction to prune `to_delete` elements during commit.
686    ///
687    /// Note: `last_block` refers to the block the unwinds ends at if dealing with transaction-based
688    /// data.
689    fn queue_prune(
690        &mut self,
691        to_delete: u64,
692        last_block: Option<BlockNumber>,
693    ) -> ProviderResult<()> {
694        self.ensure_no_queued_prune()?;
695        self.prune_on_commit = Some((to_delete, last_block));
696        Ok(())
697    }
698
699    /// Returns Error if there is a pruning instruction that needs to be applied.
700    fn ensure_no_queued_prune(&self) -> ProviderResult<()> {
701        if self.prune_on_commit.is_some() {
702            return Err(ProviderError::NippyJar(
703                "Pruning should be committed before appending or pruning more data".to_string(),
704            ))
705        }
706        Ok(())
707    }
708
709    /// Removes the last `to_delete` transactions from the data file.
710    fn prune_transaction_data(
711        &mut self,
712        to_delete: u64,
713        last_block: BlockNumber,
714    ) -> ProviderResult<()> {
715        let start = Instant::now();
716
717        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
718
719        self.truncate(to_delete, Some(last_block))?;
720
721        if let Some(metrics) = &self.metrics {
722            metrics.record_segment_operation(
723                StaticFileSegment::Transactions,
724                StaticFileProviderOperation::Prune,
725                Some(start.elapsed()),
726            );
727        }
728
729        Ok(())
730    }
731
732    /// Prunes the last `to_delete` receipts from the data file.
733    fn prune_receipt_data(
734        &mut self,
735        to_delete: u64,
736        last_block: BlockNumber,
737    ) -> ProviderResult<()> {
738        let start = Instant::now();
739
740        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
741
742        self.truncate(to_delete, Some(last_block))?;
743
744        if let Some(metrics) = &self.metrics {
745            metrics.record_segment_operation(
746                StaticFileSegment::Receipts,
747                StaticFileProviderOperation::Prune,
748                Some(start.elapsed()),
749            );
750        }
751
752        Ok(())
753    }
754
755    /// Prunes the last `to_delete` headers from the data file.
756    fn prune_header_data(&mut self, to_delete: u64) -> ProviderResult<()> {
757        let start = Instant::now();
758
759        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
760
761        self.truncate(to_delete, None)?;
762
763        if let Some(metrics) = &self.metrics {
764            metrics.record_segment_operation(
765                StaticFileSegment::Headers,
766                StaticFileProviderOperation::Prune,
767                Some(start.elapsed()),
768            );
769        }
770
771        Ok(())
772    }
773
774    fn reader(&self) -> StaticFileProvider<N> {
775        Self::upgrade_provider_to_strong_reference(&self.reader)
776    }
777
778    /// Upgrades a weak reference of [`StaticFileProviderInner`] to a strong reference
779    /// [`StaticFileProvider`].
780    ///
781    /// # Panics
782    ///
783    /// Panics if the parent [`StaticFileProvider`] is fully dropped while the child writer is still
784    /// active. In reality, it's impossible to detach the [`StaticFileProviderRW`] from the
785    /// [`StaticFileProvider`].
786    fn upgrade_provider_to_strong_reference(
787        provider: &Weak<StaticFileProviderInner<N>>,
788    ) -> StaticFileProvider<N> {
789        provider.upgrade().map(StaticFileProvider).expect("StaticFileProvider is dropped")
790    }
791
792    /// Helper function to access [`SegmentHeader`].
793    pub const fn user_header(&self) -> &SegmentHeader {
794        self.writer.user_header()
795    }
796
797    /// Helper function to access a mutable reference to [`SegmentHeader`].
798    pub fn user_header_mut(&mut self) -> &mut SegmentHeader {
799        self.writer.user_header_mut()
800    }
801
802    /// Helper function to override block range for testing.
803    #[cfg(any(test, feature = "test-utils"))]
804    pub fn set_block_range(&mut self, block_range: std::ops::RangeInclusive<BlockNumber>) {
805        self.writer.user_header_mut().set_block_range(*block_range.start(), *block_range.end())
806    }
807
808    /// Helper function to override block range for testing.
809    #[cfg(any(test, feature = "test-utils"))]
810    pub fn inner(&mut self) -> &mut NippyJarWriter<SegmentHeader> {
811        &mut self.writer
812    }
813}
814
815fn create_jar(
816    segment: StaticFileSegment,
817    path: &Path,
818    expected_block_range: SegmentRangeInclusive,
819) -> NippyJar<SegmentHeader> {
820    let mut jar = NippyJar::new(
821        segment.columns(),
822        path,
823        SegmentHeader::new(expected_block_range, None, None, segment),
824    );
825
826    // Transaction and Receipt already have the compression scheme used natively in its encoding.
827    // (zstd-dictionary)
828    if segment.is_headers() {
829        jar = jar.with_lz4();
830    }
831
832    jar
833}