1use crate::{engine::DownloadRequest, metrics::BlockDownloaderMetrics};
4use alloy_consensus::BlockHeader;
5use alloy_primitives::B256;
6use futures::FutureExt;
7use reth_consensus::Consensus;
8use reth_network_p2p::{
9 full_block::{FetchFullBlockFuture, FetchFullBlockRangeFuture, FullBlockClient},
10 BlockClient,
11};
12use reth_primitives::{SealedBlockFor, SealedBlockWithSenders};
13use reth_primitives_traits::Block;
14use std::{
15 cmp::{Ordering, Reverse},
16 collections::{binary_heap::PeekMut, BinaryHeap, HashSet, VecDeque},
17 fmt::Debug,
18 sync::Arc,
19 task::{Context, Poll},
20};
21use tracing::trace;
22
23pub trait BlockDownloader: Send + Sync {
25 type Block: Block;
27
28 fn on_action(&mut self, action: DownloadAction);
30
31 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<DownloadOutcome<Self::Block>>;
33}
34
35#[derive(Debug)]
37pub enum DownloadAction {
38 Clear,
40 Download(DownloadRequest),
42}
43
44#[derive(Debug)]
46pub enum DownloadOutcome<B: Block> {
47 Blocks(Vec<SealedBlockWithSenders<B>>),
49 NewDownloadStarted {
51 remaining_blocks: u64,
53 target: B256,
55 },
56}
57
58#[allow(missing_debug_implementations)]
60pub struct BasicBlockDownloader<Client, B: Block>
61where
62 Client: BlockClient + 'static,
63{
64 full_block_client: FullBlockClient<Client>,
66 inflight_full_block_requests: Vec<FetchFullBlockFuture<Client>>,
68 inflight_block_range_requests: Vec<FetchFullBlockRangeFuture<Client>>,
70 set_buffered_blocks: BinaryHeap<Reverse<OrderedSealedBlockWithSenders<B>>>,
73 metrics: BlockDownloaderMetrics,
75 pending_events: VecDeque<DownloadOutcome<B>>,
77}
78
79impl<Client, B> BasicBlockDownloader<Client, B>
80where
81 Client: BlockClient<Header = B::Header, Body = B::Body> + 'static,
82 B: Block,
83{
84 pub fn new(
86 client: Client,
87 consensus: Arc<dyn Consensus<Client::Header, Client::Body>>,
88 ) -> Self {
89 Self {
90 full_block_client: FullBlockClient::new(client, consensus),
91 inflight_full_block_requests: Vec::new(),
92 inflight_block_range_requests: Vec::new(),
93 set_buffered_blocks: BinaryHeap::new(),
94 metrics: BlockDownloaderMetrics::default(),
95 pending_events: Default::default(),
96 }
97 }
98
99 fn clear(&mut self) {
101 self.inflight_full_block_requests.clear();
102 self.inflight_block_range_requests.clear();
103 self.set_buffered_blocks.clear();
104 self.update_block_download_metrics();
105 }
106
107 fn download(&mut self, request: DownloadRequest) {
109 match request {
110 DownloadRequest::BlockSet(hashes) => self.download_block_set(hashes),
111 DownloadRequest::BlockRange(hash, count) => self.download_block_range(hash, count),
112 }
113 }
114
115 fn download_block_set(&mut self, hashes: HashSet<B256>) {
117 for hash in hashes {
118 self.download_full_block(hash);
119 }
120 }
121
122 fn download_block_range(&mut self, hash: B256, count: u64) {
124 if count == 1 {
125 self.download_full_block(hash);
126 } else {
127 trace!(
128 target: "consensus::engine",
129 ?hash,
130 ?count,
131 "start downloading full block range."
132 );
133
134 let request = self.full_block_client.get_full_block_range(hash, count);
135 self.push_pending_event(DownloadOutcome::NewDownloadStarted {
136 remaining_blocks: request.count(),
137 target: request.start_hash(),
138 });
139 self.inflight_block_range_requests.push(request);
140 }
141 }
142
143 fn download_full_block(&mut self, hash: B256) -> bool {
148 if self.is_inflight_request(hash) {
149 return false
150 }
151 self.push_pending_event(DownloadOutcome::NewDownloadStarted {
152 remaining_blocks: 1,
153 target: hash,
154 });
155
156 trace!(
157 target: "consensus::engine::sync",
158 ?hash,
159 "Start downloading full block"
160 );
161
162 let request = self.full_block_client.get_full_block(hash);
163 self.inflight_full_block_requests.push(request);
164
165 self.update_block_download_metrics();
166
167 true
168 }
169
170 fn is_inflight_request(&self, hash: B256) -> bool {
172 self.inflight_full_block_requests.iter().any(|req| *req.hash() == hash)
173 }
174
175 fn update_block_download_metrics(&self) {
177 let blocks = self.inflight_full_block_requests.len() +
178 self.inflight_block_range_requests.iter().map(|r| r.count() as usize).sum::<usize>();
179 self.metrics.active_block_downloads.set(blocks as f64);
180 }
181
182 fn push_pending_event(&mut self, pending_event: DownloadOutcome<B>) {
184 self.pending_events.push_back(pending_event);
185 }
186
187 fn pop_pending_event(&mut self) -> Option<DownloadOutcome<B>> {
189 self.pending_events.pop_front()
190 }
191}
192
193impl<Client, B> BlockDownloader for BasicBlockDownloader<Client, B>
194where
195 Client: BlockClient<Header = B::Header, Body = B::Body>,
196 B: Block,
197{
198 type Block = B;
199
200 fn on_action(&mut self, action: DownloadAction) {
202 match action {
203 DownloadAction::Clear => self.clear(),
204 DownloadAction::Download(request) => self.download(request),
205 }
206 }
207
208 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<DownloadOutcome<B>> {
210 if let Some(pending_event) = self.pop_pending_event() {
211 return Poll::Ready(pending_event);
212 }
213
214 for idx in (0..self.inflight_full_block_requests.len()).rev() {
216 let mut request = self.inflight_full_block_requests.swap_remove(idx);
217 if let Poll::Ready(block) = request.poll_unpin(cx) {
218 trace!(target: "consensus::engine", block=?block.num_hash(), "Received single full block, buffering");
219 self.set_buffered_blocks.push(Reverse(block.into()));
220 } else {
221 self.inflight_full_block_requests.push(request);
223 }
224 }
225
226 for idx in (0..self.inflight_block_range_requests.len()).rev() {
228 let mut request = self.inflight_block_range_requests.swap_remove(idx);
229 if let Poll::Ready(blocks) = request.poll_unpin(cx) {
230 trace!(target: "consensus::engine", len=?blocks.len(), first=?blocks.first().map(|b| b.num_hash()), last=?blocks.last().map(|b| b.num_hash()), "Received full block range, buffering");
231 self.set_buffered_blocks.extend(
232 blocks
233 .into_iter()
234 .map(|b| {
235 let senders = b.senders().unwrap_or_default();
236 OrderedSealedBlockWithSenders(SealedBlockWithSenders {
237 block: b,
238 senders,
239 })
240 })
241 .map(Reverse),
242 );
243 } else {
244 self.inflight_block_range_requests.push(request);
246 }
247 }
248
249 self.update_block_download_metrics();
250
251 if self.set_buffered_blocks.is_empty() {
252 return Poll::Pending;
253 }
254
255 let mut downloaded_blocks: Vec<SealedBlockWithSenders<B>> =
257 Vec::with_capacity(self.set_buffered_blocks.len());
258 while let Some(block) = self.set_buffered_blocks.pop() {
259 while let Some(peek) = self.set_buffered_blocks.peek_mut() {
261 if peek.0 .0.hash() == block.0 .0.hash() {
262 PeekMut::pop(peek);
263 } else {
264 break
265 }
266 }
267 downloaded_blocks.push(block.0.into());
268 }
269 Poll::Ready(DownloadOutcome::Blocks(downloaded_blocks))
270 }
271}
272
273#[derive(Debug, Clone, PartialEq, Eq)]
276struct OrderedSealedBlockWithSenders<B: Block>(SealedBlockWithSenders<B>);
277
278impl<B: Block> PartialOrd for OrderedSealedBlockWithSenders<B> {
279 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
280 Some(self.cmp(other))
281 }
282}
283
284impl<B: Block> Ord for OrderedSealedBlockWithSenders<B> {
285 fn cmp(&self, other: &Self) -> Ordering {
286 self.0.number().cmp(&other.0.number())
287 }
288}
289
290impl<B: Block> From<SealedBlockFor<B>> for OrderedSealedBlockWithSenders<B> {
291 fn from(block: SealedBlockFor<B>) -> Self {
292 let senders = block.senders().unwrap_or_default();
293 Self(SealedBlockWithSenders { block, senders })
294 }
295}
296
297impl<B: Block> From<OrderedSealedBlockWithSenders<B>> for SealedBlockWithSenders<B> {
298 fn from(value: OrderedSealedBlockWithSenders<B>) -> Self {
299 let senders = value.0.senders;
300 Self { block: value.0.block, senders }
301 }
302}
303
304#[derive(Debug, Clone, Default)]
306#[non_exhaustive]
307pub struct NoopBlockDownloader<B>(core::marker::PhantomData<B>);
308
309impl<B: Block> BlockDownloader for NoopBlockDownloader<B> {
310 type Block = B;
311
312 fn on_action(&mut self, _event: DownloadAction) {}
313
314 fn poll(&mut self, _cx: &mut Context<'_>) -> Poll<DownloadOutcome<B>> {
315 Poll::Pending
316 }
317}
318
319#[cfg(test)]
320mod tests {
321 use super::*;
322 use crate::test_utils::insert_headers_into_client;
323 use alloy_consensus::Header;
324 use alloy_eips::eip1559::ETHEREUM_BLOCK_GAS_LIMIT;
325 use assert_matches::assert_matches;
326 use reth_beacon_consensus::EthBeaconConsensus;
327 use reth_chainspec::{ChainSpecBuilder, MAINNET};
328 use reth_network_p2p::test_utils::TestFullBlockClient;
329 use reth_primitives::SealedHeader;
330 use std::{future::poll_fn, sync::Arc};
331
332 struct TestHarness {
333 block_downloader: BasicBlockDownloader<TestFullBlockClient, reth_primitives::Block>,
334 client: TestFullBlockClient,
335 }
336
337 impl TestHarness {
338 fn new(total_blocks: usize) -> Self {
339 let chain_spec = Arc::new(
340 ChainSpecBuilder::default()
341 .chain(MAINNET.chain)
342 .genesis(MAINNET.genesis.clone())
343 .paris_activated()
344 .build(),
345 );
346
347 let client = TestFullBlockClient::default();
348 let header = Header {
349 base_fee_per_gas: Some(7),
350 gas_limit: ETHEREUM_BLOCK_GAS_LIMIT,
351 ..Default::default()
352 };
353 let header = SealedHeader::seal(header);
354
355 insert_headers_into_client(&client, header, 0..total_blocks);
356 let consensus = Arc::new(EthBeaconConsensus::new(chain_spec));
357
358 let block_downloader = BasicBlockDownloader::new(client.clone(), consensus);
359 Self { block_downloader, client }
360 }
361 }
362
363 #[tokio::test]
364 async fn block_downloader_range_request() {
365 const TOTAL_BLOCKS: usize = 10;
366 let TestHarness { mut block_downloader, client } = TestHarness::new(TOTAL_BLOCKS);
367 let tip = client.highest_block().expect("there should be blocks here");
368
369 block_downloader.on_action(DownloadAction::Download(DownloadRequest::BlockRange(
371 tip.hash(),
372 tip.number,
373 )));
374
375 assert_eq!(block_downloader.inflight_block_range_requests.len(), 1);
377
378 let first_req = block_downloader.inflight_block_range_requests.first().unwrap();
380 assert_eq!(first_req.start_hash(), tip.hash());
381 assert_eq!(first_req.count(), tip.number);
382
383 let sync_future = poll_fn(|cx| block_downloader.poll(cx));
385 let next_ready = sync_future.await;
386
387 assert_matches!(next_ready, DownloadOutcome::NewDownloadStarted { remaining_blocks, .. } => {
388 assert_eq!(remaining_blocks, TOTAL_BLOCKS as u64);
389 });
390
391 let sync_future = poll_fn(|cx| block_downloader.poll(cx));
392 let next_ready = sync_future.await;
393
394 assert_matches!(next_ready, DownloadOutcome::Blocks(blocks) => {
395 assert_eq!(blocks.len(), TOTAL_BLOCKS);
397
398 for num in 1..=TOTAL_BLOCKS {
400 assert_eq!(blocks[num-1].number(), num as u64);
401 }
402 });
403 }
404
405 #[tokio::test]
406 async fn block_downloader_set_request() {
407 const TOTAL_BLOCKS: usize = 2;
408 let TestHarness { mut block_downloader, client } = TestHarness::new(TOTAL_BLOCKS);
409
410 let tip = client.highest_block().expect("there should be blocks here");
411
412 block_downloader.on_action(DownloadAction::Download(DownloadRequest::BlockSet(
414 HashSet::from([tip.hash(), tip.parent_hash]),
415 )));
416
417 assert_eq!(block_downloader.inflight_full_block_requests.len(), TOTAL_BLOCKS);
419
420 for _ in 0..TOTAL_BLOCKS {
422 let sync_future = poll_fn(|cx| block_downloader.poll(cx));
423 let next_ready = sync_future.await;
424
425 assert_matches!(next_ready, DownloadOutcome::NewDownloadStarted { remaining_blocks, .. } => {
426 assert_eq!(remaining_blocks, 1);
427 });
428 }
429
430 let sync_future = poll_fn(|cx| block_downloader.poll(cx));
431 let next_ready = sync_future.await;
432 assert_matches!(next_ready, DownloadOutcome::Blocks(blocks) => {
433 assert_eq!(blocks.len(), TOTAL_BLOCKS);
435
436 for num in 1..=TOTAL_BLOCKS {
438 assert_eq!(blocks[num-1].number(), num as u64);
439 }
440 });
441 }
442
443 #[tokio::test]
444 async fn block_downloader_clear_request() {
445 const TOTAL_BLOCKS: usize = 10;
446 let TestHarness { mut block_downloader, client } = TestHarness::new(TOTAL_BLOCKS);
447
448 let tip = client.highest_block().expect("there should be blocks here");
449
450 block_downloader.on_action(DownloadAction::Download(DownloadRequest::BlockRange(
452 tip.hash(),
453 tip.number,
454 )));
455
456 let download_set = HashSet::from([tip.hash(), tip.parent_hash]);
458 block_downloader
459 .on_action(DownloadAction::Download(DownloadRequest::BlockSet(download_set.clone())));
460
461 assert_eq!(block_downloader.inflight_block_range_requests.len(), 1);
463
464 let first_req = block_downloader.inflight_block_range_requests.first().unwrap();
466 assert_eq!(first_req.start_hash(), tip.hash());
467 assert_eq!(first_req.count(), tip.number);
468
469 assert_eq!(block_downloader.inflight_full_block_requests.len(), download_set.len());
471
472 block_downloader.on_action(DownloadAction::Clear);
474
475 assert_eq!(block_downloader.inflight_block_range_requests.len(), 0);
477
478 assert_eq!(block_downloader.inflight_full_block_requests.len(), 0);
480 }
481}