reth_network/fetch/
client.rs1use crate::{fetch::DownloadRequest, flattened_response::FlattenedResponse};
4use alloy_primitives::B256;
5use futures::{future, future::Either};
6use reth_eth_wire::{EthNetworkPrimitives, NetworkPrimitives};
7use reth_network_api::test_utils::PeersHandle;
8use reth_network_p2p::{
9 bodies::client::{BodiesClient, BodiesFut},
10 download::DownloadClient,
11 error::{PeerRequestResult, RequestError},
12 headers::client::{HeadersClient, HeadersRequest},
13 priority::Priority,
14};
15use reth_network_peers::PeerId;
16use reth_network_types::ReputationChangeKind;
17use std::sync::{
18 atomic::{AtomicUsize, Ordering},
19 Arc,
20};
21use tokio::sync::{mpsc::UnboundedSender, oneshot};
22
23#[cfg_attr(doc, aquamarine::aquamarine)]
24#[derive(Debug, Clone)]
31pub struct FetchClient<N: NetworkPrimitives = EthNetworkPrimitives> {
32 pub(crate) request_tx: UnboundedSender<DownloadRequest<N>>,
34 pub(crate) peers_handle: PeersHandle,
36 pub(crate) num_active_peers: Arc<AtomicUsize>,
38}
39
40impl<N: NetworkPrimitives> DownloadClient for FetchClient<N> {
41 fn report_bad_message(&self, peer_id: PeerId) {
42 self.peers_handle.reputation_change(peer_id, ReputationChangeKind::BadMessage);
43 }
44
45 fn num_connected_peers(&self) -> usize {
46 self.num_active_peers.load(Ordering::Relaxed)
47 }
48}
49
50type HeadersClientFuture<T> = Either<FlattenedResponse<T>, future::Ready<T>>;
53
54impl<N: NetworkPrimitives> HeadersClient for FetchClient<N> {
55 type Header = N::BlockHeader;
56 type Output = HeadersClientFuture<PeerRequestResult<Vec<N::BlockHeader>>>;
57
58 fn get_headers_with_priority(
60 &self,
61 request: HeadersRequest,
62 priority: Priority,
63 ) -> Self::Output {
64 let (response, rx) = oneshot::channel();
65 if self
66 .request_tx
67 .send(DownloadRequest::GetBlockHeaders { request, response, priority })
68 .is_ok()
69 {
70 Either::Left(FlattenedResponse::from(rx))
71 } else {
72 Either::Right(future::err(RequestError::ChannelClosed))
73 }
74 }
75}
76
77impl<N: NetworkPrimitives> BodiesClient for FetchClient<N> {
78 type Body = N::BlockBody;
79 type Output = BodiesFut<N::BlockBody>;
80
81 fn get_block_bodies_with_priority(
83 &self,
84 request: Vec<B256>,
85 priority: Priority,
86 ) -> Self::Output {
87 let (response, rx) = oneshot::channel();
88 if self
89 .request_tx
90 .send(DownloadRequest::GetBlockBodies { request, response, priority })
91 .is_ok()
92 {
93 Box::pin(FlattenedResponse::from(rx))
94 } else {
95 Box::pin(future::err(RequestError::ChannelClosed))
96 }
97 }
98}