1use crate::{
4 engine::metrics::EngineSyncMetrics, BeaconConsensusEngineEvent,
5 ConsensusEngineLiveSyncProgress, EthBeaconConsensus,
6};
7use alloy_consensus::Header;
8use alloy_primitives::{BlockNumber, B256};
9use futures::FutureExt;
10use reth_network_p2p::{
11 full_block::{FetchFullBlockFuture, FetchFullBlockRangeFuture, FullBlockClient},
12 BlockClient,
13};
14use reth_node_types::{BodyTy, HeaderTy};
15use reth_primitives::{BlockBody, EthPrimitives, NodePrimitives, SealedBlock};
16use reth_provider::providers::ProviderNodeTypes;
17use reth_stages_api::{ControlFlow, Pipeline, PipelineError, PipelineTarget, PipelineWithResult};
18use reth_tasks::TaskSpawner;
19use reth_tokio_util::EventSender;
20use std::{
21 cmp::{Ordering, Reverse},
22 collections::{binary_heap::PeekMut, BinaryHeap},
23 sync::Arc,
24 task::{ready, Context, Poll},
25};
26use tokio::sync::oneshot;
27use tracing::trace;
28
29pub(crate) struct EngineSyncController<N, Client>
37where
38 N: ProviderNodeTypes,
39 Client: BlockClient,
40{
41 full_block_client: FullBlockClient<Client>,
43 pipeline_task_spawner: Box<dyn TaskSpawner>,
45 pipeline_state: PipelineState<N>,
48 pending_pipeline_target: Option<PipelineTarget>,
50 inflight_full_block_requests: Vec<FetchFullBlockFuture<Client>>,
52 inflight_block_range_requests: Vec<FetchFullBlockRangeFuture<Client>>,
54 event_sender: EventSender<BeaconConsensusEngineEvent<N::Primitives>>,
56 range_buffered_blocks: BinaryHeap<Reverse<OrderedSealedBlock<HeaderTy<N>, BodyTy<N>>>>,
59 max_block: Option<BlockNumber>,
62 metrics: EngineSyncMetrics,
64}
65
66impl<N, Client> EngineSyncController<N, Client>
67where
68 N: ProviderNodeTypes,
69 Client: BlockClient,
70{
71 pub(crate) fn new(
73 pipeline: Pipeline<N>,
74 client: Client,
75 pipeline_task_spawner: Box<dyn TaskSpawner>,
76 max_block: Option<BlockNumber>,
77 chain_spec: Arc<N::ChainSpec>,
78 event_sender: EventSender<BeaconConsensusEngineEvent<N::Primitives>>,
79 ) -> Self {
80 Self {
81 full_block_client: FullBlockClient::new(
82 client,
83 Arc::new(EthBeaconConsensus::new(chain_spec)),
84 ),
85 pipeline_task_spawner,
86 pipeline_state: PipelineState::Idle(Some(pipeline)),
87 pending_pipeline_target: None,
88 inflight_full_block_requests: Vec::new(),
89 inflight_block_range_requests: Vec::new(),
90 range_buffered_blocks: BinaryHeap::new(),
91 event_sender,
92 max_block,
93 metrics: EngineSyncMetrics::default(),
94 }
95 }
96}
97
98impl<N, Client> EngineSyncController<N, Client>
99where
100 N: ProviderNodeTypes,
101 Client: BlockClient<Header = HeaderTy<N>, Body = BodyTy<N>> + 'static,
102{
103 fn update_block_download_metrics(&self) {
105 self.metrics.active_block_downloads.set(self.inflight_full_block_requests.len() as f64);
106 }
108
109 #[cfg(test)]
111 pub(crate) fn set_max_block(&mut self, block: BlockNumber) {
112 self.max_block = Some(block);
113 }
114
115 pub(crate) fn clear_block_download_requests(&mut self) {
117 self.inflight_full_block_requests.clear();
118 self.inflight_block_range_requests.clear();
119 self.range_buffered_blocks.clear();
120 self.update_block_download_metrics();
121 }
122
123 pub(crate) fn cancel_full_block_request(&mut self, hash: B256) {
125 self.inflight_full_block_requests.retain(|req| *req.hash() != hash);
126 self.update_block_download_metrics();
127 }
128
129 #[allow(dead_code)]
131 pub(crate) const fn is_pipeline_sync_pending(&self) -> bool {
132 self.pending_pipeline_target.is_some() && self.pipeline_state.is_idle()
133 }
134
135 pub(crate) const fn is_pipeline_idle(&self) -> bool {
137 self.pipeline_state.is_idle()
138 }
139
140 pub(crate) const fn is_pipeline_active(&self) -> bool {
142 !self.is_pipeline_idle()
143 }
144
145 pub(crate) fn is_inflight_request(&self, hash: B256) -> bool {
147 self.inflight_full_block_requests.iter().any(|req| *req.hash() == hash)
148 }
149
150 pub(crate) fn download_block_range(&mut self, hash: B256, count: u64) {
155 if count == 1 {
156 self.download_full_block(hash);
157 } else {
158 trace!(
159 target: "consensus::engine",
160 ?hash,
161 ?count,
162 "start downloading full block range."
163 );
164
165 self.event_sender.notify(BeaconConsensusEngineEvent::LiveSyncProgress(
167 ConsensusEngineLiveSyncProgress::DownloadingBlocks {
168 remaining_blocks: count,
169 target: hash,
170 },
171 ));
172 let request = self.full_block_client.get_full_block_range(hash, count);
173 self.inflight_block_range_requests.push(request);
174 }
175
176 }
179
180 pub(crate) fn download_full_block(&mut self, hash: B256) -> bool {
185 if self.is_inflight_request(hash) {
186 return false
187 }
188 trace!(
189 target: "consensus::engine::sync",
190 ?hash,
191 "Start downloading full block"
192 );
193
194 self.event_sender.notify(BeaconConsensusEngineEvent::LiveSyncProgress(
196 ConsensusEngineLiveSyncProgress::DownloadingBlocks {
197 remaining_blocks: 1,
198 target: hash,
199 },
200 ));
201
202 let request = self.full_block_client.get_full_block(hash);
203 self.inflight_full_block_requests.push(request);
204
205 self.update_block_download_metrics();
206
207 true
208 }
209
210 pub(crate) fn set_pipeline_sync_target(&mut self, target: PipelineTarget) {
214 if target.sync_target().is_some_and(|target| target.is_zero()) {
215 trace!(
216 target: "consensus::engine::sync",
217 "Pipeline target cannot be zero hash."
218 );
219 return
221 }
222 self.pending_pipeline_target = Some(target);
223 }
224
225 pub(crate) fn has_reached_max_block(&self, progress: BlockNumber) -> bool {
229 let has_reached_max_block = self.max_block.is_some_and(|target| progress >= target);
230 if has_reached_max_block {
231 trace!(
232 target: "consensus::engine::sync",
233 ?progress,
234 max_block = ?self.max_block,
235 "Consensus engine reached max block"
236 );
237 }
238 has_reached_max_block
239 }
240
241 fn poll_pipeline(&mut self, cx: &mut Context<'_>) -> Poll<EngineSyncEvent<N::Primitives>> {
245 let res = match self.pipeline_state {
246 PipelineState::Idle(_) => return Poll::Pending,
247 PipelineState::Running(ref mut fut) => {
248 ready!(fut.poll_unpin(cx))
249 }
250 };
251 let ev = match res {
252 Ok((pipeline, result)) => {
253 let minimum_block_number = pipeline.minimum_block_number();
254 let reached_max_block =
255 self.has_reached_max_block(minimum_block_number.unwrap_or_default());
256 self.pipeline_state = PipelineState::Idle(Some(pipeline));
257 EngineSyncEvent::PipelineFinished { result, reached_max_block }
258 }
259 Err(_) => {
260 EngineSyncEvent::PipelineTaskDropped
262 }
263 };
264 Poll::Ready(ev)
265 }
266
267 fn try_spawn_pipeline(&mut self) -> Option<EngineSyncEvent<N::Primitives>> {
270 match &mut self.pipeline_state {
271 PipelineState::Idle(pipeline) => {
272 let target = self.pending_pipeline_target.take()?;
273 let (tx, rx) = oneshot::channel();
274
275 let pipeline = pipeline.take().expect("exists");
276 self.pipeline_task_spawner.spawn_critical_blocking(
277 "pipeline task",
278 Box::pin(async move {
279 let result = pipeline.run_as_fut(Some(target)).await;
280 let _ = tx.send(result);
281 }),
282 );
283 self.pipeline_state = PipelineState::Running(rx);
284
285 self.clear_block_download_requests();
288
289 Some(EngineSyncEvent::PipelineStarted(Some(target)))
290 }
291 PipelineState::Running(_) => None,
292 }
293 }
294
295 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<EngineSyncEvent<N::Primitives>> {
297 if let Some(event) = self.try_spawn_pipeline() {
299 return Poll::Ready(event)
300 }
301
302 if !self.is_pipeline_idle() {
304 if let Poll::Ready(event) = self.poll_pipeline(cx) {
306 return Poll::Ready(event)
307 }
308 }
309
310 for idx in (0..self.inflight_full_block_requests.len()).rev() {
312 let mut request = self.inflight_full_block_requests.swap_remove(idx);
313 if let Poll::Ready(block) = request.poll_unpin(cx) {
314 trace!(target: "consensus::engine", block=?block.num_hash(), "Received single full block, buffering");
315 self.range_buffered_blocks.push(Reverse(OrderedSealedBlock(block)));
316 } else {
317 self.inflight_full_block_requests.push(request);
319 }
320 }
321
322 for idx in (0..self.inflight_block_range_requests.len()).rev() {
324 let mut request = self.inflight_block_range_requests.swap_remove(idx);
325 if let Poll::Ready(blocks) = request.poll_unpin(cx) {
326 trace!(target: "consensus::engine", len=?blocks.len(), first=?blocks.first().map(|b| b.num_hash()), last=?blocks.last().map(|b| b.num_hash()), "Received full block range, buffering");
327 self.range_buffered_blocks
328 .extend(blocks.into_iter().map(OrderedSealedBlock).map(Reverse));
329 } else {
330 self.inflight_block_range_requests.push(request);
332 }
333 }
334
335 self.update_block_download_metrics();
336
337 if let Some(block) = self.range_buffered_blocks.pop() {
339 while let Some(peek) = self.range_buffered_blocks.peek_mut() {
341 if peek.0 .0.hash() == block.0 .0.hash() {
342 PeekMut::pop(peek);
343 } else {
344 break
345 }
346 }
347 return Poll::Ready(EngineSyncEvent::FetchedFullBlock(block.0 .0))
348 }
349
350 Poll::Pending
351 }
352}
353
354#[derive(Debug, Clone, PartialEq, Eq)]
356struct OrderedSealedBlock<H = Header, B = BlockBody>(SealedBlock<H, B>);
357
358impl<H, B> PartialOrd for OrderedSealedBlock<H, B>
359where
360 H: reth_primitives_traits::BlockHeader + 'static,
361 B: reth_primitives_traits::BlockBody + 'static,
362{
363 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
364 Some(self.cmp(other))
365 }
366}
367
368impl<H, B> Ord for OrderedSealedBlock<H, B>
369where
370 H: reth_primitives_traits::BlockHeader + 'static,
371 B: reth_primitives_traits::BlockBody + 'static,
372{
373 fn cmp(&self, other: &Self) -> Ordering {
374 self.0.number().cmp(&other.0.number())
375 }
376}
377
378#[derive(Debug)]
380pub(crate) enum EngineSyncEvent<N: NodePrimitives = EthPrimitives> {
381 FetchedFullBlock(SealedBlock<N::BlockHeader, N::BlockBody>),
383 PipelineStarted(Option<PipelineTarget>),
387 PipelineFinished {
391 result: Result<ControlFlow, PipelineError>,
393 reached_max_block: bool,
397 },
398 PipelineTaskDropped,
401}
402
403enum PipelineState<N: ProviderNodeTypes> {
413 Idle(Option<Pipeline<N>>),
415 Running(oneshot::Receiver<PipelineWithResult<N>>),
417}
418
419impl<N: ProviderNodeTypes> PipelineState<N> {
420 const fn is_idle(&self) -> bool {
422 matches!(self, Self::Idle(_))
423 }
424}
425
426#[cfg(test)]
427mod tests {
428 use super::*;
429 use alloy_consensus::Header;
430 use alloy_eips::eip1559::ETHEREUM_BLOCK_GAS_LIMIT;
431 use assert_matches::assert_matches;
432 use futures::poll;
433 use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET};
434 use reth_network_p2p::{either::Either, test_utils::TestFullBlockClient, EthBlockClient};
435 use reth_primitives::{BlockBody, SealedHeader};
436 use reth_provider::{
437 test_utils::{create_test_provider_factory_with_chain_spec, MockNodeTypesWithDB},
438 ExecutionOutcome,
439 };
440 use reth_prune_types::PruneModes;
441 use reth_stages::{test_utils::TestStages, ExecOutput, StageError};
442 use reth_stages_api::StageCheckpoint;
443 use reth_static_file::StaticFileProducer;
444 use reth_tasks::TokioTaskExecutor;
445 use std::{collections::VecDeque, future::poll_fn, ops::Range};
446 use tokio::sync::watch;
447
448 struct TestPipelineBuilder {
449 pipeline_exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
450 executor_results: Vec<ExecutionOutcome>,
451 max_block: Option<BlockNumber>,
452 }
453
454 impl TestPipelineBuilder {
455 const fn new() -> Self {
457 Self {
458 pipeline_exec_outputs: VecDeque::new(),
459 executor_results: Vec::new(),
460 max_block: None,
461 }
462 }
463
464 fn with_pipeline_exec_outputs(
466 mut self,
467 pipeline_exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
468 ) -> Self {
469 self.pipeline_exec_outputs = pipeline_exec_outputs;
470 self
471 }
472
473 #[allow(dead_code)]
475 fn with_executor_results(mut self, executor_results: Vec<ExecutionOutcome>) -> Self {
476 self.executor_results = executor_results;
477 self
478 }
479
480 #[allow(dead_code)]
482 const fn with_max_block(mut self, max_block: BlockNumber) -> Self {
483 self.max_block = Some(max_block);
484 self
485 }
486
487 fn build(self, chain_spec: Arc<ChainSpec>) -> Pipeline<MockNodeTypesWithDB> {
489 reth_tracing::init_test_tracing();
490
491 let (tip_tx, _tip_rx) = watch::channel(B256::default());
493 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
494 .add_stages(TestStages::new(self.pipeline_exec_outputs, Default::default()))
495 .with_tip_sender(tip_tx);
496
497 if let Some(max_block) = self.max_block {
498 pipeline = pipeline.with_max_block(max_block);
499 }
500
501 let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec);
502
503 let static_file_producer =
504 StaticFileProducer::new(provider_factory.clone(), PruneModes::default());
505
506 pipeline.build(provider_factory, static_file_producer)
507 }
508 }
509
510 struct TestSyncControllerBuilder<Client> {
511 max_block: Option<BlockNumber>,
512 client: Option<Client>,
513 }
514
515 impl<Client> TestSyncControllerBuilder<Client> {
516 const fn new() -> Self {
518 Self { max_block: None, client: None }
519 }
520
521 #[allow(dead_code)]
523 const fn with_max_block(mut self, max_block: BlockNumber) -> Self {
524 self.max_block = Some(max_block);
525 self
526 }
527
528 fn with_client(mut self, client: Client) -> Self {
530 self.client = Some(client);
531 self
532 }
533
534 fn build<N>(
536 self,
537 pipeline: Pipeline<N>,
538 chain_spec: Arc<N::ChainSpec>,
539 ) -> EngineSyncController<N, Either<Client, TestFullBlockClient>>
540 where
541 N: ProviderNodeTypes,
542 Client: EthBlockClient + 'static,
543 {
544 let client = self
545 .client
546 .map(Either::Left)
547 .unwrap_or_else(|| Either::Right(TestFullBlockClient::default()));
548
549 EngineSyncController::new(
550 pipeline,
551 client,
552 Box::<TokioTaskExecutor>::default(),
553 self.max_block,
554 chain_spec,
555 Default::default(),
556 )
557 }
558 }
559
560 #[tokio::test]
561 async fn pipeline_started_after_setting_target() {
562 let chain_spec = Arc::new(
563 ChainSpecBuilder::default()
564 .chain(MAINNET.chain)
565 .genesis(MAINNET.genesis.clone())
566 .paris_activated()
567 .build(),
568 );
569
570 let client = TestFullBlockClient::default();
571 insert_headers_into_client(&client, SealedHeader::default(), 0..10);
572 let pipeline = TestPipelineBuilder::new()
574 .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
575 checkpoint: StageCheckpoint::new(5),
576 done: true,
577 })]))
578 .build(chain_spec.clone());
579
580 let mut sync_controller = TestSyncControllerBuilder::new()
581 .with_client(client.clone())
582 .build(pipeline, chain_spec);
583
584 let tip = client.highest_block().expect("there should be blocks here");
585 sync_controller.set_pipeline_sync_target(tip.hash().into());
586
587 let sync_future = poll_fn(|cx| sync_controller.poll(cx));
588 let next_event = poll!(sync_future);
589
590 assert_matches!(next_event, Poll::Ready(EngineSyncEvent::PipelineStarted(Some(target))) => {
593 assert_eq!(target.sync_target().unwrap(), tip.hash());
594 });
595
596 let sync_future = poll_fn(|cx| sync_controller.poll(cx));
598 let next_ready = sync_future.await;
599 assert_matches!(next_ready, EngineSyncEvent::PipelineFinished { result, reached_max_block } => {
600 assert_matches!(result, Ok(control_flow) => assert_eq!(control_flow, ControlFlow::Continue { block_number: 5 }));
601 assert!(!reached_max_block);
603 });
604 }
605
606 fn insert_headers_into_client(
607 client: &TestFullBlockClient,
608 genesis_header: SealedHeader,
609 range: Range<usize>,
610 ) {
611 let mut sealed_header = genesis_header;
612 let body = BlockBody::default();
613 for _ in range {
614 let (mut header, hash) = sealed_header.split();
615 header.parent_hash = hash;
617 header.number += 1;
618 header.timestamp += 1;
619 sealed_header = SealedHeader::seal(header);
620 client.insert(sealed_header.clone(), body.clone());
621 }
622 }
623
624 #[tokio::test]
625 async fn controller_sends_range_request() {
626 let chain_spec = Arc::new(
627 ChainSpecBuilder::default()
628 .chain(MAINNET.chain)
629 .genesis(MAINNET.genesis.clone())
630 .paris_activated()
631 .build(),
632 );
633
634 let client = TestFullBlockClient::default();
635 let header = Header {
636 base_fee_per_gas: Some(7),
637 gas_limit: ETHEREUM_BLOCK_GAS_LIMIT,
638 ..Default::default()
639 };
640 let header = SealedHeader::seal(header);
641 insert_headers_into_client(&client, header, 0..10);
642
643 let pipeline = TestPipelineBuilder::new().build(chain_spec.clone());
645
646 let mut sync_controller = TestSyncControllerBuilder::new()
647 .with_client(client.clone())
648 .build(pipeline, chain_spec);
649
650 let tip = client.highest_block().expect("there should be blocks here");
651
652 sync_controller.download_block_range(tip.hash(), tip.number);
654
655 assert_eq!(sync_controller.inflight_block_range_requests.len(), 1);
657
658 let first_req = sync_controller.inflight_block_range_requests.first().unwrap();
660 assert_eq!(first_req.start_hash(), tip.hash());
661 assert_eq!(first_req.count(), tip.number);
662
663 for num in 1..=10 {
665 let sync_future = poll_fn(|cx| sync_controller.poll(cx));
666 let next_ready = sync_future.await;
667 assert_matches!(next_ready, EngineSyncEvent::FetchedFullBlock(block) => {
668 assert_eq!(block.number, num);
669 });
670 }
671 }
672}