1use super::task::TaskDownloader;
4use crate::metrics::HeaderDownloaderMetrics;
5use alloy_consensus::BlockHeader;
6use alloy_eips::BlockHashOrNumber;
7use alloy_primitives::{BlockNumber, B256};
8use futures::{stream::Stream, FutureExt};
9use futures_util::{stream::FuturesUnordered, StreamExt};
10use rayon::prelude::*;
11use reth_config::config::HeadersConfig;
12use reth_consensus::HeaderValidator;
13use reth_network_p2p::{
14 error::{DownloadError, DownloadResult, PeerRequestResult},
15 headers::{
16 client::{HeadersClient, HeadersRequest},
17 downloader::{validate_header_download, HeaderDownloader, SyncTarget},
18 error::{HeadersDownloaderError, HeadersDownloaderResult},
19 },
20 priority::Priority,
21};
22use reth_network_peers::PeerId;
23use reth_primitives::{GotExpected, SealedHeader};
24use reth_tasks::{TaskSpawner, TokioTaskExecutor};
25use std::{
26 cmp::{Ordering, Reverse},
27 collections::{binary_heap::PeekMut, BinaryHeap},
28 future::Future,
29 pin::Pin,
30 sync::Arc,
31 task::{ready, Context, Poll},
32};
33use thiserror::Error;
34use tracing::{error, trace};
35
36const REQUESTS_PER_PEER_MULTIPLIER: usize = 5;
40
41#[derive(Error, Debug)]
43enum ReverseHeadersDownloaderError<H> {
44 #[error(transparent)]
45 Downloader(#[from] HeadersDownloaderError<H>),
46 #[error(transparent)]
47 Response(#[from] Box<HeadersResponseError>),
48}
49
50impl<H> From<HeadersResponseError> for ReverseHeadersDownloaderError<H> {
51 fn from(value: HeadersResponseError) -> Self {
52 Self::Response(Box::new(value))
53 }
54}
55
56#[must_use = "Stream does nothing unless polled"]
68#[derive(Debug)]
69pub struct ReverseHeadersDownloader<H: HeadersClient> {
70 consensus: Arc<dyn HeaderValidator<H::Header>>,
72 client: Arc<H>,
74 local_head: Option<SealedHeader<H::Header>>,
76 sync_target: Option<SyncTargetBlock>,
78 next_request_block_number: u64,
80 lowest_validated_header: Option<SealedHeader<H::Header>>,
82 next_chain_tip_block_number: u64,
84 request_limit: u64,
86 min_concurrent_requests: usize,
88 max_concurrent_requests: usize,
90 stream_batch_size: usize,
92 max_buffered_responses: usize,
94 sync_target_request: Option<HeadersRequestFuture<H::Output>>,
99 in_progress_queue: FuturesUnordered<HeadersRequestFuture<H::Output>>,
101 buffered_responses: BinaryHeap<OrderedHeadersResponse<H::Header>>,
103 queued_validated_headers: Vec<SealedHeader<H::Header>>,
107 metrics: HeaderDownloaderMetrics,
109}
110
111impl<H> ReverseHeadersDownloader<H>
114where
115 H: HeadersClient<Header: reth_primitives_traits::BlockHeader> + 'static,
116{
117 pub fn builder() -> ReverseHeadersDownloaderBuilder {
119 ReverseHeadersDownloaderBuilder::default()
120 }
121
122 #[inline]
124 fn local_block_number(&self) -> Option<BlockNumber> {
125 self.local_head.as_ref().map(|h| h.number())
126 }
127
128 #[inline]
134 fn existing_local_block_number(&self) -> BlockNumber {
135 self.local_head.as_ref().expect("is initialized").number()
136 }
137
138 #[inline]
144 fn existing_sync_target(&self) -> SyncTargetBlock {
145 self.sync_target.as_ref().expect("is initialized").clone()
146 }
147
148 #[inline]
153 fn concurrent_request_limit(&self) -> usize {
154 let num_peers = self.client.num_connected_peers();
155
156 let dynamic_target = num_peers * REQUESTS_PER_PEER_MULTIPLIER;
159 let max_dynamic = dynamic_target.max(self.min_concurrent_requests);
160
161 if num_peers < self.min_concurrent_requests {
163 return max_dynamic
164 }
165
166 max_dynamic.min(self.max_concurrent_requests)
167 }
168
169 fn next_request(&mut self) -> Option<HeadersRequest> {
175 if let Some(local_head) = self.local_block_number() {
176 if self.next_request_block_number > local_head {
177 let request = calc_next_request(
178 local_head,
179 self.next_request_block_number,
180 self.request_limit,
181 );
182 self.next_request_block_number -= request.limit;
185
186 return Some(request)
187 }
188 }
189
190 None
191 }
192
193 fn lowest_validated_header(&self) -> Option<&SealedHeader<H::Header>> {
203 self.queued_validated_headers.last().or(self.lowest_validated_header.as_ref())
204 }
205
206 fn validate_sync_target(
208 &self,
209 header: &SealedHeader<H::Header>,
210 request: HeadersRequest,
211 peer_id: PeerId,
212 ) -> Result<(), Box<HeadersResponseError>> {
213 match self.existing_sync_target() {
214 SyncTargetBlock::Hash(hash) | SyncTargetBlock::HashAndNumber { hash, .. }
215 if header.hash() != hash =>
216 {
217 Err(Box::new(HeadersResponseError {
218 request,
219 peer_id: Some(peer_id),
220 error: DownloadError::InvalidTip(
221 GotExpected { got: header.hash(), expected: hash }.into(),
222 ),
223 }))
224 }
225 SyncTargetBlock::Number(number) if header.number() != number => {
226 Err(Box::new(HeadersResponseError {
227 request,
228 peer_id: Some(peer_id),
229 error: DownloadError::InvalidTipNumber(GotExpected {
230 got: header.number(),
231 expected: number,
232 }),
233 }))
234 }
235 _ => Ok(()),
236 }
237 }
238
239 fn process_next_headers(
247 &mut self,
248 request: HeadersRequest,
249 headers: Vec<H::Header>,
250 peer_id: PeerId,
251 ) -> Result<(), ReverseHeadersDownloaderError<H::Header>> {
252 let mut validated = Vec::with_capacity(headers.len());
253
254 let sealed_headers = headers.into_par_iter().map(SealedHeader::seal).collect::<Vec<_>>();
255 for parent in sealed_headers {
256 if let Some(validated_header) =
258 validated.last().or_else(|| self.lowest_validated_header())
259 {
260 if let Err(error) = self.validate(validated_header, &parent) {
261 trace!(target: "downloaders::headers", %error ,"Failed to validate header");
262 return Err(
263 HeadersResponseError { request, peer_id: Some(peer_id), error }.into()
264 )
265 }
266 } else {
267 self.validate_sync_target(&parent, request.clone(), peer_id)?;
268 }
269
270 validated.push(parent);
271 }
272
273 if let Some((last_header, head)) = validated
275 .last_mut()
276 .zip(self.local_head.as_ref())
277 .filter(|(last, head)| last.number() == head.number() + 1)
278 {
279 if let Err(error) = self.consensus.validate_header(&*last_header) {
281 trace!(target: "downloaders::headers", %error, "Failed to validate header");
282 return Err(HeadersResponseError {
283 request,
284 peer_id: Some(peer_id),
285 error: DownloadError::HeaderValidation {
286 hash: head.hash(),
287 number: head.number(),
288 error: Box::new(error),
289 },
290 }
291 .into())
292 }
293
294 if let Err(error) = self.consensus.validate_header_against_parent(&*last_header, head) {
297 error!(target: "downloaders::headers", %error, number = last_header.number(), hash = ?last_header.hash(), "Header cannot be attached to known canonical chain");
299 return Err(HeadersDownloaderError::DetachedHead {
300 local_head: Box::new(head.clone()),
301 header: Box::new(last_header.clone()),
302 error: Box::new(error),
303 }
304 .into())
305 }
306 }
307
308 self.next_chain_tip_block_number =
310 validated.last().expect("exists").number().saturating_sub(1);
311 self.queued_validated_headers.extend(validated);
312
313 Ok(())
314 }
315
316 fn on_block_number_update(&mut self, target_block_number: u64, next_block: u64) {
327 if let Some(old_target) =
329 self.sync_target.as_mut().and_then(|t| t.replace_number(target_block_number))
330 {
331 if target_block_number > old_target {
332 self.next_request_block_number = next_block;
335 self.next_chain_tip_block_number = next_block;
336 self.clear();
337 } else {
338 let skip = self
340 .queued_validated_headers
341 .iter()
342 .take_while(|last| last.number() > target_block_number)
343 .count();
344 self.queued_validated_headers.drain(..skip);
346 }
347 } else {
348 self.next_request_block_number = next_block;
350 self.next_chain_tip_block_number = next_block;
351 }
352 }
353
354 fn on_sync_target_outcome(
356 &mut self,
357 response: HeadersRequestOutcome<H::Header>,
358 ) -> Result<(), ReverseHeadersDownloaderError<H::Header>> {
359 let sync_target = self.existing_sync_target();
360 let HeadersRequestOutcome { request, outcome } = response;
361 match outcome {
362 Ok(res) => {
363 let (peer_id, mut headers) = res.split();
364
365 self.metrics.total_downloaded.increment(headers.len() as u64);
367
368 headers.sort_unstable_by_key(|h| Reverse(h.number()));
370
371 if headers.is_empty() {
372 return Err(HeadersResponseError {
373 request,
374 peer_id: Some(peer_id),
375 error: DownloadError::EmptyResponse,
376 }
377 .into())
378 }
379
380 let header = headers.swap_remove(0);
381 let target = SealedHeader::seal(header);
382
383 match sync_target {
384 SyncTargetBlock::Hash(hash) | SyncTargetBlock::HashAndNumber { hash, .. } => {
385 if target.hash() != hash {
386 return Err(HeadersResponseError {
387 request,
388 peer_id: Some(peer_id),
389 error: DownloadError::InvalidTip(
390 GotExpected { got: target.hash(), expected: hash }.into(),
391 ),
392 }
393 .into())
394 }
395 }
396 SyncTargetBlock::Number(number) => {
397 if target.number() != number {
398 return Err(HeadersResponseError {
399 request,
400 peer_id: Some(peer_id),
401 error: DownloadError::InvalidTipNumber(GotExpected {
402 got: target.number(),
403 expected: number,
404 }),
405 }
406 .into())
407 }
408 }
409 }
410
411 trace!(target: "downloaders::headers", head=?self.local_block_number(), hash=?target.hash(), number=%target.number(), "Received sync target");
412
413 let parent_block_number = target.number().saturating_sub(1);
415 self.on_block_number_update(target.number(), parent_block_number);
416
417 self.queued_validated_headers.push(target);
418
419 self.try_validate_buffered()
421 .map(Err::<(), ReverseHeadersDownloaderError<H::Header>>)
422 .transpose()?;
423
424 Ok(())
425 }
426 Err(err) => {
427 Err(HeadersResponseError { request, peer_id: None, error: err.into() }.into())
428 }
429 }
430 }
431
432 fn on_headers_outcome(
434 &mut self,
435 response: HeadersRequestOutcome<H::Header>,
436 ) -> Result<(), ReverseHeadersDownloaderError<H::Header>> {
437 let requested_block_number = response.block_number();
438 let HeadersRequestOutcome { request, outcome } = response;
439
440 match outcome {
441 Ok(res) => {
442 let (peer_id, mut headers) = res.split();
443
444 self.metrics.total_downloaded.increment(headers.len() as u64);
446
447 trace!(target: "downloaders::headers", len=%headers.len(), "Received headers response");
448
449 if headers.is_empty() {
450 return Err(HeadersResponseError {
451 request,
452 peer_id: Some(peer_id),
453 error: DownloadError::EmptyResponse,
454 }
455 .into())
456 }
457
458 if (headers.len() as u64) != request.limit {
459 return Err(HeadersResponseError {
460 peer_id: Some(peer_id),
461 error: DownloadError::HeadersResponseTooShort(GotExpected {
462 got: headers.len() as u64,
463 expected: request.limit,
464 }),
465 request,
466 }
467 .into())
468 }
469
470 headers.sort_unstable_by_key(|h| Reverse(h.number()));
472
473 let highest = &headers[0];
475
476 trace!(target: "downloaders::headers", requested_block_number, highest=?highest.number(), "Validating non-empty headers response");
477
478 if highest.number() != requested_block_number {
479 return Err(HeadersResponseError {
480 request,
481 peer_id: Some(peer_id),
482 error: DownloadError::HeadersResponseStartBlockMismatch(GotExpected {
483 got: highest.number(),
484 expected: requested_block_number,
485 }),
486 }
487 .into())
488 }
489
490 if highest.number() == self.next_chain_tip_block_number {
492 self.process_next_headers(request, headers, peer_id)?;
494 self.try_validate_buffered()
496 .map(Err::<(), ReverseHeadersDownloaderError<H::Header>>)
497 .transpose()?;
498 } else if highest.number() > self.existing_local_block_number() {
499 self.metrics.buffered_responses.increment(1.);
500 self.buffered_responses.push(OrderedHeadersResponse {
502 headers,
503 request,
504 peer_id,
505 })
506 }
507
508 Ok(())
509 }
510 Err(err) => {
513 trace!(target: "downloaders::headers", %err, "Response error");
514 Err(HeadersResponseError { request, peer_id: None, error: err.into() }.into())
515 }
516 }
517 }
518
519 fn penalize_peer(&self, peer_id: Option<PeerId>, error: &DownloadError) {
520 if let Some(peer_id) = peer_id {
522 trace!(target: "downloaders::headers", ?peer_id, %error, "Penalizing peer");
523 self.client.report_bad_message(peer_id);
524 }
525 }
526
527 fn on_headers_error(&self, err: Box<HeadersResponseError>) {
531 let HeadersResponseError { request, peer_id, error } = *err;
532
533 self.penalize_peer(peer_id, &error);
534
535 self.metrics.increment_errors(&error);
537
538 self.submit_request(request, Priority::High);
540 }
541
542 fn try_validate_buffered(&mut self) -> Option<ReverseHeadersDownloaderError<H::Header>> {
546 loop {
547 let next_response = self.buffered_responses.peek_mut()?;
549 let next_block_number = next_response.block_number();
550 match next_block_number.cmp(&self.next_chain_tip_block_number) {
551 Ordering::Less => return None,
552 Ordering::Equal => {
553 let OrderedHeadersResponse { headers, request, peer_id } =
554 PeekMut::pop(next_response);
555 self.metrics.buffered_responses.decrement(1.);
556
557 if let Err(err) = self.process_next_headers(request, headers, peer_id) {
558 return Some(err)
559 }
560 }
561 Ordering::Greater => {
562 self.metrics.buffered_responses.decrement(1.);
563 PeekMut::pop(next_response);
564 }
565 }
566 }
567 }
568
569 const fn get_sync_target_request(&self, start: BlockHashOrNumber) -> HeadersRequest {
571 HeadersRequest::falling(start, 1)
572 }
573
574 fn submit_request(&self, request: HeadersRequest, priority: Priority) {
576 trace!(target: "downloaders::headers", ?request, "Submitting headers request");
577 self.in_progress_queue.push(self.request_fut(request, priority));
578 self.metrics.in_flight_requests.increment(1.);
579 }
580
581 fn request_fut(
582 &self,
583 request: HeadersRequest,
584 priority: Priority,
585 ) -> HeadersRequestFuture<H::Output> {
586 let client = Arc::clone(&self.client);
587 HeadersRequestFuture {
588 request: Some(request.clone()),
589 fut: client.get_headers_with_priority(request, priority),
590 }
591 }
592
593 fn validate(
595 &self,
596 header: &SealedHeader<H::Header>,
597 parent: &SealedHeader<H::Header>,
598 ) -> DownloadResult<()> {
599 validate_header_download(&self.consensus, header, parent)
600 }
601
602 fn clear(&mut self) {
604 self.lowest_validated_header.take();
605 self.queued_validated_headers = Vec::new();
606 self.buffered_responses = BinaryHeap::new();
607 self.in_progress_queue.clear();
608
609 self.metrics.in_flight_requests.set(0.);
610 self.metrics.buffered_responses.set(0.);
611 }
612
613 fn split_next_batch(&mut self) -> Vec<SealedHeader<H::Header>> {
615 let batch_size = self.stream_batch_size.min(self.queued_validated_headers.len());
616 let mut rem = self.queued_validated_headers.split_off(batch_size);
617 std::mem::swap(&mut rem, &mut self.queued_validated_headers);
618 rem.shrink_to_fit();
634 rem
635 }
636}
637
638impl<H> ReverseHeadersDownloader<H>
639where
640 H: HeadersClient,
641 Self: HeaderDownloader + 'static,
642{
643 pub fn into_task(self) -> TaskDownloader<<Self as HeaderDownloader>::Header> {
645 self.into_task_with(&TokioTaskExecutor::default())
646 }
647
648 pub fn into_task_with<S>(
650 self,
651 spawner: &S,
652 ) -> TaskDownloader<<Self as HeaderDownloader>::Header>
653 where
654 S: TaskSpawner,
655 {
656 TaskDownloader::spawn_with(self, spawner)
657 }
658}
659
660impl<H> HeaderDownloader for ReverseHeadersDownloader<H>
661where
662 H: HeadersClient<Header: reth_primitives_traits::BlockHeader> + 'static,
663{
664 type Header = H::Header;
665
666 fn update_local_head(&mut self, head: SealedHeader<H::Header>) {
667 while self
669 .queued_validated_headers
670 .last()
671 .is_some_and(|last| last.number() <= head.number())
672 {
673 self.queued_validated_headers.pop();
675 }
676 self.local_head = Some(head);
678 }
679
680 fn update_sync_target(&mut self, target: SyncTarget) {
682 let current_tip = self.sync_target.as_ref().and_then(|t| t.hash());
683 match target {
684 SyncTarget::Tip(tip) => {
685 if Some(tip) != current_tip {
686 trace!(target: "downloaders::headers", current=?current_tip, new=?tip, "Update sync target");
687 let new_sync_target = SyncTargetBlock::from_hash(tip);
688
689 if let Some(target_number) = self
692 .queued_validated_headers
693 .first()
694 .filter(|h| h.hash() == tip)
695 .map(|h| h.number())
696 {
697 self.sync_target = Some(new_sync_target.with_number(target_number));
698 return
699 }
700
701 trace!(target: "downloaders::headers", new=?target, "Request new sync target");
702 self.metrics.out_of_order_requests.increment(1);
703 self.sync_target = Some(new_sync_target);
704 self.sync_target_request = Some(
705 self.request_fut(self.get_sync_target_request(tip.into()), Priority::High),
706 );
707 }
708 }
709 SyncTarget::Gap(existing) => {
710 let target = existing.parent;
711 if Some(target) != current_tip {
712 self.sync_target_request.take();
714 let parent_block_number = existing.block.number.saturating_sub(1);
717
718 trace!(target: "downloaders::headers", current=?current_tip, new=?target, %parent_block_number, "Updated sync target");
719
720 self.sync_target = match self.sync_target.take() {
722 Some(sync_target) => Some(sync_target.with_hash(target)),
723 None => Some(SyncTargetBlock::from_hash(target)),
724 };
725 self.on_block_number_update(parent_block_number, parent_block_number);
726 }
727 }
728 SyncTarget::TipNum(num) => {
729 let current_tip_num = self.sync_target.as_ref().and_then(|t| t.number());
730 if Some(num) != current_tip_num {
731 trace!(target: "downloaders::headers", %num, "Updating sync target based on num");
732 self.sync_target = Some(SyncTargetBlock::from_number(num));
734 self.sync_target_request = Some(
735 self.request_fut(self.get_sync_target_request(num.into()), Priority::High),
736 );
737 }
738 }
739 }
740 }
741
742 fn set_batch_size(&mut self, batch_size: usize) {
743 self.stream_batch_size = batch_size;
744 }
745}
746
747impl<H> Stream for ReverseHeadersDownloader<H>
748where
749 H: HeadersClient<Header: reth_primitives_traits::BlockHeader> + 'static,
750{
751 type Item = HeadersDownloaderResult<Vec<SealedHeader<H::Header>>, H::Header>;
752
753 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
754 let this = self.get_mut();
755
756 if this.local_head.is_none() || this.sync_target.is_none() {
759 trace!(
760 target: "downloaders::headers",
761 head=?this.local_block_number(),
762 sync_target=?this.sync_target,
763 "The downloader sync boundaries have not been set"
764 );
765 return Poll::Pending
766 }
767
768 while let Some(mut req) = this.sync_target_request.take() {
771 match req.poll_unpin(cx) {
772 Poll::Ready(outcome) => {
773 match this.on_sync_target_outcome(outcome) {
774 Ok(()) => break,
775 Err(ReverseHeadersDownloaderError::Response(error)) => {
776 trace!(target: "downloaders::headers", %error, "invalid sync target response");
777 if error.is_channel_closed() {
778 return Poll::Ready(None)
780 }
781
782 this.penalize_peer(error.peer_id, &error.error);
783 this.metrics.increment_errors(&error.error);
784 this.sync_target_request =
785 Some(this.request_fut(error.request, Priority::High));
786 }
787 Err(ReverseHeadersDownloaderError::Downloader(error)) => {
788 this.clear();
789 return Poll::Ready(Some(Err(error)))
790 }
791 };
792 }
793 Poll::Pending => {
794 this.sync_target_request = Some(req);
795 return Poll::Pending
796 }
797 }
798 }
799
800 this.buffered_responses.shrink_to_fit();
802
803 loop {
812 while let Poll::Ready(Some(outcome)) = this.in_progress_queue.poll_next_unpin(cx) {
814 this.metrics.in_flight_requests.decrement(1.);
815 match this.on_headers_outcome(outcome) {
817 Ok(()) => (),
818 Err(ReverseHeadersDownloaderError::Response(error)) => {
819 if error.is_channel_closed() {
820 return Poll::Ready(None)
822 }
823 this.on_headers_error(error);
824 }
825 Err(ReverseHeadersDownloaderError::Downloader(error)) => {
826 this.clear();
827 return Poll::Ready(Some(Err(error)))
828 }
829 };
830 }
831
832 this.buffered_responses.shrink_to_fit();
834
835 let mut progress = false;
837
838 let concurrent_request_limit = this.concurrent_request_limit();
839 while this.in_progress_queue.len() < concurrent_request_limit &&
841 this.buffered_responses.len() < this.max_buffered_responses
842 {
843 if let Some(request) = this.next_request() {
844 trace!(
845 target: "downloaders::headers",
846 "Requesting headers {request:?}"
847 );
848 progress = true;
849 this.submit_request(request, Priority::Normal);
850 } else {
851 break
853 }
854 }
855
856 if this.queued_validated_headers.len() >= this.stream_batch_size {
858 let next_batch = this.split_next_batch();
859
860 if this.queued_validated_headers.is_empty() {
863 this.lowest_validated_header = next_batch.last().cloned();
864 }
865
866 trace!(target: "downloaders::headers", batch=%next_batch.len(), "Returning validated batch");
867
868 this.metrics.total_flushed.increment(next_batch.len() as u64);
869 return Poll::Ready(Some(Ok(next_batch)))
870 }
871
872 if !progress {
873 break
874 }
875 }
876
877 if this.in_progress_queue.is_empty() {
879 let next_batch = this.split_next_batch();
880 if next_batch.is_empty() {
881 this.clear();
882 return Poll::Ready(None)
883 }
884 this.metrics.total_flushed.increment(next_batch.len() as u64);
885 return Poll::Ready(Some(Ok(next_batch)))
886 }
887
888 Poll::Pending
889 }
890}
891
892#[derive(Debug)]
894struct HeadersRequestFuture<F> {
895 request: Option<HeadersRequest>,
896 fut: F,
897}
898
899impl<F, H> Future for HeadersRequestFuture<F>
900where
901 F: Future<Output = PeerRequestResult<Vec<H>>> + Sync + Send + Unpin,
902{
903 type Output = HeadersRequestOutcome<H>;
904
905 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
906 let this = self.get_mut();
907 let outcome = ready!(this.fut.poll_unpin(cx));
908 let request = this.request.take().unwrap();
909
910 Poll::Ready(HeadersRequestOutcome { request, outcome })
911 }
912}
913
914struct HeadersRequestOutcome<H> {
916 request: HeadersRequest,
917 outcome: PeerRequestResult<Vec<H>>,
918}
919
920impl<H> HeadersRequestOutcome<H> {
923 fn block_number(&self) -> u64 {
924 self.request.start.as_number().expect("is number")
925 }
926}
927
928#[derive(Debug)]
930struct OrderedHeadersResponse<H> {
931 headers: Vec<H>,
932 request: HeadersRequest,
933 peer_id: PeerId,
934}
935
936impl<H> OrderedHeadersResponse<H> {
939 fn block_number(&self) -> u64 {
940 self.request.start.as_number().expect("is number")
941 }
942}
943
944impl<H> PartialEq for OrderedHeadersResponse<H> {
945 fn eq(&self, other: &Self) -> bool {
946 self.block_number() == other.block_number()
947 }
948}
949
950impl<H> Eq for OrderedHeadersResponse<H> {}
951
952impl<H> PartialOrd for OrderedHeadersResponse<H> {
953 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
954 Some(self.cmp(other))
955 }
956}
957
958impl<H> Ord for OrderedHeadersResponse<H> {
959 fn cmp(&self, other: &Self) -> Ordering {
960 self.block_number().cmp(&other.block_number())
961 }
962}
963
964#[derive(Debug, Error)]
966#[error("error requesting headers from peer {peer_id:?}: {error}; request: {request:?}")]
967struct HeadersResponseError {
968 request: HeadersRequest,
969 peer_id: Option<PeerId>,
970 #[source]
971 error: DownloadError,
972}
973
974impl HeadersResponseError {
975 const fn is_channel_closed(&self) -> bool {
977 if let DownloadError::RequestError(ref err) = self.error {
978 return err.is_channel_closed()
979 }
980 false
981 }
982}
983
984#[derive(Clone, Debug)]
987pub enum SyncTargetBlock {
988 Hash(B256),
990 Number(u64),
992 HashAndNumber {
994 hash: B256,
996 number: u64,
998 },
999}
1000
1001impl SyncTargetBlock {
1002 const fn from_hash(hash: B256) -> Self {
1004 Self::Hash(hash)
1005 }
1006
1007 const fn from_number(num: u64) -> Self {
1009 Self::Number(num)
1010 }
1011
1012 const fn with_hash(self, hash: B256) -> Self {
1014 match self {
1015 Self::Hash(_) => Self::Hash(hash),
1016 Self::Number(number) | Self::HashAndNumber { number, .. } => {
1017 Self::HashAndNumber { hash, number }
1018 }
1019 }
1020 }
1021
1022 const fn with_number(self, number: u64) -> Self {
1024 match self {
1025 Self::Hash(hash) | Self::HashAndNumber { hash, .. } => {
1026 Self::HashAndNumber { hash, number }
1027 }
1028 Self::Number(_) => Self::Number(number),
1029 }
1030 }
1031
1032 fn replace_number(&mut self, number: u64) -> Option<u64> {
1037 match self {
1038 Self::Hash(hash) => {
1039 *self = Self::HashAndNumber { hash: *hash, number };
1040 None
1041 }
1042 Self::Number(old_number) => {
1043 let res = Some(*old_number);
1044 *self = Self::Number(number);
1045 res
1046 }
1047 Self::HashAndNumber { number: old_number, hash } => {
1048 let res = Some(*old_number);
1049 *self = Self::HashAndNumber { hash: *hash, number };
1050 res
1051 }
1052 }
1053 }
1054
1055 const fn hash(&self) -> Option<B256> {
1057 match self {
1058 Self::Hash(hash) | Self::HashAndNumber { hash, .. } => Some(*hash),
1059 Self::Number(_) => None,
1060 }
1061 }
1062
1063 const fn number(&self) -> Option<u64> {
1065 match self {
1066 Self::Hash(_) => None,
1067 Self::Number(number) | Self::HashAndNumber { number, .. } => Some(*number),
1068 }
1069 }
1070}
1071
1072#[derive(Debug)]
1075pub struct ReverseHeadersDownloaderBuilder {
1076 request_limit: u64,
1078 stream_batch_size: usize,
1080 min_concurrent_requests: usize,
1082 max_concurrent_requests: usize,
1084 max_buffered_responses: usize,
1086}
1087
1088impl ReverseHeadersDownloaderBuilder {
1089 pub fn new(config: HeadersConfig) -> Self {
1092 Self::default()
1093 .request_limit(config.downloader_request_limit)
1094 .min_concurrent_requests(config.downloader_min_concurrent_requests)
1095 .max_concurrent_requests(config.downloader_max_concurrent_requests)
1096 .max_buffered_responses(config.downloader_max_buffered_responses)
1097 .stream_batch_size(config.commit_threshold as usize)
1098 }
1099}
1100
1101impl Default for ReverseHeadersDownloaderBuilder {
1102 fn default() -> Self {
1103 Self {
1104 stream_batch_size: 10_000,
1105 request_limit: 1_000,
1108 max_concurrent_requests: 100,
1109 min_concurrent_requests: 5,
1110 max_buffered_responses: 100,
1111 }
1112 }
1113}
1114
1115impl ReverseHeadersDownloaderBuilder {
1116 pub const fn request_limit(mut self, limit: u64) -> Self {
1121 self.request_limit = limit;
1122 self
1123 }
1124
1125 pub const fn stream_batch_size(mut self, size: usize) -> Self {
1131 self.stream_batch_size = size;
1132 self
1133 }
1134
1135 pub const fn min_concurrent_requests(mut self, min_concurrent_requests: usize) -> Self {
1140 self.min_concurrent_requests = min_concurrent_requests;
1141 self
1142 }
1143
1144 pub const fn max_concurrent_requests(mut self, max_concurrent_requests: usize) -> Self {
1148 self.max_concurrent_requests = max_concurrent_requests;
1149 self
1150 }
1151
1152 pub const fn max_buffered_responses(mut self, max_buffered_responses: usize) -> Self {
1159 self.max_buffered_responses = max_buffered_responses;
1160 self
1161 }
1162
1163 pub fn build<H>(
1166 self,
1167 client: H,
1168 consensus: Arc<dyn HeaderValidator<H::Header>>,
1169 ) -> ReverseHeadersDownloader<H>
1170 where
1171 H: HeadersClient + 'static,
1172 {
1173 let Self {
1174 request_limit,
1175 stream_batch_size,
1176 min_concurrent_requests,
1177 max_concurrent_requests,
1178 max_buffered_responses,
1179 } = self;
1180 ReverseHeadersDownloader {
1181 consensus,
1182 client: Arc::new(client),
1183 local_head: None,
1184 sync_target: None,
1185 next_request_block_number: 0,
1188 next_chain_tip_block_number: 0,
1189 lowest_validated_header: None,
1190 request_limit,
1191 min_concurrent_requests,
1192 max_concurrent_requests,
1193 stream_batch_size,
1194 max_buffered_responses,
1195 sync_target_request: None,
1196 in_progress_queue: Default::default(),
1197 buffered_responses: Default::default(),
1198 queued_validated_headers: Default::default(),
1199 metrics: Default::default(),
1200 }
1201 }
1202}
1203
1204#[inline]
1211fn calc_next_request(
1212 local_head: u64,
1213 next_request_block_number: u64,
1214 request_limit: u64,
1215) -> HeadersRequest {
1216 let diff = next_request_block_number - local_head;
1218 let limit = diff.min(request_limit);
1219 let start = next_request_block_number;
1220 HeadersRequest::falling(start.into(), limit)
1221}
1222
1223#[cfg(test)]
1224mod tests {
1225 use super::*;
1226 use crate::headers::test_utils::child_header;
1227 use alloy_consensus::Header;
1228 use alloy_eips::{eip1898::BlockWithParent, BlockNumHash};
1229 use assert_matches::assert_matches;
1230 use reth_consensus::test_utils::TestConsensus;
1231 use reth_network_p2p::test_utils::TestHeadersClient;
1232
1233 #[test]
1235 fn test_replace_number_semantics() {
1236 struct Fixture {
1237 sync_target_block: SyncTargetBlock,
1239 sync_target_option: Option<u64>,
1240
1241 replace_number: u64,
1243
1244 expected_result: Option<u64>,
1246
1247 new_number: u64,
1249 }
1250
1251 let fixtures = vec![
1252 Fixture {
1253 sync_target_block: SyncTargetBlock::Hash(B256::random()),
1254 sync_target_option: None,
1256 replace_number: 1,
1257 expected_result: None,
1258 new_number: 1,
1259 },
1260 Fixture {
1261 sync_target_block: SyncTargetBlock::Number(1),
1262 sync_target_option: Some(1),
1263 replace_number: 2,
1264 expected_result: Some(1),
1265 new_number: 2,
1266 },
1267 Fixture {
1268 sync_target_block: SyncTargetBlock::HashAndNumber {
1269 hash: B256::random(),
1270 number: 1,
1271 },
1272 sync_target_option: Some(1),
1273 replace_number: 2,
1274 expected_result: Some(1),
1275 new_number: 2,
1276 },
1277 ];
1278
1279 for fixture in fixtures {
1280 let mut sync_target_block = fixture.sync_target_block;
1281 let result = sync_target_block.replace_number(fixture.replace_number);
1282 assert_eq!(result, fixture.expected_result);
1283 assert_eq!(sync_target_block.number(), Some(fixture.new_number));
1284
1285 let mut sync_target_option = fixture.sync_target_option;
1286 let option_result = sync_target_option.replace(fixture.replace_number);
1287 assert_eq!(option_result, fixture.expected_result);
1288 assert_eq!(sync_target_option, Some(fixture.new_number));
1289 }
1290 }
1291
1292 #[test]
1294 fn test_sync_target_update() {
1295 let client = Arc::new(TestHeadersClient::default());
1296
1297 let genesis = SealedHeader::default();
1298
1299 let mut downloader = ReverseHeadersDownloaderBuilder::default()
1300 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1301 downloader.update_local_head(genesis);
1302 downloader.update_sync_target(SyncTarget::Tip(B256::random()));
1303
1304 downloader.sync_target_request.take();
1305
1306 let target = SyncTarget::Tip(B256::random());
1307 downloader.update_sync_target(target);
1308 assert!(downloader.sync_target_request.is_some());
1309
1310 downloader.sync_target_request.take();
1311 let target = SyncTarget::Gap(BlockWithParent {
1312 block: BlockNumHash::new(0, B256::random()),
1313 parent: Default::default(),
1314 });
1315 downloader.update_sync_target(target);
1316 assert!(downloader.sync_target_request.is_none());
1317 assert_matches!(
1318 downloader.sync_target,
1319 Some(target) => target.number().is_some()
1320 );
1321 }
1322
1323 #[test]
1325 fn test_head_update() {
1326 let client = Arc::new(TestHeadersClient::default());
1327
1328 let header: SealedHeader = SealedHeader::default();
1329
1330 let mut downloader = ReverseHeadersDownloaderBuilder::default()
1331 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1332 downloader.update_local_head(header.clone());
1333 downloader.update_sync_target(SyncTarget::Tip(B256::random()));
1334
1335 downloader.queued_validated_headers.push(header.clone());
1336 let mut next = header.as_ref().clone();
1337 next.number += 1;
1338 downloader.update_local_head(SealedHeader::new(next, B256::random()));
1339 assert!(downloader.queued_validated_headers.is_empty());
1340 }
1341
1342 #[test]
1343 fn test_request_calc() {
1344 let local = 0;
1346 let next = 1000;
1347 let batch_size = 2;
1348 let request = calc_next_request(local, next, batch_size);
1349 assert_eq!(request.start, next.into());
1350 assert_eq!(request.limit, batch_size);
1351
1352 let local = 999;
1354 let next = 1000;
1355 let batch_size = 2;
1356 let request = calc_next_request(local, next, batch_size);
1357 assert_eq!(request.start, next.into());
1358 assert_eq!(request.limit, 1);
1359 }
1360
1361 #[test]
1363 fn test_next_request() {
1364 let client = Arc::new(TestHeadersClient::default());
1365
1366 let genesis = SealedHeader::default();
1367
1368 let batch_size = 99;
1369 let start = 1000;
1370 let mut downloader = ReverseHeadersDownloaderBuilder::default()
1371 .request_limit(batch_size)
1372 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1373 downloader.update_local_head(genesis);
1374 downloader.update_sync_target(SyncTarget::Tip(B256::random()));
1375
1376 downloader.next_request_block_number = start;
1377
1378 let mut total = 0;
1379 while let Some(req) = downloader.next_request() {
1380 assert_eq!(req.start, (start - total).into());
1381 total += req.limit;
1382 }
1383 assert_eq!(total, start);
1384 assert_eq!(Some(downloader.next_request_block_number), downloader.local_block_number());
1385 }
1386
1387 #[test]
1388 fn test_resp_order() {
1389 let mut heap = BinaryHeap::new();
1390 let hi = 1u64;
1391 heap.push(OrderedHeadersResponse::<Header> {
1392 headers: vec![],
1393 request: HeadersRequest { start: hi.into(), limit: 0, direction: Default::default() },
1394 peer_id: Default::default(),
1395 });
1396
1397 let lo = 0u64;
1398 heap.push(OrderedHeadersResponse {
1399 headers: vec![],
1400 request: HeadersRequest { start: lo.into(), limit: 0, direction: Default::default() },
1401 peer_id: Default::default(),
1402 });
1403
1404 assert_eq!(heap.pop().unwrap().block_number(), hi);
1405 assert_eq!(heap.pop().unwrap().block_number(), lo);
1406 }
1407
1408 #[tokio::test]
1409 async fn download_at_fork_head() {
1410 reth_tracing::init_test_tracing();
1411
1412 let client = Arc::new(TestHeadersClient::default());
1413
1414 let p3 = SealedHeader::default();
1415 let p2 = child_header(&p3);
1416 let p1 = child_header(&p2);
1417 let p0 = child_header(&p1);
1418
1419 let mut downloader = ReverseHeadersDownloaderBuilder::default()
1420 .stream_batch_size(3)
1421 .request_limit(3)
1422 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1423 downloader.update_local_head(p3.clone());
1424 downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
1425
1426 client
1427 .extend(vec![
1428 p0.as_ref().clone(),
1429 p1.as_ref().clone(),
1430 p2.as_ref().clone(),
1431 p3.as_ref().clone(),
1432 ])
1433 .await;
1434
1435 let headers = downloader.next().await.unwrap();
1436 assert_eq!(headers, Ok(vec![p0, p1, p2,]));
1437 assert!(downloader.buffered_responses.is_empty());
1438 assert!(downloader.next().await.is_none());
1439 assert!(downloader.next().await.is_none());
1440 }
1441
1442 #[tokio::test]
1443 async fn download_one_by_one() {
1444 reth_tracing::init_test_tracing();
1445 let p3 = SealedHeader::default();
1446 let p2 = child_header(&p3);
1447 let p1 = child_header(&p2);
1448 let p0 = child_header(&p1);
1449
1450 let client = Arc::new(TestHeadersClient::default());
1451 let mut downloader = ReverseHeadersDownloaderBuilder::default()
1452 .stream_batch_size(1)
1453 .request_limit(1)
1454 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1455 downloader.update_local_head(p3.clone());
1456 downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
1457
1458 client
1459 .extend(vec![
1460 p0.as_ref().clone(),
1461 p1.as_ref().clone(),
1462 p2.as_ref().clone(),
1463 p3.as_ref().clone(),
1464 ])
1465 .await;
1466
1467 let headers = downloader.next().await.unwrap();
1468 assert_eq!(headers, Ok(vec![p0]));
1469 let headers = headers.unwrap();
1470 assert_eq!(headers.capacity(), headers.len());
1471
1472 let headers = downloader.next().await.unwrap();
1473 assert_eq!(headers, Ok(vec![p1]));
1474 let headers = headers.unwrap();
1475 assert_eq!(headers.capacity(), headers.len());
1476
1477 let headers = downloader.next().await.unwrap();
1478 assert_eq!(headers, Ok(vec![p2]));
1479 let headers = headers.unwrap();
1480 assert_eq!(headers.capacity(), headers.len());
1481
1482 assert!(downloader.next().await.is_none());
1483 }
1484
1485 #[tokio::test]
1486 async fn download_one_by_one_larger_request_limit() {
1487 reth_tracing::init_test_tracing();
1488 let p3 = SealedHeader::default();
1489 let p2 = child_header(&p3);
1490 let p1 = child_header(&p2);
1491 let p0 = child_header(&p1);
1492
1493 let client = Arc::new(TestHeadersClient::default());
1494 let mut downloader = ReverseHeadersDownloaderBuilder::default()
1495 .stream_batch_size(1)
1496 .request_limit(3)
1497 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
1498 downloader.update_local_head(p3.clone());
1499 downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
1500
1501 client
1502 .extend(vec![
1503 p0.as_ref().clone(),
1504 p1.as_ref().clone(),
1505 p2.as_ref().clone(),
1506 p3.as_ref().clone(),
1507 ])
1508 .await;
1509
1510 let headers = downloader.next().await.unwrap();
1511 assert_eq!(headers, Ok(vec![p0]));
1512 let headers = headers.unwrap();
1513 assert_eq!(headers.capacity(), headers.len());
1514
1515 let headers = downloader.next().await.unwrap();
1516 assert_eq!(headers, Ok(vec![p1]));
1517 let headers = headers.unwrap();
1518 assert_eq!(headers.capacity(), headers.len());
1519
1520 let headers = downloader.next().await.unwrap();
1521 assert_eq!(headers, Ok(vec![p2]));
1522 let headers = headers.unwrap();
1523 assert_eq!(headers.capacity(), headers.len());
1524
1525 assert!(downloader.next().await.is_none());
1526 }
1527}