reth_network/fetch/
mod.rs

1//! Fetch data from the network.
2
3mod client;
4
5pub use client::FetchClient;
6
7use crate::message::BlockRequest;
8use alloy_primitives::B256;
9use futures::StreamExt;
10use reth_eth_wire::{EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, NetworkPrimitives};
11use reth_network_api::test_utils::PeersHandle;
12use reth_network_p2p::{
13    error::{EthResponseValidator, PeerRequestResult, RequestError, RequestResult},
14    headers::client::HeadersRequest,
15    priority::Priority,
16};
17use reth_network_peers::PeerId;
18use reth_network_types::ReputationChangeKind;
19use std::{
20    collections::{HashMap, VecDeque},
21    sync::{
22        atomic::{AtomicU64, AtomicUsize, Ordering},
23        Arc,
24    },
25    task::{Context, Poll},
26};
27use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
28use tokio_stream::wrappers::UnboundedReceiverStream;
29
30type InflightHeadersRequest<H> = Request<HeadersRequest, PeerRequestResult<Vec<H>>>;
31type InflightBodiesRequest<B> = Request<Vec<B256>, PeerRequestResult<Vec<B>>>;
32
33/// Manages data fetching operations.
34///
35/// This type is hooked into the staged sync pipeline and delegates download request to available
36/// peers and sends the response once ready.
37///
38/// This type maintains a list of connected peers that are available for requests.
39#[derive(Debug)]
40pub struct StateFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
41    /// Currently active [`GetBlockHeaders`] requests
42    inflight_headers_requests: HashMap<PeerId, InflightHeadersRequest<N::BlockHeader>>,
43    /// Currently active [`GetBlockBodies`] requests
44    inflight_bodies_requests: HashMap<PeerId, InflightBodiesRequest<N::BlockBody>>,
45    /// The list of _available_ peers for requests.
46    peers: HashMap<PeerId, Peer>,
47    /// The handle to the peers manager
48    peers_handle: PeersHandle,
49    /// Number of active peer sessions the node's currently handling.
50    num_active_peers: Arc<AtomicUsize>,
51    /// Requests queued for processing
52    queued_requests: VecDeque<DownloadRequest<N>>,
53    /// Receiver for new incoming download requests
54    download_requests_rx: UnboundedReceiverStream<DownloadRequest<N>>,
55    /// Sender for download requests, used to detach a [`FetchClient`]
56    download_requests_tx: UnboundedSender<DownloadRequest<N>>,
57}
58
59// === impl StateSyncer ===
60
61impl<N: NetworkPrimitives> StateFetcher<N> {
62    pub(crate) fn new(peers_handle: PeersHandle, num_active_peers: Arc<AtomicUsize>) -> Self {
63        let (download_requests_tx, download_requests_rx) = mpsc::unbounded_channel();
64        Self {
65            inflight_headers_requests: Default::default(),
66            inflight_bodies_requests: Default::default(),
67            peers: Default::default(),
68            peers_handle,
69            num_active_peers,
70            queued_requests: Default::default(),
71            download_requests_rx: UnboundedReceiverStream::new(download_requests_rx),
72            download_requests_tx,
73        }
74    }
75
76    /// Invoked when connected to a new peer.
77    pub(crate) fn new_active_peer(
78        &mut self,
79        peer_id: PeerId,
80        best_hash: B256,
81        best_number: u64,
82        timeout: Arc<AtomicU64>,
83    ) {
84        self.peers.insert(
85            peer_id,
86            Peer {
87                state: PeerState::Idle,
88                best_hash,
89                best_number,
90                timeout,
91                last_response_likely_bad: false,
92            },
93        );
94    }
95
96    /// Removes the peer from the peer list, after which it is no longer available for future
97    /// requests.
98    ///
99    /// Invoked when an active session was closed.
100    ///
101    /// This cancels also inflight request and sends an error to the receiver.
102    pub(crate) fn on_session_closed(&mut self, peer: &PeerId) {
103        self.peers.remove(peer);
104        if let Some(req) = self.inflight_headers_requests.remove(peer) {
105            let _ = req.response.send(Err(RequestError::ConnectionDropped));
106        }
107        if let Some(req) = self.inflight_bodies_requests.remove(peer) {
108            let _ = req.response.send(Err(RequestError::ConnectionDropped));
109        }
110    }
111
112    /// Updates the block information for the peer.
113    ///
114    /// Returns `true` if this a newer block
115    pub(crate) fn update_peer_block(&mut self, peer_id: &PeerId, hash: B256, number: u64) -> bool {
116        if let Some(peer) = self.peers.get_mut(peer_id) {
117            if number > peer.best_number {
118                peer.best_hash = hash;
119                peer.best_number = number;
120                return true
121            }
122        }
123        false
124    }
125
126    /// Invoked when an active session is about to be disconnected.
127    pub(crate) fn on_pending_disconnect(&mut self, peer_id: &PeerId) {
128        if let Some(peer) = self.peers.get_mut(peer_id) {
129            peer.state = PeerState::Closing;
130        }
131    }
132
133    /// Returns the _next_ idle peer that's ready to accept a request,
134    /// prioritizing those with the lowest timeout/latency and those that recently responded with
135    /// adequate data.
136    fn next_best_peer(&self) -> Option<PeerId> {
137        let mut idle = self.peers.iter().filter(|(_, peer)| peer.state.is_idle());
138
139        let mut best_peer = idle.next()?;
140
141        for maybe_better in idle {
142            // replace best peer if our current best peer sent us a bad response last time
143            if best_peer.1.last_response_likely_bad && !maybe_better.1.last_response_likely_bad {
144                best_peer = maybe_better;
145                continue
146            }
147
148            // replace best peer if this peer has better rtt
149            if maybe_better.1.timeout() < best_peer.1.timeout() &&
150                !maybe_better.1.last_response_likely_bad
151            {
152                best_peer = maybe_better;
153            }
154        }
155
156        Some(*best_peer.0)
157    }
158
159    /// Returns the next action to return
160    fn poll_action(&mut self) -> PollAction {
161        // we only check and not pop here since we don't know yet whether a peer is available.
162        if self.queued_requests.is_empty() {
163            return PollAction::NoRequests
164        }
165
166        let Some(peer_id) = self.next_best_peer() else { return PollAction::NoPeersAvailable };
167
168        let request = self.queued_requests.pop_front().expect("not empty");
169        let request = self.prepare_block_request(peer_id, request);
170
171        PollAction::Ready(FetchAction::BlockRequest { peer_id, request })
172    }
173
174    /// Advance the state the syncer
175    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<FetchAction> {
176        // drain buffered actions first
177        loop {
178            let no_peers_available = match self.poll_action() {
179                PollAction::Ready(action) => return Poll::Ready(action),
180                PollAction::NoRequests => false,
181                PollAction::NoPeersAvailable => true,
182            };
183
184            loop {
185                // poll incoming requests
186                match self.download_requests_rx.poll_next_unpin(cx) {
187                    Poll::Ready(Some(request)) => match request.get_priority() {
188                        Priority::High => {
189                            // find the first normal request and queue before, add this request to
190                            // the back of the high-priority queue
191                            let pos = self
192                                .queued_requests
193                                .iter()
194                                .position(|req| req.is_normal_priority())
195                                .unwrap_or(0);
196                            self.queued_requests.insert(pos, request);
197                        }
198                        Priority::Normal => {
199                            self.queued_requests.push_back(request);
200                        }
201                    },
202                    Poll::Ready(None) => {
203                        unreachable!("channel can't close")
204                    }
205                    Poll::Pending => break,
206                }
207            }
208
209            if self.queued_requests.is_empty() || no_peers_available {
210                return Poll::Pending
211            }
212        }
213    }
214
215    /// Handles a new request to a peer.
216    ///
217    /// Caution: this assumes the peer exists and is idle
218    fn prepare_block_request(&mut self, peer_id: PeerId, req: DownloadRequest<N>) -> BlockRequest {
219        // update the peer's state
220        if let Some(peer) = self.peers.get_mut(&peer_id) {
221            peer.state = req.peer_state();
222        }
223
224        match req {
225            DownloadRequest::GetBlockHeaders { request, response, .. } => {
226                let inflight = Request { request: request.clone(), response };
227                self.inflight_headers_requests.insert(peer_id, inflight);
228                let HeadersRequest { start, limit, direction } = request;
229                BlockRequest::GetBlockHeaders(GetBlockHeaders {
230                    start_block: start,
231                    limit,
232                    skip: 0,
233                    direction,
234                })
235            }
236            DownloadRequest::GetBlockBodies { request, response, .. } => {
237                let inflight = Request { request: request.clone(), response };
238                self.inflight_bodies_requests.insert(peer_id, inflight);
239                BlockRequest::GetBlockBodies(GetBlockBodies(request))
240            }
241        }
242    }
243
244    /// Returns a new followup request for the peer.
245    ///
246    /// Caution: this expects that the peer is _not_ closed.
247    fn followup_request(&mut self, peer_id: PeerId) -> Option<BlockResponseOutcome> {
248        let req = self.queued_requests.pop_front()?;
249        let req = self.prepare_block_request(peer_id, req);
250        Some(BlockResponseOutcome::Request(peer_id, req))
251    }
252
253    /// Called on a `GetBlockHeaders` response from a peer.
254    ///
255    /// This delegates the response and returns a [`BlockResponseOutcome`] to either queue in a
256    /// direct followup request or get the peer reported if the response was a
257    /// [`EthResponseValidator::reputation_change_err`]
258    pub(crate) fn on_block_headers_response(
259        &mut self,
260        peer_id: PeerId,
261        res: RequestResult<Vec<N::BlockHeader>>,
262    ) -> Option<BlockResponseOutcome> {
263        let is_error = res.is_err();
264        let maybe_reputation_change = res.reputation_change_err();
265
266        let resp = self.inflight_headers_requests.remove(&peer_id);
267
268        let is_likely_bad_response =
269            resp.as_ref().is_some_and(|r| res.is_likely_bad_headers_response(&r.request));
270
271        if let Some(resp) = resp {
272            // delegate the response
273            let _ = resp.response.send(res.map(|h| (peer_id, h).into()));
274        }
275
276        if let Some(peer) = self.peers.get_mut(&peer_id) {
277            // update the peer's response state
278            peer.last_response_likely_bad = is_likely_bad_response;
279
280            // If the peer is still ready to accept new requests, we try to send a followup
281            // request immediately.
282            if peer.state.on_request_finished() && !is_error && !is_likely_bad_response {
283                return self.followup_request(peer_id)
284            }
285        }
286
287        // if the response was an `Err` worth reporting the peer for then we return a `BadResponse`
288        // outcome
289        maybe_reputation_change
290            .map(|reputation_change| BlockResponseOutcome::BadResponse(peer_id, reputation_change))
291    }
292
293    /// Called on a `GetBlockBodies` response from a peer
294    pub(crate) fn on_block_bodies_response(
295        &mut self,
296        peer_id: PeerId,
297        res: RequestResult<Vec<N::BlockBody>>,
298    ) -> Option<BlockResponseOutcome> {
299        let is_likely_bad_response = res.as_ref().map_or(true, |bodies| bodies.is_empty());
300
301        if let Some(resp) = self.inflight_bodies_requests.remove(&peer_id) {
302            let _ = resp.response.send(res.map(|b| (peer_id, b).into()));
303        }
304        if let Some(peer) = self.peers.get_mut(&peer_id) {
305            // update the peer's response state
306            peer.last_response_likely_bad = is_likely_bad_response;
307
308            if peer.state.on_request_finished() && !is_likely_bad_response {
309                return self.followup_request(peer_id)
310            }
311        }
312        None
313    }
314
315    /// Returns a new [`FetchClient`] that can send requests to this type.
316    pub(crate) fn client(&self) -> FetchClient<N> {
317        FetchClient {
318            request_tx: self.download_requests_tx.clone(),
319            peers_handle: self.peers_handle.clone(),
320            num_active_peers: Arc::clone(&self.num_active_peers),
321        }
322    }
323}
324
325/// The outcome of [`StateFetcher::poll_action`]
326enum PollAction {
327    Ready(FetchAction),
328    NoRequests,
329    NoPeersAvailable,
330}
331
332/// Represents a connected peer
333#[derive(Debug)]
334struct Peer {
335    /// The state this peer currently resides in.
336    state: PeerState,
337    /// Best known hash that the peer has
338    best_hash: B256,
339    /// Tracks the best number of the peer.
340    best_number: u64,
341    /// Tracks the current timeout value we use for the peer.
342    timeout: Arc<AtomicU64>,
343    /// Tracks whether the peer has recently responded with a likely bad response.
344    ///
345    /// This is used to de-rank the peer if there are other peers available.
346    /// This exists because empty responses may not be penalized (e.g. when blocks near the tip are
347    /// downloaded), but we still want to avoid requesting from the same peer again if it has the
348    /// lowest timeout.
349    last_response_likely_bad: bool,
350}
351
352impl Peer {
353    fn timeout(&self) -> u64 {
354        self.timeout.load(Ordering::Relaxed)
355    }
356}
357
358/// Tracks the state of an individual peer
359#[derive(Debug)]
360enum PeerState {
361    /// Peer is currently not handling requests and is available.
362    Idle,
363    /// Peer is handling a `GetBlockHeaders` request.
364    GetBlockHeaders,
365    /// Peer is handling a `GetBlockBodies` request.
366    GetBlockBodies,
367    /// Peer session is about to close
368    Closing,
369}
370
371// === impl PeerState ===
372
373impl PeerState {
374    /// Returns true if the peer is currently idle.
375    const fn is_idle(&self) -> bool {
376        matches!(self, Self::Idle)
377    }
378
379    /// Resets the state on a received response.
380    ///
381    /// If the state was already marked as `Closing` do nothing.
382    ///
383    /// Returns `true` if the peer is ready for another request.
384    fn on_request_finished(&mut self) -> bool {
385        if !matches!(self, Self::Closing) {
386            *self = Self::Idle;
387            return true
388        }
389        false
390    }
391}
392
393/// A request that waits for a response from the network, so it can send it back through the
394/// response channel.
395#[derive(Debug)]
396struct Request<Req, Resp> {
397    /// The issued request object
398    // TODO: this can be attached to the response in error case
399    #[allow(dead_code)]
400    request: Req,
401    response: oneshot::Sender<Resp>,
402}
403
404/// Requests that can be sent to the Syncer from a [`FetchClient`]
405#[derive(Debug)]
406pub(crate) enum DownloadRequest<N: NetworkPrimitives> {
407    /// Download the requested headers and send response through channel
408    GetBlockHeaders {
409        request: HeadersRequest,
410        response: oneshot::Sender<PeerRequestResult<Vec<N::BlockHeader>>>,
411        priority: Priority,
412    },
413    /// Download the requested headers and send response through channel
414    GetBlockBodies {
415        request: Vec<B256>,
416        response: oneshot::Sender<PeerRequestResult<Vec<N::BlockBody>>>,
417        priority: Priority,
418    },
419}
420
421// === impl DownloadRequest ===
422
423impl<N: NetworkPrimitives> DownloadRequest<N> {
424    /// Returns the corresponding state for a peer that handles the request.
425    const fn peer_state(&self) -> PeerState {
426        match self {
427            Self::GetBlockHeaders { .. } => PeerState::GetBlockHeaders,
428            Self::GetBlockBodies { .. } => PeerState::GetBlockBodies,
429        }
430    }
431
432    /// Returns the requested priority of this request
433    const fn get_priority(&self) -> &Priority {
434        match self {
435            Self::GetBlockHeaders { priority, .. } | Self::GetBlockBodies { priority, .. } => {
436                priority
437            }
438        }
439    }
440
441    /// Returns `true` if this request is normal priority.
442    const fn is_normal_priority(&self) -> bool {
443        self.get_priority().is_normal()
444    }
445}
446
447/// An action the syncer can emit.
448pub(crate) enum FetchAction {
449    /// Dispatch an eth request to the given peer.
450    BlockRequest {
451        /// The targeted recipient for the request
452        peer_id: PeerId,
453        /// The request to send
454        request: BlockRequest,
455    },
456}
457
458/// Outcome of a processed response.
459///
460/// Returned after processing a response.
461#[derive(Debug, PartialEq, Eq)]
462pub(crate) enum BlockResponseOutcome {
463    /// Continue with another request to the peer.
464    Request(PeerId, BlockRequest),
465    /// How to handle a bad response and the reputation change to apply, if any.
466    BadResponse(PeerId, ReputationChangeKind),
467}
468
469#[cfg(test)]
470mod tests {
471    use super::*;
472    use crate::{peers::PeersManager, PeersConfig};
473    use alloy_consensus::Header;
474    use alloy_primitives::B512;
475    use std::future::poll_fn;
476
477    #[tokio::test(flavor = "multi_thread")]
478    async fn test_poll_fetcher() {
479        let manager = PeersManager::new(PeersConfig::default());
480        let mut fetcher =
481            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
482
483        poll_fn(move |cx| {
484            assert!(fetcher.poll(cx).is_pending());
485            let (tx, _rx) = oneshot::channel();
486            fetcher.queued_requests.push_back(DownloadRequest::GetBlockBodies {
487                request: vec![],
488                response: tx,
489                priority: Priority::default(),
490            });
491            assert!(fetcher.poll(cx).is_pending());
492
493            Poll::Ready(())
494        })
495        .await;
496    }
497
498    #[tokio::test]
499    async fn test_peer_rotation() {
500        let manager = PeersManager::new(PeersConfig::default());
501        let mut fetcher =
502            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
503        // Add a few random peers
504        let peer1 = B512::random();
505        let peer2 = B512::random();
506        fetcher.new_active_peer(peer1, B256::random(), 1, Arc::new(AtomicU64::new(1)));
507        fetcher.new_active_peer(peer2, B256::random(), 2, Arc::new(AtomicU64::new(1)));
508
509        let first_peer = fetcher.next_best_peer().unwrap();
510        assert!(first_peer == peer1 || first_peer == peer2);
511        // Pending disconnect for first_peer
512        fetcher.on_pending_disconnect(&first_peer);
513        // first_peer now isn't idle, so we should get other peer
514        let second_peer = fetcher.next_best_peer().unwrap();
515        assert!(first_peer == peer1 || first_peer == peer2);
516        assert_ne!(first_peer, second_peer);
517        // without idle peers, returns None
518        fetcher.on_pending_disconnect(&second_peer);
519        assert_eq!(fetcher.next_best_peer(), None);
520    }
521
522    #[tokio::test]
523    async fn test_peer_prioritization() {
524        let manager = PeersManager::new(PeersConfig::default());
525        let mut fetcher =
526            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
527        // Add a few random peers
528        let peer1 = B512::random();
529        let peer2 = B512::random();
530        let peer3 = B512::random();
531
532        let peer2_timeout = Arc::new(AtomicU64::new(300));
533
534        fetcher.new_active_peer(peer1, B256::random(), 1, Arc::new(AtomicU64::new(30)));
535        fetcher.new_active_peer(peer2, B256::random(), 2, Arc::clone(&peer2_timeout));
536        fetcher.new_active_peer(peer3, B256::random(), 3, Arc::new(AtomicU64::new(50)));
537
538        // Must always get peer1 (lowest timeout)
539        assert_eq!(fetcher.next_best_peer(), Some(peer1));
540        assert_eq!(fetcher.next_best_peer(), Some(peer1));
541        // peer2's timeout changes below peer1's
542        peer2_timeout.store(10, Ordering::Relaxed);
543        // Then we get peer 2 always (now lowest)
544        assert_eq!(fetcher.next_best_peer(), Some(peer2));
545        assert_eq!(fetcher.next_best_peer(), Some(peer2));
546    }
547
548    #[tokio::test]
549    async fn test_on_block_headers_response() {
550        let manager = PeersManager::new(PeersConfig::default());
551        let mut fetcher =
552            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
553        let peer_id = B512::random();
554
555        assert_eq!(fetcher.on_block_headers_response(peer_id, Ok(vec![Header::default()])), None);
556
557        assert_eq!(
558            fetcher.on_block_headers_response(peer_id, Err(RequestError::Timeout)),
559            Some(BlockResponseOutcome::BadResponse(peer_id, ReputationChangeKind::Timeout))
560        );
561        assert_eq!(
562            fetcher.on_block_headers_response(peer_id, Err(RequestError::BadResponse)),
563            None
564        );
565        assert_eq!(
566            fetcher.on_block_headers_response(peer_id, Err(RequestError::ChannelClosed)),
567            None
568        );
569        assert_eq!(
570            fetcher.on_block_headers_response(peer_id, Err(RequestError::ConnectionDropped)),
571            None
572        );
573        assert_eq!(
574            fetcher.on_block_headers_response(peer_id, Err(RequestError::UnsupportedCapability)),
575            None
576        );
577    }
578
579    #[tokio::test]
580    async fn test_header_response_outcome() {
581        let manager = PeersManager::new(PeersConfig::default());
582        let mut fetcher =
583            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
584        let peer_id = B512::random();
585
586        let request_pair = || {
587            let (tx, _rx) = oneshot::channel();
588            let req = Request {
589                request: HeadersRequest {
590                    start: 0u64.into(),
591                    limit: 1,
592                    direction: Default::default(),
593                },
594                response: tx,
595            };
596            let header = Header { number: 0, ..Default::default() };
597            (req, header)
598        };
599
600        fetcher.new_active_peer(
601            peer_id,
602            Default::default(),
603            Default::default(),
604            Default::default(),
605        );
606
607        let (req, header) = request_pair();
608        fetcher.inflight_headers_requests.insert(peer_id, req);
609
610        let outcome = fetcher.on_block_headers_response(peer_id, Ok(vec![header]));
611        assert!(outcome.is_none());
612        assert!(fetcher.peers[&peer_id].state.is_idle());
613
614        let outcome =
615            fetcher.on_block_headers_response(peer_id, Err(RequestError::Timeout)).unwrap();
616
617        assert!(EthResponseValidator::reputation_change_err(&Err::<Vec<Header>, _>(
618            RequestError::Timeout
619        ))
620        .is_some());
621
622        match outcome {
623            BlockResponseOutcome::BadResponse(peer, _) => {
624                assert_eq!(peer, peer_id)
625            }
626            BlockResponseOutcome::Request(_, _) => {
627                unreachable!()
628            }
629        };
630
631        assert!(fetcher.peers[&peer_id].state.is_idle());
632    }
633}