reth_network/
eth_requests.rs

1//! Blocks/Headers management for the p2p network.
2
3use crate::{
4    budget::DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS, metered_poll_nested_stream_with_budget,
5    metrics::EthRequestHandlerMetrics,
6};
7use alloy_consensus::BlockHeader;
8use alloy_eips::BlockHashOrNumber;
9use alloy_rlp::Encodable;
10use futures::StreamExt;
11use reth_eth_wire::{
12    BlockBodies, BlockHeaders, EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, GetNodeData,
13    GetReceipts, HeadersDirection, NetworkPrimitives, NodeData, Receipts,
14};
15use reth_network_api::test_utils::PeersHandle;
16use reth_network_p2p::error::RequestResult;
17use reth_network_peers::PeerId;
18use reth_primitives_traits::Block;
19use reth_storage_api::{BlockReader, HeaderProvider, ReceiptProvider};
20use std::{
21    future::Future,
22    pin::Pin,
23    task::{Context, Poll},
24    time::Duration,
25};
26use tokio::sync::{mpsc::Receiver, oneshot};
27use tokio_stream::wrappers::ReceiverStream;
28
29// Limits: <https://github.com/ethereum/go-ethereum/blob/b0d44338bbcefee044f1f635a84487cbbd8f0538/eth/protocols/eth/handler.go#L34-L56>
30
31/// Maximum number of receipts to serve.
32///
33/// Used to limit lookups.
34const MAX_RECEIPTS_SERVE: usize = 1024;
35
36/// Maximum number of block headers to serve.
37///
38/// Used to limit lookups.
39const MAX_HEADERS_SERVE: usize = 1024;
40
41/// Maximum number of block headers to serve.
42///
43/// Used to limit lookups. With 24KB block sizes nowadays, the practical limit will always be
44/// `SOFT_RESPONSE_LIMIT`.
45const MAX_BODIES_SERVE: usize = 1024;
46
47/// Maximum size of replies to data retrievals.
48const SOFT_RESPONSE_LIMIT: usize = 2 * 1024 * 1024;
49
50/// Manages eth related requests on top of the p2p network.
51///
52/// This can be spawned to another task and is supposed to be run as background service.
53#[derive(Debug)]
54#[must_use = "Manager does nothing unless polled."]
55pub struct EthRequestHandler<C, N: NetworkPrimitives = EthNetworkPrimitives> {
56    /// The client type that can interact with the chain.
57    client: C,
58    /// Used for reporting peers.
59    // TODO use to report spammers
60    #[allow(dead_code)]
61    peers: PeersHandle,
62    /// Incoming request from the [`NetworkManager`](crate::NetworkManager).
63    incoming_requests: ReceiverStream<IncomingEthRequest<N>>,
64    /// Metrics for the eth request handler.
65    metrics: EthRequestHandlerMetrics,
66}
67
68// === impl EthRequestHandler ===
69impl<C, N: NetworkPrimitives> EthRequestHandler<C, N> {
70    /// Create a new instance
71    pub fn new(client: C, peers: PeersHandle, incoming: Receiver<IncomingEthRequest<N>>) -> Self {
72        Self {
73            client,
74            peers,
75            incoming_requests: ReceiverStream::new(incoming),
76            metrics: Default::default(),
77        }
78    }
79}
80
81impl<C, N> EthRequestHandler<C, N>
82where
83    N: NetworkPrimitives,
84    C: BlockReader + HeaderProvider + ReceiptProvider<Receipt = reth_primitives::Receipt>,
85{
86    /// Returns the list of requested headers
87    fn get_headers_response(&self, request: GetBlockHeaders) -> Vec<C::Header> {
88        let GetBlockHeaders { start_block, limit, skip, direction } = request;
89
90        let mut headers = Vec::new();
91
92        let mut block: BlockHashOrNumber = match start_block {
93            BlockHashOrNumber::Hash(start) => start.into(),
94            BlockHashOrNumber::Number(num) => {
95                let Some(hash) = self.client.block_hash(num).unwrap_or_default() else {
96                    return headers
97                };
98                hash.into()
99            }
100        };
101
102        let skip = skip as u64;
103        let mut total_bytes = 0;
104
105        for _ in 0..limit {
106            if let Some(header) = self.client.header_by_hash_or_number(block).unwrap_or_default() {
107                match direction {
108                    HeadersDirection::Rising => {
109                        if let Some(next) = (header.number() + 1).checked_add(skip) {
110                            block = next.into()
111                        } else {
112                            break
113                        }
114                    }
115                    HeadersDirection::Falling => {
116                        if skip > 0 {
117                            // prevent under flows for block.number == 0 and `block.number - skip <
118                            // 0`
119                            if let Some(next) =
120                                header.number().checked_sub(1).and_then(|num| num.checked_sub(skip))
121                            {
122                                block = next.into()
123                            } else {
124                                break
125                            }
126                        } else {
127                            block = header.parent_hash().into()
128                        }
129                    }
130                }
131
132                total_bytes += header.length();
133                headers.push(header);
134
135                if headers.len() >= MAX_HEADERS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
136                    break
137                }
138            } else {
139                break
140            }
141        }
142
143        headers
144    }
145
146    fn on_headers_request(
147        &self,
148        _peer_id: PeerId,
149        request: GetBlockHeaders,
150        response: oneshot::Sender<RequestResult<BlockHeaders<C::Header>>>,
151    ) {
152        self.metrics.eth_headers_requests_received_total.increment(1);
153        let headers = self.get_headers_response(request);
154        let _ = response.send(Ok(BlockHeaders(headers)));
155    }
156
157    fn on_bodies_request(
158        &self,
159        _peer_id: PeerId,
160        request: GetBlockBodies,
161        response: oneshot::Sender<
162            RequestResult<BlockBodies<<C::Block as reth_primitives_traits::Block>::Body>>,
163        >,
164    ) {
165        self.metrics.eth_bodies_requests_received_total.increment(1);
166        let mut bodies = Vec::new();
167
168        let mut total_bytes = 0;
169
170        for hash in request.0 {
171            if let Some(block) = self.client.block_by_hash(hash).unwrap_or_default() {
172                let (_, body) = block.split();
173                total_bytes += body.length();
174                bodies.push(body);
175
176                if bodies.len() >= MAX_BODIES_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
177                    break
178                }
179            } else {
180                break
181            }
182        }
183
184        let _ = response.send(Ok(BlockBodies(bodies)));
185    }
186
187    fn on_receipts_request(
188        &self,
189        _peer_id: PeerId,
190        request: GetReceipts,
191        response: oneshot::Sender<RequestResult<Receipts>>,
192    ) {
193        self.metrics.eth_receipts_requests_received_total.increment(1);
194
195        let mut receipts = Vec::new();
196
197        let mut total_bytes = 0;
198
199        for hash in request.0 {
200            if let Some(receipts_by_block) =
201                self.client.receipts_by_block(BlockHashOrNumber::Hash(hash)).unwrap_or_default()
202            {
203                let receipt = receipts_by_block
204                    .into_iter()
205                    .map(|receipt| receipt.with_bloom())
206                    .collect::<Vec<_>>();
207
208                total_bytes += receipt.length();
209                receipts.push(receipt);
210
211                if receipts.len() >= MAX_RECEIPTS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
212                    break
213                }
214            } else {
215                break
216            }
217        }
218
219        let _ = response.send(Ok(Receipts(receipts)));
220    }
221}
222
223/// An endless future.
224///
225/// This should be spawned or used as part of `tokio::select!`.
226impl<C, N> Future for EthRequestHandler<C, N>
227where
228    N: NetworkPrimitives,
229    C: BlockReader<Block = N::Block, Receipt = reth_primitives::Receipt>
230        + HeaderProvider<Header = N::BlockHeader>
231        + Unpin,
232{
233    type Output = ();
234
235    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
236        let this = self.get_mut();
237
238        let mut acc = Duration::ZERO;
239        let maybe_more_incoming_requests = metered_poll_nested_stream_with_budget!(
240            acc,
241            "net::eth",
242            "Incoming eth requests stream",
243            DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS,
244            this.incoming_requests.poll_next_unpin(cx),
245            |incoming| {
246                match incoming {
247                    IncomingEthRequest::GetBlockHeaders { peer_id, request, response } => {
248                        this.on_headers_request(peer_id, request, response)
249                    }
250                    IncomingEthRequest::GetBlockBodies { peer_id, request, response } => {
251                        this.on_bodies_request(peer_id, request, response)
252                    }
253                    IncomingEthRequest::GetNodeData { .. } => {
254                        this.metrics.eth_node_data_requests_received_total.increment(1);
255                    }
256                    IncomingEthRequest::GetReceipts { peer_id, request, response } => {
257                        this.on_receipts_request(peer_id, request, response)
258                    }
259                }
260            },
261        );
262
263        this.metrics.acc_duration_poll_eth_req_handler.set(acc.as_secs_f64());
264
265        // stream is fully drained and import futures pending
266        if maybe_more_incoming_requests {
267            // make sure we're woken up again
268            cx.waker().wake_by_ref();
269            return Poll::Pending
270        }
271
272        Poll::Pending
273    }
274}
275
276/// All `eth` request related to blocks delegated by the network.
277#[derive(Debug)]
278pub enum IncomingEthRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
279    /// Request Block headers from the peer.
280    ///
281    /// The response should be sent through the channel.
282    GetBlockHeaders {
283        /// The ID of the peer to request block headers from.
284        peer_id: PeerId,
285        /// The specific block headers requested.
286        request: GetBlockHeaders,
287        /// The channel sender for the response containing block headers.
288        response: oneshot::Sender<RequestResult<BlockHeaders<N::BlockHeader>>>,
289    },
290    /// Request Block bodies from the peer.
291    ///
292    /// The response should be sent through the channel.
293    GetBlockBodies {
294        /// The ID of the peer to request block bodies from.
295        peer_id: PeerId,
296        /// The specific block bodies requested.
297        request: GetBlockBodies,
298        /// The channel sender for the response containing block bodies.
299        response: oneshot::Sender<RequestResult<BlockBodies<N::BlockBody>>>,
300    },
301    /// Request Node Data from the peer.
302    ///
303    /// The response should be sent through the channel.
304    GetNodeData {
305        /// The ID of the peer to request node data from.
306        peer_id: PeerId,
307        /// The specific node data requested.
308        request: GetNodeData,
309        /// The channel sender for the response containing node data.
310        response: oneshot::Sender<RequestResult<NodeData>>,
311    },
312    /// Request Receipts from the peer.
313    ///
314    /// The response should be sent through the channel.
315    GetReceipts {
316        /// The ID of the peer to request receipts from.
317        peer_id: PeerId,
318        /// The specific receipts requested.
319        request: GetReceipts,
320        /// The channel sender for the response containing receipts.
321        response: oneshot::Sender<RequestResult<Receipts>>,
322    },
323}