1use crate::{
4 budget::DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS, metered_poll_nested_stream_with_budget,
5 metrics::EthRequestHandlerMetrics,
6};
7use alloy_consensus::BlockHeader;
8use alloy_eips::BlockHashOrNumber;
9use alloy_rlp::Encodable;
10use futures::StreamExt;
11use reth_eth_wire::{
12 BlockBodies, BlockHeaders, EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, GetNodeData,
13 GetReceipts, HeadersDirection, NetworkPrimitives, NodeData, Receipts,
14};
15use reth_network_api::test_utils::PeersHandle;
16use reth_network_p2p::error::RequestResult;
17use reth_network_peers::PeerId;
18use reth_primitives_traits::Block;
19use reth_storage_api::{BlockReader, HeaderProvider, ReceiptProvider};
20use std::{
21 future::Future,
22 pin::Pin,
23 task::{Context, Poll},
24 time::Duration,
25};
26use tokio::sync::{mpsc::Receiver, oneshot};
27use tokio_stream::wrappers::ReceiverStream;
28
29const MAX_RECEIPTS_SERVE: usize = 1024;
35
36const MAX_HEADERS_SERVE: usize = 1024;
40
41const MAX_BODIES_SERVE: usize = 1024;
46
47const SOFT_RESPONSE_LIMIT: usize = 2 * 1024 * 1024;
49
50#[derive(Debug)]
54#[must_use = "Manager does nothing unless polled."]
55pub struct EthRequestHandler<C, N: NetworkPrimitives = EthNetworkPrimitives> {
56 client: C,
58 #[allow(dead_code)]
61 peers: PeersHandle,
62 incoming_requests: ReceiverStream<IncomingEthRequest<N>>,
64 metrics: EthRequestHandlerMetrics,
66}
67
68impl<C, N: NetworkPrimitives> EthRequestHandler<C, N> {
70 pub fn new(client: C, peers: PeersHandle, incoming: Receiver<IncomingEthRequest<N>>) -> Self {
72 Self {
73 client,
74 peers,
75 incoming_requests: ReceiverStream::new(incoming),
76 metrics: Default::default(),
77 }
78 }
79}
80
81impl<C, N> EthRequestHandler<C, N>
82where
83 N: NetworkPrimitives,
84 C: BlockReader + HeaderProvider + ReceiptProvider<Receipt = reth_primitives::Receipt>,
85{
86 fn get_headers_response(&self, request: GetBlockHeaders) -> Vec<C::Header> {
88 let GetBlockHeaders { start_block, limit, skip, direction } = request;
89
90 let mut headers = Vec::new();
91
92 let mut block: BlockHashOrNumber = match start_block {
93 BlockHashOrNumber::Hash(start) => start.into(),
94 BlockHashOrNumber::Number(num) => {
95 let Some(hash) = self.client.block_hash(num).unwrap_or_default() else {
96 return headers
97 };
98 hash.into()
99 }
100 };
101
102 let skip = skip as u64;
103 let mut total_bytes = 0;
104
105 for _ in 0..limit {
106 if let Some(header) = self.client.header_by_hash_or_number(block).unwrap_or_default() {
107 match direction {
108 HeadersDirection::Rising => {
109 if let Some(next) = (header.number() + 1).checked_add(skip) {
110 block = next.into()
111 } else {
112 break
113 }
114 }
115 HeadersDirection::Falling => {
116 if skip > 0 {
117 if let Some(next) =
120 header.number().checked_sub(1).and_then(|num| num.checked_sub(skip))
121 {
122 block = next.into()
123 } else {
124 break
125 }
126 } else {
127 block = header.parent_hash().into()
128 }
129 }
130 }
131
132 total_bytes += header.length();
133 headers.push(header);
134
135 if headers.len() >= MAX_HEADERS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
136 break
137 }
138 } else {
139 break
140 }
141 }
142
143 headers
144 }
145
146 fn on_headers_request(
147 &self,
148 _peer_id: PeerId,
149 request: GetBlockHeaders,
150 response: oneshot::Sender<RequestResult<BlockHeaders<C::Header>>>,
151 ) {
152 self.metrics.eth_headers_requests_received_total.increment(1);
153 let headers = self.get_headers_response(request);
154 let _ = response.send(Ok(BlockHeaders(headers)));
155 }
156
157 fn on_bodies_request(
158 &self,
159 _peer_id: PeerId,
160 request: GetBlockBodies,
161 response: oneshot::Sender<
162 RequestResult<BlockBodies<<C::Block as reth_primitives_traits::Block>::Body>>,
163 >,
164 ) {
165 self.metrics.eth_bodies_requests_received_total.increment(1);
166 let mut bodies = Vec::new();
167
168 let mut total_bytes = 0;
169
170 for hash in request.0 {
171 if let Some(block) = self.client.block_by_hash(hash).unwrap_or_default() {
172 let (_, body) = block.split();
173 total_bytes += body.length();
174 bodies.push(body);
175
176 if bodies.len() >= MAX_BODIES_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
177 break
178 }
179 } else {
180 break
181 }
182 }
183
184 let _ = response.send(Ok(BlockBodies(bodies)));
185 }
186
187 fn on_receipts_request(
188 &self,
189 _peer_id: PeerId,
190 request: GetReceipts,
191 response: oneshot::Sender<RequestResult<Receipts>>,
192 ) {
193 self.metrics.eth_receipts_requests_received_total.increment(1);
194
195 let mut receipts = Vec::new();
196
197 let mut total_bytes = 0;
198
199 for hash in request.0 {
200 if let Some(receipts_by_block) =
201 self.client.receipts_by_block(BlockHashOrNumber::Hash(hash)).unwrap_or_default()
202 {
203 let receipt = receipts_by_block
204 .into_iter()
205 .map(|receipt| receipt.with_bloom())
206 .collect::<Vec<_>>();
207
208 total_bytes += receipt.length();
209 receipts.push(receipt);
210
211 if receipts.len() >= MAX_RECEIPTS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
212 break
213 }
214 } else {
215 break
216 }
217 }
218
219 let _ = response.send(Ok(Receipts(receipts)));
220 }
221}
222
223impl<C, N> Future for EthRequestHandler<C, N>
227where
228 N: NetworkPrimitives,
229 C: BlockReader<Block = N::Block, Receipt = reth_primitives::Receipt>
230 + HeaderProvider<Header = N::BlockHeader>
231 + Unpin,
232{
233 type Output = ();
234
235 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
236 let this = self.get_mut();
237
238 let mut acc = Duration::ZERO;
239 let maybe_more_incoming_requests = metered_poll_nested_stream_with_budget!(
240 acc,
241 "net::eth",
242 "Incoming eth requests stream",
243 DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS,
244 this.incoming_requests.poll_next_unpin(cx),
245 |incoming| {
246 match incoming {
247 IncomingEthRequest::GetBlockHeaders { peer_id, request, response } => {
248 this.on_headers_request(peer_id, request, response)
249 }
250 IncomingEthRequest::GetBlockBodies { peer_id, request, response } => {
251 this.on_bodies_request(peer_id, request, response)
252 }
253 IncomingEthRequest::GetNodeData { .. } => {
254 this.metrics.eth_node_data_requests_received_total.increment(1);
255 }
256 IncomingEthRequest::GetReceipts { peer_id, request, response } => {
257 this.on_receipts_request(peer_id, request, response)
258 }
259 }
260 },
261 );
262
263 this.metrics.acc_duration_poll_eth_req_handler.set(acc.as_secs_f64());
264
265 if maybe_more_incoming_requests {
267 cx.waker().wake_by_ref();
269 return Poll::Pending
270 }
271
272 Poll::Pending
273 }
274}
275
276#[derive(Debug)]
278pub enum IncomingEthRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
279 GetBlockHeaders {
283 peer_id: PeerId,
285 request: GetBlockHeaders,
287 response: oneshot::Sender<RequestResult<BlockHeaders<N::BlockHeader>>>,
289 },
290 GetBlockBodies {
294 peer_id: PeerId,
296 request: GetBlockBodies,
298 response: oneshot::Sender<RequestResult<BlockBodies<N::BlockBody>>>,
300 },
301 GetNodeData {
305 peer_id: PeerId,
307 request: GetNodeData,
309 response: oneshot::Sender<RequestResult<NodeData>>,
311 },
312 GetReceipts {
316 peer_id: PeerId,
318 request: GetReceipts,
320 response: oneshot::Sender<RequestResult<Receipts>>,
322 },
323}