reth_engine_tree/
persistence.rs

1use crate::metrics::PersistenceMetrics;
2use alloy_consensus::BlockHeader;
3use alloy_eips::BlockNumHash;
4use reth_chain_state::ExecutedBlock;
5use reth_errors::ProviderError;
6use reth_primitives::{EthPrimitives, NodePrimitives};
7use reth_provider::{
8    providers::ProviderNodeTypes, writer::UnifiedStorageWriter, BlockHashReader,
9    ChainStateBlockWriter, DatabaseProviderFactory, ProviderFactory, StaticFileProviderFactory,
10};
11use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory};
12use reth_stages_api::{MetricEvent, MetricEventsSender};
13use std::{
14    sync::mpsc::{Receiver, SendError, Sender},
15    time::Instant,
16};
17use thiserror::Error;
18use tokio::sync::oneshot;
19use tracing::{debug, error};
20
21/// Writes parts of reth's in memory tree state to the database and static files.
22///
23/// This is meant to be a spawned service that listens for various incoming persistence operations,
24/// performing those actions on disk, and returning the result in a channel.
25///
26/// This should be spawned in its own thread with [`std::thread::spawn`], since this performs
27/// blocking I/O operations in an endless loop.
28#[derive(Debug)]
29pub struct PersistenceService<N>
30where
31    N: ProviderNodeTypes,
32{
33    /// The provider factory to use
34    provider: ProviderFactory<N>,
35    /// Incoming requests
36    incoming: Receiver<PersistenceAction<N::Primitives>>,
37    /// The pruner
38    pruner: PrunerWithFactory<ProviderFactory<N>>,
39    /// metrics
40    metrics: PersistenceMetrics,
41    /// Sender for sync metrics - we only submit sync metrics for persisted blocks
42    sync_metrics_tx: MetricEventsSender,
43}
44
45impl<N> PersistenceService<N>
46where
47    N: ProviderNodeTypes,
48{
49    /// Create a new persistence service
50    pub fn new(
51        provider: ProviderFactory<N>,
52        incoming: Receiver<PersistenceAction<N::Primitives>>,
53        pruner: PrunerWithFactory<ProviderFactory<N>>,
54        sync_metrics_tx: MetricEventsSender,
55    ) -> Self {
56        Self { provider, incoming, pruner, metrics: PersistenceMetrics::default(), sync_metrics_tx }
57    }
58
59    /// Prunes block data before the given block hash according to the configured prune
60    /// configuration.
61    fn prune_before(&mut self, block_num: u64) -> Result<PrunerOutput, PrunerError> {
62        debug!(target: "engine::persistence", ?block_num, "Running pruner");
63        let start_time = Instant::now();
64        // TODO: doing this properly depends on pruner segment changes
65        let result = self.pruner.run(block_num);
66        self.metrics.prune_before_duration_seconds.record(start_time.elapsed());
67        result
68    }
69}
70
71impl<N> PersistenceService<N>
72where
73    N: ProviderNodeTypes,
74{
75    /// This is the main loop, that will listen to database events and perform the requested
76    /// database actions
77    pub fn run(mut self) -> Result<(), PersistenceError> {
78        // If the receiver errors then senders have disconnected, so the loop should then end.
79        while let Ok(action) = self.incoming.recv() {
80            match action {
81                PersistenceAction::RemoveBlocksAbove(new_tip_num, sender) => {
82                    let result = self.on_remove_blocks_above(new_tip_num)?;
83                    // send new sync metrics based on removed blocks
84                    let _ =
85                        self.sync_metrics_tx.send(MetricEvent::SyncHeight { height: new_tip_num });
86                    // we ignore the error because the caller may or may not care about the result
87                    let _ = sender.send(result);
88                }
89                PersistenceAction::SaveBlocks(blocks, sender) => {
90                    let result = self.on_save_blocks(blocks)?;
91                    let result_number = result.map(|r| r.number);
92
93                    // we ignore the error because the caller may or may not care about the result
94                    let _ = sender.send(result);
95
96                    if let Some(block_number) = result_number {
97                        // send new sync metrics based on saved blocks
98                        let _ = self
99                            .sync_metrics_tx
100                            .send(MetricEvent::SyncHeight { height: block_number });
101
102                        if self.pruner.is_pruning_needed(block_number) {
103                            // We log `PrunerOutput` inside the `Pruner`
104                            let _ = self.prune_before(block_number)?;
105                        }
106                    }
107                }
108                PersistenceAction::SaveFinalizedBlock(finalized_block) => {
109                    let provider = self.provider.database_provider_rw()?;
110                    provider.save_finalized_block_number(finalized_block)?;
111                    provider.commit()?;
112                }
113                PersistenceAction::SaveSafeBlock(safe_block) => {
114                    let provider = self.provider.database_provider_rw()?;
115                    provider.save_safe_block_number(safe_block)?;
116                    provider.commit()?;
117                }
118            }
119        }
120        Ok(())
121    }
122
123    fn on_remove_blocks_above(
124        &self,
125        new_tip_num: u64,
126    ) -> Result<Option<BlockNumHash>, PersistenceError> {
127        debug!(target: "engine::persistence", ?new_tip_num, "Removing blocks");
128        let start_time = Instant::now();
129        let provider_rw = self.provider.database_provider_rw()?;
130        let sf_provider = self.provider.static_file_provider();
131
132        let new_tip_hash = provider_rw.block_hash(new_tip_num)?;
133        UnifiedStorageWriter::from(&provider_rw, &sf_provider).remove_blocks_above(new_tip_num)?;
134        UnifiedStorageWriter::commit_unwind(provider_rw)?;
135
136        debug!(target: "engine::persistence", ?new_tip_num, ?new_tip_hash, "Removed blocks from disk");
137        self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed());
138        Ok(new_tip_hash.map(|hash| BlockNumHash { hash, number: new_tip_num }))
139    }
140
141    fn on_save_blocks(
142        &self,
143        blocks: Vec<ExecutedBlock<N::Primitives>>,
144    ) -> Result<Option<BlockNumHash>, PersistenceError> {
145        debug!(target: "engine::persistence", first=?blocks.first().map(|b| b.block.num_hash()), last=?blocks.last().map(|b| b.block.num_hash()), "Saving range of blocks");
146        let start_time = Instant::now();
147        let last_block_hash_num = blocks.last().map(|block| BlockNumHash {
148            hash: block.block().hash(),
149            number: block.block().header().number(),
150        });
151
152        if last_block_hash_num.is_some() {
153            let provider_rw = self.provider.database_provider_rw()?;
154            let static_file_provider = self.provider.static_file_provider();
155
156            UnifiedStorageWriter::from(&provider_rw, &static_file_provider).save_blocks(blocks)?;
157            UnifiedStorageWriter::commit(provider_rw)?;
158        }
159        self.metrics.save_blocks_duration_seconds.record(start_time.elapsed());
160        Ok(last_block_hash_num)
161    }
162}
163
164/// One of the errors that can happen when using the persistence service.
165#[derive(Debug, Error)]
166pub enum PersistenceError {
167    /// A pruner error
168    #[error(transparent)]
169    PrunerError(#[from] PrunerError),
170
171    /// A provider error
172    #[error(transparent)]
173    ProviderError(#[from] ProviderError),
174}
175
176/// A signal to the persistence service that part of the tree state can be persisted.
177#[derive(Debug)]
178pub enum PersistenceAction<N: NodePrimitives = EthPrimitives> {
179    /// The section of tree state that should be persisted. These blocks are expected in order of
180    /// increasing block number.
181    ///
182    /// First, header, transaction, and receipt-related data should be written to static files.
183    /// Then the execution history-related data will be written to the database.
184    SaveBlocks(Vec<ExecutedBlock<N>>, oneshot::Sender<Option<BlockNumHash>>),
185
186    /// Removes block data above the given block number from the database.
187    ///
188    /// This will first update checkpoints from the database, then remove actual block data from
189    /// static files.
190    RemoveBlocksAbove(u64, oneshot::Sender<Option<BlockNumHash>>),
191
192    /// Update the persisted finalized block on disk
193    SaveFinalizedBlock(u64),
194
195    /// Update the persisted safe block on disk
196    SaveSafeBlock(u64),
197}
198
199/// A handle to the persistence service
200#[derive(Debug, Clone)]
201pub struct PersistenceHandle<N: NodePrimitives = EthPrimitives> {
202    /// The channel used to communicate with the persistence service
203    sender: Sender<PersistenceAction<N>>,
204}
205
206impl<T: NodePrimitives> PersistenceHandle<T> {
207    /// Create a new [`PersistenceHandle`] from a [`Sender<PersistenceAction>`].
208    pub const fn new(sender: Sender<PersistenceAction<T>>) -> Self {
209        Self { sender }
210    }
211
212    /// Create a new [`PersistenceHandle`], and spawn the persistence service.
213    pub fn spawn_service<N>(
214        provider_factory: ProviderFactory<N>,
215        pruner: PrunerWithFactory<ProviderFactory<N>>,
216        sync_metrics_tx: MetricEventsSender,
217    ) -> PersistenceHandle<N::Primitives>
218    where
219        N: ProviderNodeTypes,
220    {
221        // create the initial channels
222        let (db_service_tx, db_service_rx) = std::sync::mpsc::channel();
223
224        // construct persistence handle
225        let persistence_handle = PersistenceHandle::new(db_service_tx);
226
227        // spawn the persistence service
228        let db_service =
229            PersistenceService::new(provider_factory, db_service_rx, pruner, sync_metrics_tx);
230        std::thread::Builder::new()
231            .name("Persistence Service".to_string())
232            .spawn(|| {
233                if let Err(err) = db_service.run() {
234                    error!(target: "engine::persistence", ?err, "Persistence service failed");
235                }
236            })
237            .unwrap();
238
239        persistence_handle
240    }
241
242    /// Sends a specific [`PersistenceAction`] in the contained channel. The caller is responsible
243    /// for creating any channels for the given action.
244    pub fn send_action(
245        &self,
246        action: PersistenceAction<T>,
247    ) -> Result<(), SendError<PersistenceAction<T>>> {
248        self.sender.send(action)
249    }
250
251    /// Tells the persistence service to save a certain list of finalized blocks. The blocks are
252    /// assumed to be ordered by block number.
253    ///
254    /// This returns the latest hash that has been saved, allowing removal of that block and any
255    /// previous blocks from in-memory data structures. This value is returned in the receiver end
256    /// of the sender argument.
257    ///
258    /// If there are no blocks to persist, then `None` is sent in the sender.
259    pub fn save_blocks(
260        &self,
261        blocks: Vec<ExecutedBlock<T>>,
262        tx: oneshot::Sender<Option<BlockNumHash>>,
263    ) -> Result<(), SendError<PersistenceAction<T>>> {
264        self.send_action(PersistenceAction::SaveBlocks(blocks, tx))
265    }
266
267    /// Persists the finalized block number on disk.
268    pub fn save_finalized_block_number(
269        &self,
270        finalized_block: u64,
271    ) -> Result<(), SendError<PersistenceAction<T>>> {
272        self.send_action(PersistenceAction::SaveFinalizedBlock(finalized_block))
273    }
274
275    /// Persists the finalized block number on disk.
276    pub fn save_safe_block_number(
277        &self,
278        safe_block: u64,
279    ) -> Result<(), SendError<PersistenceAction<T>>> {
280        self.send_action(PersistenceAction::SaveSafeBlock(safe_block))
281    }
282
283    /// Tells the persistence service to remove blocks above a certain block number. The removed
284    /// blocks are returned by the service.
285    ///
286    /// When the operation completes, the new tip hash is returned in the receiver end of the sender
287    /// argument.
288    pub fn remove_blocks_above(
289        &self,
290        block_num: u64,
291        tx: oneshot::Sender<Option<BlockNumHash>>,
292    ) -> Result<(), SendError<PersistenceAction<T>>> {
293        self.send_action(PersistenceAction::RemoveBlocksAbove(block_num, tx))
294    }
295}
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300    use alloy_primitives::B256;
301    use reth_chain_state::test_utils::TestBlockBuilder;
302    use reth_exex_types::FinishedExExHeight;
303    use reth_provider::test_utils::create_test_provider_factory;
304    use reth_prune::Pruner;
305    use tokio::sync::mpsc::unbounded_channel;
306
307    fn default_persistence_handle() -> PersistenceHandle<EthPrimitives> {
308        let provider = create_test_provider_factory();
309
310        let (_finished_exex_height_tx, finished_exex_height_rx) =
311            tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
312
313        let pruner =
314            Pruner::new_with_factory(provider.clone(), vec![], 5, 0, None, finished_exex_height_rx);
315
316        let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
317        PersistenceHandle::<EthPrimitives>::spawn_service(provider, pruner, sync_metrics_tx)
318    }
319
320    #[tokio::test]
321    async fn test_save_blocks_empty() {
322        reth_tracing::init_test_tracing();
323        let persistence_handle = default_persistence_handle();
324
325        let blocks = vec![];
326        let (tx, rx) = oneshot::channel();
327
328        persistence_handle.save_blocks(blocks, tx).unwrap();
329
330        let hash = rx.await.unwrap();
331        assert_eq!(hash, None);
332    }
333
334    #[tokio::test]
335    async fn test_save_blocks_single_block() {
336        reth_tracing::init_test_tracing();
337        let persistence_handle = default_persistence_handle();
338        let block_number = 0;
339        let mut test_block_builder = TestBlockBuilder::default();
340        let executed =
341            test_block_builder.get_executed_block_with_number(block_number, B256::random());
342        let block_hash = executed.block().hash();
343
344        let blocks = vec![executed];
345        let (tx, rx) = oneshot::channel();
346
347        persistence_handle.save_blocks(blocks, tx).unwrap();
348
349        let BlockNumHash { hash: actual_hash, number: _ } =
350            tokio::time::timeout(std::time::Duration::from_secs(10), rx)
351                .await
352                .expect("test timed out")
353                .expect("channel closed unexpectedly")
354                .expect("no hash returned");
355
356        assert_eq!(block_hash, actual_hash);
357    }
358
359    #[tokio::test]
360    async fn test_save_blocks_multiple_blocks() {
361        reth_tracing::init_test_tracing();
362        let persistence_handle = default_persistence_handle();
363
364        let mut test_block_builder = TestBlockBuilder::default();
365        let blocks = test_block_builder.get_executed_blocks(0..5).collect::<Vec<_>>();
366        let last_hash = blocks.last().unwrap().block().hash();
367        let (tx, rx) = oneshot::channel();
368
369        persistence_handle.save_blocks(blocks, tx).unwrap();
370        let BlockNumHash { hash: actual_hash, number: _ } = rx.await.unwrap().unwrap();
371        assert_eq!(last_hash, actual_hash);
372    }
373
374    #[tokio::test]
375    async fn test_save_blocks_multiple_calls() {
376        reth_tracing::init_test_tracing();
377        let persistence_handle = default_persistence_handle();
378
379        let ranges = [0..1, 1..2, 2..4, 4..5];
380        let mut test_block_builder = TestBlockBuilder::default();
381        for range in ranges {
382            let blocks = test_block_builder.get_executed_blocks(range).collect::<Vec<_>>();
383            let last_hash = blocks.last().unwrap().block().hash();
384            let (tx, rx) = oneshot::channel();
385
386            persistence_handle.save_blocks(blocks, tx).unwrap();
387
388            let BlockNumHash { hash: actual_hash, number: _ } = rx.await.unwrap().unwrap();
389            assert_eq!(last_hash, actual_hash);
390        }
391    }
392}