1use super::headers::client::HeadersRequest;
2use crate::{
3 bodies::client::{BodiesClient, SingleBodyRequest},
4 download::DownloadClient,
5 error::PeerRequestResult,
6 headers::client::{HeadersClient, SingleHeaderRequest},
7 priority::Priority,
8 BlockClient,
9};
10use alloy_consensus::BlockHeader;
11use alloy_primitives::{Sealable, B256};
12use core::marker::PhantomData;
13use reth_consensus::{Consensus, ConsensusError};
14use reth_eth_wire_types::{EthNetworkPrimitives, HeadersDirection, NetworkPrimitives};
15use reth_network_peers::{PeerId, WithPeerId};
16use reth_primitives_traits::{SealedBlock, SealedHeader};
17use std::{
18 cmp::Reverse,
19 collections::{HashMap, VecDeque},
20 fmt::Debug,
21 future::Future,
22 hash::Hash,
23 pin::Pin,
24 sync::Arc,
25 task::{ready, Context, Poll},
26};
27use tracing::debug;
28
29#[derive(Debug, Clone)]
31pub struct FullBlockClient<Client>
32where
33 Client: BlockClient,
34{
35 client: Client,
36 consensus: Arc<dyn Consensus<Client::Block, Error = ConsensusError>>,
37}
38
39impl<Client> FullBlockClient<Client>
40where
41 Client: BlockClient,
42{
43 pub fn new(
45 client: Client,
46 consensus: Arc<dyn Consensus<Client::Block, Error = ConsensusError>>,
47 ) -> Self {
48 Self { client, consensus }
49 }
50
51 #[cfg(any(test, feature = "test-utils"))]
53 pub fn test_client(client: Client) -> Self {
54 Self::new(client, Arc::new(reth_consensus::test_utils::TestConsensus::default()))
55 }
56}
57
58impl<Client> FullBlockClient<Client>
59where
60 Client: BlockClient,
61{
62 pub fn get_full_block(&self, hash: B256) -> FetchFullBlockFuture<Client> {
69 let client = self.client.clone();
70 FetchFullBlockFuture {
71 hash,
72 consensus: self.consensus.clone(),
73 request: FullBlockRequest {
74 header: Some(client.get_header(hash.into())),
75 body: Some(client.get_block_body(hash)),
76 },
77 client,
78 header: None,
79 body: None,
80 }
81 }
82
83 pub fn get_full_block_range(
93 &self,
94 hash: B256,
95 count: u64,
96 ) -> FetchFullBlockRangeFuture<Client> {
97 let client = self.client.clone();
98 FetchFullBlockRangeFuture {
99 start_hash: hash,
100 count,
101 request: FullBlockRangeRequest {
102 headers: Some(client.get_headers(HeadersRequest::falling(hash.into(), count))),
103 bodies: None,
104 },
105 client,
106 headers: None,
107 pending_headers: VecDeque::new(),
108 bodies: HashMap::default(),
109 consensus: Arc::clone(&self.consensus),
110 }
111 }
112}
113
114#[must_use = "futures do nothing unless polled"]
119pub struct FetchFullBlockFuture<Client>
120where
121 Client: BlockClient,
122{
123 client: Client,
124 consensus: Arc<dyn Consensus<Client::Block, Error = ConsensusError>>,
125 hash: B256,
126 request: FullBlockRequest<Client>,
127 header: Option<SealedHeader<Client::Header>>,
128 body: Option<BodyResponse<Client::Body>>,
129}
130
131impl<Client> FetchFullBlockFuture<Client>
132where
133 Client: BlockClient<Header: BlockHeader>,
134{
135 pub const fn hash(&self) -> &B256 {
137 &self.hash
138 }
139
140 pub fn block_number(&self) -> Option<u64> {
142 self.header.as_ref().map(|h| h.number())
143 }
144
145 fn take_block(&mut self) -> Option<SealedBlock<Client::Block>> {
147 if self.header.is_none() || self.body.is_none() {
148 return None
149 }
150
151 let header = self.header.take().unwrap();
152 let resp = self.body.take().unwrap();
153 match resp {
154 BodyResponse::Validated(body) => Some(SealedBlock::from_sealed_parts(header, body)),
155 BodyResponse::PendingValidation(resp) => {
156 if let Err(err) = self.consensus.validate_body_against_header(resp.data(), &header)
158 {
159 debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body");
160 self.client.report_bad_message(resp.peer_id());
161 self.header = Some(header);
162 self.request.body = Some(self.client.get_block_body(self.hash));
163 return None
164 }
165 Some(SealedBlock::from_sealed_parts(header, resp.into_data()))
166 }
167 }
168 }
169
170 fn on_block_response(&mut self, resp: WithPeerId<Client::Body>) {
171 if let Some(ref header) = self.header {
172 if let Err(err) = self.consensus.validate_body_against_header(resp.data(), header) {
173 debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body");
174 self.client.report_bad_message(resp.peer_id());
175 return
176 }
177 self.body = Some(BodyResponse::Validated(resp.into_data()));
178 return
179 }
180 self.body = Some(BodyResponse::PendingValidation(resp));
181 }
182}
183
184impl<Client> Future for FetchFullBlockFuture<Client>
185where
186 Client: BlockClient<Header: BlockHeader + Sealable> + 'static,
187{
188 type Output = SealedBlock<Client::Block>;
189
190 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
191 let this = self.get_mut();
192
193 let mut budget = 4;
195
196 loop {
197 match ready!(this.request.poll(cx)) {
198 ResponseResult::Header(res) => {
199 match res {
200 Ok(maybe_header) => {
201 let (peer, maybe_header) =
202 maybe_header.map(|h| h.map(SealedHeader::seal_slow)).split();
203 if let Some(header) = maybe_header {
204 if header.hash() == this.hash {
205 this.header = Some(header);
206 } else {
207 debug!(target: "downloaders", expected=?this.hash, received=?header.hash(), "Received wrong header");
208 this.client.report_bad_message(peer)
210 }
211 }
212 }
213 Err(err) => {
214 debug!(target: "downloaders", %err, ?this.hash, "Header download failed");
215 }
216 }
217
218 if this.header.is_none() {
219 this.request.header = Some(this.client.get_header(this.hash.into()));
221 }
222 }
223 ResponseResult::Body(res) => {
224 match res {
225 Ok(maybe_body) => {
226 if let Some(body) = maybe_body.transpose() {
227 this.on_block_response(body);
228 }
229 }
230 Err(err) => {
231 debug!(target: "downloaders", %err, ?this.hash, "Body download failed");
232 }
233 }
234 if this.body.is_none() {
235 this.request.body = Some(this.client.get_block_body(this.hash));
237 }
238 }
239 }
240
241 if let Some(res) = this.take_block() {
242 return Poll::Ready(res)
243 }
244
245 budget -= 1;
247 if budget == 0 {
248 cx.waker().wake_by_ref();
250 return Poll::Pending
251 }
252 }
253 }
254}
255
256impl<Client> Debug for FetchFullBlockFuture<Client>
257where
258 Client: BlockClient<Header: Debug, Body: Debug>,
259{
260 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
261 f.debug_struct("FetchFullBlockFuture")
262 .field("hash", &self.hash)
263 .field("header", &self.header)
264 .field("body", &self.body)
265 .finish()
266 }
267}
268
269struct FullBlockRequest<Client>
270where
271 Client: BlockClient,
272{
273 header: Option<SingleHeaderRequest<<Client as HeadersClient>::Output>>,
274 body: Option<SingleBodyRequest<<Client as BodiesClient>::Output>>,
275}
276
277impl<Client> FullBlockRequest<Client>
278where
279 Client: BlockClient,
280{
281 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ResponseResult<Client::Header, Client::Body>> {
282 if let Some(fut) = Pin::new(&mut self.header).as_pin_mut() {
283 if let Poll::Ready(res) = fut.poll(cx) {
284 self.header = None;
285 return Poll::Ready(ResponseResult::Header(res))
286 }
287 }
288
289 if let Some(fut) = Pin::new(&mut self.body).as_pin_mut() {
290 if let Poll::Ready(res) = fut.poll(cx) {
291 self.body = None;
292 return Poll::Ready(ResponseResult::Body(res))
293 }
294 }
295
296 Poll::Pending
297 }
298}
299
300enum ResponseResult<H, B> {
303 Header(PeerRequestResult<Option<H>>),
304 Body(PeerRequestResult<Option<B>>),
305}
306
307#[derive(Debug)]
309enum BodyResponse<B> {
310 Validated(B),
312 PendingValidation(WithPeerId<B>),
314}
315#[must_use = "futures do nothing unless polled"]
328#[expect(missing_debug_implementations)]
329pub struct FetchFullBlockRangeFuture<Client>
330where
331 Client: BlockClient,
332{
333 client: Client,
335 consensus: Arc<dyn Consensus<Client::Block, Error = ConsensusError>>,
337 start_hash: B256,
339 count: u64,
341 request: FullBlockRangeRequest<Client>,
343 headers: Option<Vec<SealedHeader<Client::Header>>>,
345 pending_headers: VecDeque<SealedHeader<Client::Header>>,
347 bodies: HashMap<SealedHeader<Client::Header>, BodyResponse<Client::Body>>,
349}
350
351impl<Client> FetchFullBlockRangeFuture<Client>
352where
353 Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq>,
354{
355 pub fn range_block_hashes(&self) -> Option<Vec<B256>> {
357 self.headers.as_ref().map(|h| h.iter().map(|h| h.hash()).collect())
358 }
359
360 fn is_bodies_complete(&self) -> bool {
362 self.bodies.len() == self.count as usize
363 }
364
365 fn insert_body(&mut self, body_response: BodyResponse<Client::Body>) {
369 if let Some(header) = self.pending_headers.pop_front() {
370 self.bodies.insert(header, body_response);
371 }
372 }
373
374 fn insert_bodies(&mut self, bodies: impl IntoIterator<Item = BodyResponse<Client::Body>>) {
376 for body in bodies {
377 self.insert_body(body);
378 }
379 }
380
381 fn remaining_bodies_hashes(&self) -> Vec<B256> {
384 self.pending_headers.iter().map(|h| h.hash()).collect()
385 }
386
387 fn take_blocks(&mut self) -> Option<Vec<SealedBlock<Client::Block>>> {
395 if !self.is_bodies_complete() {
396 return None
398 }
399
400 let headers = self.headers.take()?;
401 let mut needs_retry = false;
402 let mut valid_responses = Vec::new();
403
404 for header in &headers {
405 if let Some(body_resp) = self.bodies.remove(header) {
406 let body = match body_resp {
408 BodyResponse::Validated(body) => body,
409 BodyResponse::PendingValidation(resp) => {
410 if let Err(err) =
412 self.consensus.validate_body_against_header(resp.data(), header)
413 {
414 debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body in range response");
415 self.client.report_bad_message(resp.peer_id());
416
417 self.pending_headers.push_back(header.clone());
419 needs_retry = true;
420 continue
421 }
422
423 resp.into_data()
424 }
425 };
426
427 valid_responses
428 .push(SealedBlock::<Client::Block>::from_sealed_parts(header.clone(), body));
429 }
430 }
431
432 if needs_retry {
433 for block in valid_responses {
436 let (header, body) = block.split_sealed_header_body();
437 self.bodies.insert(header, BodyResponse::Validated(body));
438 }
439
440 self.headers = Some(headers);
442
443 let hashes = self.remaining_bodies_hashes();
445 self.request.bodies = Some(self.client.get_block_bodies(hashes));
446 return None
447 }
448
449 Some(valid_responses)
450 }
451
452 fn on_headers_response(&mut self, headers: WithPeerId<Vec<Client::Header>>) {
453 let (peer, mut headers_falling) =
454 headers.map(|h| h.into_iter().map(SealedHeader::seal_slow).collect::<Vec<_>>()).split();
455
456 if headers_falling.len() == self.count as usize {
458 headers_falling.sort_unstable_by_key(|h| Reverse(h.number()));
460
461 if headers_falling[0].hash() == self.start_hash {
463 let headers_rising = headers_falling.iter().rev().cloned().collect::<Vec<_>>();
464 if let Err(err) = self.consensus.validate_header_range(&headers_rising) {
466 debug!(target: "downloaders", %err, ?self.start_hash, "Received bad header response");
467 self.client.report_bad_message(peer);
468 }
469
470 let hashes = headers_falling.iter().map(|h| h.hash()).collect::<Vec<_>>();
472
473 self.pending_headers = headers_falling.clone().into();
475
476 if !self.has_bodies_request_started() {
478 self.request.bodies = Some(self.client.get_block_bodies(hashes));
480 }
481
482 self.headers = Some(headers_falling);
484 } else {
485 self.client.report_bad_message(peer);
487 }
488 }
489 }
490
491 const fn has_bodies_request_started(&self) -> bool {
494 self.request.bodies.is_some()
495 }
496
497 pub const fn start_hash(&self) -> B256 {
499 self.start_hash
500 }
501
502 pub const fn count(&self) -> u64 {
504 self.count
505 }
506}
507
508impl<Client> Future for FetchFullBlockRangeFuture<Client>
509where
510 Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq> + 'static,
511{
512 type Output = Vec<SealedBlock<Client::Block>>;
513
514 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
515 let this = self.get_mut();
516
517 loop {
518 match ready!(this.request.poll(cx)) {
519 RangeResponseResult::Header(res) => {
528 match res {
529 Ok(headers) => {
530 this.on_headers_response(headers);
531 }
532 Err(err) => {
533 debug!(target: "downloaders", %err, ?this.start_hash, "Header range download failed");
534 }
535 }
536
537 if this.headers.is_none() {
538 this.request.headers = Some(this.client.get_headers(HeadersRequest {
540 start: this.start_hash.into(),
541 limit: this.count,
542 direction: HeadersDirection::Falling,
543 }));
544 }
545 }
546 RangeResponseResult::Body(res) => {
552 match res {
553 Ok(bodies_resp) => {
554 let (peer, new_bodies) = bodies_resp.split();
555
556 this.insert_bodies(
558 new_bodies
559 .into_iter()
560 .map(|resp| WithPeerId::new(peer, resp))
561 .map(BodyResponse::PendingValidation),
562 );
563
564 if !this.is_bodies_complete() {
565 let req_hashes = this.remaining_bodies_hashes();
567
568 this.request.bodies = Some(this.client.get_block_bodies(req_hashes))
570 }
571 }
572 Err(err) => {
573 debug!(target: "downloaders", %err, ?this.start_hash, "Body range download failed");
574 }
575 }
576 if this.bodies.is_empty() {
577 let hashes = this.remaining_bodies_hashes();
590 if !hashes.is_empty() {
591 this.request.bodies = Some(this.client.get_block_bodies(hashes));
592 }
593 }
594 }
595 }
596
597 if let Some(res) = this.take_blocks() {
598 return Poll::Ready(res)
599 }
600 }
601 }
602}
603
604struct FullBlockRangeRequest<Client>
608where
609 Client: BlockClient,
610{
611 headers: Option<<Client as HeadersClient>::Output>,
612 bodies: Option<<Client as BodiesClient>::Output>,
613}
614
615impl<Client> FullBlockRangeRequest<Client>
616where
617 Client: BlockClient,
618{
619 fn poll(
620 &mut self,
621 cx: &mut Context<'_>,
622 ) -> Poll<RangeResponseResult<Client::Header, Client::Body>> {
623 if let Some(fut) = Pin::new(&mut self.headers).as_pin_mut() {
624 if let Poll::Ready(res) = fut.poll(cx) {
625 self.headers = None;
626 return Poll::Ready(RangeResponseResult::Header(res))
627 }
628 }
629
630 if let Some(fut) = Pin::new(&mut self.bodies).as_pin_mut() {
631 if let Poll::Ready(res) = fut.poll(cx) {
632 self.bodies = None;
633 return Poll::Ready(RangeResponseResult::Body(res))
634 }
635 }
636
637 Poll::Pending
638 }
639}
640
641enum RangeResponseResult<H, B> {
644 Header(PeerRequestResult<Vec<H>>),
645 Body(PeerRequestResult<Vec<B>>),
646}
647
648#[derive(Debug, Default, Clone)]
650#[non_exhaustive]
651pub struct NoopFullBlockClient<Net = EthNetworkPrimitives>(PhantomData<Net>);
652
653impl<Net> DownloadClient for NoopFullBlockClient<Net>
655where
656 Net: Debug + Send + Sync,
657{
658 fn report_bad_message(&self, _peer_id: PeerId) {}
665
666 fn num_connected_peers(&self) -> usize {
672 0
673 }
674}
675
676impl<Net> BodiesClient for NoopFullBlockClient<Net>
678where
679 Net: NetworkPrimitives,
680{
681 type Body = Net::BlockBody;
682 type Output = futures::future::Ready<PeerRequestResult<Vec<Self::Body>>>;
684
685 fn get_block_bodies_with_priority(
696 &self,
697 _hashes: Vec<B256>,
698 _priority: Priority,
699 ) -> Self::Output {
700 futures::future::ready(Ok(WithPeerId::new(PeerId::random(), vec![])))
703 }
704}
705
706impl<Net> HeadersClient for NoopFullBlockClient<Net>
707where
708 Net: NetworkPrimitives,
709{
710 type Header = Net::BlockHeader;
711 type Output = futures::future::Ready<PeerRequestResult<Vec<Self::Header>>>;
714
715 fn get_headers_with_priority(
729 &self,
730 _request: HeadersRequest,
731 _priority: Priority,
732 ) -> Self::Output {
733 futures::future::ready(Ok(WithPeerId::new(PeerId::random(), vec![])))
734 }
735}
736
737impl<Net> BlockClient for NoopFullBlockClient<Net>
738where
739 Net: NetworkPrimitives,
740{
741 type Block = Net::Block;
742}
743
744#[cfg(test)]
745mod tests {
746 use reth_ethereum_primitives::BlockBody;
747
748 use super::*;
749 use crate::test_utils::TestFullBlockClient;
750 use std::ops::Range;
751
752 #[tokio::test]
753 async fn download_single_full_block() {
754 let client = TestFullBlockClient::default();
755 let header: SealedHeader = SealedHeader::default();
756 let body = BlockBody::default();
757 client.insert(header.clone(), body.clone());
758 let client = FullBlockClient::test_client(client);
759
760 let received = client.get_full_block(header.hash()).await;
761 assert_eq!(received, SealedBlock::from_sealed_parts(header, body));
762 }
763
764 #[tokio::test]
765 async fn download_single_full_block_range() {
766 let client = TestFullBlockClient::default();
767 let header: SealedHeader = SealedHeader::default();
768 let body = BlockBody::default();
769 client.insert(header.clone(), body.clone());
770 let client = FullBlockClient::test_client(client);
771
772 let received = client.get_full_block_range(header.hash(), 1).await;
773 let received = received.first().expect("response should include a block");
774 assert_eq!(*received, SealedBlock::from_sealed_parts(header, body));
775 }
776
777 fn insert_headers_into_client(
779 client: &TestFullBlockClient,
780 range: Range<usize>,
781 ) -> (SealedHeader, BlockBody) {
782 let mut sealed_header: SealedHeader = SealedHeader::default();
783 let body = BlockBody::default();
784 for _ in range {
785 let (mut header, hash) = sealed_header.split();
786 header.parent_hash = hash;
788 header.number += 1;
789
790 sealed_header = SealedHeader::seal_slow(header);
791
792 client.insert(sealed_header.clone(), body.clone());
793 }
794
795 (sealed_header, body)
796 }
797
798 #[tokio::test]
799 async fn download_full_block_range() {
800 let client = TestFullBlockClient::default();
801 let (header, body) = insert_headers_into_client(&client, 0..50);
802 let client = FullBlockClient::test_client(client);
803
804 let received = client.get_full_block_range(header.hash(), 1).await;
805 let received = received.first().expect("response should include a block");
806 assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body));
807
808 let received = client.get_full_block_range(header.hash(), 10).await;
809 assert_eq!(received.len(), 10);
810 for (i, block) in received.iter().enumerate() {
811 let expected_number = header.number - i as u64;
812 assert_eq!(block.number, expected_number);
813 }
814 }
815
816 #[tokio::test]
817 async fn download_full_block_range_over_soft_limit() {
818 let client = TestFullBlockClient::default();
820 let (header, body) = insert_headers_into_client(&client, 0..50);
821 let client = FullBlockClient::test_client(client);
822
823 let received = client.get_full_block_range(header.hash(), 1).await;
824 let received = received.first().expect("response should include a block");
825 assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body));
826
827 let received = client.get_full_block_range(header.hash(), 50).await;
828 assert_eq!(received.len(), 50);
829 for (i, block) in received.iter().enumerate() {
830 let expected_number = header.number - i as u64;
831 assert_eq!(block.number, expected_number);
832 }
833 }
834
835 #[tokio::test]
836 async fn download_full_block_range_with_invalid_header() {
837 let client = TestFullBlockClient::default();
838 let range_length: usize = 3;
839 let (header, _) = insert_headers_into_client(&client, 0..range_length);
840
841 let test_consensus = reth_consensus::test_utils::TestConsensus::default();
842 test_consensus.set_fail_validation(true);
843 test_consensus.set_fail_body_against_header(false);
844 let client = FullBlockClient::new(client, Arc::new(test_consensus));
845
846 let received = client.get_full_block_range(header.hash(), range_length as u64).await;
847
848 assert_eq!(received.len(), range_length);
849 for (i, block) in received.iter().enumerate() {
850 let expected_number = header.number - i as u64;
851 assert_eq!(block.number, expected_number);
852 }
853 }
854}