reth_engine_tree/
download.rs

1//! Handler that can download blocks on demand (e.g. from the network).
2
3use crate::{engine::DownloadRequest, metrics::BlockDownloaderMetrics};
4use alloy_consensus::BlockHeader;
5use alloy_primitives::B256;
6use futures::FutureExt;
7use reth_consensus::{Consensus, ConsensusError};
8use reth_network_p2p::{
9    full_block::{FetchFullBlockFuture, FetchFullBlockRangeFuture, FullBlockClient},
10    BlockClient,
11};
12use reth_primitives_traits::{Block, RecoveredBlock, SealedBlock};
13use std::{
14    cmp::{Ordering, Reverse},
15    collections::{binary_heap::PeekMut, BinaryHeap, HashSet, VecDeque},
16    fmt::Debug,
17    sync::Arc,
18    task::{Context, Poll},
19};
20use tracing::trace;
21
22/// A trait that can download blocks on demand.
23pub trait BlockDownloader: Send + Sync {
24    /// Type of the block being downloaded.
25    type Block: Block;
26
27    /// Handle an action.
28    fn on_action(&mut self, action: DownloadAction);
29
30    /// Advance in progress requests if any
31    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<DownloadOutcome<Self::Block>>;
32}
33
34/// Actions that can be performed by the block downloader.
35#[derive(Debug)]
36pub enum DownloadAction {
37    /// Stop downloading blocks.
38    Clear,
39    /// Download given blocks
40    Download(DownloadRequest),
41}
42
43/// Outcome of downloaded blocks.
44#[derive(Debug)]
45pub enum DownloadOutcome<B: Block> {
46    /// Downloaded blocks.
47    Blocks(Vec<RecoveredBlock<B>>),
48    /// New download started.
49    NewDownloadStarted {
50        /// How many blocks are pending in this download.
51        remaining_blocks: u64,
52        /// The hash of the highest block of this download.
53        target: B256,
54    },
55}
56
57/// Basic [`BlockDownloader`].
58#[expect(missing_debug_implementations)]
59pub struct BasicBlockDownloader<Client, B: Block>
60where
61    Client: BlockClient + 'static,
62{
63    /// A downloader that can download full blocks from the network.
64    full_block_client: FullBlockClient<Client>,
65    /// In-flight full block requests in progress.
66    inflight_full_block_requests: Vec<FetchFullBlockFuture<Client>>,
67    /// In-flight full block _range_ requests in progress.
68    inflight_block_range_requests: Vec<FetchFullBlockRangeFuture<Client>>,
69    /// Buffered blocks from downloads - this is a min-heap of blocks, using the block number for
70    /// ordering. This means the blocks will be popped from the heap with ascending block numbers.
71    set_buffered_blocks: BinaryHeap<Reverse<OrderedRecoveredBlock<B>>>,
72    /// Engine download metrics.
73    metrics: BlockDownloaderMetrics,
74    /// Pending events to be emitted.
75    pending_events: VecDeque<DownloadOutcome<B>>,
76}
77
78impl<Client, B> BasicBlockDownloader<Client, B>
79where
80    Client: BlockClient<Block = B> + 'static,
81    B: Block,
82{
83    /// Create a new instance
84    pub fn new(client: Client, consensus: Arc<dyn Consensus<B, Error = ConsensusError>>) -> Self {
85        Self {
86            full_block_client: FullBlockClient::new(client, consensus),
87            inflight_full_block_requests: Vec::new(),
88            inflight_block_range_requests: Vec::new(),
89            set_buffered_blocks: BinaryHeap::new(),
90            metrics: BlockDownloaderMetrics::default(),
91            pending_events: Default::default(),
92        }
93    }
94
95    /// Clears the stored inflight requests.
96    fn clear(&mut self) {
97        self.inflight_full_block_requests.clear();
98        self.inflight_block_range_requests.clear();
99        self.set_buffered_blocks.clear();
100        self.update_block_download_metrics();
101    }
102
103    /// Processes a download request.
104    fn download(&mut self, request: DownloadRequest) {
105        match request {
106            DownloadRequest::BlockSet(hashes) => self.download_block_set(hashes),
107            DownloadRequest::BlockRange(hash, count) => self.download_block_range(hash, count),
108        }
109    }
110
111    /// Processes a block set download request.
112    fn download_block_set(&mut self, hashes: HashSet<B256>) {
113        for hash in hashes {
114            self.download_full_block(hash);
115        }
116    }
117
118    /// Processes a block range download request.
119    fn download_block_range(&mut self, hash: B256, count: u64) {
120        if count == 1 {
121            self.download_full_block(hash);
122        } else {
123            trace!(
124                target: "consensus::engine",
125                ?hash,
126                ?count,
127                "start downloading full block range."
128            );
129
130            let request = self.full_block_client.get_full_block_range(hash, count);
131            self.push_pending_event(DownloadOutcome::NewDownloadStarted {
132                remaining_blocks: request.count(),
133                target: request.start_hash(),
134            });
135            self.inflight_block_range_requests.push(request);
136
137            self.update_block_download_metrics();
138        }
139    }
140
141    /// Starts requesting a full block from the network.
142    ///
143    /// Returns `true` if the request was started, `false` if there's already a request for the
144    /// given hash.
145    fn download_full_block(&mut self, hash: B256) -> bool {
146        if self.is_inflight_request(hash) {
147            return false
148        }
149        self.push_pending_event(DownloadOutcome::NewDownloadStarted {
150            remaining_blocks: 1,
151            target: hash,
152        });
153
154        trace!(
155            target: "consensus::engine::sync",
156            ?hash,
157            "Start downloading full block"
158        );
159
160        let request = self.full_block_client.get_full_block(hash);
161        self.inflight_full_block_requests.push(request);
162
163        self.update_block_download_metrics();
164
165        true
166    }
167
168    /// Returns true if there's already a request for the given hash.
169    fn is_inflight_request(&self, hash: B256) -> bool {
170        self.inflight_full_block_requests.iter().any(|req| *req.hash() == hash)
171    }
172
173    /// Sets the metrics for the active downloads
174    fn update_block_download_metrics(&self) {
175        let blocks = self.inflight_full_block_requests.len() +
176            self.inflight_block_range_requests.iter().map(|r| r.count() as usize).sum::<usize>();
177        self.metrics.active_block_downloads.set(blocks as f64);
178    }
179
180    /// Adds a pending event to the FIFO queue.
181    fn push_pending_event(&mut self, pending_event: DownloadOutcome<B>) {
182        self.pending_events.push_back(pending_event);
183    }
184
185    /// Removes a pending event from the FIFO queue.
186    fn pop_pending_event(&mut self) -> Option<DownloadOutcome<B>> {
187        self.pending_events.pop_front()
188    }
189}
190
191impl<Client, B> BlockDownloader for BasicBlockDownloader<Client, B>
192where
193    Client: BlockClient<Block = B>,
194    B: Block,
195{
196    type Block = B;
197
198    /// Handles incoming download actions.
199    fn on_action(&mut self, action: DownloadAction) {
200        match action {
201            DownloadAction::Clear => self.clear(),
202            DownloadAction::Download(request) => self.download(request),
203        }
204    }
205
206    /// Advances the download process.
207    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<DownloadOutcome<B>> {
208        if let Some(pending_event) = self.pop_pending_event() {
209            return Poll::Ready(pending_event);
210        }
211
212        // advance all full block requests
213        for idx in (0..self.inflight_full_block_requests.len()).rev() {
214            let mut request = self.inflight_full_block_requests.swap_remove(idx);
215            if let Poll::Ready(block) = request.poll_unpin(cx) {
216                trace!(target: "consensus::engine", block=?block.num_hash(), "Received single full block, buffering");
217                self.set_buffered_blocks.push(Reverse(block.into()));
218            } else {
219                // still pending
220                self.inflight_full_block_requests.push(request);
221            }
222        }
223
224        // advance all full block range requests
225        for idx in (0..self.inflight_block_range_requests.len()).rev() {
226            let mut request = self.inflight_block_range_requests.swap_remove(idx);
227            if let Poll::Ready(blocks) = request.poll_unpin(cx) {
228                trace!(target: "consensus::engine", len=?blocks.len(), first=?blocks.first().map(|b| b.num_hash()), last=?blocks.last().map(|b| b.num_hash()), "Received full block range, buffering");
229                self.set_buffered_blocks.extend(
230                    blocks
231                        .into_iter()
232                        .map(|b| {
233                            let senders = b.senders().unwrap_or_default();
234                            OrderedRecoveredBlock(RecoveredBlock::new_sealed(b, senders))
235                        })
236                        .map(Reverse),
237                );
238            } else {
239                // still pending
240                self.inflight_block_range_requests.push(request);
241            }
242        }
243
244        self.update_block_download_metrics();
245
246        if self.set_buffered_blocks.is_empty() {
247            return Poll::Pending;
248        }
249
250        // drain all unique element of the block buffer if there are any
251        let mut downloaded_blocks: Vec<RecoveredBlock<B>> =
252            Vec::with_capacity(self.set_buffered_blocks.len());
253        while let Some(block) = self.set_buffered_blocks.pop() {
254            // peek ahead and pop duplicates
255            while let Some(peek) = self.set_buffered_blocks.peek_mut() {
256                if peek.0 .0.hash() == block.0 .0.hash() {
257                    PeekMut::pop(peek);
258                } else {
259                    break
260                }
261            }
262            downloaded_blocks.push(block.0.into());
263        }
264        Poll::Ready(DownloadOutcome::Blocks(downloaded_blocks))
265    }
266}
267
268/// A wrapper type around [`RecoveredBlock`] that implements the [Ord]
269/// trait by block number.
270#[derive(Debug, Clone, PartialEq, Eq)]
271struct OrderedRecoveredBlock<B: Block>(RecoveredBlock<B>);
272
273impl<B: Block> PartialOrd for OrderedRecoveredBlock<B> {
274    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
275        Some(self.cmp(other))
276    }
277}
278
279impl<B: Block> Ord for OrderedRecoveredBlock<B> {
280    fn cmp(&self, other: &Self) -> Ordering {
281        self.0.number().cmp(&other.0.number())
282    }
283}
284
285impl<B: Block> From<SealedBlock<B>> for OrderedRecoveredBlock<B> {
286    fn from(block: SealedBlock<B>) -> Self {
287        let senders = block.senders().unwrap_or_default();
288        Self(RecoveredBlock::new_sealed(block, senders))
289    }
290}
291
292impl<B: Block> From<OrderedRecoveredBlock<B>> for RecoveredBlock<B> {
293    fn from(value: OrderedRecoveredBlock<B>) -> Self {
294        value.0
295    }
296}
297
298/// A [`BlockDownloader`] that does nothing.
299#[derive(Debug, Clone, Default)]
300#[non_exhaustive]
301pub struct NoopBlockDownloader<B>(core::marker::PhantomData<B>);
302
303impl<B: Block> BlockDownloader for NoopBlockDownloader<B> {
304    type Block = B;
305
306    fn on_action(&mut self, _event: DownloadAction) {}
307
308    fn poll(&mut self, _cx: &mut Context<'_>) -> Poll<DownloadOutcome<B>> {
309        Poll::Pending
310    }
311}
312
313#[cfg(test)]
314mod tests {
315    use super::*;
316    use crate::test_utils::insert_headers_into_client;
317    use alloy_consensus::Header;
318    use alloy_eips::eip1559::ETHEREUM_BLOCK_GAS_LIMIT_30M;
319    use assert_matches::assert_matches;
320    use reth_chainspec::{ChainSpecBuilder, MAINNET};
321    use reth_ethereum_consensus::EthBeaconConsensus;
322    use reth_network_p2p::test_utils::TestFullBlockClient;
323    use reth_primitives_traits::SealedHeader;
324    use std::{future::poll_fn, sync::Arc};
325
326    struct TestHarness {
327        block_downloader:
328            BasicBlockDownloader<TestFullBlockClient, reth_ethereum_primitives::Block>,
329        client: TestFullBlockClient,
330    }
331
332    impl TestHarness {
333        fn new(total_blocks: usize) -> Self {
334            let chain_spec = Arc::new(
335                ChainSpecBuilder::default()
336                    .chain(MAINNET.chain)
337                    .genesis(MAINNET.genesis.clone())
338                    .paris_activated()
339                    .build(),
340            );
341
342            let client = TestFullBlockClient::default();
343            let header = Header {
344                base_fee_per_gas: Some(7),
345                gas_limit: ETHEREUM_BLOCK_GAS_LIMIT_30M,
346                ..Default::default()
347            };
348            let header = SealedHeader::seal_slow(header);
349
350            insert_headers_into_client(&client, header, 0..total_blocks);
351            let consensus = Arc::new(EthBeaconConsensus::new(chain_spec));
352
353            let block_downloader = BasicBlockDownloader::new(client.clone(), consensus);
354            Self { block_downloader, client }
355        }
356    }
357
358    #[tokio::test]
359    async fn block_downloader_range_request() {
360        const TOTAL_BLOCKS: usize = 10;
361        let TestHarness { mut block_downloader, client } = TestHarness::new(TOTAL_BLOCKS);
362        let tip = client.highest_block().expect("there should be blocks here");
363
364        // send block range download request
365        block_downloader.on_action(DownloadAction::Download(DownloadRequest::BlockRange(
366            tip.hash(),
367            tip.number,
368        )));
369
370        // ensure we have one in flight range request
371        assert_eq!(block_downloader.inflight_block_range_requests.len(), 1);
372
373        // ensure the range request is made correctly
374        let first_req = block_downloader.inflight_block_range_requests.first().unwrap();
375        assert_eq!(first_req.start_hash(), tip.hash());
376        assert_eq!(first_req.count(), tip.number);
377
378        // poll downloader
379        let sync_future = poll_fn(|cx| block_downloader.poll(cx));
380        let next_ready = sync_future.await;
381
382        assert_matches!(next_ready, DownloadOutcome::NewDownloadStarted { remaining_blocks, .. } => {
383            assert_eq!(remaining_blocks, TOTAL_BLOCKS as u64);
384        });
385
386        let sync_future = poll_fn(|cx| block_downloader.poll(cx));
387        let next_ready = sync_future.await;
388
389        assert_matches!(next_ready, DownloadOutcome::Blocks(blocks) => {
390            // ensure all blocks were obtained
391            assert_eq!(blocks.len(), TOTAL_BLOCKS);
392
393            // ensure they are in ascending order
394            for num in 1..=TOTAL_BLOCKS {
395                assert_eq!(blocks[num-1].number(), num as u64);
396            }
397        });
398    }
399
400    #[tokio::test]
401    async fn block_downloader_set_request() {
402        const TOTAL_BLOCKS: usize = 2;
403        let TestHarness { mut block_downloader, client } = TestHarness::new(TOTAL_BLOCKS);
404
405        let tip = client.highest_block().expect("there should be blocks here");
406
407        // send block set download request
408        block_downloader.on_action(DownloadAction::Download(DownloadRequest::BlockSet(
409            HashSet::from([tip.hash(), tip.parent_hash]),
410        )));
411
412        // ensure we have TOTAL_BLOCKS in flight full block request
413        assert_eq!(block_downloader.inflight_full_block_requests.len(), TOTAL_BLOCKS);
414
415        // poll downloader
416        for _ in 0..TOTAL_BLOCKS {
417            let sync_future = poll_fn(|cx| block_downloader.poll(cx));
418            let next_ready = sync_future.await;
419
420            assert_matches!(next_ready, DownloadOutcome::NewDownloadStarted { remaining_blocks, .. } => {
421                assert_eq!(remaining_blocks, 1);
422            });
423        }
424
425        let sync_future = poll_fn(|cx| block_downloader.poll(cx));
426        let next_ready = sync_future.await;
427        assert_matches!(next_ready, DownloadOutcome::Blocks(blocks) => {
428            // ensure all blocks were obtained
429            assert_eq!(blocks.len(), TOTAL_BLOCKS);
430
431            // ensure they are in ascending order
432            for num in 1..=TOTAL_BLOCKS {
433                assert_eq!(blocks[num-1].number(), num as u64);
434            }
435        });
436    }
437
438    #[tokio::test]
439    async fn block_downloader_clear_request() {
440        const TOTAL_BLOCKS: usize = 10;
441        let TestHarness { mut block_downloader, client } = TestHarness::new(TOTAL_BLOCKS);
442
443        let tip = client.highest_block().expect("there should be blocks here");
444
445        // send block range download request
446        block_downloader.on_action(DownloadAction::Download(DownloadRequest::BlockRange(
447            tip.hash(),
448            tip.number,
449        )));
450
451        // send block set download request
452        let download_set = HashSet::from([tip.hash(), tip.parent_hash]);
453        block_downloader
454            .on_action(DownloadAction::Download(DownloadRequest::BlockSet(download_set.clone())));
455
456        // ensure we have one in flight range request
457        assert_eq!(block_downloader.inflight_block_range_requests.len(), 1);
458
459        // ensure the range request is made correctly
460        let first_req = block_downloader.inflight_block_range_requests.first().unwrap();
461        assert_eq!(first_req.start_hash(), tip.hash());
462        assert_eq!(first_req.count(), tip.number);
463
464        // ensure we have download_set.len() in flight full block request
465        assert_eq!(block_downloader.inflight_full_block_requests.len(), download_set.len());
466
467        // send clear request
468        block_downloader.on_action(DownloadAction::Clear);
469
470        // ensure we have no in flight range request
471        assert_eq!(block_downloader.inflight_block_range_requests.len(), 0);
472
473        // ensure we have no in flight full block request
474        assert_eq!(block_downloader.inflight_full_block_requests.len(), 0);
475    }
476}