1use crate::metrics::PersistenceMetrics;
2use alloy_consensus::BlockHeader;
3use alloy_eips::BlockNumHash;
4use reth_chain_state::ExecutedBlock;
5use reth_errors::ProviderError;
6use reth_primitives::{EthPrimitives, NodePrimitives};
7use reth_provider::{
8 providers::ProviderNodeTypes, writer::UnifiedStorageWriter, BlockHashReader,
9 ChainStateBlockWriter, DatabaseProviderFactory, ProviderFactory, StaticFileProviderFactory,
10};
11use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory};
12use reth_stages_api::{MetricEvent, MetricEventsSender};
13use std::{
14 sync::mpsc::{Receiver, SendError, Sender},
15 time::Instant,
16};
17use thiserror::Error;
18use tokio::sync::oneshot;
19use tracing::{debug, error};
20
21#[derive(Debug)]
29pub struct PersistenceService<N>
30where
31 N: ProviderNodeTypes,
32{
33 provider: ProviderFactory<N>,
35 incoming: Receiver<PersistenceAction<N::Primitives>>,
37 pruner: PrunerWithFactory<ProviderFactory<N>>,
39 metrics: PersistenceMetrics,
41 sync_metrics_tx: MetricEventsSender,
43}
44
45impl<N> PersistenceService<N>
46where
47 N: ProviderNodeTypes,
48{
49 pub fn new(
51 provider: ProviderFactory<N>,
52 incoming: Receiver<PersistenceAction<N::Primitives>>,
53 pruner: PrunerWithFactory<ProviderFactory<N>>,
54 sync_metrics_tx: MetricEventsSender,
55 ) -> Self {
56 Self { provider, incoming, pruner, metrics: PersistenceMetrics::default(), sync_metrics_tx }
57 }
58
59 fn prune_before(&mut self, block_num: u64) -> Result<PrunerOutput, PrunerError> {
62 debug!(target: "engine::persistence", ?block_num, "Running pruner");
63 let start_time = Instant::now();
64 let result = self.pruner.run(block_num);
66 self.metrics.prune_before_duration_seconds.record(start_time.elapsed());
67 result
68 }
69}
70
71impl<N> PersistenceService<N>
72where
73 N: ProviderNodeTypes,
74{
75 pub fn run(mut self) -> Result<(), PersistenceError> {
78 while let Ok(action) = self.incoming.recv() {
80 match action {
81 PersistenceAction::RemoveBlocksAbove(new_tip_num, sender) => {
82 let result = self.on_remove_blocks_above(new_tip_num)?;
83 let _ =
85 self.sync_metrics_tx.send(MetricEvent::SyncHeight { height: new_tip_num });
86 let _ = sender.send(result);
88 }
89 PersistenceAction::SaveBlocks(blocks, sender) => {
90 let result = self.on_save_blocks(blocks)?;
91 let result_number = result.map(|r| r.number);
92
93 let _ = sender.send(result);
95
96 if let Some(block_number) = result_number {
97 let _ = self
99 .sync_metrics_tx
100 .send(MetricEvent::SyncHeight { height: block_number });
101
102 if self.pruner.is_pruning_needed(block_number) {
103 let _ = self.prune_before(block_number)?;
105 }
106 }
107 }
108 PersistenceAction::SaveFinalizedBlock(finalized_block) => {
109 let provider = self.provider.database_provider_rw()?;
110 provider.save_finalized_block_number(finalized_block)?;
111 provider.commit()?;
112 }
113 PersistenceAction::SaveSafeBlock(safe_block) => {
114 let provider = self.provider.database_provider_rw()?;
115 provider.save_safe_block_number(safe_block)?;
116 provider.commit()?;
117 }
118 }
119 }
120 Ok(())
121 }
122
123 fn on_remove_blocks_above(
124 &self,
125 new_tip_num: u64,
126 ) -> Result<Option<BlockNumHash>, PersistenceError> {
127 debug!(target: "engine::persistence", ?new_tip_num, "Removing blocks");
128 let start_time = Instant::now();
129 let provider_rw = self.provider.database_provider_rw()?;
130 let sf_provider = self.provider.static_file_provider();
131
132 let new_tip_hash = provider_rw.block_hash(new_tip_num)?;
133 UnifiedStorageWriter::from(&provider_rw, &sf_provider).remove_blocks_above(new_tip_num)?;
134 UnifiedStorageWriter::commit_unwind(provider_rw)?;
135
136 debug!(target: "engine::persistence", ?new_tip_num, ?new_tip_hash, "Removed blocks from disk");
137 self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed());
138 Ok(new_tip_hash.map(|hash| BlockNumHash { hash, number: new_tip_num }))
139 }
140
141 fn on_save_blocks(
142 &self,
143 blocks: Vec<ExecutedBlock<N::Primitives>>,
144 ) -> Result<Option<BlockNumHash>, PersistenceError> {
145 debug!(target: "engine::persistence", first=?blocks.first().map(|b| b.block.num_hash()), last=?blocks.last().map(|b| b.block.num_hash()), "Saving range of blocks");
146 let start_time = Instant::now();
147 let last_block_hash_num = blocks.last().map(|block| BlockNumHash {
148 hash: block.block().hash(),
149 number: block.block().header().number(),
150 });
151
152 if last_block_hash_num.is_some() {
153 let provider_rw = self.provider.database_provider_rw()?;
154 let static_file_provider = self.provider.static_file_provider();
155
156 UnifiedStorageWriter::from(&provider_rw, &static_file_provider).save_blocks(blocks)?;
157 UnifiedStorageWriter::commit(provider_rw)?;
158 }
159 self.metrics.save_blocks_duration_seconds.record(start_time.elapsed());
160 Ok(last_block_hash_num)
161 }
162}
163
164#[derive(Debug, Error)]
166pub enum PersistenceError {
167 #[error(transparent)]
169 PrunerError(#[from] PrunerError),
170
171 #[error(transparent)]
173 ProviderError(#[from] ProviderError),
174}
175
176#[derive(Debug)]
178pub enum PersistenceAction<N: NodePrimitives = EthPrimitives> {
179 SaveBlocks(Vec<ExecutedBlock<N>>, oneshot::Sender<Option<BlockNumHash>>),
185
186 RemoveBlocksAbove(u64, oneshot::Sender<Option<BlockNumHash>>),
191
192 SaveFinalizedBlock(u64),
194
195 SaveSafeBlock(u64),
197}
198
199#[derive(Debug, Clone)]
201pub struct PersistenceHandle<N: NodePrimitives = EthPrimitives> {
202 sender: Sender<PersistenceAction<N>>,
204}
205
206impl<T: NodePrimitives> PersistenceHandle<T> {
207 pub const fn new(sender: Sender<PersistenceAction<T>>) -> Self {
209 Self { sender }
210 }
211
212 pub fn spawn_service<N>(
214 provider_factory: ProviderFactory<N>,
215 pruner: PrunerWithFactory<ProviderFactory<N>>,
216 sync_metrics_tx: MetricEventsSender,
217 ) -> PersistenceHandle<N::Primitives>
218 where
219 N: ProviderNodeTypes,
220 {
221 let (db_service_tx, db_service_rx) = std::sync::mpsc::channel();
223
224 let persistence_handle = PersistenceHandle::new(db_service_tx);
226
227 let db_service =
229 PersistenceService::new(provider_factory, db_service_rx, pruner, sync_metrics_tx);
230 std::thread::Builder::new()
231 .name("Persistence Service".to_string())
232 .spawn(|| {
233 if let Err(err) = db_service.run() {
234 error!(target: "engine::persistence", ?err, "Persistence service failed");
235 }
236 })
237 .unwrap();
238
239 persistence_handle
240 }
241
242 pub fn send_action(
245 &self,
246 action: PersistenceAction<T>,
247 ) -> Result<(), SendError<PersistenceAction<T>>> {
248 self.sender.send(action)
249 }
250
251 pub fn save_blocks(
260 &self,
261 blocks: Vec<ExecutedBlock<T>>,
262 tx: oneshot::Sender<Option<BlockNumHash>>,
263 ) -> Result<(), SendError<PersistenceAction<T>>> {
264 self.send_action(PersistenceAction::SaveBlocks(blocks, tx))
265 }
266
267 pub fn save_finalized_block_number(
269 &self,
270 finalized_block: u64,
271 ) -> Result<(), SendError<PersistenceAction<T>>> {
272 self.send_action(PersistenceAction::SaveFinalizedBlock(finalized_block))
273 }
274
275 pub fn save_safe_block_number(
277 &self,
278 safe_block: u64,
279 ) -> Result<(), SendError<PersistenceAction<T>>> {
280 self.send_action(PersistenceAction::SaveSafeBlock(safe_block))
281 }
282
283 pub fn remove_blocks_above(
289 &self,
290 block_num: u64,
291 tx: oneshot::Sender<Option<BlockNumHash>>,
292 ) -> Result<(), SendError<PersistenceAction<T>>> {
293 self.send_action(PersistenceAction::RemoveBlocksAbove(block_num, tx))
294 }
295}
296
297#[cfg(test)]
298mod tests {
299 use super::*;
300 use alloy_primitives::B256;
301 use reth_chain_state::test_utils::TestBlockBuilder;
302 use reth_exex_types::FinishedExExHeight;
303 use reth_provider::test_utils::create_test_provider_factory;
304 use reth_prune::Pruner;
305 use tokio::sync::mpsc::unbounded_channel;
306
307 fn default_persistence_handle() -> PersistenceHandle<EthPrimitives> {
308 let provider = create_test_provider_factory();
309
310 let (_finished_exex_height_tx, finished_exex_height_rx) =
311 tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
312
313 let pruner =
314 Pruner::new_with_factory(provider.clone(), vec![], 5, 0, None, finished_exex_height_rx);
315
316 let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
317 PersistenceHandle::<EthPrimitives>::spawn_service(provider, pruner, sync_metrics_tx)
318 }
319
320 #[tokio::test]
321 async fn test_save_blocks_empty() {
322 reth_tracing::init_test_tracing();
323 let persistence_handle = default_persistence_handle();
324
325 let blocks = vec![];
326 let (tx, rx) = oneshot::channel();
327
328 persistence_handle.save_blocks(blocks, tx).unwrap();
329
330 let hash = rx.await.unwrap();
331 assert_eq!(hash, None);
332 }
333
334 #[tokio::test]
335 async fn test_save_blocks_single_block() {
336 reth_tracing::init_test_tracing();
337 let persistence_handle = default_persistence_handle();
338 let block_number = 0;
339 let mut test_block_builder = TestBlockBuilder::default();
340 let executed =
341 test_block_builder.get_executed_block_with_number(block_number, B256::random());
342 let block_hash = executed.block().hash();
343
344 let blocks = vec![executed];
345 let (tx, rx) = oneshot::channel();
346
347 persistence_handle.save_blocks(blocks, tx).unwrap();
348
349 let BlockNumHash { hash: actual_hash, number: _ } =
350 tokio::time::timeout(std::time::Duration::from_secs(10), rx)
351 .await
352 .expect("test timed out")
353 .expect("channel closed unexpectedly")
354 .expect("no hash returned");
355
356 assert_eq!(block_hash, actual_hash);
357 }
358
359 #[tokio::test]
360 async fn test_save_blocks_multiple_blocks() {
361 reth_tracing::init_test_tracing();
362 let persistence_handle = default_persistence_handle();
363
364 let mut test_block_builder = TestBlockBuilder::default();
365 let blocks = test_block_builder.get_executed_blocks(0..5).collect::<Vec<_>>();
366 let last_hash = blocks.last().unwrap().block().hash();
367 let (tx, rx) = oneshot::channel();
368
369 persistence_handle.save_blocks(blocks, tx).unwrap();
370 let BlockNumHash { hash: actual_hash, number: _ } = rx.await.unwrap().unwrap();
371 assert_eq!(last_hash, actual_hash);
372 }
373
374 #[tokio::test]
375 async fn test_save_blocks_multiple_calls() {
376 reth_tracing::init_test_tracing();
377 let persistence_handle = default_persistence_handle();
378
379 let ranges = [0..1, 1..2, 2..4, 4..5];
380 let mut test_block_builder = TestBlockBuilder::default();
381 for range in ranges {
382 let blocks = test_block_builder.get_executed_blocks(range).collect::<Vec<_>>();
383 let last_hash = blocks.last().unwrap().block().hash();
384 let (tx, rx) = oneshot::channel();
385
386 persistence_handle.save_blocks(blocks, tx).unwrap();
387
388 let BlockNumHash { hash: actual_hash, number: _ } = rx.await.unwrap().unwrap();
389 assert_eq!(last_hash, actual_hash);
390 }
391 }
392}