reth_engine_tree/
download.rs

1//! Handler that can download blocks on demand (e.g. from the network).
2
3use crate::{engine::DownloadRequest, metrics::BlockDownloaderMetrics};
4use alloy_consensus::BlockHeader;
5use alloy_primitives::B256;
6use futures::FutureExt;
7use reth_consensus::Consensus;
8use reth_network_p2p::{
9    full_block::{FetchFullBlockFuture, FetchFullBlockRangeFuture, FullBlockClient},
10    BlockClient,
11};
12use reth_primitives::{SealedBlockFor, SealedBlockWithSenders};
13use reth_primitives_traits::Block;
14use std::{
15    cmp::{Ordering, Reverse},
16    collections::{binary_heap::PeekMut, BinaryHeap, HashSet, VecDeque},
17    fmt::Debug,
18    sync::Arc,
19    task::{Context, Poll},
20};
21use tracing::trace;
22
23/// A trait that can download blocks on demand.
24pub trait BlockDownloader: Send + Sync {
25    /// Type of the block being downloaded.
26    type Block: Block;
27
28    /// Handle an action.
29    fn on_action(&mut self, action: DownloadAction);
30
31    /// Advance in progress requests if any
32    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<DownloadOutcome<Self::Block>>;
33}
34
35/// Actions that can be performed by the block downloader.
36#[derive(Debug)]
37pub enum DownloadAction {
38    /// Stop downloading blocks.
39    Clear,
40    /// Download given blocks
41    Download(DownloadRequest),
42}
43
44/// Outcome of downloaded blocks.
45#[derive(Debug)]
46pub enum DownloadOutcome<B: Block> {
47    /// Downloaded blocks.
48    Blocks(Vec<SealedBlockWithSenders<B>>),
49    /// New download started.
50    NewDownloadStarted {
51        /// How many blocks are pending in this download.
52        remaining_blocks: u64,
53        /// The hash of the highest block of this download.
54        target: B256,
55    },
56}
57
58/// Basic [`BlockDownloader`].
59#[allow(missing_debug_implementations)]
60pub struct BasicBlockDownloader<Client, B: Block>
61where
62    Client: BlockClient + 'static,
63{
64    /// A downloader that can download full blocks from the network.
65    full_block_client: FullBlockClient<Client>,
66    /// In-flight full block requests in progress.
67    inflight_full_block_requests: Vec<FetchFullBlockFuture<Client>>,
68    /// In-flight full block _range_ requests in progress.
69    inflight_block_range_requests: Vec<FetchFullBlockRangeFuture<Client>>,
70    /// Buffered blocks from downloads - this is a min-heap of blocks, using the block number for
71    /// ordering. This means the blocks will be popped from the heap with ascending block numbers.
72    set_buffered_blocks: BinaryHeap<Reverse<OrderedSealedBlockWithSenders<B>>>,
73    /// Engine download metrics.
74    metrics: BlockDownloaderMetrics,
75    /// Pending events to be emitted.
76    pending_events: VecDeque<DownloadOutcome<B>>,
77}
78
79impl<Client, B> BasicBlockDownloader<Client, B>
80where
81    Client: BlockClient<Header = B::Header, Body = B::Body> + 'static,
82    B: Block,
83{
84    /// Create a new instance
85    pub fn new(
86        client: Client,
87        consensus: Arc<dyn Consensus<Client::Header, Client::Body>>,
88    ) -> Self {
89        Self {
90            full_block_client: FullBlockClient::new(client, consensus),
91            inflight_full_block_requests: Vec::new(),
92            inflight_block_range_requests: Vec::new(),
93            set_buffered_blocks: BinaryHeap::new(),
94            metrics: BlockDownloaderMetrics::default(),
95            pending_events: Default::default(),
96        }
97    }
98
99    /// Clears the stored inflight requests.
100    fn clear(&mut self) {
101        self.inflight_full_block_requests.clear();
102        self.inflight_block_range_requests.clear();
103        self.set_buffered_blocks.clear();
104        self.update_block_download_metrics();
105    }
106
107    /// Processes a download request.
108    fn download(&mut self, request: DownloadRequest) {
109        match request {
110            DownloadRequest::BlockSet(hashes) => self.download_block_set(hashes),
111            DownloadRequest::BlockRange(hash, count) => self.download_block_range(hash, count),
112        }
113    }
114
115    /// Processes a block set download request.
116    fn download_block_set(&mut self, hashes: HashSet<B256>) {
117        for hash in hashes {
118            self.download_full_block(hash);
119        }
120    }
121
122    /// Processes a block range download request.
123    fn download_block_range(&mut self, hash: B256, count: u64) {
124        if count == 1 {
125            self.download_full_block(hash);
126        } else {
127            trace!(
128                target: "consensus::engine",
129                ?hash,
130                ?count,
131                "start downloading full block range."
132            );
133
134            let request = self.full_block_client.get_full_block_range(hash, count);
135            self.push_pending_event(DownloadOutcome::NewDownloadStarted {
136                remaining_blocks: request.count(),
137                target: request.start_hash(),
138            });
139            self.inflight_block_range_requests.push(request);
140        }
141    }
142
143    /// Starts requesting a full block from the network.
144    ///
145    /// Returns `true` if the request was started, `false` if there's already a request for the
146    /// given hash.
147    fn download_full_block(&mut self, hash: B256) -> bool {
148        if self.is_inflight_request(hash) {
149            return false
150        }
151        self.push_pending_event(DownloadOutcome::NewDownloadStarted {
152            remaining_blocks: 1,
153            target: hash,
154        });
155
156        trace!(
157            target: "consensus::engine::sync",
158            ?hash,
159            "Start downloading full block"
160        );
161
162        let request = self.full_block_client.get_full_block(hash);
163        self.inflight_full_block_requests.push(request);
164
165        self.update_block_download_metrics();
166
167        true
168    }
169
170    /// Returns true if there's already a request for the given hash.
171    fn is_inflight_request(&self, hash: B256) -> bool {
172        self.inflight_full_block_requests.iter().any(|req| *req.hash() == hash)
173    }
174
175    /// Sets the metrics for the active downloads
176    fn update_block_download_metrics(&self) {
177        let blocks = self.inflight_full_block_requests.len() +
178            self.inflight_block_range_requests.iter().map(|r| r.count() as usize).sum::<usize>();
179        self.metrics.active_block_downloads.set(blocks as f64);
180    }
181
182    /// Adds a pending event to the FIFO queue.
183    fn push_pending_event(&mut self, pending_event: DownloadOutcome<B>) {
184        self.pending_events.push_back(pending_event);
185    }
186
187    /// Removes a pending event from the FIFO queue.
188    fn pop_pending_event(&mut self) -> Option<DownloadOutcome<B>> {
189        self.pending_events.pop_front()
190    }
191}
192
193impl<Client, B> BlockDownloader for BasicBlockDownloader<Client, B>
194where
195    Client: BlockClient<Header = B::Header, Body = B::Body>,
196    B: Block,
197{
198    type Block = B;
199
200    /// Handles incoming download actions.
201    fn on_action(&mut self, action: DownloadAction) {
202        match action {
203            DownloadAction::Clear => self.clear(),
204            DownloadAction::Download(request) => self.download(request),
205        }
206    }
207
208    /// Advances the download process.
209    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<DownloadOutcome<B>> {
210        if let Some(pending_event) = self.pop_pending_event() {
211            return Poll::Ready(pending_event);
212        }
213
214        // advance all full block requests
215        for idx in (0..self.inflight_full_block_requests.len()).rev() {
216            let mut request = self.inflight_full_block_requests.swap_remove(idx);
217            if let Poll::Ready(block) = request.poll_unpin(cx) {
218                trace!(target: "consensus::engine", block=?block.num_hash(), "Received single full block, buffering");
219                self.set_buffered_blocks.push(Reverse(block.into()));
220            } else {
221                // still pending
222                self.inflight_full_block_requests.push(request);
223            }
224        }
225
226        // advance all full block range requests
227        for idx in (0..self.inflight_block_range_requests.len()).rev() {
228            let mut request = self.inflight_block_range_requests.swap_remove(idx);
229            if let Poll::Ready(blocks) = request.poll_unpin(cx) {
230                trace!(target: "consensus::engine", len=?blocks.len(), first=?blocks.first().map(|b| b.num_hash()), last=?blocks.last().map(|b| b.num_hash()), "Received full block range, buffering");
231                self.set_buffered_blocks.extend(
232                    blocks
233                        .into_iter()
234                        .map(|b| {
235                            let senders = b.senders().unwrap_or_default();
236                            OrderedSealedBlockWithSenders(SealedBlockWithSenders {
237                                block: b,
238                                senders,
239                            })
240                        })
241                        .map(Reverse),
242                );
243            } else {
244                // still pending
245                self.inflight_block_range_requests.push(request);
246            }
247        }
248
249        self.update_block_download_metrics();
250
251        if self.set_buffered_blocks.is_empty() {
252            return Poll::Pending;
253        }
254
255        // drain all unique element of the block buffer if there are any
256        let mut downloaded_blocks: Vec<SealedBlockWithSenders<B>> =
257            Vec::with_capacity(self.set_buffered_blocks.len());
258        while let Some(block) = self.set_buffered_blocks.pop() {
259            // peek ahead and pop duplicates
260            while let Some(peek) = self.set_buffered_blocks.peek_mut() {
261                if peek.0 .0.hash() == block.0 .0.hash() {
262                    PeekMut::pop(peek);
263                } else {
264                    break
265                }
266            }
267            downloaded_blocks.push(block.0.into());
268        }
269        Poll::Ready(DownloadOutcome::Blocks(downloaded_blocks))
270    }
271}
272
273/// A wrapper type around [`SealedBlockWithSenders`] that implements the [Ord]
274/// trait by block number.
275#[derive(Debug, Clone, PartialEq, Eq)]
276struct OrderedSealedBlockWithSenders<B: Block>(SealedBlockWithSenders<B>);
277
278impl<B: Block> PartialOrd for OrderedSealedBlockWithSenders<B> {
279    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
280        Some(self.cmp(other))
281    }
282}
283
284impl<B: Block> Ord for OrderedSealedBlockWithSenders<B> {
285    fn cmp(&self, other: &Self) -> Ordering {
286        self.0.number().cmp(&other.0.number())
287    }
288}
289
290impl<B: Block> From<SealedBlockFor<B>> for OrderedSealedBlockWithSenders<B> {
291    fn from(block: SealedBlockFor<B>) -> Self {
292        let senders = block.senders().unwrap_or_default();
293        Self(SealedBlockWithSenders { block, senders })
294    }
295}
296
297impl<B: Block> From<OrderedSealedBlockWithSenders<B>> for SealedBlockWithSenders<B> {
298    fn from(value: OrderedSealedBlockWithSenders<B>) -> Self {
299        let senders = value.0.senders;
300        Self { block: value.0.block, senders }
301    }
302}
303
304/// A [`BlockDownloader`] that does nothing.
305#[derive(Debug, Clone, Default)]
306#[non_exhaustive]
307pub struct NoopBlockDownloader<B>(core::marker::PhantomData<B>);
308
309impl<B: Block> BlockDownloader for NoopBlockDownloader<B> {
310    type Block = B;
311
312    fn on_action(&mut self, _event: DownloadAction) {}
313
314    fn poll(&mut self, _cx: &mut Context<'_>) -> Poll<DownloadOutcome<B>> {
315        Poll::Pending
316    }
317}
318
319#[cfg(test)]
320mod tests {
321    use super::*;
322    use crate::test_utils::insert_headers_into_client;
323    use alloy_consensus::Header;
324    use alloy_eips::eip1559::ETHEREUM_BLOCK_GAS_LIMIT;
325    use assert_matches::assert_matches;
326    use reth_beacon_consensus::EthBeaconConsensus;
327    use reth_chainspec::{ChainSpecBuilder, MAINNET};
328    use reth_network_p2p::test_utils::TestFullBlockClient;
329    use reth_primitives::SealedHeader;
330    use std::{future::poll_fn, sync::Arc};
331
332    struct TestHarness {
333        block_downloader: BasicBlockDownloader<TestFullBlockClient, reth_primitives::Block>,
334        client: TestFullBlockClient,
335    }
336
337    impl TestHarness {
338        fn new(total_blocks: usize) -> Self {
339            let chain_spec = Arc::new(
340                ChainSpecBuilder::default()
341                    .chain(MAINNET.chain)
342                    .genesis(MAINNET.genesis.clone())
343                    .paris_activated()
344                    .build(),
345            );
346
347            let client = TestFullBlockClient::default();
348            let header = Header {
349                base_fee_per_gas: Some(7),
350                gas_limit: ETHEREUM_BLOCK_GAS_LIMIT,
351                ..Default::default()
352            };
353            let header = SealedHeader::seal(header);
354
355            insert_headers_into_client(&client, header, 0..total_blocks);
356            let consensus = Arc::new(EthBeaconConsensus::new(chain_spec));
357
358            let block_downloader = BasicBlockDownloader::new(client.clone(), consensus);
359            Self { block_downloader, client }
360        }
361    }
362
363    #[tokio::test]
364    async fn block_downloader_range_request() {
365        const TOTAL_BLOCKS: usize = 10;
366        let TestHarness { mut block_downloader, client } = TestHarness::new(TOTAL_BLOCKS);
367        let tip = client.highest_block().expect("there should be blocks here");
368
369        // send block range download request
370        block_downloader.on_action(DownloadAction::Download(DownloadRequest::BlockRange(
371            tip.hash(),
372            tip.number,
373        )));
374
375        // ensure we have one in flight range request
376        assert_eq!(block_downloader.inflight_block_range_requests.len(), 1);
377
378        // ensure the range request is made correctly
379        let first_req = block_downloader.inflight_block_range_requests.first().unwrap();
380        assert_eq!(first_req.start_hash(), tip.hash());
381        assert_eq!(first_req.count(), tip.number);
382
383        // poll downloader
384        let sync_future = poll_fn(|cx| block_downloader.poll(cx));
385        let next_ready = sync_future.await;
386
387        assert_matches!(next_ready, DownloadOutcome::NewDownloadStarted { remaining_blocks, .. } => {
388            assert_eq!(remaining_blocks, TOTAL_BLOCKS as u64);
389        });
390
391        let sync_future = poll_fn(|cx| block_downloader.poll(cx));
392        let next_ready = sync_future.await;
393
394        assert_matches!(next_ready, DownloadOutcome::Blocks(blocks) => {
395            // ensure all blocks were obtained
396            assert_eq!(blocks.len(), TOTAL_BLOCKS);
397
398            // ensure they are in ascending order
399            for num in 1..=TOTAL_BLOCKS {
400                assert_eq!(blocks[num-1].number(), num as u64);
401            }
402        });
403    }
404
405    #[tokio::test]
406    async fn block_downloader_set_request() {
407        const TOTAL_BLOCKS: usize = 2;
408        let TestHarness { mut block_downloader, client } = TestHarness::new(TOTAL_BLOCKS);
409
410        let tip = client.highest_block().expect("there should be blocks here");
411
412        // send block set download request
413        block_downloader.on_action(DownloadAction::Download(DownloadRequest::BlockSet(
414            HashSet::from([tip.hash(), tip.parent_hash]),
415        )));
416
417        // ensure we have TOTAL_BLOCKS in flight full block request
418        assert_eq!(block_downloader.inflight_full_block_requests.len(), TOTAL_BLOCKS);
419
420        // poll downloader
421        for _ in 0..TOTAL_BLOCKS {
422            let sync_future = poll_fn(|cx| block_downloader.poll(cx));
423            let next_ready = sync_future.await;
424
425            assert_matches!(next_ready, DownloadOutcome::NewDownloadStarted { remaining_blocks, .. } => {
426                assert_eq!(remaining_blocks, 1);
427            });
428        }
429
430        let sync_future = poll_fn(|cx| block_downloader.poll(cx));
431        let next_ready = sync_future.await;
432        assert_matches!(next_ready, DownloadOutcome::Blocks(blocks) => {
433            // ensure all blocks were obtained
434            assert_eq!(blocks.len(), TOTAL_BLOCKS);
435
436            // ensure they are in ascending order
437            for num in 1..=TOTAL_BLOCKS {
438                assert_eq!(blocks[num-1].number(), num as u64);
439            }
440        });
441    }
442
443    #[tokio::test]
444    async fn block_downloader_clear_request() {
445        const TOTAL_BLOCKS: usize = 10;
446        let TestHarness { mut block_downloader, client } = TestHarness::new(TOTAL_BLOCKS);
447
448        let tip = client.highest_block().expect("there should be blocks here");
449
450        // send block range download request
451        block_downloader.on_action(DownloadAction::Download(DownloadRequest::BlockRange(
452            tip.hash(),
453            tip.number,
454        )));
455
456        // send block set download request
457        let download_set = HashSet::from([tip.hash(), tip.parent_hash]);
458        block_downloader
459            .on_action(DownloadAction::Download(DownloadRequest::BlockSet(download_set.clone())));
460
461        // ensure we have one in flight range request
462        assert_eq!(block_downloader.inflight_block_range_requests.len(), 1);
463
464        // ensure the range request is made correctly
465        let first_req = block_downloader.inflight_block_range_requests.first().unwrap();
466        assert_eq!(first_req.start_hash(), tip.hash());
467        assert_eq!(first_req.count(), tip.number);
468
469        // ensure we have download_set.len() in flight full block request
470        assert_eq!(block_downloader.inflight_full_block_requests.len(), download_set.len());
471
472        // send clear request
473        block_downloader.on_action(DownloadAction::Clear);
474
475        // ensure we have no in flight range request
476        assert_eq!(block_downloader.inflight_block_range_requests.len(), 0);
477
478        // ensure we have no in flight full block request
479        assert_eq!(block_downloader.inflight_full_block_requests.len(), 0);
480    }
481}