reth_beacon_consensus/engine/hooks/
prune.rs1use crate::{
4 engine::hooks::{EngineHook, EngineHookContext, EngineHookError, EngineHookEvent},
5 hooks::EngineHookDBAccessLevel,
6};
7use alloy_primitives::BlockNumber;
8use futures::FutureExt;
9use metrics::Counter;
10use reth_errors::{RethError, RethResult};
11use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter};
12use reth_prune::{Pruner, PrunerError, PrunerWithResult};
13use reth_tasks::TaskSpawner;
14use std::{
15 fmt::{self, Debug},
16 task::{ready, Context, Poll},
17};
18use tokio::sync::oneshot;
19
20pub struct PruneHook<PF: DatabaseProviderFactory> {
24 pruner_state: PrunerState<PF>,
26 pruner_task_spawner: Box<dyn TaskSpawner>,
28 metrics: Metrics,
29}
30
31impl<PF> fmt::Debug for PruneHook<PF>
32where
33 PF: DatabaseProviderFactory<ProviderRW: fmt::Debug> + fmt::Debug,
34{
35 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
36 f.debug_struct("PruneHook")
37 .field("pruner_state", &self.pruner_state)
38 .field("metrics", &self.metrics)
39 .finish()
40 }
41}
42
43impl<PF: DatabaseProviderFactory> PruneHook<PF> {
44 pub fn new(
46 pruner: Pruner<PF::ProviderRW, PF>,
47 pruner_task_spawner: Box<dyn TaskSpawner>,
48 ) -> Self {
49 Self {
50 pruner_state: PrunerState::Idle(Some(pruner)),
51 pruner_task_spawner,
52 metrics: Metrics::default(),
53 }
54 }
55
56 fn poll_pruner(&mut self, cx: &mut Context<'_>) -> Poll<RethResult<EngineHookEvent>> {
60 let result = match self.pruner_state {
61 PrunerState::Idle(_) => return Poll::Pending,
62 PrunerState::Running(ref mut fut) => {
63 ready!(fut.poll_unpin(cx))
64 }
65 };
66
67 let event = match result {
68 Ok((pruner, result)) => {
69 self.pruner_state = PrunerState::Idle(Some(pruner));
70
71 match result {
72 Ok(_) => EngineHookEvent::Finished(Ok(())),
73 Err(err) => EngineHookEvent::Finished(Err(err.into())),
74 }
75 }
76 Err(_) => {
77 EngineHookEvent::Finished(Err(EngineHookError::ChannelClosed))
79 }
80 };
81
82 Poll::Ready(Ok(event))
83 }
84}
85
86impl<PF> PruneHook<PF>
87where
88 PF: DatabaseProviderFactory<ProviderRW: PruneCheckpointReader + PruneCheckpointWriter>
89 + 'static,
90{
91 fn try_spawn_pruner(&mut self, tip_block_number: BlockNumber) -> Option<EngineHookEvent> {
100 match &mut self.pruner_state {
101 PrunerState::Idle(pruner) => {
102 let mut pruner = pruner.take()?;
103
104 if pruner.is_pruning_needed(tip_block_number) {
106 let (tx, rx) = oneshot::channel();
107 self.pruner_task_spawner.spawn_critical_blocking(
108 "pruner task",
109 Box::pin(async move {
110 let result = pruner.run(tip_block_number);
111 let _ = tx.send((pruner, result));
112 }),
113 );
114 self.metrics.runs_total.increment(1);
115 self.pruner_state = PrunerState::Running(rx);
116
117 Some(EngineHookEvent::Started)
118 } else {
119 self.pruner_state = PrunerState::Idle(Some(pruner));
120 Some(EngineHookEvent::NotReady)
121 }
122 }
123 PrunerState::Running(_) => None,
124 }
125 }
126}
127
128impl<PF> EngineHook for PruneHook<PF>
129where
130 PF: DatabaseProviderFactory<ProviderRW: PruneCheckpointReader + PruneCheckpointWriter>
131 + 'static,
132{
133 fn name(&self) -> &'static str {
134 "Prune"
135 }
136
137 fn poll(
138 &mut self,
139 cx: &mut Context<'_>,
140 ctx: EngineHookContext,
141 ) -> Poll<RethResult<EngineHookEvent>> {
142 match self.try_spawn_pruner(ctx.tip_block_number) {
144 Some(EngineHookEvent::NotReady) => return Poll::Pending,
145 Some(event) => return Poll::Ready(Ok(event)),
146 None => (),
147 }
148
149 self.poll_pruner(cx)
151 }
152
153 fn db_access_level(&self) -> EngineHookDBAccessLevel {
154 EngineHookDBAccessLevel::ReadWrite
155 }
156}
157
158enum PrunerState<PF: DatabaseProviderFactory> {
168 Idle(Option<Pruner<PF::ProviderRW, PF>>),
170 Running(oneshot::Receiver<PrunerWithResult<PF::ProviderRW, PF>>),
172}
173
174impl<PF> fmt::Debug for PrunerState<PF>
175where
176 PF: DatabaseProviderFactory<ProviderRW: Debug> + Debug,
177{
178 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
179 match self {
180 Self::Idle(f0) => f.debug_tuple("Idle").field(&f0).finish(),
181 Self::Running(f0) => f.debug_tuple("Running").field(&f0).finish(),
182 }
183 }
184}
185
186#[derive(reth_metrics::Metrics)]
187#[metrics(scope = "consensus.engine.prune")]
188struct Metrics {
189 runs_total: Counter,
191}
192
193impl From<PrunerError> for EngineHookError {
194 fn from(err: PrunerError) -> Self {
195 match err {
196 PrunerError::PruneSegment(_) | PrunerError::InconsistentData(_) => {
197 Self::Internal(Box::new(err))
198 }
199 PrunerError::Database(err) => RethError::Database(err).into(),
200 PrunerError::Provider(err) => RethError::Provider(err).into(),
201 }
202 }
203}