reth_beacon_consensus/engine/hooks/
prune.rs

1//! Prune hook for the engine implementation.
2
3use crate::{
4    engine::hooks::{EngineHook, EngineHookContext, EngineHookError, EngineHookEvent},
5    hooks::EngineHookDBAccessLevel,
6};
7use alloy_primitives::BlockNumber;
8use futures::FutureExt;
9use metrics::Counter;
10use reth_errors::{RethError, RethResult};
11use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter};
12use reth_prune::{Pruner, PrunerError, PrunerWithResult};
13use reth_tasks::TaskSpawner;
14use std::{
15    fmt::{self, Debug},
16    task::{ready, Context, Poll},
17};
18use tokio::sync::oneshot;
19
20/// Manages pruning under the control of the engine.
21///
22/// This type controls the [Pruner].
23pub struct PruneHook<PF: DatabaseProviderFactory> {
24    /// The current state of the pruner.
25    pruner_state: PrunerState<PF>,
26    /// The type that can spawn the pruner task.
27    pruner_task_spawner: Box<dyn TaskSpawner>,
28    metrics: Metrics,
29}
30
31impl<PF> fmt::Debug for PruneHook<PF>
32where
33    PF: DatabaseProviderFactory<ProviderRW: fmt::Debug> + fmt::Debug,
34{
35    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
36        f.debug_struct("PruneHook")
37            .field("pruner_state", &self.pruner_state)
38            .field("metrics", &self.metrics)
39            .finish()
40    }
41}
42
43impl<PF: DatabaseProviderFactory> PruneHook<PF> {
44    /// Create a new instance
45    pub fn new(
46        pruner: Pruner<PF::ProviderRW, PF>,
47        pruner_task_spawner: Box<dyn TaskSpawner>,
48    ) -> Self {
49        Self {
50            pruner_state: PrunerState::Idle(Some(pruner)),
51            pruner_task_spawner,
52            metrics: Metrics::default(),
53        }
54    }
55
56    /// Advances the pruner state.
57    ///
58    /// This checks for the result in the channel, or returns pending if the pruner is idle.
59    fn poll_pruner(&mut self, cx: &mut Context<'_>) -> Poll<RethResult<EngineHookEvent>> {
60        let result = match self.pruner_state {
61            PrunerState::Idle(_) => return Poll::Pending,
62            PrunerState::Running(ref mut fut) => {
63                ready!(fut.poll_unpin(cx))
64            }
65        };
66
67        let event = match result {
68            Ok((pruner, result)) => {
69                self.pruner_state = PrunerState::Idle(Some(pruner));
70
71                match result {
72                    Ok(_) => EngineHookEvent::Finished(Ok(())),
73                    Err(err) => EngineHookEvent::Finished(Err(err.into())),
74                }
75            }
76            Err(_) => {
77                // failed to receive the pruner
78                EngineHookEvent::Finished(Err(EngineHookError::ChannelClosed))
79            }
80        };
81
82        Poll::Ready(Ok(event))
83    }
84}
85
86impl<PF> PruneHook<PF>
87where
88    PF: DatabaseProviderFactory<ProviderRW: PruneCheckpointReader + PruneCheckpointWriter>
89        + 'static,
90{
91    /// This will try to spawn the pruner if it is idle:
92    /// 1. Check if pruning is needed through [`Pruner::is_pruning_needed`].
93    ///
94    /// 2.1. If pruning is needed, pass tip block number to the [`Pruner::run`] and spawn it in a
95    ///      separate task. Set pruner state to [`PrunerState::Running`].
96    /// 2.2. If pruning is not needed, set pruner state back to [`PrunerState::Idle`].
97    ///
98    /// If pruner is already running, do nothing.
99    fn try_spawn_pruner(&mut self, tip_block_number: BlockNumber) -> Option<EngineHookEvent> {
100        match &mut self.pruner_state {
101            PrunerState::Idle(pruner) => {
102                let mut pruner = pruner.take()?;
103
104                // Check tip for pruning
105                if pruner.is_pruning_needed(tip_block_number) {
106                    let (tx, rx) = oneshot::channel();
107                    self.pruner_task_spawner.spawn_critical_blocking(
108                        "pruner task",
109                        Box::pin(async move {
110                            let result = pruner.run(tip_block_number);
111                            let _ = tx.send((pruner, result));
112                        }),
113                    );
114                    self.metrics.runs_total.increment(1);
115                    self.pruner_state = PrunerState::Running(rx);
116
117                    Some(EngineHookEvent::Started)
118                } else {
119                    self.pruner_state = PrunerState::Idle(Some(pruner));
120                    Some(EngineHookEvent::NotReady)
121                }
122            }
123            PrunerState::Running(_) => None,
124        }
125    }
126}
127
128impl<PF> EngineHook for PruneHook<PF>
129where
130    PF: DatabaseProviderFactory<ProviderRW: PruneCheckpointReader + PruneCheckpointWriter>
131        + 'static,
132{
133    fn name(&self) -> &'static str {
134        "Prune"
135    }
136
137    fn poll(
138        &mut self,
139        cx: &mut Context<'_>,
140        ctx: EngineHookContext,
141    ) -> Poll<RethResult<EngineHookEvent>> {
142        // Try to spawn a pruner
143        match self.try_spawn_pruner(ctx.tip_block_number) {
144            Some(EngineHookEvent::NotReady) => return Poll::Pending,
145            Some(event) => return Poll::Ready(Ok(event)),
146            None => (),
147        }
148
149        // Poll pruner and check its status
150        self.poll_pruner(cx)
151    }
152
153    fn db_access_level(&self) -> EngineHookDBAccessLevel {
154        EngineHookDBAccessLevel::ReadWrite
155    }
156}
157
158/// The possible pruner states within the sync controller.
159///
160/// [`PrunerState::Idle`] means that the pruner is currently idle.
161/// [`PrunerState::Running`] means that the pruner is currently running.
162///
163/// NOTE: The differentiation between these two states is important, because when the pruner is
164/// running, it acquires the write lock over the database. This means that we cannot forward to the
165/// blockchain tree any messages that would result in database writes, since it would result in a
166/// deadlock.
167enum PrunerState<PF: DatabaseProviderFactory> {
168    /// Pruner is idle.
169    Idle(Option<Pruner<PF::ProviderRW, PF>>),
170    /// Pruner is running and waiting for a response
171    Running(oneshot::Receiver<PrunerWithResult<PF::ProviderRW, PF>>),
172}
173
174impl<PF> fmt::Debug for PrunerState<PF>
175where
176    PF: DatabaseProviderFactory<ProviderRW: Debug> + Debug,
177{
178    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
179        match self {
180            Self::Idle(f0) => f.debug_tuple("Idle").field(&f0).finish(),
181            Self::Running(f0) => f.debug_tuple("Running").field(&f0).finish(),
182        }
183    }
184}
185
186#[derive(reth_metrics::Metrics)]
187#[metrics(scope = "consensus.engine.prune")]
188struct Metrics {
189    /// The number of times the pruner was run.
190    runs_total: Counter,
191}
192
193impl From<PrunerError> for EngineHookError {
194    fn from(err: PrunerError) -> Self {
195        match err {
196            PrunerError::PruneSegment(_) | PrunerError::InconsistentData(_) => {
197                Self::Internal(Box::new(err))
198            }
199            PrunerError::Database(err) => RethError::Database(err).into(),
200            PrunerError::Provider(err) => RethError::Provider(err).into(),
201        }
202    }
203}