reth_downloaders/headers/
reverse_headers.rs

1//! A headers downloader that can handle multiple requests concurrently.
2
3use super::task::TaskDownloader;
4use crate::metrics::HeaderDownloaderMetrics;
5use alloy_consensus::BlockHeader;
6use alloy_eips::BlockHashOrNumber;
7use alloy_primitives::{BlockNumber, B256};
8use futures::{stream::Stream, FutureExt};
9use futures_util::{stream::FuturesUnordered, StreamExt};
10use rayon::prelude::*;
11use reth_config::config::HeadersConfig;
12use reth_consensus::HeaderValidator;
13use reth_network_p2p::{
14    error::{DownloadError, DownloadResult, PeerRequestResult},
15    headers::{
16        client::{HeadersClient, HeadersRequest},
17        downloader::{validate_header_download, HeaderDownloader, SyncTarget},
18        error::{HeadersDownloaderError, HeadersDownloaderResult},
19    },
20    priority::Priority,
21};
22use reth_network_peers::PeerId;
23use reth_primitives::{GotExpected, SealedHeader};
24use reth_tasks::{TaskSpawner, TokioTaskExecutor};
25use std::{
26    cmp::{Ordering, Reverse},
27    collections::{binary_heap::PeekMut, BinaryHeap},
28    future::Future,
29    pin::Pin,
30    sync::Arc,
31    task::{ready, Context, Poll},
32};
33use thiserror::Error;
34use tracing::{error, trace};
35
36/// A heuristic that is used to determine the number of requests that should be prepared for a peer.
37/// This should ensure that there are always requests lined up for peers to handle while the
38/// downloader is yielding a next batch of headers that is being committed to the database.
39const REQUESTS_PER_PEER_MULTIPLIER: usize = 5;
40
41/// Wrapper for internal downloader errors.
42#[derive(Error, Debug)]
43enum ReverseHeadersDownloaderError<H> {
44    #[error(transparent)]
45    Downloader(#[from] HeadersDownloaderError<H>),
46    #[error(transparent)]
47    Response(#[from] Box<HeadersResponseError>),
48}
49
50impl<H> From<HeadersResponseError> for ReverseHeadersDownloaderError<H> {
51    fn from(value: HeadersResponseError) -> Self {
52        Self::Response(Box::new(value))
53    }
54}
55
56/// Downloads headers concurrently.
57///
58/// This [`HeaderDownloader`] downloads headers using the configured [`HeadersClient`].
59/// Headers can be requested by hash or block number and take a `limit` parameter. This downloader
60/// tries to fill the gap between the local head of the node and the chain tip by issuing multiple
61/// requests at a time but yielding them in batches on [`Stream::poll_next`].
62///
63/// **Note:** This downloader downloads in reverse, see also
64/// [`reth_network_p2p::headers::client::HeadersDirection`], this means the batches of headers that
65/// this downloader yields will start at the chain tip and move towards the local head: falling
66/// block numbers.
67#[must_use = "Stream does nothing unless polled"]
68#[derive(Debug)]
69pub struct ReverseHeadersDownloader<H: HeadersClient> {
70    /// Consensus client used to validate headers
71    consensus: Arc<dyn HeaderValidator<H::Header>>,
72    /// Client used to download headers.
73    client: Arc<H>,
74    /// The local head of the chain.
75    local_head: Option<SealedHeader<H::Header>>,
76    /// Block we want to close the gap to.
77    sync_target: Option<SyncTargetBlock>,
78    /// The block number to use for requests.
79    next_request_block_number: u64,
80    /// Keeps track of the block we need to validate next.
81    lowest_validated_header: Option<SealedHeader<H::Header>>,
82    /// Tip block number to start validating from (in reverse)
83    next_chain_tip_block_number: u64,
84    /// The batch size per one request
85    request_limit: u64,
86    /// Minimum amount of requests to handle concurrently.
87    min_concurrent_requests: usize,
88    /// Maximum amount of requests to handle concurrently.
89    max_concurrent_requests: usize,
90    /// The number of block headers to return at once
91    stream_batch_size: usize,
92    /// Maximum amount of received headers to buffer internally.
93    max_buffered_responses: usize,
94    /// Contains the request to retrieve the headers for the sync target
95    ///
96    /// This will give us the block number of the `sync_target`, after which we can send multiple
97    /// requests at a time.
98    sync_target_request: Option<HeadersRequestFuture<H::Output>>,
99    /// requests in progress
100    in_progress_queue: FuturesUnordered<HeadersRequestFuture<H::Output>>,
101    /// Buffered, unvalidated responses
102    buffered_responses: BinaryHeap<OrderedHeadersResponse<H::Header>>,
103    /// Buffered, _sorted_ and validated headers ready to be returned.
104    ///
105    /// Note: headers are sorted from high to low
106    queued_validated_headers: Vec<SealedHeader<H::Header>>,
107    /// Header downloader metrics.
108    metrics: HeaderDownloaderMetrics,
109}
110
111// === impl ReverseHeadersDownloader ===
112
113impl<H> ReverseHeadersDownloader<H>
114where
115    H: HeadersClient<Header: reth_primitives_traits::BlockHeader> + 'static,
116{
117    /// Convenience method to create a [`ReverseHeadersDownloaderBuilder`] without importing it
118    pub fn builder() -> ReverseHeadersDownloaderBuilder {
119        ReverseHeadersDownloaderBuilder::default()
120    }
121
122    /// Returns the block number the local node is at.
123    #[inline]
124    fn local_block_number(&self) -> Option<BlockNumber> {
125        self.local_head.as_ref().map(|h| h.number())
126    }
127
128    /// Returns the existing local head block number
129    ///
130    /// # Panics
131    ///
132    /// If the local head has not been set.
133    #[inline]
134    fn existing_local_block_number(&self) -> BlockNumber {
135        self.local_head.as_ref().expect("is initialized").number()
136    }
137
138    /// Returns the existing sync target.
139    ///
140    /// # Panics
141    ///
142    /// If the sync target has never been set.
143    #[inline]
144    fn existing_sync_target(&self) -> SyncTargetBlock {
145        self.sync_target.as_ref().expect("is initialized").clone()
146    }
147
148    /// Max requests to handle at the same time
149    ///
150    /// This depends on the number of active peers but will always be
151    /// [`min_concurrent_requests`..`max_concurrent_requests`]
152    #[inline]
153    fn concurrent_request_limit(&self) -> usize {
154        let num_peers = self.client.num_connected_peers();
155
156        // we try to keep more requests than available peers active so that there's always a
157        // followup request available for a peer
158        let dynamic_target = num_peers * REQUESTS_PER_PEER_MULTIPLIER;
159        let max_dynamic = dynamic_target.max(self.min_concurrent_requests);
160
161        // If only a few peers are connected we keep it low
162        if num_peers < self.min_concurrent_requests {
163            return max_dynamic
164        }
165
166        max_dynamic.min(self.max_concurrent_requests)
167    }
168
169    /// Returns the next header request
170    ///
171    /// This will advance the current block towards the local head.
172    ///
173    /// Returns `None` if no more requests are required.
174    fn next_request(&mut self) -> Option<HeadersRequest> {
175        if let Some(local_head) = self.local_block_number() {
176            if self.next_request_block_number > local_head {
177                let request = calc_next_request(
178                    local_head,
179                    self.next_request_block_number,
180                    self.request_limit,
181                );
182                // need to shift the tracked request block number based on the number of requested
183                // headers so follow-up requests will use that as start.
184                self.next_request_block_number -= request.limit;
185
186                return Some(request)
187            }
188        }
189
190        None
191    }
192
193    /// Returns the next header to use for validation.
194    ///
195    /// Since this downloader downloads blocks with falling block number, this will return the
196    /// lowest (in terms of block number) validated header.
197    ///
198    /// This is either the last `queued_validated_headers`, or if has been drained entirely the
199    /// `lowest_validated_header`.
200    ///
201    /// This only returns `None` if we haven't fetched the initial chain tip yet.
202    fn lowest_validated_header(&self) -> Option<&SealedHeader<H::Header>> {
203        self.queued_validated_headers.last().or(self.lowest_validated_header.as_ref())
204    }
205
206    /// Validate that the received header matches the expected sync target.
207    fn validate_sync_target(
208        &self,
209        header: &SealedHeader<H::Header>,
210        request: HeadersRequest,
211        peer_id: PeerId,
212    ) -> Result<(), Box<HeadersResponseError>> {
213        match self.existing_sync_target() {
214            SyncTargetBlock::Hash(hash) | SyncTargetBlock::HashAndNumber { hash, .. }
215                if header.hash() != hash =>
216            {
217                Err(Box::new(HeadersResponseError {
218                    request,
219                    peer_id: Some(peer_id),
220                    error: DownloadError::InvalidTip(
221                        GotExpected { got: header.hash(), expected: hash }.into(),
222                    ),
223                }))
224            }
225            SyncTargetBlock::Number(number) if header.number() != number => {
226                Err(Box::new(HeadersResponseError {
227                    request,
228                    peer_id: Some(peer_id),
229                    error: DownloadError::InvalidTipNumber(GotExpected {
230                        got: header.number(),
231                        expected: number,
232                    }),
233                }))
234            }
235            _ => Ok(()),
236        }
237    }
238
239    /// Processes the next headers in line.
240    ///
241    /// This will validate all headers and insert them into the validated buffer.
242    ///
243    /// Returns an error if the given headers are invalid.
244    ///
245    /// Caution: this expects the `headers` to be sorted with _falling_ block numbers
246    fn process_next_headers(
247        &mut self,
248        request: HeadersRequest,
249        headers: Vec<H::Header>,
250        peer_id: PeerId,
251    ) -> Result<(), ReverseHeadersDownloaderError<H::Header>> {
252        let mut validated = Vec::with_capacity(headers.len());
253
254        let sealed_headers = headers.into_par_iter().map(SealedHeader::seal).collect::<Vec<_>>();
255        for parent in sealed_headers {
256            // Validate that the header is the parent header of the last validated header.
257            if let Some(validated_header) =
258                validated.last().or_else(|| self.lowest_validated_header())
259            {
260                if let Err(error) = self.validate(validated_header, &parent) {
261                    trace!(target: "downloaders::headers", %error ,"Failed to validate header");
262                    return Err(
263                        HeadersResponseError { request, peer_id: Some(peer_id), error }.into()
264                    )
265                }
266            } else {
267                self.validate_sync_target(&parent, request.clone(), peer_id)?;
268            }
269
270            validated.push(parent);
271        }
272
273        // If the last (smallest) validated header attaches to the local head, validate it.
274        if let Some((last_header, head)) = validated
275            .last_mut()
276            .zip(self.local_head.as_ref())
277            .filter(|(last, head)| last.number() == head.number() + 1)
278        {
279            // Every header must be valid on its own
280            if let Err(error) = self.consensus.validate_header(&*last_header) {
281                trace!(target: "downloaders::headers", %error, "Failed to validate header");
282                return Err(HeadersResponseError {
283                    request,
284                    peer_id: Some(peer_id),
285                    error: DownloadError::HeaderValidation {
286                        hash: head.hash(),
287                        number: head.number(),
288                        error: Box::new(error),
289                    },
290                }
291                .into())
292            }
293
294            // If the header is valid on its own, but not against its parent, we return it as
295            // detached head error.
296            if let Err(error) = self.consensus.validate_header_against_parent(&*last_header, head) {
297                // Replace the last header with a detached variant
298                error!(target: "downloaders::headers", %error, number = last_header.number(), hash = ?last_header.hash(), "Header cannot be attached to known canonical chain");
299                return Err(HeadersDownloaderError::DetachedHead {
300                    local_head: Box::new(head.clone()),
301                    header: Box::new(last_header.clone()),
302                    error: Box::new(error),
303                }
304                .into())
305            }
306        }
307
308        // update tracked block info (falling block number)
309        self.next_chain_tip_block_number =
310            validated.last().expect("exists").number().saturating_sub(1);
311        self.queued_validated_headers.extend(validated);
312
313        Ok(())
314    }
315
316    /// Updates the state based on the given `target_block_number`
317    ///
318    /// There are three different outcomes:
319    ///  * This is the first time this is called: current `sync_target` block is still `None`. In
320    ///    which case we're initializing the request trackers to `next_block`
321    ///  * The `target_block_number` is _higher_ than the current target. In which case we start
322    ///    over with a new range
323    ///  * The `target_block_number` is _lower_ than the current target or the _same_. In which case
324    ///    we don't need to update the request trackers but need to ensure already buffered headers
325    ///    are _not_ higher than the new `target_block_number`.
326    fn on_block_number_update(&mut self, target_block_number: u64, next_block: u64) {
327        // Update the trackers
328        if let Some(old_target) =
329            self.sync_target.as_mut().and_then(|t| t.replace_number(target_block_number))
330        {
331            if target_block_number > old_target {
332                // the new target is higher than the old target we need to update the
333                // request tracker and reset everything
334                self.next_request_block_number = next_block;
335                self.next_chain_tip_block_number = next_block;
336                self.clear();
337            } else {
338                // ensure already validated headers are in range
339                let skip = self
340                    .queued_validated_headers
341                    .iter()
342                    .take_while(|last| last.number() > target_block_number)
343                    .count();
344                // removes all headers that are higher than current target
345                self.queued_validated_headers.drain(..skip);
346            }
347        } else {
348            // this occurs on the initial sync target request
349            self.next_request_block_number = next_block;
350            self.next_chain_tip_block_number = next_block;
351        }
352    }
353
354    /// Handles the response for the request for the sync target
355    fn on_sync_target_outcome(
356        &mut self,
357        response: HeadersRequestOutcome<H::Header>,
358    ) -> Result<(), ReverseHeadersDownloaderError<H::Header>> {
359        let sync_target = self.existing_sync_target();
360        let HeadersRequestOutcome { request, outcome } = response;
361        match outcome {
362            Ok(res) => {
363                let (peer_id, mut headers) = res.split();
364
365                // update total downloaded metric
366                self.metrics.total_downloaded.increment(headers.len() as u64);
367
368                // sort headers from highest to lowest block number
369                headers.sort_unstable_by_key(|h| Reverse(h.number()));
370
371                if headers.is_empty() {
372                    return Err(HeadersResponseError {
373                        request,
374                        peer_id: Some(peer_id),
375                        error: DownloadError::EmptyResponse,
376                    }
377                    .into())
378                }
379
380                let header = headers.swap_remove(0);
381                let target = SealedHeader::seal(header);
382
383                match sync_target {
384                    SyncTargetBlock::Hash(hash) | SyncTargetBlock::HashAndNumber { hash, .. } => {
385                        if target.hash() != hash {
386                            return Err(HeadersResponseError {
387                                request,
388                                peer_id: Some(peer_id),
389                                error: DownloadError::InvalidTip(
390                                    GotExpected { got: target.hash(), expected: hash }.into(),
391                                ),
392                            }
393                            .into())
394                        }
395                    }
396                    SyncTargetBlock::Number(number) => {
397                        if target.number() != number {
398                            return Err(HeadersResponseError {
399                                request,
400                                peer_id: Some(peer_id),
401                                error: DownloadError::InvalidTipNumber(GotExpected {
402                                    got: target.number(),
403                                    expected: number,
404                                }),
405                            }
406                            .into())
407                        }
408                    }
409                }
410
411                trace!(target: "downloaders::headers", head=?self.local_block_number(), hash=?target.hash(), number=%target.number(), "Received sync target");
412
413                // This is the next block we need to start issuing requests from
414                let parent_block_number = target.number().saturating_sub(1);
415                self.on_block_number_update(target.number(), parent_block_number);
416
417                self.queued_validated_headers.push(target);
418
419                // try to validate all buffered responses blocked by this successful response
420                self.try_validate_buffered()
421                    .map(Err::<(), ReverseHeadersDownloaderError<H::Header>>)
422                    .transpose()?;
423
424                Ok(())
425            }
426            Err(err) => {
427                Err(HeadersResponseError { request, peer_id: None, error: err.into() }.into())
428            }
429        }
430    }
431
432    /// Invoked when we received a response
433    fn on_headers_outcome(
434        &mut self,
435        response: HeadersRequestOutcome<H::Header>,
436    ) -> Result<(), ReverseHeadersDownloaderError<H::Header>> {
437        let requested_block_number = response.block_number();
438        let HeadersRequestOutcome { request, outcome } = response;
439
440        match outcome {
441            Ok(res) => {
442                let (peer_id, mut headers) = res.split();
443
444                // update total downloaded metric
445                self.metrics.total_downloaded.increment(headers.len() as u64);
446
447                trace!(target: "downloaders::headers", len=%headers.len(), "Received headers response");
448
449                if headers.is_empty() {
450                    return Err(HeadersResponseError {
451                        request,
452                        peer_id: Some(peer_id),
453                        error: DownloadError::EmptyResponse,
454                    }
455                    .into())
456                }
457
458                if (headers.len() as u64) != request.limit {
459                    return Err(HeadersResponseError {
460                        peer_id: Some(peer_id),
461                        error: DownloadError::HeadersResponseTooShort(GotExpected {
462                            got: headers.len() as u64,
463                            expected: request.limit,
464                        }),
465                        request,
466                    }
467                    .into())
468                }
469
470                // sort headers from highest to lowest block number
471                headers.sort_unstable_by_key(|h| Reverse(h.number()));
472
473                // validate the response
474                let highest = &headers[0];
475
476                trace!(target: "downloaders::headers", requested_block_number, highest=?highest.number(), "Validating non-empty headers response");
477
478                if highest.number() != requested_block_number {
479                    return Err(HeadersResponseError {
480                        request,
481                        peer_id: Some(peer_id),
482                        error: DownloadError::HeadersResponseStartBlockMismatch(GotExpected {
483                            got: highest.number(),
484                            expected: requested_block_number,
485                        }),
486                    }
487                    .into())
488                }
489
490                // check if the response is the next expected
491                if highest.number() == self.next_chain_tip_block_number {
492                    // is next response, validate it
493                    self.process_next_headers(request, headers, peer_id)?;
494                    // try to validate all buffered responses blocked by this successful response
495                    self.try_validate_buffered()
496                        .map(Err::<(), ReverseHeadersDownloaderError<H::Header>>)
497                        .transpose()?;
498                } else if highest.number() > self.existing_local_block_number() {
499                    self.metrics.buffered_responses.increment(1.);
500                    // can't validate yet
501                    self.buffered_responses.push(OrderedHeadersResponse {
502                        headers,
503                        request,
504                        peer_id,
505                    })
506                }
507
508                Ok(())
509            }
510            // most likely a noop, because this error
511            // would've been handled by the fetcher internally
512            Err(err) => {
513                trace!(target: "downloaders::headers", %err, "Response error");
514                Err(HeadersResponseError { request, peer_id: None, error: err.into() }.into())
515            }
516        }
517    }
518
519    fn penalize_peer(&self, peer_id: Option<PeerId>, error: &DownloadError) {
520        // Penalize the peer for bad response
521        if let Some(peer_id) = peer_id {
522            trace!(target: "downloaders::headers", ?peer_id, %error, "Penalizing peer");
523            self.client.report_bad_message(peer_id);
524        }
525    }
526
527    /// Handles the error of a bad response
528    ///
529    /// This will re-submit the request.
530    fn on_headers_error(&self, err: Box<HeadersResponseError>) {
531        let HeadersResponseError { request, peer_id, error } = *err;
532
533        self.penalize_peer(peer_id, &error);
534
535        // Update error metric
536        self.metrics.increment_errors(&error);
537
538        // Re-submit the request
539        self.submit_request(request, Priority::High);
540    }
541
542    /// Attempts to validate the buffered responses
543    ///
544    /// Returns an error if the next expected response was popped, but failed validation.
545    fn try_validate_buffered(&mut self) -> Option<ReverseHeadersDownloaderError<H::Header>> {
546        loop {
547            // Check to see if we've already received the next value
548            let next_response = self.buffered_responses.peek_mut()?;
549            let next_block_number = next_response.block_number();
550            match next_block_number.cmp(&self.next_chain_tip_block_number) {
551                Ordering::Less => return None,
552                Ordering::Equal => {
553                    let OrderedHeadersResponse { headers, request, peer_id } =
554                        PeekMut::pop(next_response);
555                    self.metrics.buffered_responses.decrement(1.);
556
557                    if let Err(err) = self.process_next_headers(request, headers, peer_id) {
558                        return Some(err)
559                    }
560                }
561                Ordering::Greater => {
562                    self.metrics.buffered_responses.decrement(1.);
563                    PeekMut::pop(next_response);
564                }
565            }
566        }
567    }
568
569    /// Returns the request for the `sync_target` header.
570    const fn get_sync_target_request(&self, start: BlockHashOrNumber) -> HeadersRequest {
571        HeadersRequest::falling(start, 1)
572    }
573
574    /// Starts a request future
575    fn submit_request(&self, request: HeadersRequest, priority: Priority) {
576        trace!(target: "downloaders::headers", ?request, "Submitting headers request");
577        self.in_progress_queue.push(self.request_fut(request, priority));
578        self.metrics.in_flight_requests.increment(1.);
579    }
580
581    fn request_fut(
582        &self,
583        request: HeadersRequest,
584        priority: Priority,
585    ) -> HeadersRequestFuture<H::Output> {
586        let client = Arc::clone(&self.client);
587        HeadersRequestFuture {
588            request: Some(request.clone()),
589            fut: client.get_headers_with_priority(request, priority),
590        }
591    }
592
593    /// Validate whether the header is valid in relation to it's parent
594    fn validate(
595        &self,
596        header: &SealedHeader<H::Header>,
597        parent: &SealedHeader<H::Header>,
598    ) -> DownloadResult<()> {
599        validate_header_download(&self.consensus, header, parent)
600    }
601
602    /// Clears all requests/responses.
603    fn clear(&mut self) {
604        self.lowest_validated_header.take();
605        self.queued_validated_headers = Vec::new();
606        self.buffered_responses = BinaryHeap::new();
607        self.in_progress_queue.clear();
608
609        self.metrics.in_flight_requests.set(0.);
610        self.metrics.buffered_responses.set(0.);
611    }
612
613    /// Splits off the next batch of headers
614    fn split_next_batch(&mut self) -> Vec<SealedHeader<H::Header>> {
615        let batch_size = self.stream_batch_size.min(self.queued_validated_headers.len());
616        let mut rem = self.queued_validated_headers.split_off(batch_size);
617        std::mem::swap(&mut rem, &mut self.queued_validated_headers);
618        // If the downloader consumer does not flush headers at the same rate that the downloader
619        // queues them, then the `queued_validated_headers` buffer can grow unbounded.
620        //
621        // The semantics of `split_off` state that the capacity of the original buffer is
622        // unchanged, so queued_validated_headers will then have only `batch_size` elements, and
623        // its original capacity. Because `rem` is initially populated with elements `[batch_size,
624        // len)` of `queued_validated_headers`, it will have a capacity of at least `len -
625        // batch_size`, and the total memory allocated by the two buffers will be around double the
626        // original size of `queued_validated_headers`.
627        //
628        // These are then mem::swapped, leaving `rem` with a large capacity, but small length.
629        //
630        // To prevent these allocations from leaking to the consumer, we shrink the capacity of the
631        // new buffer. The total memory allocated should then be not much more than the original
632        // size of `queued_validated_headers`.
633        rem.shrink_to_fit();
634        rem
635    }
636}
637
638impl<H> ReverseHeadersDownloader<H>
639where
640    H: HeadersClient,
641    Self: HeaderDownloader + 'static,
642{
643    /// Spawns the downloader task via [`tokio::task::spawn`]
644    pub fn into_task(self) -> TaskDownloader<<Self as HeaderDownloader>::Header> {
645        self.into_task_with(&TokioTaskExecutor::default())
646    }
647
648    /// Convert the downloader into a [`TaskDownloader`] by spawning it via the given `spawner`.
649    pub fn into_task_with<S>(
650        self,
651        spawner: &S,
652    ) -> TaskDownloader<<Self as HeaderDownloader>::Header>
653    where
654        S: TaskSpawner,
655    {
656        TaskDownloader::spawn_with(self, spawner)
657    }
658}
659
660impl<H> HeaderDownloader for ReverseHeadersDownloader<H>
661where
662    H: HeadersClient<Header: reth_primitives_traits::BlockHeader> + 'static,
663{
664    type Header = H::Header;
665
666    fn update_local_head(&mut self, head: SealedHeader<H::Header>) {
667        // ensure we're only yielding headers that are in range and follow the current local head.
668        while self
669            .queued_validated_headers
670            .last()
671            .is_some_and(|last| last.number() <= head.number())
672        {
673            // headers are sorted high to low
674            self.queued_validated_headers.pop();
675        }
676        // update the local head
677        self.local_head = Some(head);
678    }
679
680    /// If the given target is different from the current target, we need to update the sync target
681    fn update_sync_target(&mut self, target: SyncTarget) {
682        let current_tip = self.sync_target.as_ref().and_then(|t| t.hash());
683        match target {
684            SyncTarget::Tip(tip) => {
685                if Some(tip) != current_tip {
686                    trace!(target: "downloaders::headers", current=?current_tip, new=?tip, "Update sync target");
687                    let new_sync_target = SyncTargetBlock::from_hash(tip);
688
689                    // if the new sync target is the next queued request we don't need to re-start
690                    // the target update
691                    if let Some(target_number) = self
692                        .queued_validated_headers
693                        .first()
694                        .filter(|h| h.hash() == tip)
695                        .map(|h| h.number())
696                    {
697                        self.sync_target = Some(new_sync_target.with_number(target_number));
698                        return
699                    }
700
701                    trace!(target: "downloaders::headers", new=?target, "Request new sync target");
702                    self.metrics.out_of_order_requests.increment(1);
703                    self.sync_target = Some(new_sync_target);
704                    self.sync_target_request = Some(
705                        self.request_fut(self.get_sync_target_request(tip.into()), Priority::High),
706                    );
707                }
708            }
709            SyncTarget::Gap(existing) => {
710                let target = existing.parent;
711                if Some(target) != current_tip {
712                    // there could be a sync target request in progress
713                    self.sync_target_request.take();
714                    // If the target has changed, update the request pointers based on the new
715                    // targeted block number
716                    let parent_block_number = existing.block.number.saturating_sub(1);
717
718                    trace!(target: "downloaders::headers", current=?current_tip, new=?target, %parent_block_number, "Updated sync target");
719
720                    // Update the sync target hash
721                    self.sync_target = match self.sync_target.take() {
722                        Some(sync_target) => Some(sync_target.with_hash(target)),
723                        None => Some(SyncTargetBlock::from_hash(target)),
724                    };
725                    self.on_block_number_update(parent_block_number, parent_block_number);
726                }
727            }
728            SyncTarget::TipNum(num) => {
729                let current_tip_num = self.sync_target.as_ref().and_then(|t| t.number());
730                if Some(num) != current_tip_num {
731                    trace!(target: "downloaders::headers", %num, "Updating sync target based on num");
732                    // just update the sync target
733                    self.sync_target = Some(SyncTargetBlock::from_number(num));
734                    self.sync_target_request = Some(
735                        self.request_fut(self.get_sync_target_request(num.into()), Priority::High),
736                    );
737                }
738            }
739        }
740    }
741
742    fn set_batch_size(&mut self, batch_size: usize) {
743        self.stream_batch_size = batch_size;
744    }
745}
746
747impl<H> Stream for ReverseHeadersDownloader<H>
748where
749    H: HeadersClient<Header: reth_primitives_traits::BlockHeader> + 'static,
750{
751    type Item = HeadersDownloaderResult<Vec<SealedHeader<H::Header>>, H::Header>;
752
753    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
754        let this = self.get_mut();
755
756        // The downloader boundaries (local head and sync target) have to be set in order
757        // to start downloading data.
758        if this.local_head.is_none() || this.sync_target.is_none() {
759            trace!(
760                target: "downloaders::headers",
761                head=?this.local_block_number(),
762                sync_target=?this.sync_target,
763                "The downloader sync boundaries have not been set"
764            );
765            return Poll::Pending
766        }
767
768        // If we have a new tip request we need to complete that first before we send batched
769        // requests
770        while let Some(mut req) = this.sync_target_request.take() {
771            match req.poll_unpin(cx) {
772                Poll::Ready(outcome) => {
773                    match this.on_sync_target_outcome(outcome) {
774                        Ok(()) => break,
775                        Err(ReverseHeadersDownloaderError::Response(error)) => {
776                            trace!(target: "downloaders::headers", %error, "invalid sync target response");
777                            if error.is_channel_closed() {
778                                // download channel closed which means the network was dropped
779                                return Poll::Ready(None)
780                            }
781
782                            this.penalize_peer(error.peer_id, &error.error);
783                            this.metrics.increment_errors(&error.error);
784                            this.sync_target_request =
785                                Some(this.request_fut(error.request, Priority::High));
786                        }
787                        Err(ReverseHeadersDownloaderError::Downloader(error)) => {
788                            this.clear();
789                            return Poll::Ready(Some(Err(error)))
790                        }
791                    };
792                }
793                Poll::Pending => {
794                    this.sync_target_request = Some(req);
795                    return Poll::Pending
796                }
797            }
798        }
799
800        // shrink the buffer after handling sync target outcomes
801        this.buffered_responses.shrink_to_fit();
802
803        // this loop will submit new requests and poll them, if a new batch is ready it is returned
804        // The actual work is done by the receiver of the request channel, this means, polling the
805        // request future is just reading from a `oneshot::Receiver`. Hence, this loop tries to keep
806        // the downloader at capacity at all times The order of loops is as follows:
807        // 1. poll futures to make room for followup requests (this will also prepare validated
808        // headers for 3.) 2. exhaust all capacity by sending requests
809        // 3. return batch, if enough validated
810        // 4. return Pending if 2.) did not submit a new request, else continue
811        loop {
812            // poll requests
813            while let Poll::Ready(Some(outcome)) = this.in_progress_queue.poll_next_unpin(cx) {
814                this.metrics.in_flight_requests.decrement(1.);
815                // handle response
816                match this.on_headers_outcome(outcome) {
817                    Ok(()) => (),
818                    Err(ReverseHeadersDownloaderError::Response(error)) => {
819                        if error.is_channel_closed() {
820                            // download channel closed which means the network was dropped
821                            return Poll::Ready(None)
822                        }
823                        this.on_headers_error(error);
824                    }
825                    Err(ReverseHeadersDownloaderError::Downloader(error)) => {
826                        this.clear();
827                        return Poll::Ready(Some(Err(error)))
828                    }
829                };
830            }
831
832            // shrink the buffer after handling headers outcomes
833            this.buffered_responses.shrink_to_fit();
834
835            // marks the loop's exit condition: exit if no requests submitted
836            let mut progress = false;
837
838            let concurrent_request_limit = this.concurrent_request_limit();
839            // populate requests
840            while this.in_progress_queue.len() < concurrent_request_limit &&
841                this.buffered_responses.len() < this.max_buffered_responses
842            {
843                if let Some(request) = this.next_request() {
844                    trace!(
845                        target: "downloaders::headers",
846                        "Requesting headers {request:?}"
847                    );
848                    progress = true;
849                    this.submit_request(request, Priority::Normal);
850                } else {
851                    // no more requests
852                    break
853                }
854            }
855
856            // yield next batch
857            if this.queued_validated_headers.len() >= this.stream_batch_size {
858                let next_batch = this.split_next_batch();
859
860                // Note: if this would drain all headers, we need to keep the lowest (last index)
861                // around so we can continue validating headers responses.
862                if this.queued_validated_headers.is_empty() {
863                    this.lowest_validated_header = next_batch.last().cloned();
864                }
865
866                trace!(target: "downloaders::headers", batch=%next_batch.len(), "Returning validated batch");
867
868                this.metrics.total_flushed.increment(next_batch.len() as u64);
869                return Poll::Ready(Some(Ok(next_batch)))
870            }
871
872            if !progress {
873                break
874            }
875        }
876
877        // all requests are handled, stream is finished
878        if this.in_progress_queue.is_empty() {
879            let next_batch = this.split_next_batch();
880            if next_batch.is_empty() {
881                this.clear();
882                return Poll::Ready(None)
883            }
884            this.metrics.total_flushed.increment(next_batch.len() as u64);
885            return Poll::Ready(Some(Ok(next_batch)))
886        }
887
888        Poll::Pending
889    }
890}
891
892/// A future that returns a list of headers on success.
893#[derive(Debug)]
894struct HeadersRequestFuture<F> {
895    request: Option<HeadersRequest>,
896    fut: F,
897}
898
899impl<F, H> Future for HeadersRequestFuture<F>
900where
901    F: Future<Output = PeerRequestResult<Vec<H>>> + Sync + Send + Unpin,
902{
903    type Output = HeadersRequestOutcome<H>;
904
905    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
906        let this = self.get_mut();
907        let outcome = ready!(this.fut.poll_unpin(cx));
908        let request = this.request.take().unwrap();
909
910        Poll::Ready(HeadersRequestOutcome { request, outcome })
911    }
912}
913
914/// The outcome of the [`HeadersRequestFuture`]
915struct HeadersRequestOutcome<H> {
916    request: HeadersRequest,
917    outcome: PeerRequestResult<Vec<H>>,
918}
919
920// === impl OrderedHeadersResponse ===
921
922impl<H> HeadersRequestOutcome<H> {
923    fn block_number(&self) -> u64 {
924        self.request.start.as_number().expect("is number")
925    }
926}
927
928/// Wrapper type to order responses
929#[derive(Debug)]
930struct OrderedHeadersResponse<H> {
931    headers: Vec<H>,
932    request: HeadersRequest,
933    peer_id: PeerId,
934}
935
936// === impl OrderedHeadersResponse ===
937
938impl<H> OrderedHeadersResponse<H> {
939    fn block_number(&self) -> u64 {
940        self.request.start.as_number().expect("is number")
941    }
942}
943
944impl<H> PartialEq for OrderedHeadersResponse<H> {
945    fn eq(&self, other: &Self) -> bool {
946        self.block_number() == other.block_number()
947    }
948}
949
950impl<H> Eq for OrderedHeadersResponse<H> {}
951
952impl<H> PartialOrd for OrderedHeadersResponse<H> {
953    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
954        Some(self.cmp(other))
955    }
956}
957
958impl<H> Ord for OrderedHeadersResponse<H> {
959    fn cmp(&self, other: &Self) -> Ordering {
960        self.block_number().cmp(&other.block_number())
961    }
962}
963
964/// Type returned if a bad response was processed
965#[derive(Debug, Error)]
966#[error("error requesting headers from peer {peer_id:?}: {error}; request: {request:?}")]
967struct HeadersResponseError {
968    request: HeadersRequest,
969    peer_id: Option<PeerId>,
970    #[source]
971    error: DownloadError,
972}
973
974impl HeadersResponseError {
975    /// Returns true if the error was caused by a closed channel to the network.
976    const fn is_channel_closed(&self) -> bool {
977        if let DownloadError::RequestError(ref err) = self.error {
978            return err.is_channel_closed()
979        }
980        false
981    }
982}
983
984/// The block to which we want to close the gap: (local head...sync target]
985/// This tracks the sync target block, so this could be either a block number or hash.
986#[derive(Clone, Debug)]
987pub enum SyncTargetBlock {
988    /// Block hash of the targeted block
989    Hash(B256),
990    /// Block number of the targeted block
991    Number(u64),
992    /// Both the block hash and number of the targeted block
993    HashAndNumber {
994        /// Block hash of the targeted block
995        hash: B256,
996        /// Block number of the targeted block
997        number: u64,
998    },
999}
1000
1001impl SyncTargetBlock {
1002    /// Create new instance from hash.
1003    const fn from_hash(hash: B256) -> Self {
1004        Self::Hash(hash)
1005    }
1006
1007    /// Create new instance from number.
1008    const fn from_number(num: u64) -> Self {
1009        Self::Number(num)
1010    }
1011
1012    /// Set the hash for the sync target.
1013    const fn with_hash(self, hash: B256) -> Self {
1014        match self {
1015            Self::Hash(_) => Self::Hash(hash),
1016            Self::Number(number) | Self::HashAndNumber { number, .. } => {
1017                Self::HashAndNumber { hash, number }
1018            }
1019        }
1020    }
1021
1022    /// Set a number on the instance.
1023    const fn with_number(self, number: u64) -> Self {
1024        match self {
1025            Self::Hash(hash) | Self::HashAndNumber { hash, .. } => {
1026                Self::HashAndNumber { hash, number }
1027            }
1028            Self::Number(_) => Self::Number(number),
1029        }
1030    }
1031
1032    /// Replace the target block number, and return the old block number, if it was set.
1033    ///
1034    /// If the target block is a hash, this be converted into a `HashAndNumber`, but return `None`.
1035    /// The semantics should be equivalent to that of `Option::replace`.
1036    fn replace_number(&mut self, number: u64) -> Option<u64> {
1037        match self {
1038            Self::Hash(hash) => {
1039                *self = Self::HashAndNumber { hash: *hash, number };
1040                None
1041            }
1042            Self::Number(old_number) => {
1043                let res = Some(*old_number);
1044                *self = Self::Number(number);
1045                res
1046            }
1047            Self::HashAndNumber { number: old_number, hash } => {
1048                let res = Some(*old_number);
1049                *self = Self::HashAndNumber { hash: *hash, number };
1050                res
1051            }
1052        }
1053    }
1054
1055    /// Return the hash of the target block, if it is set.
1056    const fn hash(&self) -> Option<B256> {
1057        match self {
1058            Self::Hash(hash) | Self::HashAndNumber { hash, .. } => Some(*hash),
1059            Self::Number(_) => None,
1060        }
1061    }
1062
1063    /// Return the block number of the sync target, if it is set.
1064    const fn number(&self) -> Option<u64> {
1065        match self {
1066            Self::Hash(_) => None,
1067            Self::Number(number) | Self::HashAndNumber { number, .. } => Some(*number),
1068        }
1069    }
1070}
1071
1072/// The builder for [`ReverseHeadersDownloader`] with
1073/// some default settings
1074#[derive(Debug)]
1075pub struct ReverseHeadersDownloaderBuilder {
1076    /// The batch size per one request
1077    request_limit: u64,
1078    /// Batch size for headers
1079    stream_batch_size: usize,
1080    /// Batch size for headers
1081    min_concurrent_requests: usize,
1082    /// Batch size for headers
1083    max_concurrent_requests: usize,
1084    /// How many responses to buffer
1085    max_buffered_responses: usize,
1086}
1087
1088impl ReverseHeadersDownloaderBuilder {
1089    /// Creates a new [`ReverseHeadersDownloaderBuilder`] with configurations based on the provided
1090    /// [`HeadersConfig`].
1091    pub fn new(config: HeadersConfig) -> Self {
1092        Self::default()
1093            .request_limit(config.downloader_request_limit)
1094            .min_concurrent_requests(config.downloader_min_concurrent_requests)
1095            .max_concurrent_requests(config.downloader_max_concurrent_requests)
1096            .max_buffered_responses(config.downloader_max_buffered_responses)
1097            .stream_batch_size(config.commit_threshold as usize)
1098    }
1099}
1100
1101impl Default for ReverseHeadersDownloaderBuilder {
1102    fn default() -> Self {
1103        Self {
1104            stream_batch_size: 10_000,
1105            // This is just below the max number of headers commonly in a headers response (1024), see also <https://github.com/ethereum/go-ethereum/blob/b0d44338bbcefee044f1f635a84487cbbd8f0538/eth/protocols/eth/handler.go#L38-L40>
1106            // with ~500bytes per header this around 0.5MB per request max
1107            request_limit: 1_000,
1108            max_concurrent_requests: 100,
1109            min_concurrent_requests: 5,
1110            max_buffered_responses: 100,
1111        }
1112    }
1113}
1114
1115impl ReverseHeadersDownloaderBuilder {
1116    /// Set the request batch size.
1117    ///
1118    /// This determines the `limit` for a `GetBlockHeaders` requests, the number of headers we ask
1119    /// for.
1120    pub const fn request_limit(mut self, limit: u64) -> Self {
1121        self.request_limit = limit;
1122        self
1123    }
1124
1125    /// Set the stream batch size
1126    ///
1127    /// This determines the number of headers the [`ReverseHeadersDownloader`] will yield on
1128    /// `Stream::next`. This will be the amount of headers the headers stage will commit at a
1129    /// time.
1130    pub const fn stream_batch_size(mut self, size: usize) -> Self {
1131        self.stream_batch_size = size;
1132        self
1133    }
1134
1135    /// Set the min amount of concurrent requests.
1136    ///
1137    /// If there's capacity the [`ReverseHeadersDownloader`] will keep at least this many requests
1138    /// active at a time.
1139    pub const fn min_concurrent_requests(mut self, min_concurrent_requests: usize) -> Self {
1140        self.min_concurrent_requests = min_concurrent_requests;
1141        self
1142    }
1143
1144    /// Set the max amount of concurrent requests.
1145    ///
1146    /// The downloader's concurrent requests won't exceed the given amount.
1147    pub const fn max_concurrent_requests(mut self, max_concurrent_requests: usize) -> Self {
1148        self.max_concurrent_requests = max_concurrent_requests;
1149        self
1150    }
1151
1152    /// How many responses to buffer internally.
1153    ///
1154    /// This essentially determines how much memory the downloader can use for buffering responses
1155    /// that arrive out of order. The total number of buffered headers is `request_limit *
1156    /// max_buffered_responses`. If the [`ReverseHeadersDownloader`]'s buffered responses exceeds
1157    /// this threshold it waits until there's capacity again before sending new requests.
1158    pub const fn max_buffered_responses(mut self, max_buffered_responses: usize) -> Self {
1159        self.max_buffered_responses = max_buffered_responses;
1160        self
1161    }
1162
1163    /// Build [`ReverseHeadersDownloader`] with provided consensus
1164    /// and header client implementations
1165    pub fn build<H>(
1166        self,
1167        client: H,
1168        consensus: Arc<dyn HeaderValidator<H::Header>>,
1169    ) -> ReverseHeadersDownloader<H>
1170    where
1171        H: HeadersClient + 'static,
1172    {
1173        let Self {
1174            request_limit,
1175            stream_batch_size,
1176            min_concurrent_requests,
1177            max_concurrent_requests,
1178            max_buffered_responses,
1179        } = self;
1180        ReverseHeadersDownloader {
1181            consensus,
1182            client: Arc::new(client),
1183            local_head: None,
1184            sync_target: None,
1185            // Note: we set these to `0` first, they'll be updated once the sync target response is
1186            // handled and only used afterwards
1187            next_request_block_number: 0,
1188            next_chain_tip_block_number: 0,
1189            lowest_validated_header: None,
1190            request_limit,
1191            min_concurrent_requests,
1192            max_concurrent_requests,
1193            stream_batch_size,
1194            max_buffered_responses,
1195            sync_target_request: None,
1196            in_progress_queue: Default::default(),
1197            buffered_responses: Default::default(),
1198            queued_validated_headers: Default::default(),
1199            metrics: Default::default(),
1200        }
1201    }
1202}
1203
1204/// Configures and returns the next [`HeadersRequest`] based on the given parameters
1205///
1206/// The request will start at the given `next_request_block_number` block.
1207/// The `limit` of the request will either be the targeted `request_limit` or the difference of
1208/// `next_request_block_number` and the `local_head` in case this is smaller than the targeted
1209/// `request_limit`.
1210#[inline]
1211fn calc_next_request(
1212    local_head: u64,
1213    next_request_block_number: u64,
1214    request_limit: u64,
1215) -> HeadersRequest {
1216    // downloading is in reverse
1217    let diff = next_request_block_number - local_head;
1218    let limit = diff.min(request_limit);
1219    let start = next_request_block_number;
1220    HeadersRequest::falling(start.into(), limit)
1221}
1222
1223#[cfg(test)]
1224mod tests {
1225    use super::*;
1226    use crate::headers::test_utils::child_header;
1227    use alloy_consensus::Header;
1228    use alloy_eips::{eip1898::BlockWithParent, BlockNumHash};
1229    use assert_matches::assert_matches;
1230    use reth_consensus::test_utils::TestConsensus;
1231    use reth_network_p2p::test_utils::TestHeadersClient;
1232
1233    /// Tests that `replace_number` works the same way as `Option::replace`
1234    #[test]
1235    fn test_replace_number_semantics() {
1236        struct Fixture {
1237            // input fields (both SyncTargetBlock and Option<u64>)
1238            sync_target_block: SyncTargetBlock,
1239            sync_target_option: Option<u64>,
1240
1241            // option to replace
1242            replace_number: u64,
1243
1244            // expected method result
1245            expected_result: Option<u64>,
1246
1247            // output state
1248            new_number: u64,
1249        }
1250
1251        let fixtures = vec![
1252            Fixture {
1253                sync_target_block: SyncTargetBlock::Hash(B256::random()),
1254                // Hash maps to None here, all other variants map to Some
1255                sync_target_option: None,
1256                replace_number: 1,
1257                expected_result: None,
1258                new_number: 1,
1259            },
1260            Fixture {
1261                sync_target_block: SyncTargetBlock::Number(1),
1262                sync_target_option: Some(1),
1263                replace_number: 2,
1264                expected_result: Some(1),
1265                new_number: 2,
1266            },
1267            Fixture {
1268                sync_target_block: SyncTargetBlock::HashAndNumber {
1269                    hash: B256::random(),
1270                    number: 1,
1271                },
1272                sync_target_option: Some(1),
1273                replace_number: 2,
1274                expected_result: Some(1),
1275                new_number: 2,
1276            },
1277        ];
1278
1279        for fixture in fixtures {
1280            let mut sync_target_block = fixture.sync_target_block;
1281            let result = sync_target_block.replace_number(fixture.replace_number);
1282            assert_eq!(result, fixture.expected_result);
1283            assert_eq!(sync_target_block.number(), Some(fixture.new_number));
1284
1285            let mut sync_target_option = fixture.sync_target_option;
1286            let option_result = sync_target_option.replace(fixture.replace_number);
1287            assert_eq!(option_result, fixture.expected_result);
1288            assert_eq!(sync_target_option, Some(fixture.new_number));
1289        }
1290    }
1291
1292    /// Tests that request calc works
1293    #[test]
1294    fn test_sync_target_update() {
1295        let client = Arc::new(TestHeadersClient::default());
1296
1297        let genesis = SealedHeader::default();
1298
1299        let mut downloader = ReverseHeadersDownloaderBuilder::default()
1300            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1301        downloader.update_local_head(genesis);
1302        downloader.update_sync_target(SyncTarget::Tip(B256::random()));
1303
1304        downloader.sync_target_request.take();
1305
1306        let target = SyncTarget::Tip(B256::random());
1307        downloader.update_sync_target(target);
1308        assert!(downloader.sync_target_request.is_some());
1309
1310        downloader.sync_target_request.take();
1311        let target = SyncTarget::Gap(BlockWithParent {
1312            block: BlockNumHash::new(0, B256::random()),
1313            parent: Default::default(),
1314        });
1315        downloader.update_sync_target(target);
1316        assert!(downloader.sync_target_request.is_none());
1317        assert_matches!(
1318            downloader.sync_target,
1319            Some(target) => target.number().is_some()
1320        );
1321    }
1322
1323    /// Tests that request calc works
1324    #[test]
1325    fn test_head_update() {
1326        let client = Arc::new(TestHeadersClient::default());
1327
1328        let header: SealedHeader = SealedHeader::default();
1329
1330        let mut downloader = ReverseHeadersDownloaderBuilder::default()
1331            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1332        downloader.update_local_head(header.clone());
1333        downloader.update_sync_target(SyncTarget::Tip(B256::random()));
1334
1335        downloader.queued_validated_headers.push(header.clone());
1336        let mut next = header.as_ref().clone();
1337        next.number += 1;
1338        downloader.update_local_head(SealedHeader::new(next, B256::random()));
1339        assert!(downloader.queued_validated_headers.is_empty());
1340    }
1341
1342    #[test]
1343    fn test_request_calc() {
1344        // request an entire batch
1345        let local = 0;
1346        let next = 1000;
1347        let batch_size = 2;
1348        let request = calc_next_request(local, next, batch_size);
1349        assert_eq!(request.start, next.into());
1350        assert_eq!(request.limit, batch_size);
1351
1352        // only request 1
1353        let local = 999;
1354        let next = 1000;
1355        let batch_size = 2;
1356        let request = calc_next_request(local, next, batch_size);
1357        assert_eq!(request.start, next.into());
1358        assert_eq!(request.limit, 1);
1359    }
1360
1361    /// Tests that request calc works
1362    #[test]
1363    fn test_next_request() {
1364        let client = Arc::new(TestHeadersClient::default());
1365
1366        let genesis = SealedHeader::default();
1367
1368        let batch_size = 99;
1369        let start = 1000;
1370        let mut downloader = ReverseHeadersDownloaderBuilder::default()
1371            .request_limit(batch_size)
1372            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1373        downloader.update_local_head(genesis);
1374        downloader.update_sync_target(SyncTarget::Tip(B256::random()));
1375
1376        downloader.next_request_block_number = start;
1377
1378        let mut total = 0;
1379        while let Some(req) = downloader.next_request() {
1380            assert_eq!(req.start, (start - total).into());
1381            total += req.limit;
1382        }
1383        assert_eq!(total, start);
1384        assert_eq!(Some(downloader.next_request_block_number), downloader.local_block_number());
1385    }
1386
1387    #[test]
1388    fn test_resp_order() {
1389        let mut heap = BinaryHeap::new();
1390        let hi = 1u64;
1391        heap.push(OrderedHeadersResponse::<Header> {
1392            headers: vec![],
1393            request: HeadersRequest { start: hi.into(), limit: 0, direction: Default::default() },
1394            peer_id: Default::default(),
1395        });
1396
1397        let lo = 0u64;
1398        heap.push(OrderedHeadersResponse {
1399            headers: vec![],
1400            request: HeadersRequest { start: lo.into(), limit: 0, direction: Default::default() },
1401            peer_id: Default::default(),
1402        });
1403
1404        assert_eq!(heap.pop().unwrap().block_number(), hi);
1405        assert_eq!(heap.pop().unwrap().block_number(), lo);
1406    }
1407
1408    #[tokio::test]
1409    async fn download_at_fork_head() {
1410        reth_tracing::init_test_tracing();
1411
1412        let client = Arc::new(TestHeadersClient::default());
1413
1414        let p3 = SealedHeader::default();
1415        let p2 = child_header(&p3);
1416        let p1 = child_header(&p2);
1417        let p0 = child_header(&p1);
1418
1419        let mut downloader = ReverseHeadersDownloaderBuilder::default()
1420            .stream_batch_size(3)
1421            .request_limit(3)
1422            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1423        downloader.update_local_head(p3.clone());
1424        downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
1425
1426        client
1427            .extend(vec![
1428                p0.as_ref().clone(),
1429                p1.as_ref().clone(),
1430                p2.as_ref().clone(),
1431                p3.as_ref().clone(),
1432            ])
1433            .await;
1434
1435        let headers = downloader.next().await.unwrap();
1436        assert_eq!(headers, Ok(vec![p0, p1, p2,]));
1437        assert!(downloader.buffered_responses.is_empty());
1438        assert!(downloader.next().await.is_none());
1439        assert!(downloader.next().await.is_none());
1440    }
1441
1442    #[tokio::test]
1443    async fn download_one_by_one() {
1444        reth_tracing::init_test_tracing();
1445        let p3 = SealedHeader::default();
1446        let p2 = child_header(&p3);
1447        let p1 = child_header(&p2);
1448        let p0 = child_header(&p1);
1449
1450        let client = Arc::new(TestHeadersClient::default());
1451        let mut downloader = ReverseHeadersDownloaderBuilder::default()
1452            .stream_batch_size(1)
1453            .request_limit(1)
1454            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1455        downloader.update_local_head(p3.clone());
1456        downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
1457
1458        client
1459            .extend(vec![
1460                p0.as_ref().clone(),
1461                p1.as_ref().clone(),
1462                p2.as_ref().clone(),
1463                p3.as_ref().clone(),
1464            ])
1465            .await;
1466
1467        let headers = downloader.next().await.unwrap();
1468        assert_eq!(headers, Ok(vec![p0]));
1469        let headers = headers.unwrap();
1470        assert_eq!(headers.capacity(), headers.len());
1471
1472        let headers = downloader.next().await.unwrap();
1473        assert_eq!(headers, Ok(vec![p1]));
1474        let headers = headers.unwrap();
1475        assert_eq!(headers.capacity(), headers.len());
1476
1477        let headers = downloader.next().await.unwrap();
1478        assert_eq!(headers, Ok(vec![p2]));
1479        let headers = headers.unwrap();
1480        assert_eq!(headers.capacity(), headers.len());
1481
1482        assert!(downloader.next().await.is_none());
1483    }
1484
1485    #[tokio::test]
1486    async fn download_one_by_one_larger_request_limit() {
1487        reth_tracing::init_test_tracing();
1488        let p3 = SealedHeader::default();
1489        let p2 = child_header(&p3);
1490        let p1 = child_header(&p2);
1491        let p0 = child_header(&p1);
1492
1493        let client = Arc::new(TestHeadersClient::default());
1494        let mut downloader = ReverseHeadersDownloaderBuilder::default()
1495            .stream_batch_size(1)
1496            .request_limit(3)
1497            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1498        downloader.update_local_head(p3.clone());
1499        downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
1500
1501        client
1502            .extend(vec![
1503                p0.as_ref().clone(),
1504                p1.as_ref().clone(),
1505                p2.as_ref().clone(),
1506                p3.as_ref().clone(),
1507            ])
1508            .await;
1509
1510        let headers = downloader.next().await.unwrap();
1511        assert_eq!(headers, Ok(vec![p0]));
1512        let headers = headers.unwrap();
1513        assert_eq!(headers.capacity(), headers.len());
1514
1515        let headers = downloader.next().await.unwrap();
1516        assert_eq!(headers, Ok(vec![p1]));
1517        let headers = headers.unwrap();
1518        assert_eq!(headers.capacity(), headers.len());
1519
1520        let headers = downloader.next().await.unwrap();
1521        assert_eq!(headers, Ok(vec![p2]));
1522        let headers = headers.unwrap();
1523        assert_eq!(headers.capacity(), headers.len());
1524
1525        assert!(downloader.next().await.is_none());
1526    }
1527}