reth_network_p2p/
full_block.rs

1use super::headers::client::HeadersRequest;
2use crate::{
3    bodies::client::{BodiesClient, SingleBodyRequest},
4    download::DownloadClient,
5    error::PeerRequestResult,
6    headers::client::{HeadersClient, SingleHeaderRequest},
7    priority::Priority,
8    BlockClient,
9};
10use alloy_consensus::BlockHeader;
11use alloy_primitives::{Sealable, B256};
12use core::marker::PhantomData;
13use reth_consensus::{Consensus, ConsensusError};
14use reth_eth_wire_types::{EthNetworkPrimitives, HeadersDirection, NetworkPrimitives};
15use reth_network_peers::{PeerId, WithPeerId};
16use reth_primitives_traits::{SealedBlock, SealedHeader};
17use std::{
18    cmp::Reverse,
19    collections::{HashMap, VecDeque},
20    fmt::Debug,
21    future::Future,
22    hash::Hash,
23    pin::Pin,
24    sync::Arc,
25    task::{ready, Context, Poll},
26};
27use tracing::debug;
28
29/// A Client that can fetch full blocks from the network.
30#[derive(Debug, Clone)]
31pub struct FullBlockClient<Client>
32where
33    Client: BlockClient,
34{
35    client: Client,
36    consensus: Arc<dyn Consensus<Client::Block, Error = ConsensusError>>,
37}
38
39impl<Client> FullBlockClient<Client>
40where
41    Client: BlockClient,
42{
43    /// Creates a new instance of `FullBlockClient`.
44    pub fn new(
45        client: Client,
46        consensus: Arc<dyn Consensus<Client::Block, Error = ConsensusError>>,
47    ) -> Self {
48        Self { client, consensus }
49    }
50
51    /// Returns a client with Test consensus
52    #[cfg(any(test, feature = "test-utils"))]
53    pub fn test_client(client: Client) -> Self {
54        Self::new(client, Arc::new(reth_consensus::test_utils::TestConsensus::default()))
55    }
56}
57
58impl<Client> FullBlockClient<Client>
59where
60    Client: BlockClient,
61{
62    /// Returns a future that fetches the [`SealedBlock`] for the given hash.
63    ///
64    /// Note: this future is cancel safe
65    ///
66    /// Caution: This does no validation of body (transactions) response but guarantees that the
67    /// [`SealedHeader`] matches the requested hash.
68    pub fn get_full_block(&self, hash: B256) -> FetchFullBlockFuture<Client> {
69        let client = self.client.clone();
70        FetchFullBlockFuture {
71            hash,
72            consensus: self.consensus.clone(),
73            request: FullBlockRequest {
74                header: Some(client.get_header(hash.into())),
75                body: Some(client.get_block_body(hash)),
76            },
77            client,
78            header: None,
79            body: None,
80        }
81    }
82
83    /// Returns a future that fetches [`SealedBlock`]s for the given hash and count.
84    ///
85    /// Note: this future is cancel safe
86    ///
87    /// Caution: This does no validation of body (transactions) responses but guarantees that
88    /// the starting [`SealedHeader`] matches the requested hash, and that the number of headers and
89    /// bodies received matches the requested limit.
90    ///
91    /// The returned future yields bodies in falling order, i.e. with descending block numbers.
92    pub fn get_full_block_range(
93        &self,
94        hash: B256,
95        count: u64,
96    ) -> FetchFullBlockRangeFuture<Client> {
97        let client = self.client.clone();
98        FetchFullBlockRangeFuture {
99            start_hash: hash,
100            count,
101            request: FullBlockRangeRequest {
102                headers: Some(client.get_headers(HeadersRequest::falling(hash.into(), count))),
103                bodies: None,
104            },
105            client,
106            headers: None,
107            pending_headers: VecDeque::new(),
108            bodies: HashMap::default(),
109            consensus: Arc::clone(&self.consensus),
110        }
111    }
112}
113
114/// A future that downloads a full block from the network.
115///
116/// This will attempt to fetch both the header and body for the given block hash at the same time.
117/// When both requests succeed, the future will yield the full block.
118#[must_use = "futures do nothing unless polled"]
119pub struct FetchFullBlockFuture<Client>
120where
121    Client: BlockClient,
122{
123    client: Client,
124    consensus: Arc<dyn Consensus<Client::Block, Error = ConsensusError>>,
125    hash: B256,
126    request: FullBlockRequest<Client>,
127    header: Option<SealedHeader<Client::Header>>,
128    body: Option<BodyResponse<Client::Body>>,
129}
130
131impl<Client> FetchFullBlockFuture<Client>
132where
133    Client: BlockClient<Header: BlockHeader>,
134{
135    /// Returns the hash of the block being requested.
136    pub const fn hash(&self) -> &B256 {
137        &self.hash
138    }
139
140    /// If the header request is already complete, this returns the block number
141    pub fn block_number(&self) -> Option<u64> {
142        self.header.as_ref().map(|h| h.number())
143    }
144
145    /// Returns the [`SealedBlock`] if the request is complete and valid.
146    fn take_block(&mut self) -> Option<SealedBlock<Client::Block>> {
147        if self.header.is_none() || self.body.is_none() {
148            return None
149        }
150
151        let header = self.header.take().unwrap();
152        let resp = self.body.take().unwrap();
153        match resp {
154            BodyResponse::Validated(body) => Some(SealedBlock::from_sealed_parts(header, body)),
155            BodyResponse::PendingValidation(resp) => {
156                // ensure the block is valid, else retry
157                if let Err(err) = self.consensus.validate_body_against_header(resp.data(), &header)
158                {
159                    debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body");
160                    self.client.report_bad_message(resp.peer_id());
161                    self.header = Some(header);
162                    self.request.body = Some(self.client.get_block_body(self.hash));
163                    return None
164                }
165                Some(SealedBlock::from_sealed_parts(header, resp.into_data()))
166            }
167        }
168    }
169
170    fn on_block_response(&mut self, resp: WithPeerId<Client::Body>) {
171        if let Some(ref header) = self.header {
172            if let Err(err) = self.consensus.validate_body_against_header(resp.data(), header) {
173                debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body");
174                self.client.report_bad_message(resp.peer_id());
175                return
176            }
177            self.body = Some(BodyResponse::Validated(resp.into_data()));
178            return
179        }
180        self.body = Some(BodyResponse::PendingValidation(resp));
181    }
182}
183
184impl<Client> Future for FetchFullBlockFuture<Client>
185where
186    Client: BlockClient<Header: BlockHeader + Sealable> + 'static,
187{
188    type Output = SealedBlock<Client::Block>;
189
190    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
191        let this = self.get_mut();
192
193        // preemptive yield point
194        let mut budget = 4;
195
196        loop {
197            match ready!(this.request.poll(cx)) {
198                ResponseResult::Header(res) => {
199                    match res {
200                        Ok(maybe_header) => {
201                            let (peer, maybe_header) =
202                                maybe_header.map(|h| h.map(SealedHeader::seal_slow)).split();
203                            if let Some(header) = maybe_header {
204                                if header.hash() == this.hash {
205                                    this.header = Some(header);
206                                } else {
207                                    debug!(target: "downloaders", expected=?this.hash, received=?header.hash(), "Received wrong header");
208                                    // received a different header than requested
209                                    this.client.report_bad_message(peer)
210                                }
211                            }
212                        }
213                        Err(err) => {
214                            debug!(target: "downloaders", %err, ?this.hash, "Header download failed");
215                        }
216                    }
217
218                    if this.header.is_none() {
219                        // received bad response
220                        this.request.header = Some(this.client.get_header(this.hash.into()));
221                    }
222                }
223                ResponseResult::Body(res) => {
224                    match res {
225                        Ok(maybe_body) => {
226                            if let Some(body) = maybe_body.transpose() {
227                                this.on_block_response(body);
228                            }
229                        }
230                        Err(err) => {
231                            debug!(target: "downloaders", %err, ?this.hash, "Body download failed");
232                        }
233                    }
234                    if this.body.is_none() {
235                        // received bad response
236                        this.request.body = Some(this.client.get_block_body(this.hash));
237                    }
238                }
239            }
240
241            if let Some(res) = this.take_block() {
242                return Poll::Ready(res)
243            }
244
245            // ensure we still have enough budget for another iteration
246            budget -= 1;
247            if budget == 0 {
248                // make sure we're woken up again
249                cx.waker().wake_by_ref();
250                return Poll::Pending
251            }
252        }
253    }
254}
255
256impl<Client> Debug for FetchFullBlockFuture<Client>
257where
258    Client: BlockClient<Header: Debug, Body: Debug>,
259{
260    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
261        f.debug_struct("FetchFullBlockFuture")
262            .field("hash", &self.hash)
263            .field("header", &self.header)
264            .field("body", &self.body)
265            .finish()
266    }
267}
268
269struct FullBlockRequest<Client>
270where
271    Client: BlockClient,
272{
273    header: Option<SingleHeaderRequest<<Client as HeadersClient>::Output>>,
274    body: Option<SingleBodyRequest<<Client as BodiesClient>::Output>>,
275}
276
277impl<Client> FullBlockRequest<Client>
278where
279    Client: BlockClient,
280{
281    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ResponseResult<Client::Header, Client::Body>> {
282        if let Some(fut) = Pin::new(&mut self.header).as_pin_mut() {
283            if let Poll::Ready(res) = fut.poll(cx) {
284                self.header = None;
285                return Poll::Ready(ResponseResult::Header(res))
286            }
287        }
288
289        if let Some(fut) = Pin::new(&mut self.body).as_pin_mut() {
290            if let Poll::Ready(res) = fut.poll(cx) {
291                self.body = None;
292                return Poll::Ready(ResponseResult::Body(res))
293            }
294        }
295
296        Poll::Pending
297    }
298}
299
300/// The result of a request for a single header or body. This is yielded by the `FullBlockRequest`
301/// future.
302enum ResponseResult<H, B> {
303    Header(PeerRequestResult<Option<H>>),
304    Body(PeerRequestResult<Option<B>>),
305}
306
307/// The response of a body request.
308#[derive(Debug)]
309enum BodyResponse<B> {
310    /// Already validated against transaction root of header
311    Validated(B),
312    /// Still needs to be validated against header
313    PendingValidation(WithPeerId<B>),
314}
315/// A future that downloads a range of full blocks from the network.
316///
317/// This first fetches the headers for the given range using the inner `Client`. Once the request
318/// is complete, it will fetch the bodies for the headers it received.
319///
320/// Once the bodies request completes, the [`SealedBlock`]s will be assembled and the future will
321/// yield the full block range.
322///
323/// The full block range will be returned with falling block numbers, i.e. in descending order.
324///
325/// NOTE: this assumes that bodies responses are returned by the client in the same order as the
326/// hash array used to request them.
327#[must_use = "futures do nothing unless polled"]
328#[expect(missing_debug_implementations)]
329pub struct FetchFullBlockRangeFuture<Client>
330where
331    Client: BlockClient,
332{
333    /// The client used to fetch headers and bodies.
334    client: Client,
335    /// The consensus instance used to validate the blocks.
336    consensus: Arc<dyn Consensus<Client::Block, Error = ConsensusError>>,
337    /// The block hash to start fetching from (inclusive).
338    start_hash: B256,
339    /// How many blocks to fetch: `len([start_hash, ..]) == count`
340    count: u64,
341    /// Requests for headers and bodies that are in progress.
342    request: FullBlockRangeRequest<Client>,
343    /// Fetched headers.
344    headers: Option<Vec<SealedHeader<Client::Header>>>,
345    /// The next headers to request bodies for. This is drained as responses are received.
346    pending_headers: VecDeque<SealedHeader<Client::Header>>,
347    /// The bodies that have been received so far.
348    bodies: HashMap<SealedHeader<Client::Header>, BodyResponse<Client::Body>>,
349}
350
351impl<Client> FetchFullBlockRangeFuture<Client>
352where
353    Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq>,
354{
355    /// Returns the block hashes for the given range, if they are available.
356    pub fn range_block_hashes(&self) -> Option<Vec<B256>> {
357        self.headers.as_ref().map(|h| h.iter().map(|h| h.hash()).collect())
358    }
359
360    /// Returns whether or not the bodies map is fully populated with requested headers and bodies.
361    fn is_bodies_complete(&self) -> bool {
362        self.bodies.len() == self.count as usize
363    }
364
365    /// Inserts a block body, matching it with the `next_header`.
366    ///
367    /// Note: this assumes the response matches the next header in the queue.
368    fn insert_body(&mut self, body_response: BodyResponse<Client::Body>) {
369        if let Some(header) = self.pending_headers.pop_front() {
370            self.bodies.insert(header, body_response);
371        }
372    }
373
374    /// Inserts multiple block bodies.
375    fn insert_bodies(&mut self, bodies: impl IntoIterator<Item = BodyResponse<Client::Body>>) {
376        for body in bodies {
377            self.insert_body(body);
378        }
379    }
380
381    /// Returns the remaining hashes for the bodies request, based on the headers that still exist
382    /// in the `root_map`.
383    fn remaining_bodies_hashes(&self) -> Vec<B256> {
384        self.pending_headers.iter().map(|h| h.hash()).collect()
385    }
386
387    /// Returns the [`SealedBlock`]s if the request is complete and valid.
388    ///
389    /// The request is complete if the number of blocks requested is equal to the number of blocks
390    /// received. The request is valid if the returned bodies match the roots in the headers.
391    ///
392    /// These are returned in falling order starting with the requested `hash`, i.e. with
393    /// descending block numbers.
394    fn take_blocks(&mut self) -> Option<Vec<SealedBlock<Client::Block>>> {
395        if !self.is_bodies_complete() {
396            // not done with bodies yet
397            return None
398        }
399
400        let headers = self.headers.take()?;
401        let mut needs_retry = false;
402        let mut valid_responses = Vec::new();
403
404        for header in &headers {
405            if let Some(body_resp) = self.bodies.remove(header) {
406                // validate body w.r.t. the hashes in the header, only inserting into the response
407                let body = match body_resp {
408                    BodyResponse::Validated(body) => body,
409                    BodyResponse::PendingValidation(resp) => {
410                        // ensure the block is valid, else retry
411                        if let Err(err) =
412                            self.consensus.validate_body_against_header(resp.data(), header)
413                        {
414                            debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body in range response");
415                            self.client.report_bad_message(resp.peer_id());
416
417                            // get body that doesn't match, put back into vecdeque, and retry it
418                            self.pending_headers.push_back(header.clone());
419                            needs_retry = true;
420                            continue
421                        }
422
423                        resp.into_data()
424                    }
425                };
426
427                valid_responses
428                    .push(SealedBlock::<Client::Block>::from_sealed_parts(header.clone(), body));
429            }
430        }
431
432        if needs_retry {
433            // put response hashes back into bodies map since we aren't returning them as a
434            // response
435            for block in valid_responses {
436                let (header, body) = block.split_sealed_header_body();
437                self.bodies.insert(header, BodyResponse::Validated(body));
438            }
439
440            // put headers back since they were `take`n before
441            self.headers = Some(headers);
442
443            // create response for failing bodies
444            let hashes = self.remaining_bodies_hashes();
445            self.request.bodies = Some(self.client.get_block_bodies(hashes));
446            return None
447        }
448
449        Some(valid_responses)
450    }
451
452    fn on_headers_response(&mut self, headers: WithPeerId<Vec<Client::Header>>) {
453        let (peer, mut headers_falling) =
454            headers.map(|h| h.into_iter().map(SealedHeader::seal_slow).collect::<Vec<_>>()).split();
455
456        // fill in the response if it's the correct length
457        if headers_falling.len() == self.count as usize {
458            // sort headers from highest to lowest block number
459            headers_falling.sort_unstable_by_key(|h| Reverse(h.number()));
460
461            // check the starting hash
462            if headers_falling[0].hash() == self.start_hash {
463                let headers_rising = headers_falling.iter().rev().cloned().collect::<Vec<_>>();
464                // check if the downloaded headers are valid
465                if let Err(err) = self.consensus.validate_header_range(&headers_rising) {
466                    debug!(target: "downloaders", %err, ?self.start_hash, "Received bad header response");
467                    self.client.report_bad_message(peer);
468                }
469
470                // get the bodies request so it can be polled later
471                let hashes = headers_falling.iter().map(|h| h.hash()).collect::<Vec<_>>();
472
473                // populate the pending headers
474                self.pending_headers = headers_falling.clone().into();
475
476                // set the actual request if it hasn't been started yet
477                if !self.has_bodies_request_started() {
478                    // request the bodies for the downloaded headers
479                    self.request.bodies = Some(self.client.get_block_bodies(hashes));
480                }
481
482                // set the headers response
483                self.headers = Some(headers_falling);
484            } else {
485                // received a different header than requested
486                self.client.report_bad_message(peer);
487            }
488        }
489    }
490
491    /// Returns whether or not a bodies request has been started, returning false if there is no
492    /// pending request.
493    const fn has_bodies_request_started(&self) -> bool {
494        self.request.bodies.is_some()
495    }
496
497    /// Returns the start hash for the request
498    pub const fn start_hash(&self) -> B256 {
499        self.start_hash
500    }
501
502    /// Returns the block count for the request
503    pub const fn count(&self) -> u64 {
504        self.count
505    }
506}
507
508impl<Client> Future for FetchFullBlockRangeFuture<Client>
509where
510    Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq> + 'static,
511{
512    type Output = Vec<SealedBlock<Client::Block>>;
513
514    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
515        let this = self.get_mut();
516
517        loop {
518            match ready!(this.request.poll(cx)) {
519                // This branch handles headers responses from peers - it first ensures that the
520                // starting hash and number of headers matches what we requested.
521                //
522                // If these don't match, we penalize the peer and retry the request.
523                // If they do match, we sort the headers by block number and start the request for
524                // the corresponding block bodies.
525                //
526                // The next result that should be yielded by `poll` is the bodies response.
527                RangeResponseResult::Header(res) => {
528                    match res {
529                        Ok(headers) => {
530                            this.on_headers_response(headers);
531                        }
532                        Err(err) => {
533                            debug!(target: "downloaders", %err, ?this.start_hash, "Header range download failed");
534                        }
535                    }
536
537                    if this.headers.is_none() {
538                        // did not receive a correct response yet, retry
539                        this.request.headers = Some(this.client.get_headers(HeadersRequest {
540                            start: this.start_hash.into(),
541                            limit: this.count,
542                            direction: HeadersDirection::Falling,
543                        }));
544                    }
545                }
546                // This branch handles block body responses from peers - it first inserts the
547                // bodies into the `bodies` map, and then checks if the request is complete.
548                //
549                // If the request is not complete, and we need to request more bodies, we send
550                // a bodies request for the headers we don't yet have bodies for.
551                RangeResponseResult::Body(res) => {
552                    match res {
553                        Ok(bodies_resp) => {
554                            let (peer, new_bodies) = bodies_resp.split();
555
556                            // first insert the received bodies
557                            this.insert_bodies(
558                                new_bodies
559                                    .into_iter()
560                                    .map(|resp| WithPeerId::new(peer, resp))
561                                    .map(BodyResponse::PendingValidation),
562                            );
563
564                            if !this.is_bodies_complete() {
565                                // get remaining hashes so we can send the next request
566                                let req_hashes = this.remaining_bodies_hashes();
567
568                                // set a new request
569                                this.request.bodies = Some(this.client.get_block_bodies(req_hashes))
570                            }
571                        }
572                        Err(err) => {
573                            debug!(target: "downloaders", %err, ?this.start_hash, "Body range download failed");
574                        }
575                    }
576                    if this.bodies.is_empty() {
577                        // received bad response, re-request headers
578                        // TODO: convert this into two futures, one which is a headers range
579                        // future, and one which is a bodies range future.
580                        //
581                        // The headers range future should yield the bodies range future.
582                        // The bodies range future should not have an Option<Vec<B256>>, it should
583                        // have a populated Vec<B256> from the successful headers range future.
584                        //
585                        // This is optimal because we can not send a bodies request without
586                        // first completing the headers request. This way we can get rid of the
587                        // following `if let Some`. A bodies request should never be sent before
588                        // the headers request completes, so this should always be `Some` anyways.
589                        let hashes = this.remaining_bodies_hashes();
590                        if !hashes.is_empty() {
591                            this.request.bodies = Some(this.client.get_block_bodies(hashes));
592                        }
593                    }
594                }
595            }
596
597            if let Some(res) = this.take_blocks() {
598                return Poll::Ready(res)
599            }
600        }
601    }
602}
603
604/// A request for a range of full blocks. Polling this will poll the inner headers and bodies
605/// futures until they return responses. It will return either the header or body result, depending
606/// on which future successfully returned.
607struct FullBlockRangeRequest<Client>
608where
609    Client: BlockClient,
610{
611    headers: Option<<Client as HeadersClient>::Output>,
612    bodies: Option<<Client as BodiesClient>::Output>,
613}
614
615impl<Client> FullBlockRangeRequest<Client>
616where
617    Client: BlockClient,
618{
619    fn poll(
620        &mut self,
621        cx: &mut Context<'_>,
622    ) -> Poll<RangeResponseResult<Client::Header, Client::Body>> {
623        if let Some(fut) = Pin::new(&mut self.headers).as_pin_mut() {
624            if let Poll::Ready(res) = fut.poll(cx) {
625                self.headers = None;
626                return Poll::Ready(RangeResponseResult::Header(res))
627            }
628        }
629
630        if let Some(fut) = Pin::new(&mut self.bodies).as_pin_mut() {
631            if let Poll::Ready(res) = fut.poll(cx) {
632                self.bodies = None;
633                return Poll::Ready(RangeResponseResult::Body(res))
634            }
635        }
636
637        Poll::Pending
638    }
639}
640
641// The result of a request for headers or block bodies. This is yielded by the
642// `FullBlockRangeRequest` future.
643enum RangeResponseResult<H, B> {
644    Header(PeerRequestResult<Vec<H>>),
645    Body(PeerRequestResult<Vec<B>>),
646}
647
648/// A headers+bodies client implementation that does nothing.
649#[derive(Debug, Default, Clone)]
650#[non_exhaustive]
651pub struct NoopFullBlockClient<Net = EthNetworkPrimitives>(PhantomData<Net>);
652
653/// Implements the `DownloadClient` trait for the `NoopFullBlockClient` struct.
654impl<Net> DownloadClient for NoopFullBlockClient<Net>
655where
656    Net: Debug + Send + Sync,
657{
658    /// Reports a bad message received from a peer.
659    ///
660    /// # Arguments
661    ///
662    /// * `_peer_id` - Identifier for the peer sending the bad message (unused in this
663    ///   implementation).
664    fn report_bad_message(&self, _peer_id: PeerId) {}
665
666    /// Retrieves the number of connected peers.
667    ///
668    /// # Returns
669    ///
670    /// The number of connected peers, which is always zero in this implementation.
671    fn num_connected_peers(&self) -> usize {
672        0
673    }
674}
675
676/// Implements the `BodiesClient` trait for the `NoopFullBlockClient` struct.
677impl<Net> BodiesClient for NoopFullBlockClient<Net>
678where
679    Net: NetworkPrimitives,
680{
681    type Body = Net::BlockBody;
682    /// Defines the output type of the function.
683    type Output = futures::future::Ready<PeerRequestResult<Vec<Self::Body>>>;
684
685    /// Retrieves block bodies based on provided hashes and priority.
686    ///
687    /// # Arguments
688    ///
689    /// * `_hashes` - A vector of block hashes (unused in this implementation).
690    /// * `_priority` - Priority level for block body retrieval (unused in this implementation).
691    ///
692    /// # Returns
693    ///
694    /// A future containing an empty vector of block bodies and a randomly generated `PeerId`.
695    fn get_block_bodies_with_priority(
696        &self,
697        _hashes: Vec<B256>,
698        _priority: Priority,
699    ) -> Self::Output {
700        // Create a future that immediately returns an empty vector of block bodies and a random
701        // PeerId.
702        futures::future::ready(Ok(WithPeerId::new(PeerId::random(), vec![])))
703    }
704}
705
706impl<Net> HeadersClient for NoopFullBlockClient<Net>
707where
708    Net: NetworkPrimitives,
709{
710    type Header = Net::BlockHeader;
711    /// The output type representing a future containing a peer request result with a vector of
712    /// headers.
713    type Output = futures::future::Ready<PeerRequestResult<Vec<Self::Header>>>;
714
715    /// Retrieves headers with a specified priority level.
716    ///
717    /// This implementation does nothing and returns an empty vector of headers.
718    ///
719    /// # Arguments
720    ///
721    /// * `_request` - A request for headers (unused in this implementation).
722    /// * `_priority` - The priority level for the headers request (unused in this implementation).
723    ///
724    /// # Returns
725    ///
726    /// Always returns a ready future with an empty vector of headers wrapped in a
727    /// `PeerRequestResult`.
728    fn get_headers_with_priority(
729        &self,
730        _request: HeadersRequest,
731        _priority: Priority,
732    ) -> Self::Output {
733        futures::future::ready(Ok(WithPeerId::new(PeerId::random(), vec![])))
734    }
735}
736
737impl<Net> BlockClient for NoopFullBlockClient<Net>
738where
739    Net: NetworkPrimitives,
740{
741    type Block = Net::Block;
742}
743
744#[cfg(test)]
745mod tests {
746    use reth_ethereum_primitives::BlockBody;
747
748    use super::*;
749    use crate::test_utils::TestFullBlockClient;
750    use std::ops::Range;
751
752    #[tokio::test]
753    async fn download_single_full_block() {
754        let client = TestFullBlockClient::default();
755        let header: SealedHeader = SealedHeader::default();
756        let body = BlockBody::default();
757        client.insert(header.clone(), body.clone());
758        let client = FullBlockClient::test_client(client);
759
760        let received = client.get_full_block(header.hash()).await;
761        assert_eq!(received, SealedBlock::from_sealed_parts(header, body));
762    }
763
764    #[tokio::test]
765    async fn download_single_full_block_range() {
766        let client = TestFullBlockClient::default();
767        let header: SealedHeader = SealedHeader::default();
768        let body = BlockBody::default();
769        client.insert(header.clone(), body.clone());
770        let client = FullBlockClient::test_client(client);
771
772        let received = client.get_full_block_range(header.hash(), 1).await;
773        let received = received.first().expect("response should include a block");
774        assert_eq!(*received, SealedBlock::from_sealed_parts(header, body));
775    }
776
777    /// Inserts headers and returns the last header and block body.
778    fn insert_headers_into_client(
779        client: &TestFullBlockClient,
780        range: Range<usize>,
781    ) -> (SealedHeader, BlockBody) {
782        let mut sealed_header: SealedHeader = SealedHeader::default();
783        let body = BlockBody::default();
784        for _ in range {
785            let (mut header, hash) = sealed_header.split();
786            // update to the next header
787            header.parent_hash = hash;
788            header.number += 1;
789
790            sealed_header = SealedHeader::seal_slow(header);
791
792            client.insert(sealed_header.clone(), body.clone());
793        }
794
795        (sealed_header, body)
796    }
797
798    #[tokio::test]
799    async fn download_full_block_range() {
800        let client = TestFullBlockClient::default();
801        let (header, body) = insert_headers_into_client(&client, 0..50);
802        let client = FullBlockClient::test_client(client);
803
804        let received = client.get_full_block_range(header.hash(), 1).await;
805        let received = received.first().expect("response should include a block");
806        assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body));
807
808        let received = client.get_full_block_range(header.hash(), 10).await;
809        assert_eq!(received.len(), 10);
810        for (i, block) in received.iter().enumerate() {
811            let expected_number = header.number - i as u64;
812            assert_eq!(block.number, expected_number);
813        }
814    }
815
816    #[tokio::test]
817    async fn download_full_block_range_over_soft_limit() {
818        // default soft limit is 20, so we will request 50 blocks
819        let client = TestFullBlockClient::default();
820        let (header, body) = insert_headers_into_client(&client, 0..50);
821        let client = FullBlockClient::test_client(client);
822
823        let received = client.get_full_block_range(header.hash(), 1).await;
824        let received = received.first().expect("response should include a block");
825        assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body));
826
827        let received = client.get_full_block_range(header.hash(), 50).await;
828        assert_eq!(received.len(), 50);
829        for (i, block) in received.iter().enumerate() {
830            let expected_number = header.number - i as u64;
831            assert_eq!(block.number, expected_number);
832        }
833    }
834
835    #[tokio::test]
836    async fn download_full_block_range_with_invalid_header() {
837        let client = TestFullBlockClient::default();
838        let range_length: usize = 3;
839        let (header, _) = insert_headers_into_client(&client, 0..range_length);
840
841        let test_consensus = reth_consensus::test_utils::TestConsensus::default();
842        test_consensus.set_fail_validation(true);
843        test_consensus.set_fail_body_against_header(false);
844        let client = FullBlockClient::new(client, Arc::new(test_consensus));
845
846        let received = client.get_full_block_range(header.hash(), range_length as u64).await;
847
848        assert_eq!(received.len(), range_length);
849        for (i, block) in received.iter().enumerate() {
850            let expected_number = header.number - i as u64;
851            assert_eq!(block.number, expected_number);
852        }
853    }
854}