reth_network/fetch/
client.rs

1//! A client implementation that can interact with the network and download data.
2
3use crate::{fetch::DownloadRequest, flattened_response::FlattenedResponse};
4use alloy_primitives::B256;
5use futures::{future, future::Either};
6use reth_eth_wire::{EthNetworkPrimitives, NetworkPrimitives};
7use reth_network_api::test_utils::PeersHandle;
8use reth_network_p2p::{
9    bodies::client::{BodiesClient, BodiesFut},
10    download::DownloadClient,
11    error::{PeerRequestResult, RequestError},
12    headers::client::{HeadersClient, HeadersRequest},
13    priority::Priority,
14};
15use reth_network_peers::PeerId;
16use reth_network_types::ReputationChangeKind;
17use std::sync::{
18    atomic::{AtomicUsize, Ordering},
19    Arc,
20};
21use tokio::sync::{mpsc::UnboundedSender, oneshot};
22
23#[cfg_attr(doc, aquamarine::aquamarine)]
24/// Front-end API for fetching data from the network.
25///
26/// Following diagram illustrates how a request, See [`HeadersClient::get_headers`] and
27/// [`BodiesClient::get_block_bodies`] is handled internally.
28///
29/// include_mmd!("docs/mermaid/fetch-client.mmd")
30#[derive(Debug, Clone)]
31pub struct FetchClient<N: NetworkPrimitives = EthNetworkPrimitives> {
32    /// Sender half of the request channel.
33    pub(crate) request_tx: UnboundedSender<DownloadRequest<N>>,
34    /// The handle to the peers
35    pub(crate) peers_handle: PeersHandle,
36    /// Number of active peer sessions the node's currently handling.
37    pub(crate) num_active_peers: Arc<AtomicUsize>,
38}
39
40impl<N: NetworkPrimitives> DownloadClient for FetchClient<N> {
41    fn report_bad_message(&self, peer_id: PeerId) {
42        self.peers_handle.reputation_change(peer_id, ReputationChangeKind::BadMessage);
43    }
44
45    fn num_connected_peers(&self) -> usize {
46        self.num_active_peers.load(Ordering::Relaxed)
47    }
48}
49
50// The `Output` future of the [HeadersClient] impl of [FetchClient] that either returns a response
51// or an error.
52type HeadersClientFuture<T> = Either<FlattenedResponse<T>, future::Ready<T>>;
53
54impl<N: NetworkPrimitives> HeadersClient for FetchClient<N> {
55    type Header = N::BlockHeader;
56    type Output = HeadersClientFuture<PeerRequestResult<Vec<N::BlockHeader>>>;
57
58    /// Sends a `GetBlockHeaders` request to an available peer.
59    fn get_headers_with_priority(
60        &self,
61        request: HeadersRequest,
62        priority: Priority,
63    ) -> Self::Output {
64        let (response, rx) = oneshot::channel();
65        if self
66            .request_tx
67            .send(DownloadRequest::GetBlockHeaders { request, response, priority })
68            .is_ok()
69        {
70            Either::Left(FlattenedResponse::from(rx))
71        } else {
72            Either::Right(future::err(RequestError::ChannelClosed))
73        }
74    }
75}
76
77impl<N: NetworkPrimitives> BodiesClient for FetchClient<N> {
78    type Body = N::BlockBody;
79    type Output = BodiesFut<N::BlockBody>;
80
81    /// Sends a `GetBlockBodies` request to an available peer.
82    fn get_block_bodies_with_priority(
83        &self,
84        request: Vec<B256>,
85        priority: Priority,
86    ) -> Self::Output {
87        let (response, rx) = oneshot::channel();
88        if self
89            .request_tx
90            .send(DownloadRequest::GetBlockBodies { request, response, priority })
91            .is_ok()
92        {
93            Box::pin(FlattenedResponse::from(rx))
94        } else {
95            Box::pin(future::err(RequestError::ChannelClosed))
96        }
97    }
98}