reth_engine_tree/
backfill.rs1use futures::FutureExt;
11use reth_provider::providers::ProviderNodeTypes;
12use reth_stages_api::{ControlFlow, Pipeline, PipelineError, PipelineTarget, PipelineWithResult};
13use reth_tasks::TaskSpawner;
14use std::task::{ready, Context, Poll};
15use tokio::sync::oneshot;
16use tracing::trace;
17
18#[derive(Debug, PartialEq, Eq, Default)]
20pub enum BackfillSyncState {
21 #[default]
24 Idle,
25 Pending,
28 Active,
30}
31
32impl BackfillSyncState {
33 pub const fn is_idle(&self) -> bool {
35 matches!(self, Self::Idle)
36 }
37
38 pub const fn is_pending(&self) -> bool {
40 matches!(self, Self::Pending)
41 }
42
43 pub const fn is_active(&self) -> bool {
45 matches!(self, Self::Active)
46 }
47}
48
49pub trait BackfillSync: Send + Sync {
51 fn on_action(&mut self, action: BackfillAction);
53
54 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<BackfillEvent>;
56}
57
58#[derive(Debug, Clone, PartialEq, Eq)]
60pub enum BackfillAction {
61 Start(PipelineTarget),
63}
64
65#[derive(Debug)]
67pub enum BackfillEvent {
68 Started(PipelineTarget),
70 Finished(Result<ControlFlow, PipelineError>),
74 TaskDropped(String),
77}
78
79#[derive(Debug)]
81pub struct PipelineSync<N: ProviderNodeTypes> {
82 pipeline_task_spawner: Box<dyn TaskSpawner>,
84 pipeline_state: PipelineState<N>,
87 pending_pipeline_target: Option<PipelineTarget>,
89}
90
91impl<N: ProviderNodeTypes> PipelineSync<N> {
92 pub fn new(pipeline: Pipeline<N>, pipeline_task_spawner: Box<dyn TaskSpawner>) -> Self {
94 Self {
95 pipeline_task_spawner,
96 pipeline_state: PipelineState::Idle(Some(pipeline)),
97 pending_pipeline_target: None,
98 }
99 }
100
101 #[allow(dead_code)]
103 const fn is_pipeline_sync_pending(&self) -> bool {
104 self.pending_pipeline_target.is_some() && self.pipeline_state.is_idle()
105 }
106
107 const fn is_pipeline_idle(&self) -> bool {
109 self.pipeline_state.is_idle()
110 }
111
112 const fn is_pipeline_active(&self) -> bool {
114 !self.is_pipeline_idle()
115 }
116
117 fn set_pipeline_sync_target(&mut self, target: PipelineTarget) {
121 if target.sync_target().is_some_and(|target| target.is_zero()) {
122 trace!(
123 target: "consensus::engine::sync",
124 "Pipeline target cannot be zero hash."
125 );
126 return
128 }
129 self.pending_pipeline_target = Some(target);
130 }
131
132 fn try_spawn_pipeline(&mut self) -> Option<BackfillEvent> {
135 match &mut self.pipeline_state {
136 PipelineState::Idle(pipeline) => {
137 let target = self.pending_pipeline_target.take()?;
138 let (tx, rx) = oneshot::channel();
139
140 let pipeline = pipeline.take().expect("exists");
141 self.pipeline_task_spawner.spawn_critical_blocking(
142 "pipeline task",
143 Box::pin(async move {
144 let result = pipeline.run_as_fut(Some(target)).await;
145 let _ = tx.send(result);
146 }),
147 );
148 self.pipeline_state = PipelineState::Running(rx);
149
150 Some(BackfillEvent::Started(target))
151 }
152 PipelineState::Running(_) => None,
153 }
154 }
155
156 fn poll_pipeline(&mut self, cx: &mut Context<'_>) -> Poll<BackfillEvent> {
160 let res = match self.pipeline_state {
161 PipelineState::Idle(_) => return Poll::Pending,
162 PipelineState::Running(ref mut fut) => {
163 ready!(fut.poll_unpin(cx))
164 }
165 };
166 let ev = match res {
167 Ok((pipeline, result)) => {
168 self.pipeline_state = PipelineState::Idle(Some(pipeline));
169 BackfillEvent::Finished(result)
170 }
171 Err(why) => {
172 BackfillEvent::TaskDropped(why.to_string())
174 }
175 };
176 Poll::Ready(ev)
177 }
178}
179
180impl<N: ProviderNodeTypes> BackfillSync for PipelineSync<N> {
181 fn on_action(&mut self, event: BackfillAction) {
182 match event {
183 BackfillAction::Start(target) => self.set_pipeline_sync_target(target),
184 }
185 }
186
187 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<BackfillEvent> {
188 if let Some(event) = self.try_spawn_pipeline() {
190 return Poll::Ready(event)
191 }
192
193 if self.is_pipeline_active() {
195 if let Poll::Ready(event) = self.poll_pipeline(cx) {
197 return Poll::Ready(event)
198 }
199 }
200
201 Poll::Pending
202 }
203}
204
205#[derive(Debug)]
215enum PipelineState<N: ProviderNodeTypes> {
216 Idle(Option<Pipeline<N>>),
218 Running(oneshot::Receiver<PipelineWithResult<N>>),
220}
221
222impl<N: ProviderNodeTypes> PipelineState<N> {
223 const fn is_idle(&self) -> bool {
225 matches!(self, Self::Idle(_))
226 }
227}
228
229#[cfg(test)]
230mod tests {
231 use super::*;
232 use crate::test_utils::{insert_headers_into_client, TestPipelineBuilder};
233 use alloy_consensus::Header;
234 use alloy_eips::eip1559::ETHEREUM_BLOCK_GAS_LIMIT;
235 use alloy_primitives::{BlockNumber, B256};
236 use assert_matches::assert_matches;
237 use futures::poll;
238 use reth_chainspec::{ChainSpecBuilder, MAINNET};
239 use reth_network_p2p::test_utils::TestFullBlockClient;
240 use reth_primitives::SealedHeader;
241 use reth_provider::test_utils::MockNodeTypesWithDB;
242 use reth_stages::ExecOutput;
243 use reth_stages_api::StageCheckpoint;
244 use reth_tasks::TokioTaskExecutor;
245 use std::{collections::VecDeque, future::poll_fn, sync::Arc};
246
247 struct TestHarness {
248 pipeline_sync: PipelineSync<MockNodeTypesWithDB>,
249 tip: B256,
250 }
251
252 impl TestHarness {
253 fn new(total_blocks: usize, pipeline_done_after: u64) -> Self {
254 let chain_spec = Arc::new(
255 ChainSpecBuilder::default()
256 .chain(MAINNET.chain)
257 .genesis(MAINNET.genesis.clone())
258 .paris_activated()
259 .build(),
260 );
261
262 let pipeline = TestPipelineBuilder::new()
264 .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
265 checkpoint: StageCheckpoint::new(BlockNumber::from(pipeline_done_after)),
266 done: true,
267 })]))
268 .build(chain_spec);
269
270 let pipeline_sync = PipelineSync::new(pipeline, Box::<TokioTaskExecutor>::default());
271 let client = TestFullBlockClient::default();
272 let header = Header {
273 base_fee_per_gas: Some(7),
274 gas_limit: ETHEREUM_BLOCK_GAS_LIMIT,
275 ..Default::default()
276 };
277 let header = SealedHeader::seal(header);
278 insert_headers_into_client(&client, header, 0..total_blocks);
279
280 let tip = client.highest_block().expect("there should be blocks here").hash();
281
282 Self { pipeline_sync, tip }
283 }
284 }
285
286 #[tokio::test]
287 async fn pipeline_started_and_finished() {
288 const TOTAL_BLOCKS: usize = 10;
289 const PIPELINE_DONE_AFTER: u64 = 5;
290 let TestHarness { mut pipeline_sync, tip } =
291 TestHarness::new(TOTAL_BLOCKS, PIPELINE_DONE_AFTER);
292
293 let sync_future = poll_fn(|cx| pipeline_sync.poll(cx));
294 let next_event = poll!(sync_future);
295
296 assert_matches!(next_event, Poll::Pending);
298
299 pipeline_sync.on_action(BackfillAction::Start(PipelineTarget::Sync(tip)));
300
301 let sync_future = poll_fn(|cx| pipeline_sync.poll(cx));
302 let next_event = poll!(sync_future);
303
304 assert_matches!(next_event, Poll::Ready(BackfillEvent::Started(target)) => {
306 assert_eq!(target.sync_target().unwrap(), tip);
307 });
308
309 let sync_future = poll_fn(|cx| pipeline_sync.poll(cx));
311 let next_ready = sync_future.await;
312 assert_matches!(next_ready, BackfillEvent::Finished(result) => {
313 assert_matches!(result, Ok(control_flow) => assert_eq!(control_flow, ControlFlow::Continue { block_number: PIPELINE_DONE_AFTER }));
314 });
315 }
316}