reth_beacon_consensus/engine/
sync.rs

1//! Sync management for the engine implementation.
2
3use crate::{
4    engine::metrics::EngineSyncMetrics, BeaconConsensusEngineEvent,
5    ConsensusEngineLiveSyncProgress, EthBeaconConsensus,
6};
7use alloy_consensus::Header;
8use alloy_primitives::{BlockNumber, B256};
9use futures::FutureExt;
10use reth_network_p2p::{
11    full_block::{FetchFullBlockFuture, FetchFullBlockRangeFuture, FullBlockClient},
12    BlockClient,
13};
14use reth_node_types::{BodyTy, HeaderTy};
15use reth_primitives::{BlockBody, EthPrimitives, NodePrimitives, SealedBlock};
16use reth_provider::providers::ProviderNodeTypes;
17use reth_stages_api::{ControlFlow, Pipeline, PipelineError, PipelineTarget, PipelineWithResult};
18use reth_tasks::TaskSpawner;
19use reth_tokio_util::EventSender;
20use std::{
21    cmp::{Ordering, Reverse},
22    collections::{binary_heap::PeekMut, BinaryHeap},
23    sync::Arc,
24    task::{ready, Context, Poll},
25};
26use tokio::sync::oneshot;
27use tracing::trace;
28
29/// Manages syncing under the control of the engine.
30///
31/// This type controls the [Pipeline] and supports (single) full block downloads.
32///
33/// Caution: If the pipeline is running, this type will not emit blocks downloaded from the network
34/// [`EngineSyncEvent::FetchedFullBlock`] until the pipeline is idle to prevent commits to the
35/// database while the pipeline is still active.
36pub(crate) struct EngineSyncController<N, Client>
37where
38    N: ProviderNodeTypes,
39    Client: BlockClient,
40{
41    /// A downloader that can download full blocks from the network.
42    full_block_client: FullBlockClient<Client>,
43    /// The type that can spawn the pipeline task.
44    pipeline_task_spawner: Box<dyn TaskSpawner>,
45    /// The current state of the pipeline.
46    /// The pipeline is used for large ranges.
47    pipeline_state: PipelineState<N>,
48    /// Pending target block for the pipeline to sync
49    pending_pipeline_target: Option<PipelineTarget>,
50    /// In-flight full block requests in progress.
51    inflight_full_block_requests: Vec<FetchFullBlockFuture<Client>>,
52    /// In-flight full block _range_ requests in progress.
53    inflight_block_range_requests: Vec<FetchFullBlockRangeFuture<Client>>,
54    /// Sender for engine events.
55    event_sender: EventSender<BeaconConsensusEngineEvent<N::Primitives>>,
56    /// Buffered blocks from downloads - this is a min-heap of blocks, using the block number for
57    /// ordering. This means the blocks will be popped from the heap with ascending block numbers.
58    range_buffered_blocks: BinaryHeap<Reverse<OrderedSealedBlock<HeaderTy<N>, BodyTy<N>>>>,
59    /// Max block after which the consensus engine would terminate the sync. Used for debugging
60    /// purposes.
61    max_block: Option<BlockNumber>,
62    /// Engine sync metrics.
63    metrics: EngineSyncMetrics,
64}
65
66impl<N, Client> EngineSyncController<N, Client>
67where
68    N: ProviderNodeTypes,
69    Client: BlockClient,
70{
71    /// Create a new instance
72    pub(crate) fn new(
73        pipeline: Pipeline<N>,
74        client: Client,
75        pipeline_task_spawner: Box<dyn TaskSpawner>,
76        max_block: Option<BlockNumber>,
77        chain_spec: Arc<N::ChainSpec>,
78        event_sender: EventSender<BeaconConsensusEngineEvent<N::Primitives>>,
79    ) -> Self {
80        Self {
81            full_block_client: FullBlockClient::new(
82                client,
83                Arc::new(EthBeaconConsensus::new(chain_spec)),
84            ),
85            pipeline_task_spawner,
86            pipeline_state: PipelineState::Idle(Some(pipeline)),
87            pending_pipeline_target: None,
88            inflight_full_block_requests: Vec::new(),
89            inflight_block_range_requests: Vec::new(),
90            range_buffered_blocks: BinaryHeap::new(),
91            event_sender,
92            max_block,
93            metrics: EngineSyncMetrics::default(),
94        }
95    }
96}
97
98impl<N, Client> EngineSyncController<N, Client>
99where
100    N: ProviderNodeTypes,
101    Client: BlockClient<Header = HeaderTy<N>, Body = BodyTy<N>> + 'static,
102{
103    /// Sets the metrics for the active downloads
104    fn update_block_download_metrics(&self) {
105        self.metrics.active_block_downloads.set(self.inflight_full_block_requests.len() as f64);
106        // TODO: full block range metrics
107    }
108
109    /// Sets the max block value for testing
110    #[cfg(test)]
111    pub(crate) fn set_max_block(&mut self, block: BlockNumber) {
112        self.max_block = Some(block);
113    }
114
115    /// Cancels all download requests that are in progress and buffered blocks.
116    pub(crate) fn clear_block_download_requests(&mut self) {
117        self.inflight_full_block_requests.clear();
118        self.inflight_block_range_requests.clear();
119        self.range_buffered_blocks.clear();
120        self.update_block_download_metrics();
121    }
122
123    /// Cancels the full block request with the given hash.
124    pub(crate) fn cancel_full_block_request(&mut self, hash: B256) {
125        self.inflight_full_block_requests.retain(|req| *req.hash() != hash);
126        self.update_block_download_metrics();
127    }
128
129    /// Returns `true` if a pipeline target is queued and will be triggered on the next `poll`.
130    #[allow(dead_code)]
131    pub(crate) const fn is_pipeline_sync_pending(&self) -> bool {
132        self.pending_pipeline_target.is_some() && self.pipeline_state.is_idle()
133    }
134
135    /// Returns `true` if the pipeline is idle.
136    pub(crate) const fn is_pipeline_idle(&self) -> bool {
137        self.pipeline_state.is_idle()
138    }
139
140    /// Returns `true` if the pipeline is active.
141    pub(crate) const fn is_pipeline_active(&self) -> bool {
142        !self.is_pipeline_idle()
143    }
144
145    /// Returns true if there's already a request for the given hash.
146    pub(crate) fn is_inflight_request(&self, hash: B256) -> bool {
147        self.inflight_full_block_requests.iter().any(|req| *req.hash() == hash)
148    }
149
150    /// Starts requesting a range of blocks from the network, in reverse from the given hash.
151    ///
152    /// If the `count` is 1, this will use the `download_full_block` method instead, because it
153    /// downloads headers and bodies for the block concurrently.
154    pub(crate) fn download_block_range(&mut self, hash: B256, count: u64) {
155        if count == 1 {
156            self.download_full_block(hash);
157        } else {
158            trace!(
159                target: "consensus::engine",
160                ?hash,
161                ?count,
162                "start downloading full block range."
163            );
164
165            // notify listeners that we're downloading a block range
166            self.event_sender.notify(BeaconConsensusEngineEvent::LiveSyncProgress(
167                ConsensusEngineLiveSyncProgress::DownloadingBlocks {
168                    remaining_blocks: count,
169                    target: hash,
170                },
171            ));
172            let request = self.full_block_client.get_full_block_range(hash, count);
173            self.inflight_block_range_requests.push(request);
174        }
175
176        // // TODO: need more metrics for block ranges
177        // self.update_block_download_metrics();
178    }
179
180    /// Starts requesting a full block from the network.
181    ///
182    /// Returns `true` if the request was started, `false` if there's already a request for the
183    /// given hash.
184    pub(crate) fn download_full_block(&mut self, hash: B256) -> bool {
185        if self.is_inflight_request(hash) {
186            return false
187        }
188        trace!(
189            target: "consensus::engine::sync",
190            ?hash,
191            "Start downloading full block"
192        );
193
194        // notify listeners that we're downloading a block
195        self.event_sender.notify(BeaconConsensusEngineEvent::LiveSyncProgress(
196            ConsensusEngineLiveSyncProgress::DownloadingBlocks {
197                remaining_blocks: 1,
198                target: hash,
199            },
200        ));
201
202        let request = self.full_block_client.get_full_block(hash);
203        self.inflight_full_block_requests.push(request);
204
205        self.update_block_download_metrics();
206
207        true
208    }
209
210    /// Sets a new target to sync the pipeline to.
211    ///
212    /// But ensures the target is not the zero hash.
213    pub(crate) fn set_pipeline_sync_target(&mut self, target: PipelineTarget) {
214        if target.sync_target().is_some_and(|target| target.is_zero()) {
215            trace!(
216                target: "consensus::engine::sync",
217                "Pipeline target cannot be zero hash."
218            );
219            // precaution to never sync to the zero hash
220            return
221        }
222        self.pending_pipeline_target = Some(target);
223    }
224
225    /// Check if the engine reached max block as specified by `max_block` parameter.
226    ///
227    /// Note: this is mainly for debugging purposes.
228    pub(crate) fn has_reached_max_block(&self, progress: BlockNumber) -> bool {
229        let has_reached_max_block = self.max_block.is_some_and(|target| progress >= target);
230        if has_reached_max_block {
231            trace!(
232                target: "consensus::engine::sync",
233                ?progress,
234                max_block = ?self.max_block,
235                "Consensus engine reached max block"
236            );
237        }
238        has_reached_max_block
239    }
240
241    /// Advances the pipeline state.
242    ///
243    /// This checks for the result in the channel, or returns pending if the pipeline is idle.
244    fn poll_pipeline(&mut self, cx: &mut Context<'_>) -> Poll<EngineSyncEvent<N::Primitives>> {
245        let res = match self.pipeline_state {
246            PipelineState::Idle(_) => return Poll::Pending,
247            PipelineState::Running(ref mut fut) => {
248                ready!(fut.poll_unpin(cx))
249            }
250        };
251        let ev = match res {
252            Ok((pipeline, result)) => {
253                let minimum_block_number = pipeline.minimum_block_number();
254                let reached_max_block =
255                    self.has_reached_max_block(minimum_block_number.unwrap_or_default());
256                self.pipeline_state = PipelineState::Idle(Some(pipeline));
257                EngineSyncEvent::PipelineFinished { result, reached_max_block }
258            }
259            Err(_) => {
260                // failed to receive the pipeline
261                EngineSyncEvent::PipelineTaskDropped
262            }
263        };
264        Poll::Ready(ev)
265    }
266
267    /// This will spawn the pipeline if it is idle and a target is set or if the pipeline is set to
268    /// run continuously.
269    fn try_spawn_pipeline(&mut self) -> Option<EngineSyncEvent<N::Primitives>> {
270        match &mut self.pipeline_state {
271            PipelineState::Idle(pipeline) => {
272                let target = self.pending_pipeline_target.take()?;
273                let (tx, rx) = oneshot::channel();
274
275                let pipeline = pipeline.take().expect("exists");
276                self.pipeline_task_spawner.spawn_critical_blocking(
277                    "pipeline task",
278                    Box::pin(async move {
279                        let result = pipeline.run_as_fut(Some(target)).await;
280                        let _ = tx.send(result);
281                    }),
282                );
283                self.pipeline_state = PipelineState::Running(rx);
284
285                // we also clear any pending full block requests because we expect them to be
286                // outdated (included in the range the pipeline is syncing anyway)
287                self.clear_block_download_requests();
288
289                Some(EngineSyncEvent::PipelineStarted(Some(target)))
290            }
291            PipelineState::Running(_) => None,
292        }
293    }
294
295    /// Advances the sync process.
296    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<EngineSyncEvent<N::Primitives>> {
297        // try to spawn a pipeline if a target is set
298        if let Some(event) = self.try_spawn_pipeline() {
299            return Poll::Ready(event)
300        }
301
302        // make sure we poll the pipeline if it's active, and return any ready pipeline events
303        if !self.is_pipeline_idle() {
304            // advance the pipeline
305            if let Poll::Ready(event) = self.poll_pipeline(cx) {
306                return Poll::Ready(event)
307            }
308        }
309
310        // advance all full block requests
311        for idx in (0..self.inflight_full_block_requests.len()).rev() {
312            let mut request = self.inflight_full_block_requests.swap_remove(idx);
313            if let Poll::Ready(block) = request.poll_unpin(cx) {
314                trace!(target: "consensus::engine", block=?block.num_hash(), "Received single full block, buffering");
315                self.range_buffered_blocks.push(Reverse(OrderedSealedBlock(block)));
316            } else {
317                // still pending
318                self.inflight_full_block_requests.push(request);
319            }
320        }
321
322        // advance all full block range requests
323        for idx in (0..self.inflight_block_range_requests.len()).rev() {
324            let mut request = self.inflight_block_range_requests.swap_remove(idx);
325            if let Poll::Ready(blocks) = request.poll_unpin(cx) {
326                trace!(target: "consensus::engine", len=?blocks.len(), first=?blocks.first().map(|b| b.num_hash()), last=?blocks.last().map(|b| b.num_hash()), "Received full block range, buffering");
327                self.range_buffered_blocks
328                    .extend(blocks.into_iter().map(OrderedSealedBlock).map(Reverse));
329            } else {
330                // still pending
331                self.inflight_block_range_requests.push(request);
332            }
333        }
334
335        self.update_block_download_metrics();
336
337        // drain an element of the block buffer if there are any
338        if let Some(block) = self.range_buffered_blocks.pop() {
339            // peek ahead and pop duplicates
340            while let Some(peek) = self.range_buffered_blocks.peek_mut() {
341                if peek.0 .0.hash() == block.0 .0.hash() {
342                    PeekMut::pop(peek);
343                } else {
344                    break
345                }
346            }
347            return Poll::Ready(EngineSyncEvent::FetchedFullBlock(block.0 .0))
348        }
349
350        Poll::Pending
351    }
352}
353
354/// A wrapper type around [`SealedBlock`] that implements the [Ord] trait by block number.
355#[derive(Debug, Clone, PartialEq, Eq)]
356struct OrderedSealedBlock<H = Header, B = BlockBody>(SealedBlock<H, B>);
357
358impl<H, B> PartialOrd for OrderedSealedBlock<H, B>
359where
360    H: reth_primitives_traits::BlockHeader + 'static,
361    B: reth_primitives_traits::BlockBody + 'static,
362{
363    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
364        Some(self.cmp(other))
365    }
366}
367
368impl<H, B> Ord for OrderedSealedBlock<H, B>
369where
370    H: reth_primitives_traits::BlockHeader + 'static,
371    B: reth_primitives_traits::BlockBody + 'static,
372{
373    fn cmp(&self, other: &Self) -> Ordering {
374        self.0.number().cmp(&other.0.number())
375    }
376}
377
378/// The event type emitted by the [`EngineSyncController`].
379#[derive(Debug)]
380pub(crate) enum EngineSyncEvent<N: NodePrimitives = EthPrimitives> {
381    /// A full block has been downloaded from the network.
382    FetchedFullBlock(SealedBlock<N::BlockHeader, N::BlockBody>),
383    /// Pipeline started syncing
384    ///
385    /// This is none if the pipeline is triggered without a specific target.
386    PipelineStarted(Option<PipelineTarget>),
387    /// Pipeline finished
388    ///
389    /// If this is returned, the pipeline is idle.
390    PipelineFinished {
391        /// Final result of the pipeline run.
392        result: Result<ControlFlow, PipelineError>,
393        /// Whether the pipeline reached the configured `max_block`.
394        ///
395        /// Note: this is only relevant in debugging scenarios.
396        reached_max_block: bool,
397    },
398    /// Pipeline task was dropped after it was started, unable to receive it because channel
399    /// closed. This would indicate a panicked pipeline task
400    PipelineTaskDropped,
401}
402
403/// The possible pipeline states within the sync controller.
404///
405/// [`PipelineState::Idle`] means that the pipeline is currently idle.
406/// [`PipelineState::Running`] means that the pipeline is currently running.
407///
408/// NOTE: The differentiation between these two states is important, because when the pipeline is
409/// running, it acquires the write lock over the database. This means that we cannot forward to the
410/// blockchain tree any messages that would result in database writes, since it would result in a
411/// deadlock.
412enum PipelineState<N: ProviderNodeTypes> {
413    /// Pipeline is idle.
414    Idle(Option<Pipeline<N>>),
415    /// Pipeline is running and waiting for a response
416    Running(oneshot::Receiver<PipelineWithResult<N>>),
417}
418
419impl<N: ProviderNodeTypes> PipelineState<N> {
420    /// Returns `true` if the state matches idle.
421    const fn is_idle(&self) -> bool {
422        matches!(self, Self::Idle(_))
423    }
424}
425
426#[cfg(test)]
427mod tests {
428    use super::*;
429    use alloy_consensus::Header;
430    use alloy_eips::eip1559::ETHEREUM_BLOCK_GAS_LIMIT;
431    use assert_matches::assert_matches;
432    use futures::poll;
433    use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET};
434    use reth_network_p2p::{either::Either, test_utils::TestFullBlockClient, EthBlockClient};
435    use reth_primitives::{BlockBody, SealedHeader};
436    use reth_provider::{
437        test_utils::{create_test_provider_factory_with_chain_spec, MockNodeTypesWithDB},
438        ExecutionOutcome,
439    };
440    use reth_prune_types::PruneModes;
441    use reth_stages::{test_utils::TestStages, ExecOutput, StageError};
442    use reth_stages_api::StageCheckpoint;
443    use reth_static_file::StaticFileProducer;
444    use reth_tasks::TokioTaskExecutor;
445    use std::{collections::VecDeque, future::poll_fn, ops::Range};
446    use tokio::sync::watch;
447
448    struct TestPipelineBuilder {
449        pipeline_exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
450        executor_results: Vec<ExecutionOutcome>,
451        max_block: Option<BlockNumber>,
452    }
453
454    impl TestPipelineBuilder {
455        /// Create a new [`TestPipelineBuilder`].
456        const fn new() -> Self {
457            Self {
458                pipeline_exec_outputs: VecDeque::new(),
459                executor_results: Vec::new(),
460                max_block: None,
461            }
462        }
463
464        /// Set the pipeline execution outputs to use for the test consensus engine.
465        fn with_pipeline_exec_outputs(
466            mut self,
467            pipeline_exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
468        ) -> Self {
469            self.pipeline_exec_outputs = pipeline_exec_outputs;
470            self
471        }
472
473        /// Set the executor results to use for the test consensus engine.
474        #[allow(dead_code)]
475        fn with_executor_results(mut self, executor_results: Vec<ExecutionOutcome>) -> Self {
476            self.executor_results = executor_results;
477            self
478        }
479
480        /// Sets the max block for the pipeline to run.
481        #[allow(dead_code)]
482        const fn with_max_block(mut self, max_block: BlockNumber) -> Self {
483            self.max_block = Some(max_block);
484            self
485        }
486
487        /// Builds the pipeline.
488        fn build(self, chain_spec: Arc<ChainSpec>) -> Pipeline<MockNodeTypesWithDB> {
489            reth_tracing::init_test_tracing();
490
491            // Setup pipeline
492            let (tip_tx, _tip_rx) = watch::channel(B256::default());
493            let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
494                .add_stages(TestStages::new(self.pipeline_exec_outputs, Default::default()))
495                .with_tip_sender(tip_tx);
496
497            if let Some(max_block) = self.max_block {
498                pipeline = pipeline.with_max_block(max_block);
499            }
500
501            let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec);
502
503            let static_file_producer =
504                StaticFileProducer::new(provider_factory.clone(), PruneModes::default());
505
506            pipeline.build(provider_factory, static_file_producer)
507        }
508    }
509
510    struct TestSyncControllerBuilder<Client> {
511        max_block: Option<BlockNumber>,
512        client: Option<Client>,
513    }
514
515    impl<Client> TestSyncControllerBuilder<Client> {
516        /// Create a new [`TestSyncControllerBuilder`].
517        const fn new() -> Self {
518            Self { max_block: None, client: None }
519        }
520
521        /// Sets the max block for the pipeline to run.
522        #[allow(dead_code)]
523        const fn with_max_block(mut self, max_block: BlockNumber) -> Self {
524            self.max_block = Some(max_block);
525            self
526        }
527
528        /// Sets the client to use for network operations.
529        fn with_client(mut self, client: Client) -> Self {
530            self.client = Some(client);
531            self
532        }
533
534        /// Builds the sync controller.
535        fn build<N>(
536            self,
537            pipeline: Pipeline<N>,
538            chain_spec: Arc<N::ChainSpec>,
539        ) -> EngineSyncController<N, Either<Client, TestFullBlockClient>>
540        where
541            N: ProviderNodeTypes,
542            Client: EthBlockClient + 'static,
543        {
544            let client = self
545                .client
546                .map(Either::Left)
547                .unwrap_or_else(|| Either::Right(TestFullBlockClient::default()));
548
549            EngineSyncController::new(
550                pipeline,
551                client,
552                Box::<TokioTaskExecutor>::default(),
553                self.max_block,
554                chain_spec,
555                Default::default(),
556            )
557        }
558    }
559
560    #[tokio::test]
561    async fn pipeline_started_after_setting_target() {
562        let chain_spec = Arc::new(
563            ChainSpecBuilder::default()
564                .chain(MAINNET.chain)
565                .genesis(MAINNET.genesis.clone())
566                .paris_activated()
567                .build(),
568        );
569
570        let client = TestFullBlockClient::default();
571        insert_headers_into_client(&client, SealedHeader::default(), 0..10);
572        // force the pipeline to be "done" after 5 blocks
573        let pipeline = TestPipelineBuilder::new()
574            .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
575                checkpoint: StageCheckpoint::new(5),
576                done: true,
577            })]))
578            .build(chain_spec.clone());
579
580        let mut sync_controller = TestSyncControllerBuilder::new()
581            .with_client(client.clone())
582            .build(pipeline, chain_spec);
583
584        let tip = client.highest_block().expect("there should be blocks here");
585        sync_controller.set_pipeline_sync_target(tip.hash().into());
586
587        let sync_future = poll_fn(|cx| sync_controller.poll(cx));
588        let next_event = poll!(sync_future);
589
590        // can assert that the first event here is PipelineStarted because we set the sync target,
591        // and we should get Ready because the pipeline should be spawned immediately
592        assert_matches!(next_event, Poll::Ready(EngineSyncEvent::PipelineStarted(Some(target))) => {
593            assert_eq!(target.sync_target().unwrap(), tip.hash());
594        });
595
596        // the next event should be the pipeline finishing in a good state
597        let sync_future = poll_fn(|cx| sync_controller.poll(cx));
598        let next_ready = sync_future.await;
599        assert_matches!(next_ready, EngineSyncEvent::PipelineFinished { result, reached_max_block } => {
600            assert_matches!(result, Ok(control_flow) => assert_eq!(control_flow, ControlFlow::Continue { block_number: 5 }));
601            // no max block configured
602            assert!(!reached_max_block);
603        });
604    }
605
606    fn insert_headers_into_client(
607        client: &TestFullBlockClient,
608        genesis_header: SealedHeader,
609        range: Range<usize>,
610    ) {
611        let mut sealed_header = genesis_header;
612        let body = BlockBody::default();
613        for _ in range {
614            let (mut header, hash) = sealed_header.split();
615            // update to the next header
616            header.parent_hash = hash;
617            header.number += 1;
618            header.timestamp += 1;
619            sealed_header = SealedHeader::seal(header);
620            client.insert(sealed_header.clone(), body.clone());
621        }
622    }
623
624    #[tokio::test]
625    async fn controller_sends_range_request() {
626        let chain_spec = Arc::new(
627            ChainSpecBuilder::default()
628                .chain(MAINNET.chain)
629                .genesis(MAINNET.genesis.clone())
630                .paris_activated()
631                .build(),
632        );
633
634        let client = TestFullBlockClient::default();
635        let header = Header {
636            base_fee_per_gas: Some(7),
637            gas_limit: ETHEREUM_BLOCK_GAS_LIMIT,
638            ..Default::default()
639        };
640        let header = SealedHeader::seal(header);
641        insert_headers_into_client(&client, header, 0..10);
642
643        // set up a pipeline
644        let pipeline = TestPipelineBuilder::new().build(chain_spec.clone());
645
646        let mut sync_controller = TestSyncControllerBuilder::new()
647            .with_client(client.clone())
648            .build(pipeline, chain_spec);
649
650        let tip = client.highest_block().expect("there should be blocks here");
651
652        // call the download range method
653        sync_controller.download_block_range(tip.hash(), tip.number);
654
655        // ensure we have one in flight range request
656        assert_eq!(sync_controller.inflight_block_range_requests.len(), 1);
657
658        // ensure the range request is made correctly
659        let first_req = sync_controller.inflight_block_range_requests.first().unwrap();
660        assert_eq!(first_req.start_hash(), tip.hash());
661        assert_eq!(first_req.count(), tip.number);
662
663        // ensure they are in ascending order
664        for num in 1..=10 {
665            let sync_future = poll_fn(|cx| sync_controller.poll(cx));
666            let next_ready = sync_future.await;
667            assert_matches!(next_ready, EngineSyncEvent::FetchedFullBlock(block) => {
668                assert_eq!(block.number, num);
669            });
670        }
671    }
672}