reth_engine_tree/
backfill.rs

1//! It is expected that the node has two sync modes:
2//!
3//!  - Backfill sync: Sync to a certain block height in stages, e.g. download data from p2p then
4//!    execute that range.
5//!  - Live sync: In this mode the node is keeping up with the latest tip and listens for new
6//!    requests from the consensus client.
7//!
8//! These modes are mutually exclusive and the node can only be in one mode at a time.
9
10use futures::FutureExt;
11use reth_provider::providers::ProviderNodeTypes;
12use reth_stages_api::{ControlFlow, Pipeline, PipelineError, PipelineTarget, PipelineWithResult};
13use reth_tasks::TaskSpawner;
14use std::task::{ready, Context, Poll};
15use tokio::sync::oneshot;
16use tracing::trace;
17
18/// Represents the state of the backfill synchronization process.
19#[derive(Debug, PartialEq, Eq, Default)]
20pub enum BackfillSyncState {
21    /// The node is not performing any backfill synchronization.
22    /// This is the initial or default state.
23    #[default]
24    Idle,
25    /// A backfill synchronization has been requested or planned, but processing has not started
26    /// yet.
27    Pending,
28    /// The node is actively engaged in backfill synchronization.
29    Active,
30}
31
32impl BackfillSyncState {
33    /// Returns true if the state is idle.
34    pub const fn is_idle(&self) -> bool {
35        matches!(self, Self::Idle)
36    }
37
38    /// Returns true if the state is pending.
39    pub const fn is_pending(&self) -> bool {
40        matches!(self, Self::Pending)
41    }
42
43    /// Returns true if the state is active.
44    pub const fn is_active(&self) -> bool {
45        matches!(self, Self::Active)
46    }
47}
48
49/// Backfill sync mode functionality.
50pub trait BackfillSync: Send + Sync {
51    /// Performs a backfill action.
52    fn on_action(&mut self, action: BackfillAction);
53
54    /// Polls the pipeline for completion.
55    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<BackfillEvent>;
56}
57
58/// The backfill actions that can be performed.
59#[derive(Debug, Clone, PartialEq, Eq)]
60pub enum BackfillAction {
61    /// Start backfilling with the given target.
62    Start(PipelineTarget),
63}
64
65/// The events that can be emitted on backfill sync.
66#[derive(Debug)]
67pub enum BackfillEvent {
68    /// Backfill sync started.
69    Started(PipelineTarget),
70    /// Backfill sync finished.
71    ///
72    /// If this is returned, backfill sync is idle.
73    Finished(Result<ControlFlow, PipelineError>),
74    /// Sync task was dropped after it was started, unable to receive it because
75    /// channel closed. This would indicate a panicked task.
76    TaskDropped(String),
77}
78
79/// Pipeline sync.
80#[derive(Debug)]
81pub struct PipelineSync<N: ProviderNodeTypes> {
82    /// The type that can spawn the pipeline task.
83    pipeline_task_spawner: Box<dyn TaskSpawner>,
84    /// The current state of the pipeline.
85    /// The pipeline is used for large ranges.
86    pipeline_state: PipelineState<N>,
87    /// Pending target block for the pipeline to sync
88    pending_pipeline_target: Option<PipelineTarget>,
89}
90
91impl<N: ProviderNodeTypes> PipelineSync<N> {
92    /// Create a new instance.
93    pub fn new(pipeline: Pipeline<N>, pipeline_task_spawner: Box<dyn TaskSpawner>) -> Self {
94        Self {
95            pipeline_task_spawner,
96            pipeline_state: PipelineState::Idle(Some(pipeline)),
97            pending_pipeline_target: None,
98        }
99    }
100
101    /// Returns `true` if a pipeline target is queued and will be triggered on the next `poll`.
102    #[allow(dead_code)]
103    const fn is_pipeline_sync_pending(&self) -> bool {
104        self.pending_pipeline_target.is_some() && self.pipeline_state.is_idle()
105    }
106
107    /// Returns `true` if the pipeline is idle.
108    const fn is_pipeline_idle(&self) -> bool {
109        self.pipeline_state.is_idle()
110    }
111
112    /// Returns `true` if the pipeline is active.
113    const fn is_pipeline_active(&self) -> bool {
114        !self.is_pipeline_idle()
115    }
116
117    /// Sets a new target to sync the pipeline to.
118    ///
119    /// But ensures the target is not the zero hash.
120    fn set_pipeline_sync_target(&mut self, target: PipelineTarget) {
121        if target.sync_target().is_some_and(|target| target.is_zero()) {
122            trace!(
123                target: "consensus::engine::sync",
124                "Pipeline target cannot be zero hash."
125            );
126            // precaution to never sync to the zero hash
127            return
128        }
129        self.pending_pipeline_target = Some(target);
130    }
131
132    /// This will spawn the pipeline if it is idle and a target is set or if the pipeline is set to
133    /// run continuously.
134    fn try_spawn_pipeline(&mut self) -> Option<BackfillEvent> {
135        match &mut self.pipeline_state {
136            PipelineState::Idle(pipeline) => {
137                let target = self.pending_pipeline_target.take()?;
138                let (tx, rx) = oneshot::channel();
139
140                let pipeline = pipeline.take().expect("exists");
141                self.pipeline_task_spawner.spawn_critical_blocking(
142                    "pipeline task",
143                    Box::pin(async move {
144                        let result = pipeline.run_as_fut(Some(target)).await;
145                        let _ = tx.send(result);
146                    }),
147                );
148                self.pipeline_state = PipelineState::Running(rx);
149
150                Some(BackfillEvent::Started(target))
151            }
152            PipelineState::Running(_) => None,
153        }
154    }
155
156    /// Advances the pipeline state.
157    ///
158    /// This checks for the result in the channel, or returns pending if the pipeline is idle.
159    fn poll_pipeline(&mut self, cx: &mut Context<'_>) -> Poll<BackfillEvent> {
160        let res = match self.pipeline_state {
161            PipelineState::Idle(_) => return Poll::Pending,
162            PipelineState::Running(ref mut fut) => {
163                ready!(fut.poll_unpin(cx))
164            }
165        };
166        let ev = match res {
167            Ok((pipeline, result)) => {
168                self.pipeline_state = PipelineState::Idle(Some(pipeline));
169                BackfillEvent::Finished(result)
170            }
171            Err(why) => {
172                // failed to receive the pipeline
173                BackfillEvent::TaskDropped(why.to_string())
174            }
175        };
176        Poll::Ready(ev)
177    }
178}
179
180impl<N: ProviderNodeTypes> BackfillSync for PipelineSync<N> {
181    fn on_action(&mut self, event: BackfillAction) {
182        match event {
183            BackfillAction::Start(target) => self.set_pipeline_sync_target(target),
184        }
185    }
186
187    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<BackfillEvent> {
188        // try to spawn a pipeline if a target is set
189        if let Some(event) = self.try_spawn_pipeline() {
190            return Poll::Ready(event)
191        }
192
193        // make sure we poll the pipeline if it's active, and return any ready pipeline events
194        if self.is_pipeline_active() {
195            // advance the pipeline
196            if let Poll::Ready(event) = self.poll_pipeline(cx) {
197                return Poll::Ready(event)
198            }
199        }
200
201        Poll::Pending
202    }
203}
204
205/// The possible pipeline states within the sync controller.
206///
207/// [`PipelineState::Idle`] means that the pipeline is currently idle.
208/// [`PipelineState::Running`] means that the pipeline is currently running.
209///
210/// NOTE: The differentiation between these two states is important, because when the pipeline is
211/// running, it acquires the write lock over the database. This means that we cannot forward to the
212/// blockchain tree any messages that would result in database writes, since it would result in a
213/// deadlock.
214#[derive(Debug)]
215enum PipelineState<N: ProviderNodeTypes> {
216    /// Pipeline is idle.
217    Idle(Option<Pipeline<N>>),
218    /// Pipeline is running and waiting for a response
219    Running(oneshot::Receiver<PipelineWithResult<N>>),
220}
221
222impl<N: ProviderNodeTypes> PipelineState<N> {
223    /// Returns `true` if the state matches idle.
224    const fn is_idle(&self) -> bool {
225        matches!(self, Self::Idle(_))
226    }
227}
228
229#[cfg(test)]
230mod tests {
231    use super::*;
232    use crate::test_utils::{insert_headers_into_client, TestPipelineBuilder};
233    use alloy_consensus::Header;
234    use alloy_eips::eip1559::ETHEREUM_BLOCK_GAS_LIMIT;
235    use alloy_primitives::{BlockNumber, B256};
236    use assert_matches::assert_matches;
237    use futures::poll;
238    use reth_chainspec::{ChainSpecBuilder, MAINNET};
239    use reth_network_p2p::test_utils::TestFullBlockClient;
240    use reth_primitives::SealedHeader;
241    use reth_provider::test_utils::MockNodeTypesWithDB;
242    use reth_stages::ExecOutput;
243    use reth_stages_api::StageCheckpoint;
244    use reth_tasks::TokioTaskExecutor;
245    use std::{collections::VecDeque, future::poll_fn, sync::Arc};
246
247    struct TestHarness {
248        pipeline_sync: PipelineSync<MockNodeTypesWithDB>,
249        tip: B256,
250    }
251
252    impl TestHarness {
253        fn new(total_blocks: usize, pipeline_done_after: u64) -> Self {
254            let chain_spec = Arc::new(
255                ChainSpecBuilder::default()
256                    .chain(MAINNET.chain)
257                    .genesis(MAINNET.genesis.clone())
258                    .paris_activated()
259                    .build(),
260            );
261
262            // force the pipeline to be "done" after `pipeline_done_after` blocks
263            let pipeline = TestPipelineBuilder::new()
264                .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
265                    checkpoint: StageCheckpoint::new(BlockNumber::from(pipeline_done_after)),
266                    done: true,
267                })]))
268                .build(chain_spec);
269
270            let pipeline_sync = PipelineSync::new(pipeline, Box::<TokioTaskExecutor>::default());
271            let client = TestFullBlockClient::default();
272            let header = Header {
273                base_fee_per_gas: Some(7),
274                gas_limit: ETHEREUM_BLOCK_GAS_LIMIT,
275                ..Default::default()
276            };
277            let header = SealedHeader::seal(header);
278            insert_headers_into_client(&client, header, 0..total_blocks);
279
280            let tip = client.highest_block().expect("there should be blocks here").hash();
281
282            Self { pipeline_sync, tip }
283        }
284    }
285
286    #[tokio::test]
287    async fn pipeline_started_and_finished() {
288        const TOTAL_BLOCKS: usize = 10;
289        const PIPELINE_DONE_AFTER: u64 = 5;
290        let TestHarness { mut pipeline_sync, tip } =
291            TestHarness::new(TOTAL_BLOCKS, PIPELINE_DONE_AFTER);
292
293        let sync_future = poll_fn(|cx| pipeline_sync.poll(cx));
294        let next_event = poll!(sync_future);
295
296        // sync target not set, pipeline not started
297        assert_matches!(next_event, Poll::Pending);
298
299        pipeline_sync.on_action(BackfillAction::Start(PipelineTarget::Sync(tip)));
300
301        let sync_future = poll_fn(|cx| pipeline_sync.poll(cx));
302        let next_event = poll!(sync_future);
303
304        // sync target set, pipeline started
305        assert_matches!(next_event, Poll::Ready(BackfillEvent::Started(target)) => {
306            assert_eq!(target.sync_target().unwrap(), tip);
307        });
308
309        // the next event should be the pipeline finishing in a good state
310        let sync_future = poll_fn(|cx| pipeline_sync.poll(cx));
311        let next_ready = sync_future.await;
312        assert_matches!(next_ready, BackfillEvent::Finished(result) => {
313            assert_matches!(result, Ok(control_flow) => assert_eq!(control_flow, ControlFlow::Continue { block_number: PIPELINE_DONE_AFTER }));
314        });
315    }
316}