reth_network/transactions/
fetcher.rs

1//! `TransactionFetcher` is responsible for rate limiting and retry logic for fetching
2//! transactions. Upon receiving an announcement, functionality of the `TransactionFetcher` is
3//! used for filtering out hashes 1) for which the tx is already known and 2) unknown but the hash
4//! is already seen in a previous announcement. The hashes that remain from an announcement are
5//! then packed into a request with respect to the [`EthVersion`] of the announcement. Any hashes
6//! that don't fit into the request, are buffered in the `TransactionFetcher`. If on the other
7//! hand, space remains, hashes that the peer has previously announced are taken out of buffered
8//! hashes to fill the request up. The [`GetPooledTransactions`] request is then sent to the
9//! peer's session, this marks the peer as active with respect to
10//! `MAX_CONCURRENT_TX_REQUESTS_PER_PEER`.
11//!
12//! When a peer buffers hashes in the `TransactionsManager::on_new_pooled_transaction_hashes`
13//! pipeline, it is stored as fallback peer for those hashes. When [`TransactionsManager`] is
14//! polled, it checks if any of fallback peer is idle. If so, it packs a request for that peer,
15//! filling it from the buffered hashes. It does so until there are no more idle peers or until
16//! the hashes buffer is empty.
17//!
18//! If a [`GetPooledTransactions`] request resolves with an error, the hashes in the request are
19//! buffered with respect to `MAX_REQUEST_RETRIES_PER_TX_HASH`. So is the case if the request
20//! resolves with partial success, that is some of the requested hashes are not in the response,
21//! these are then buffered.
22//!
23//! Most healthy peers will send the same hashes in their announcements, as RLPx is a gossip
24//! protocol. This means it's unlikely, that a valid hash, will be buffered for very long
25//! before it's re-tried. Nonetheless, the capacity of the buffered hashes cache must be large
26//! enough to buffer many hashes during network failure, to allow for recovery.
27
28use super::{
29    config::TransactionFetcherConfig,
30    constants::{tx_fetcher::*, SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST},
31    MessageFilter, PeerMetadata, PooledTransactions,
32    SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
33};
34use crate::{
35    cache::{LruCache, LruMap},
36    duration_metered_exec,
37    metrics::TransactionFetcherMetrics,
38    transactions::{validation, PartiallyFilterMessage},
39};
40use alloy_primitives::TxHash;
41use derive_more::{Constructor, Deref};
42use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt};
43use pin_project::pin_project;
44use reth_eth_wire::{
45    DedupPayload, EthVersion, GetPooledTransactions, HandleMempoolData, HandleVersionedMempoolData,
46    PartiallyValidData, RequestTxHashes, ValidAnnouncementData,
47};
48use reth_eth_wire_types::{EthNetworkPrimitives, NetworkPrimitives};
49use reth_network_api::PeerRequest;
50use reth_network_p2p::error::{RequestError, RequestResult};
51use reth_network_peers::PeerId;
52use reth_primitives::PooledTransactionsElement;
53use reth_primitives_traits::SignedTransaction;
54use schnellru::ByLength;
55#[cfg(debug_assertions)]
56use smallvec::{smallvec, SmallVec};
57use std::{
58    collections::HashMap,
59    pin::Pin,
60    task::{ready, Context, Poll},
61    time::Duration,
62};
63use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError};
64use tracing::{debug, trace};
65use validation::FilterOutcome;
66
67/// The type responsible for fetching missing transactions from peers.
68///
69/// This will keep track of unique transaction hashes that are currently being fetched and submits
70/// new requests on announced hashes.
71#[derive(Debug)]
72#[pin_project]
73pub struct TransactionFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
74    /// All peers with to which a [`GetPooledTransactions`] request is inflight.
75    pub active_peers: LruMap<PeerId, u8, ByLength>,
76    /// All currently active [`GetPooledTransactions`] requests.
77    ///
78    /// The set of hashes encompassed by these requests are a subset of all hashes in the fetcher.
79    /// It's disjoint from the set of hashes which are awaiting an idle fallback peer in order to
80    /// be fetched.
81    #[pin]
82    pub inflight_requests: FuturesUnordered<GetPooledTxRequestFut<N::PooledTransaction>>,
83    /// Hashes that are awaiting an idle fallback peer so they can be fetched.
84    ///
85    /// This is a subset of all hashes in the fetcher, and is disjoint from the set of hashes for
86    /// which a [`GetPooledTransactions`] request is inflight.
87    pub hashes_pending_fetch: LruCache<TxHash>,
88    /// Tracks all hashes in the transaction fetcher.
89    pub(super) hashes_fetch_inflight_and_pending_fetch: LruMap<TxHash, TxFetchMetadata, ByLength>,
90    /// Filter for valid announcement and response data.
91    pub(super) filter_valid_message: MessageFilter,
92    /// Info on capacity of the transaction fetcher.
93    pub info: TransactionFetcherInfo,
94    #[doc(hidden)]
95    metrics: TransactionFetcherMetrics,
96}
97
98impl<N: NetworkPrimitives> TransactionFetcher<N> {
99    /// Removes the peer from the active set.
100    pub(crate) fn remove_peer(&mut self, peer_id: &PeerId) {
101        self.active_peers.remove(peer_id);
102    }
103
104    /// Updates metrics.
105    #[inline]
106    pub fn update_metrics(&self) {
107        let metrics = &self.metrics;
108
109        metrics.inflight_transaction_requests.set(self.inflight_requests.len() as f64);
110
111        let hashes_pending_fetch = self.hashes_pending_fetch.len() as f64;
112        let total_hashes = self.hashes_fetch_inflight_and_pending_fetch.len() as f64;
113
114        metrics.hashes_pending_fetch.set(hashes_pending_fetch);
115        metrics.hashes_inflight_transaction_requests.set(total_hashes - hashes_pending_fetch);
116    }
117
118    #[inline]
119    fn update_pending_fetch_cache_search_metrics(&self, durations: TxFetcherSearchDurations) {
120        let metrics = &self.metrics;
121
122        let TxFetcherSearchDurations { find_idle_peer, fill_request } = durations;
123        metrics
124            .duration_find_idle_fallback_peer_for_any_pending_hash
125            .set(find_idle_peer.as_secs_f64());
126        metrics.duration_fill_request_from_hashes_pending_fetch.set(fill_request.as_secs_f64());
127    }
128
129    /// Sets up transaction fetcher with config
130    pub fn with_transaction_fetcher_config(config: &TransactionFetcherConfig) -> Self {
131        let TransactionFetcherConfig {
132            max_inflight_requests,
133            max_capacity_cache_txns_pending_fetch,
134            ..
135        } = *config;
136
137        let info = config.clone().into();
138
139        let metrics = TransactionFetcherMetrics::default();
140        metrics.capacity_inflight_requests.increment(max_inflight_requests as u64);
141
142        Self {
143            active_peers: LruMap::new(max_inflight_requests),
144            hashes_pending_fetch: LruCache::new(max_capacity_cache_txns_pending_fetch),
145            hashes_fetch_inflight_and_pending_fetch: LruMap::new(
146                max_inflight_requests + max_capacity_cache_txns_pending_fetch,
147            ),
148            info,
149            metrics,
150            ..Default::default()
151        }
152    }
153
154    /// Removes the specified hashes from inflight tracking.
155    #[inline]
156    pub fn remove_hashes_from_transaction_fetcher<I>(&mut self, hashes: I)
157    where
158        I: IntoIterator<Item = TxHash>,
159    {
160        for hash in hashes {
161            self.hashes_fetch_inflight_and_pending_fetch.remove(&hash);
162            self.hashes_pending_fetch.remove(&hash);
163        }
164    }
165
166    /// Updates peer's activity status upon a resolved [`GetPooledTxRequest`].
167    fn decrement_inflight_request_count_for(&mut self, peer_id: &PeerId) {
168        let remove = || -> bool {
169            if let Some(inflight_count) = self.active_peers.get(peer_id) {
170                *inflight_count = inflight_count.saturating_sub(1);
171                if *inflight_count == 0 {
172                    return true
173                }
174            }
175            false
176        }();
177
178        if remove {
179            self.active_peers.remove(peer_id);
180        }
181    }
182
183    /// Returns `true` if peer is idle with respect to `self.inflight_requests`.
184    pub fn is_idle(&self, peer_id: &PeerId) -> bool {
185        let Some(inflight_count) = self.active_peers.peek(peer_id) else { return true };
186        if *inflight_count < self.info.max_inflight_requests_per_peer {
187            return true
188        }
189        false
190    }
191
192    /// Returns any idle peer for the given hash.
193    pub fn get_idle_peer_for(
194        &self,
195        hash: TxHash,
196        is_session_active: impl Fn(&PeerId) -> bool,
197    ) -> Option<&PeerId> {
198        let TxFetchMetadata { fallback_peers, .. } =
199            self.hashes_fetch_inflight_and_pending_fetch.peek(&hash)?;
200
201        for peer_id in fallback_peers.iter() {
202            if self.is_idle(peer_id) && is_session_active(peer_id) {
203                return Some(peer_id)
204            }
205        }
206
207        None
208    }
209
210    /// Returns any idle peer for any hash pending fetch. If one is found, the corresponding
211    /// hash is written to the request buffer that is passed as parameter.
212    ///
213    /// Loops through the hashes pending fetch in lru order until one is found with an idle
214    /// fallback peer, or the budget passed as parameter is depleted, whatever happens first.
215    pub fn find_any_idle_fallback_peer_for_any_pending_hash(
216        &mut self,
217        hashes_to_request: &mut RequestTxHashes,
218        is_session_active: impl Fn(&PeerId) -> bool,
219        mut budget: Option<usize>, // search fallback peers for max `budget` lru pending hashes
220    ) -> Option<PeerId> {
221        let mut hashes_pending_fetch_iter = self.hashes_pending_fetch.iter();
222
223        let idle_peer = loop {
224            let &hash = hashes_pending_fetch_iter.next()?;
225
226            let idle_peer = self.get_idle_peer_for(hash, &is_session_active);
227
228            if idle_peer.is_some() {
229                hashes_to_request.insert(hash);
230                break idle_peer.copied()
231            }
232
233            if let Some(ref mut bud) = budget {
234                *bud = bud.saturating_sub(1);
235                if *bud == 0 {
236                    return None
237                }
238            }
239        };
240        let hash = hashes_to_request.iter().next()?;
241
242        // pop hash that is loaded in request buffer from cache of hashes pending fetch
243        drop(hashes_pending_fetch_iter);
244        _ = self.hashes_pending_fetch.remove(hash);
245
246        idle_peer
247    }
248
249    /// Packages hashes for a [`GetPooledTxRequest`] up to limit. Returns left over hashes. Takes
250    /// a [`RequestTxHashes`] buffer as parameter for filling with hashes to request.
251    ///
252    /// Returns left over hashes.
253    pub fn pack_request(
254        &self,
255        hashes_to_request: &mut RequestTxHashes,
256        hashes_from_announcement: ValidAnnouncementData,
257    ) -> RequestTxHashes {
258        if hashes_from_announcement.msg_version().is_eth68() {
259            return self.pack_request_eth68(hashes_to_request, hashes_from_announcement)
260        }
261        self.pack_request_eth66(hashes_to_request, hashes_from_announcement)
262    }
263
264    /// Packages hashes for a [`GetPooledTxRequest`] from an
265    /// [`Eth68`](reth_eth_wire::EthVersion::Eth68) announcement up to limit as defined by protocol
266    /// version 68. Takes a [`RequestTxHashes`] buffer as parameter for filling with hashes to
267    /// request.
268    ///
269    /// Returns left over hashes.
270    ///
271    /// Loops through hashes passed as parameter and checks if a hash fits in the expected
272    /// response. If no, it's added to surplus hashes. If yes, it's added to hashes to the request
273    /// and expected response size is accumulated.
274    pub fn pack_request_eth68(
275        &self,
276        hashes_to_request: &mut RequestTxHashes,
277        hashes_from_announcement: impl HandleMempoolData
278            + IntoIterator<Item = (TxHash, Option<(u8, usize)>)>,
279    ) -> RequestTxHashes {
280        let mut acc_size_response = 0;
281
282        let mut hashes_from_announcement_iter = hashes_from_announcement.into_iter();
283
284        if let Some((hash, Some((_ty, size)))) = hashes_from_announcement_iter.next() {
285            hashes_to_request.insert(hash);
286
287            // tx is really big, pack request with single tx
288            if size >= self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request {
289                return hashes_from_announcement_iter.collect()
290            }
291            acc_size_response = size;
292        }
293
294        let mut surplus_hashes = RequestTxHashes::default();
295
296        // folds size based on expected response size  and adds selected hashes to the request
297        // list and the other hashes to the surplus list
298        loop {
299            let Some((hash, metadata)) = hashes_from_announcement_iter.next() else { break };
300
301            let Some((_ty, size)) = metadata else {
302                unreachable!("this method is called upon reception of an eth68 announcement")
303            };
304
305            let next_acc_size = acc_size_response + size;
306
307            if next_acc_size <=
308                self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request
309            {
310                // only update accumulated size of tx response if tx will fit in without exceeding
311                // soft limit
312                acc_size_response = next_acc_size;
313                _ = hashes_to_request.insert(hash)
314            } else {
315                _ = surplus_hashes.insert(hash)
316            }
317
318            let free_space =
319                self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request -
320                    acc_size_response;
321
322            if free_space < MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED {
323                break
324            }
325        }
326
327        surplus_hashes.extend(hashes_from_announcement_iter.map(|(hash, _metadata)| hash));
328
329        surplus_hashes
330    }
331
332    /// Packages hashes for a [`GetPooledTxRequest`] from an
333    /// [`Eth66`](reth_eth_wire::EthVersion::Eth66) announcement up to limit as defined by
334    /// protocol version 66. Takes a [`RequestTxHashes`] buffer as parameter for filling with
335    /// hashes to request.
336    ///
337    /// Returns left over hashes.
338    pub fn pack_request_eth66(
339        &self,
340        hashes_to_request: &mut RequestTxHashes,
341        hashes_from_announcement: ValidAnnouncementData,
342    ) -> RequestTxHashes {
343        let (mut hashes, _version) = hashes_from_announcement.into_request_hashes();
344        if hashes.len() <= SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST {
345            *hashes_to_request = hashes;
346            hashes_to_request.shrink_to_fit();
347
348            RequestTxHashes::default()
349        } else {
350            let surplus_hashes =
351                hashes.retain_count(SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST);
352            *hashes_to_request = hashes;
353            hashes_to_request.shrink_to_fit();
354
355            surplus_hashes
356        }
357    }
358
359    /// Tries to buffer hashes for retry.
360    pub fn try_buffer_hashes_for_retry(
361        &mut self,
362        mut hashes: RequestTxHashes,
363        peer_failed_to_serve: &PeerId,
364    ) {
365        // It could be that the txns have been received over broadcast in the time being. Remove
366        // the peer as fallback peer so it isn't request again for these hashes.
367        hashes.retain(|hash| {
368            if let Some(entry) = self.hashes_fetch_inflight_and_pending_fetch.get(hash) {
369                entry.fallback_peers_mut().remove(peer_failed_to_serve);
370                return true
371            }
372            // tx has been seen over broadcast in the time it took for the request to resolve
373            false
374        });
375
376        self.buffer_hashes(hashes, None)
377    }
378
379    /// Buffers hashes. Note: Only peers that haven't yet tried to request the hashes should be
380    /// passed as `fallback_peer` parameter! For re-buffering hashes on failed request, use
381    /// [`TransactionFetcher::try_buffer_hashes_for_retry`]. Hashes that have been re-requested
382    /// [`DEFAULT_MAX_RETRIES`], are dropped.
383    pub fn buffer_hashes(&mut self, hashes: RequestTxHashes, fallback_peer: Option<PeerId>) {
384        let mut max_retried_and_evicted_hashes = vec![];
385
386        for hash in hashes {
387            // hash could have been evicted from bounded lru map
388            if self.hashes_fetch_inflight_and_pending_fetch.peek(&hash).is_none() {
389                continue
390            }
391
392            let Some(TxFetchMetadata { retries, fallback_peers, .. }) =
393                self.hashes_fetch_inflight_and_pending_fetch.get(&hash)
394            else {
395                return
396            };
397
398            if let Some(peer_id) = fallback_peer {
399                // peer has not yet requested hash
400                fallback_peers.insert(peer_id);
401            } else {
402                if *retries >= DEFAULT_MAX_RETRIES {
403                    trace!(target: "net::tx",
404                        %hash,
405                        retries,
406                        "retry limit for `GetPooledTransactions` requests reached for hash, dropping hash"
407                    );
408
409                    max_retried_and_evicted_hashes.push(hash);
410                    continue
411                }
412                *retries += 1;
413            }
414            if let (_, Some(evicted_hash)) = self.hashes_pending_fetch.insert_and_get_evicted(hash)
415            {
416                max_retried_and_evicted_hashes.push(evicted_hash);
417            }
418        }
419
420        self.remove_hashes_from_transaction_fetcher(max_retried_and_evicted_hashes);
421    }
422
423    /// Tries to request hashes pending fetch.
424    ///
425    /// Finds the first buffered hash with a fallback peer that is idle, if any. Fills the rest of
426    /// the request by checking the transactions seen by the peer against the buffer.
427    pub fn on_fetch_pending_hashes(
428        &mut self,
429        peers: &HashMap<PeerId, PeerMetadata<N>>,
430        has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
431    ) {
432        let mut hashes_to_request = RequestTxHashes::default();
433        let is_session_active = |peer_id: &PeerId| peers.contains_key(peer_id);
434
435        let mut search_durations = TxFetcherSearchDurations::default();
436
437        // budget to look for an idle peer before giving up
438        let budget_find_idle_fallback_peer = self
439            .search_breadth_budget_find_idle_fallback_peer(&has_capacity_wrt_pending_pool_imports);
440
441        let peer_id = duration_metered_exec!(
442            {
443                let Some(peer_id) = self.find_any_idle_fallback_peer_for_any_pending_hash(
444                    &mut hashes_to_request,
445                    is_session_active,
446                    budget_find_idle_fallback_peer,
447                ) else {
448                    // no peers are idle or budget is depleted
449                    return
450                };
451
452                peer_id
453            },
454            search_durations.find_idle_peer
455        );
456
457        // peer should always exist since `is_session_active` already checked
458        let Some(peer) = peers.get(&peer_id) else { return };
459        let conn_eth_version = peer.version;
460
461        // fill the request with more hashes pending fetch that have been announced by the peer.
462        // the search for more hashes is done with respect to the given budget, which determines
463        // how many hashes to loop through before giving up. if no more hashes are found wrt to
464        // the budget, the single hash that was taken out of the cache above is sent in a request.
465        let budget_fill_request = self
466            .search_breadth_budget_find_intersection_pending_hashes_and_hashes_seen_by_peer(
467                &has_capacity_wrt_pending_pool_imports,
468            );
469
470        duration_metered_exec!(
471            {
472                self.fill_request_from_hashes_pending_fetch(
473                    &mut hashes_to_request,
474                    &peer.seen_transactions,
475                    budget_fill_request,
476                )
477            },
478            search_durations.fill_request
479        );
480
481        self.update_pending_fetch_cache_search_metrics(search_durations);
482
483        trace!(target: "net::tx",
484            peer_id=format!("{peer_id:#}"),
485            hashes=?*hashes_to_request,
486            %conn_eth_version,
487            "requesting hashes that were stored pending fetch from peer"
488        );
489
490        // request the buffered missing transactions
491        if let Some(failed_to_request_hashes) =
492            self.request_transactions_from_peer(hashes_to_request, peer)
493        {
494            trace!(target: "net::tx",
495                peer_id=format!("{peer_id:#}"),
496                ?failed_to_request_hashes,
497                %conn_eth_version,
498                "failed sending request to peer's session, buffering hashes"
499            );
500
501            self.buffer_hashes(failed_to_request_hashes, Some(peer_id));
502        }
503    }
504
505    /// Filters out hashes that have been seen before. For hashes that have already been seen, the
506    /// peer is added as fallback peer.
507    pub fn filter_unseen_and_pending_hashes(
508        &mut self,
509        new_announced_hashes: &mut ValidAnnouncementData,
510        is_tx_bad_import: impl Fn(&TxHash) -> bool,
511        peer_id: &PeerId,
512        is_session_active: impl Fn(PeerId) -> bool,
513        client_version: &str,
514    ) {
515        #[cfg(not(debug_assertions))]
516        let mut previously_unseen_hashes_count = 0;
517        #[cfg(debug_assertions)]
518        let mut previously_unseen_hashes = Vec::with_capacity(new_announced_hashes.len() / 4);
519
520        let msg_version = new_announced_hashes.msg_version();
521
522        // filter out inflight hashes, and register the peer as fallback for all inflight hashes
523        new_announced_hashes.retain(|hash, metadata| {
524
525            // occupied entry
526
527            if let Some(TxFetchMetadata{ref mut fallback_peers, tx_encoded_length: ref mut previously_seen_size, ..}) = self.hashes_fetch_inflight_and_pending_fetch.peek_mut(hash) {
528                // update size metadata if available
529                if let Some((_ty, size)) = metadata {
530                    if let Some(prev_size) = previously_seen_size {
531                        // check if this peer is announcing a different size than a previous peer
532                        if size != prev_size {
533                            trace!(target: "net::tx",
534                                peer_id=format!("{peer_id:#}"),
535                                %hash,
536                                size,
537                                previously_seen_size,
538                                %client_version,
539                                "peer announced a different size for tx, this is especially worrying if one size is much bigger..."
540                            );
541                        }
542                    }
543                    // believe the most recent peer to announce tx
544                    *previously_seen_size = Some(*size);
545                }
546
547                // hash has been seen but is not inflight
548                if self.hashes_pending_fetch.remove(hash) {
549                    return true
550                }
551                // hash has been seen and is in flight. store peer as fallback peer.
552                //
553                // remove any ended sessions, so that in case of a full cache, alive peers aren't
554                // removed in favour of lru dead peers
555                let mut ended_sessions = vec![];
556                for &peer_id in fallback_peers.iter() {
557                    if is_session_active(peer_id) {
558                        ended_sessions.push(peer_id);
559                    }
560                }
561                for peer_id in ended_sessions {
562                    fallback_peers.remove(&peer_id);
563                }
564
565                return false
566            }
567
568            // vacant entry
569
570            if is_tx_bad_import(hash) {
571                return false
572            }
573
574            #[cfg(not(debug_assertions))]
575            {
576                previously_unseen_hashes_count += 1;
577            }
578            #[cfg(debug_assertions)]
579            previously_unseen_hashes.push(*hash);
580
581            if self.hashes_fetch_inflight_and_pending_fetch.get_or_insert(*hash, ||
582                TxFetchMetadata{retries: 0, fallback_peers: LruCache::new(DEFAULT_MAX_COUNT_FALLBACK_PEERS as u32), tx_encoded_length: None}
583            ).is_none() {
584
585                debug!(target: "net::tx",
586                    peer_id=format!("{peer_id:#}"),
587                    %hash,
588                    ?msg_version,
589                    %client_version,
590                    "failed to cache new announced hash from peer in schnellru::LruMap, dropping hash"
591                );
592
593                return false
594            }
595            true
596        });
597
598        #[cfg(not(debug_assertions))]
599        trace!(target: "net::tx",
600            peer_id=format!("{peer_id:#}"),
601            previously_unseen_hashes_count=previously_unseen_hashes_count,
602            msg_version=?msg_version,
603            client_version=%client_version,
604            "received previously unseen hashes in announcement from peer"
605        );
606
607        #[cfg(debug_assertions)]
608        trace!(target: "net::tx",
609            peer_id=format!("{peer_id:#}"),
610            ?msg_version,
611            %client_version,
612            previously_unseen_hashes_len=previously_unseen_hashes.len(),
613            ?previously_unseen_hashes,
614            "received previously unseen hashes in announcement from peer"
615        );
616    }
617
618    /// Requests the missing transactions from the previously unseen announced hashes of the peer.
619    /// Returns the requested hashes if the request concurrency limit is reached or if the request
620    /// fails to send over the channel to the peer's session task.
621    ///
622    /// This filters all announced hashes that are already in flight, and requests the missing,
623    /// while marking the given peer as an alternative peer for the hashes that are already in
624    /// flight.
625    pub fn request_transactions_from_peer(
626        &mut self,
627        new_announced_hashes: RequestTxHashes,
628        peer: &PeerMetadata<N>,
629    ) -> Option<RequestTxHashes> {
630        let peer_id: PeerId = peer.request_tx.peer_id;
631        let conn_eth_version = peer.version;
632
633        if self.active_peers.len() >= self.info.max_inflight_requests {
634            trace!(target: "net::tx",
635                peer_id=format!("{peer_id:#}"),
636                hashes=?*new_announced_hashes,
637                %conn_eth_version,
638                max_inflight_transaction_requests=self.info.max_inflight_requests,
639                "limit for concurrent `GetPooledTransactions` requests reached, dropping request for hashes to peer"
640            );
641            return Some(new_announced_hashes)
642        }
643
644        let Some(inflight_count) = self.active_peers.get_or_insert(peer_id, || 0) else {
645            debug!(target: "net::tx",
646                peer_id=format!("{peer_id:#}"),
647                hashes=?*new_announced_hashes,
648                conn_eth_version=%conn_eth_version,
649                "failed to cache active peer in schnellru::LruMap, dropping request to peer"
650            );
651            return Some(new_announced_hashes)
652        };
653
654        if *inflight_count >= self.info.max_inflight_requests_per_peer {
655            trace!(target: "net::tx",
656                peer_id=format!("{peer_id:#}"),
657                hashes=?*new_announced_hashes,
658                %conn_eth_version,
659                max_concurrent_tx_reqs_per_peer=self.info.max_inflight_requests_per_peer,
660                "limit for concurrent `GetPooledTransactions` requests per peer reached"
661            );
662            return Some(new_announced_hashes)
663        }
664
665        #[cfg(debug_assertions)]
666        {
667            for hash in &new_announced_hashes {
668                if self.hashes_pending_fetch.contains(hash) {
669                    debug!(target: "net::tx", "`{}` should have been taken out of buffer before packing in a request, breaks invariant `@hashes_pending_fetch` and `@inflight_requests`, `@hashes_fetch_inflight_and_pending_fetch` for `{}`: {:?}",
670                        format!("{:?}", new_announced_hashes), // Assuming new_announced_hashes can be debug-printed directly
671                        format!("{:?}", new_announced_hashes),
672                        new_announced_hashes.iter().map(|hash| {
673                            let metadata = self.hashes_fetch_inflight_and_pending_fetch.get(hash);
674                            // Assuming you only need `retries` and `tx_encoded_length` for debugging
675                            (*hash, metadata.map(|m| (m.retries, m.tx_encoded_length)))
676                        }).collect::<Vec<(TxHash, Option<(u8, Option<usize>)>)>>())
677                }
678            }
679        }
680
681        let (response, rx) = oneshot::channel();
682        let req = PeerRequest::GetPooledTransactions {
683            request: GetPooledTransactions(new_announced_hashes.iter().copied().collect()),
684            response,
685        };
686
687        // try to send the request to the peer
688        if let Err(err) = peer.request_tx.try_send(req) {
689            // peer channel is full
690            return match err {
691                TrySendError::Full(_) | TrySendError::Closed(_) => {
692                    self.metrics.egress_peer_channel_full.increment(1);
693                    Some(new_announced_hashes)
694                }
695            }
696        }
697
698        *inflight_count += 1;
699        // stores a new request future for the request
700        self.inflight_requests.push(GetPooledTxRequestFut::new(peer_id, new_announced_hashes, rx));
701
702        None
703    }
704
705    /// Tries to fill request with hashes pending fetch so that the expected [`PooledTransactions`]
706    /// response is full enough. A mutable reference to a list of hashes to request is passed as
707    /// parameter. A budget is passed as parameter, this ensures that the node stops searching
708    /// for more hashes after the budget is depleted. Under bad network conditions, the cache of
709    /// hashes pending fetch may become very full for a while. As the node recovers, the hashes
710    /// pending fetch cache should get smaller. The budget should aim to be big enough to loop
711    /// through all buffered hashes in good network conditions.
712    ///
713    /// The request hashes buffer is filled as if it's an eth68 request, i.e. smartly assemble
714    /// the request based on expected response size. For any hash missing size metadata, it is
715    /// guessed at [`AVERAGE_BYTE_SIZE_TX_ENCODED`].
716    ///
717    /// Loops through hashes pending fetch and does:
718    ///
719    /// 1. Check if a hash pending fetch is seen by peer.
720    /// 2. Optimistically include the hash in the request.
721    /// 3. Accumulate expected total response size.
722    /// 4. Check if acc size and hashes count is at limit, if so stop looping.
723    /// 5. Remove hashes to request from cache of hashes pending fetch.
724    pub fn fill_request_from_hashes_pending_fetch(
725        &mut self,
726        hashes_to_request: &mut RequestTxHashes,
727        seen_hashes: &LruCache<TxHash>,
728        mut budget_fill_request: Option<usize>, // check max `budget` lru pending hashes
729    ) {
730        let Some(hash) = hashes_to_request.iter().next() else { return };
731
732        let mut acc_size_response = self
733            .hashes_fetch_inflight_and_pending_fetch
734            .get(hash)
735            .and_then(|entry| entry.tx_encoded_len())
736            .unwrap_or(AVERAGE_BYTE_SIZE_TX_ENCODED);
737
738        // if request full enough already, we're satisfied, send request for single tx
739        if acc_size_response >=
740            DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_FETCH_PENDING_HASHES
741        {
742            return
743        }
744
745        // try to fill request by checking if any other hashes pending fetch (in lru order) are
746        // also seen by peer
747        for hash in self.hashes_pending_fetch.iter() {
748            // 1. Check if a hash pending fetch is seen by peer.
749            if !seen_hashes.contains(hash) {
750                continue
751            };
752
753            // 2. Optimistically include the hash in the request.
754            hashes_to_request.insert(*hash);
755
756            // 3. Accumulate expected total response size.
757            let size = self
758                .hashes_fetch_inflight_and_pending_fetch
759                .get(hash)
760                .and_then(|entry| entry.tx_encoded_len())
761                .unwrap_or(AVERAGE_BYTE_SIZE_TX_ENCODED);
762
763            acc_size_response += size;
764
765            // 4. Check if acc size or hashes count is at limit, if so stop looping.
766            // if expected response is full enough or the number of hashes in the request is
767            // enough, we're satisfied
768            if acc_size_response >=
769                DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_FETCH_PENDING_HASHES ||
770                hashes_to_request.len() >
771                    DEFAULT_SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST_ON_FETCH_PENDING_HASHES
772            {
773                break
774            }
775
776            if let Some(ref mut bud) = budget_fill_request {
777                *bud = bud.saturating_sub(1);
778                if *bud == 0 {
779                    return
780                }
781            }
782        }
783
784        // 5. Remove hashes to request from cache of hashes pending fetch.
785        for hash in hashes_to_request.iter() {
786            self.hashes_pending_fetch.remove(hash);
787        }
788    }
789
790    /// Returns `true` if [`TransactionFetcher`] has capacity to request pending hashes. Returns
791    /// `false` if [`TransactionFetcher`] is operating close to full capacity.
792    pub fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
793        let info = &self.info;
794
795        self.has_capacity(info.max_inflight_requests)
796    }
797
798    /// Returns `true` if the number of inflight requests are under a given tolerated max.
799    fn has_capacity(&self, max_inflight_requests: usize) -> bool {
800        self.inflight_requests.len() <= max_inflight_requests
801    }
802
803    /// Returns the limit to enforce when looking for any pending hash with an idle fallback peer.
804    ///
805    /// Returns `Some(limit)` if [`TransactionFetcher`] and the
806    /// [`TransactionPool`](reth_transaction_pool::TransactionPool) are operating close to full
807    /// capacity. Returns `None`, unlimited, if they are not that busy.
808    pub fn search_breadth_budget_find_idle_fallback_peer(
809        &self,
810        has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
811    ) -> Option<usize> {
812        let info = &self.info;
813
814        let tx_fetcher_has_capacity = self.has_capacity(
815            info.max_inflight_requests /
816                DEFAULT_DIVISOR_MAX_COUNT_INFLIGHT_REQUESTS_ON_FIND_IDLE_PEER,
817        );
818        let tx_pool_has_capacity = has_capacity_wrt_pending_pool_imports(
819            DEFAULT_DIVISOR_MAX_COUNT_PENDING_POOL_IMPORTS_ON_FIND_IDLE_PEER,
820        );
821
822        if tx_fetcher_has_capacity && tx_pool_has_capacity {
823            // unlimited search breadth
824            None
825        } else {
826            // limited breadth of search for idle peer
827            let limit = DEFAULT_BUDGET_FIND_IDLE_FALLBACK_PEER;
828
829            trace!(target: "net::tx",
830                inflight_requests=self.inflight_requests.len(),
831                max_inflight_transaction_requests=info.max_inflight_requests,
832                hashes_pending_fetch=self.hashes_pending_fetch.len(),
833                limit,
834                "search breadth limited in search for idle fallback peer for some hash pending fetch"
835            );
836
837            Some(limit)
838        }
839    }
840
841    /// Returns the limit to enforce when looking for the intersection between hashes announced by
842    /// peer and hashes pending fetch.
843    ///
844    /// Returns `Some(limit)` if [`TransactionFetcher`] and the
845    /// [`TransactionPool`](reth_transaction_pool::TransactionPool) are operating close to full
846    /// capacity. Returns `None`, unlimited, if they are not that busy.
847    pub fn search_breadth_budget_find_intersection_pending_hashes_and_hashes_seen_by_peer(
848        &self,
849        has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
850    ) -> Option<usize> {
851        let info = &self.info;
852
853        let tx_fetcher_has_capacity = self.has_capacity(
854            info.max_inflight_requests /
855                DEFAULT_DIVISOR_MAX_COUNT_INFLIGHT_REQUESTS_ON_FIND_INTERSECTION,
856        );
857        let tx_pool_has_capacity = has_capacity_wrt_pending_pool_imports(
858            DEFAULT_DIVISOR_MAX_COUNT_PENDING_POOL_IMPORTS_ON_FIND_INTERSECTION,
859        );
860
861        if tx_fetcher_has_capacity && tx_pool_has_capacity {
862            // unlimited search breadth
863            None
864        } else {
865            // limited breadth of search for idle peer
866            let limit = DEFAULT_BUDGET_FIND_INTERSECTION_ANNOUNCED_BY_PEER_AND_PENDING_FETCH;
867
868            trace!(target: "net::tx",
869                inflight_requests=self.inflight_requests.len(),
870                max_inflight_transaction_requests=self.info.max_inflight_requests,
871                hashes_pending_fetch=self.hashes_pending_fetch.len(),
872                limit=limit,
873                "search breadth limited in search for intersection of hashes announced by peer and hashes pending fetch"
874            );
875
876            Some(limit)
877        }
878    }
879
880    /// Returns the approx number of transactions that a [`GetPooledTransactions`] request will
881    /// have capacity for w.r.t. the given version of the protocol.
882    pub const fn approx_capacity_get_pooled_transactions_req(
883        &self,
884        announcement_version: EthVersion,
885    ) -> usize {
886        if announcement_version.is_eth68() {
887            approx_capacity_get_pooled_transactions_req_eth68(&self.info)
888        } else {
889            approx_capacity_get_pooled_transactions_req_eth66()
890        }
891    }
892
893    /// Processes a resolved [`GetPooledTransactions`] request. Queues the outcome as a
894    /// [`FetchEvent`], which will then be streamed by
895    /// [`TransactionsManager`](super::TransactionsManager).
896    pub fn on_resolved_get_pooled_transactions_request_fut(
897        &mut self,
898        response: GetPooledTxResponse<N::PooledTransaction>,
899    ) -> FetchEvent<N::PooledTransaction> {
900        // update peer activity, requests for buffered hashes can only be made to idle
901        // fallback peers
902        let GetPooledTxResponse { peer_id, mut requested_hashes, result } = response;
903
904        self.decrement_inflight_request_count_for(&peer_id);
905
906        match result {
907            Ok(Ok(transactions)) => {
908                //
909                // 1. peer has failed to serve any of the hashes it has announced to us that we,
910                // as a follow, have requested
911                //
912                if transactions.is_empty() {
913                    trace!(target: "net::tx",
914                        peer_id=format!("{peer_id:#}"),
915                        requested_hashes_len=requested_hashes.len(),
916                        "received empty `PooledTransactions` response from peer, peer failed to serve hashes it announced"
917                    );
918
919                    return FetchEvent::EmptyResponse { peer_id }
920                }
921
922                //
923                // 2. filter out hashes that we didn't request
924                //
925                let payload = UnverifiedPooledTransactions::new(transactions);
926
927                let unverified_len = payload.len();
928                let (verification_outcome, verified_payload) =
929                    payload.verify(&requested_hashes, &peer_id);
930
931                let unsolicited = unverified_len - verified_payload.len();
932                if unsolicited > 0 {
933                    self.metrics.unsolicited_transactions.increment(unsolicited as u64);
934                }
935                if verification_outcome == VerificationOutcome::ReportPeer {
936                    // todo: report peer for sending hashes that weren't requested
937                    trace!(target: "net::tx",
938                        peer_id=format!("{peer_id:#}"),
939                        unverified_len,
940                        verified_payload_len=verified_payload.len(),
941                        "received `PooledTransactions` response from peer with entries that didn't verify against request, filtered out transactions"
942                    );
943                }
944                // peer has only sent hashes that we didn't request
945                if verified_payload.is_empty() {
946                    return FetchEvent::FetchError { peer_id, error: RequestError::BadResponse }
947                }
948
949                //
950                // 3. stateless validation of payload, e.g. dedup
951                //
952                let unvalidated_payload_len = verified_payload.len();
953
954                let (validation_outcome, valid_payload) =
955                    self.filter_valid_message.partially_filter_valid_entries(verified_payload);
956
957                // todo: validate based on announced tx size/type and report peer for sending
958                // invalid response <https://github.com/paradigmxyz/reth/issues/6529>. requires
959                // passing the rlp encoded length down from active session along with the decoded
960                // tx.
961
962                if validation_outcome == FilterOutcome::ReportPeer {
963                    trace!(target: "net::tx",
964                        peer_id=format!("{peer_id:#}"),
965                        unvalidated_payload_len,
966                        valid_payload_len=valid_payload.len(),
967                        "received invalid `PooledTransactions` response from peer, filtered out duplicate entries"
968                    );
969                }
970                // valid payload will have at least one transaction at this point. even if the tx
971                // size/type announced by the peer is different to the actual tx size/type, pass on
972                // to pending pool imports pipeline for validation.
973
974                //
975                // 4. clear received hashes
976                //
977                let requested_hashes_len = requested_hashes.len();
978                let mut fetched = Vec::with_capacity(valid_payload.len());
979                requested_hashes.retain(|requested_hash| {
980                    if valid_payload.contains_key(requested_hash) {
981                        // hash is now known, stop tracking
982                        fetched.push(*requested_hash);
983                        return false
984                    }
985                    true
986                });
987                fetched.shrink_to_fit();
988                self.metrics.fetched_transactions.increment(fetched.len() as u64);
989
990                if fetched.len() < requested_hashes_len {
991                    trace!(target: "net::tx",
992                        peer_id=format!("{peer_id:#}"),
993                        requested_hashes_len=requested_hashes_len,
994                        fetched_len=fetched.len(),
995                        "peer failed to serve hashes it announced"
996                    );
997                }
998
999                //
1000                // 5. buffer left over hashes
1001                //
1002                self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
1003
1004                let transactions = valid_payload.into_data().into_values().collect();
1005
1006                FetchEvent::TransactionsFetched { peer_id, transactions }
1007            }
1008            Ok(Err(req_err)) => {
1009                self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
1010                FetchEvent::FetchError { peer_id, error: req_err }
1011            }
1012            Err(_) => {
1013                self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
1014                // request channel closed/dropped
1015                FetchEvent::FetchError { peer_id, error: RequestError::ChannelClosed }
1016            }
1017        }
1018    }
1019}
1020
1021impl<N: NetworkPrimitives> Stream for TransactionFetcher<N> {
1022    type Item = FetchEvent<N::PooledTransaction>;
1023
1024    /// Advances all inflight requests and returns the next event.
1025    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1026        // `FuturesUnordered` doesn't close when `None` is returned. so just return pending.
1027        // <https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=815be2b6c8003303757c3ced135f363e>
1028        if self.inflight_requests.is_empty() {
1029            return Poll::Pending
1030        }
1031
1032        if let Some(resp) = ready!(self.inflight_requests.poll_next_unpin(cx)) {
1033            return Poll::Ready(Some(self.on_resolved_get_pooled_transactions_request_fut(resp)))
1034        }
1035
1036        Poll::Pending
1037    }
1038}
1039
1040impl<T: NetworkPrimitives> Default for TransactionFetcher<T> {
1041    fn default() -> Self {
1042        Self {
1043            active_peers: LruMap::new(DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS),
1044            inflight_requests: Default::default(),
1045            hashes_pending_fetch: LruCache::new(DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH),
1046            hashes_fetch_inflight_and_pending_fetch: LruMap::new(
1047                DEFAULT_MAX_CAPACITY_CACHE_INFLIGHT_AND_PENDING_FETCH,
1048            ),
1049            filter_valid_message: Default::default(),
1050            info: TransactionFetcherInfo::default(),
1051            metrics: Default::default(),
1052        }
1053    }
1054}
1055
1056/// Metadata of a transaction hash that is yet to be fetched.
1057#[derive(Debug, Constructor)]
1058pub struct TxFetchMetadata {
1059    /// The number of times a request attempt has been made for the hash.
1060    retries: u8,
1061    /// Peers that have announced the hash, but to which a request attempt has not yet been made.
1062    fallback_peers: LruCache<PeerId>,
1063    /// Size metadata of the transaction if it has been seen in an eth68 announcement.
1064    // todo: store all seen sizes as a `(size, peer_id)` tuple to catch peers that respond with
1065    // another size tx than they announced. alt enter in request (won't catch peers announcing
1066    // wrong size for requests assembled from hashes pending fetch if stored in request fut)
1067    tx_encoded_length: Option<usize>,
1068}
1069
1070impl TxFetchMetadata {
1071    /// Returns a mutable reference to the fallback peers cache for this transaction hash.
1072    pub fn fallback_peers_mut(&mut self) -> &mut LruCache<PeerId> {
1073        &mut self.fallback_peers
1074    }
1075
1076    /// Returns the size of the transaction, if its hash has been received in any
1077    /// [`Eth68`](reth_eth_wire::EthVersion::Eth68) announcement. If the transaction hash has only
1078    /// been seen in [`Eth66`](reth_eth_wire::EthVersion::Eth66) announcements so far, this will
1079    /// return `None`.
1080    pub const fn tx_encoded_len(&self) -> Option<usize> {
1081        self.tx_encoded_length
1082    }
1083}
1084
1085/// Represents possible events from fetching transactions.
1086#[derive(Debug)]
1087pub enum FetchEvent<T = PooledTransactionsElement> {
1088    /// Triggered when transactions are successfully fetched.
1089    TransactionsFetched {
1090        /// The ID of the peer from which transactions were fetched.
1091        peer_id: PeerId,
1092        /// The transactions that were fetched, if available.
1093        transactions: PooledTransactions<T>,
1094    },
1095    /// Triggered when there is an error in fetching transactions.
1096    FetchError {
1097        /// The ID of the peer from which an attempt to fetch transactions resulted in an error.
1098        peer_id: PeerId,
1099        /// The specific error that occurred while fetching.
1100        error: RequestError,
1101    },
1102    /// An empty response was received.
1103    EmptyResponse {
1104        /// The ID of the sender.
1105        peer_id: PeerId,
1106    },
1107}
1108
1109/// An inflight request for [`PooledTransactions`] from a peer.
1110#[derive(Debug)]
1111pub struct GetPooledTxRequest<T = PooledTransactionsElement> {
1112    peer_id: PeerId,
1113    /// Transaction hashes that were requested, for cleanup purposes
1114    requested_hashes: RequestTxHashes,
1115    response: oneshot::Receiver<RequestResult<PooledTransactions<T>>>,
1116}
1117
1118/// Upon reception of a response, a [`GetPooledTxRequest`] is deconstructed to form a
1119/// [`GetPooledTxResponse`].
1120#[derive(Debug)]
1121pub struct GetPooledTxResponse<T = PooledTransactionsElement> {
1122    peer_id: PeerId,
1123    /// Transaction hashes that were requested, for cleanup purposes, since peer may only return a
1124    /// subset of requested hashes.
1125    requested_hashes: RequestTxHashes,
1126    result: Result<RequestResult<PooledTransactions<T>>, RecvError>,
1127}
1128
1129/// Stores the response receiver made by sending a [`GetPooledTransactions`] request to a peer's
1130/// session.
1131#[must_use = "futures do nothing unless polled"]
1132#[pin_project::pin_project]
1133#[derive(Debug)]
1134pub struct GetPooledTxRequestFut<T = PooledTransactionsElement> {
1135    #[pin]
1136    inner: Option<GetPooledTxRequest<T>>,
1137}
1138
1139impl<T> GetPooledTxRequestFut<T> {
1140    #[inline]
1141    const fn new(
1142        peer_id: PeerId,
1143        requested_hashes: RequestTxHashes,
1144        response: oneshot::Receiver<RequestResult<PooledTransactions<T>>>,
1145    ) -> Self {
1146        Self { inner: Some(GetPooledTxRequest { peer_id, requested_hashes, response }) }
1147    }
1148}
1149
1150impl<T> Future for GetPooledTxRequestFut<T> {
1151    type Output = GetPooledTxResponse<T>;
1152
1153    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1154        let mut req = self.as_mut().project().inner.take().expect("polled after completion");
1155        match req.response.poll_unpin(cx) {
1156            Poll::Ready(result) => Poll::Ready(GetPooledTxResponse {
1157                peer_id: req.peer_id,
1158                requested_hashes: req.requested_hashes,
1159                result,
1160            }),
1161            Poll::Pending => {
1162                self.project().inner.set(Some(req));
1163                Poll::Pending
1164            }
1165        }
1166    }
1167}
1168
1169/// Wrapper of unverified [`PooledTransactions`].
1170#[derive(Debug, Constructor, Deref)]
1171pub struct UnverifiedPooledTransactions<T> {
1172    txns: PooledTransactions<T>,
1173}
1174
1175/// [`PooledTransactions`] that have been successfully verified.
1176#[derive(Debug, Constructor, Deref)]
1177pub struct VerifiedPooledTransactions<T> {
1178    txns: PooledTransactions<T>,
1179}
1180
1181impl<T: SignedTransaction> DedupPayload for VerifiedPooledTransactions<T> {
1182    type Value = T;
1183
1184    fn is_empty(&self) -> bool {
1185        self.txns.is_empty()
1186    }
1187
1188    fn len(&self) -> usize {
1189        self.txns.len()
1190    }
1191
1192    fn dedup(self) -> PartiallyValidData<Self::Value> {
1193        PartiallyValidData::from_raw_data(
1194            self.txns.into_iter().map(|tx| (*tx.tx_hash(), tx)).collect(),
1195            None,
1196        )
1197    }
1198}
1199
1200trait VerifyPooledTransactionsResponse {
1201    type Transaction: SignedTransaction;
1202
1203    fn verify(
1204        self,
1205        requested_hashes: &RequestTxHashes,
1206        peer_id: &PeerId,
1207    ) -> (VerificationOutcome, VerifiedPooledTransactions<Self::Transaction>);
1208}
1209
1210impl<T: SignedTransaction> VerifyPooledTransactionsResponse for UnverifiedPooledTransactions<T> {
1211    type Transaction = T;
1212
1213    fn verify(
1214        self,
1215        requested_hashes: &RequestTxHashes,
1216        _peer_id: &PeerId,
1217    ) -> (VerificationOutcome, VerifiedPooledTransactions<T>) {
1218        let mut verification_outcome = VerificationOutcome::Ok;
1219
1220        let Self { mut txns } = self;
1221
1222        #[cfg(debug_assertions)]
1223        let mut tx_hashes_not_requested: SmallVec<[TxHash; 16]> = smallvec!();
1224        #[cfg(not(debug_assertions))]
1225        let mut tx_hashes_not_requested_count = 0;
1226
1227        txns.0.retain(|tx| {
1228            if !requested_hashes.contains(tx.tx_hash()) {
1229                verification_outcome = VerificationOutcome::ReportPeer;
1230
1231                #[cfg(debug_assertions)]
1232                tx_hashes_not_requested.push(*tx.tx_hash());
1233                #[cfg(not(debug_assertions))]
1234                {
1235                    tx_hashes_not_requested_count += 1;
1236                }
1237
1238                return false
1239            }
1240            true
1241        });
1242
1243        #[cfg(debug_assertions)]
1244        if !tx_hashes_not_requested.is_empty() {
1245            trace!(target: "net::tx",
1246                peer_id=format!("{_peer_id:#}"),
1247                ?tx_hashes_not_requested,
1248                "transactions in `PooledTransactions` response from peer were not requested"
1249            );
1250        }
1251        #[cfg(not(debug_assertions))]
1252        if tx_hashes_not_requested_count != 0 {
1253            trace!(target: "net::tx",
1254                peer_id=format!("{_peer_id:#}"),
1255                tx_hashes_not_requested_count,
1256                "transactions in `PooledTransactions` response from peer were not requested"
1257            );
1258        }
1259
1260        (verification_outcome, VerifiedPooledTransactions::new(txns))
1261    }
1262}
1263
1264/// Outcome from verifying a [`PooledTransactions`] response. Signals to caller whether to penalize
1265/// the sender of the response or not.
1266#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1267pub enum VerificationOutcome {
1268    /// Peer behaves appropriately.
1269    Ok,
1270    /// A penalty should be flagged for the peer. Peer sent a response with unacceptably
1271    /// invalid entries.
1272    ReportPeer,
1273}
1274
1275/// Tracks stats about the [`TransactionFetcher`].
1276#[derive(Debug, Constructor)]
1277pub struct TransactionFetcherInfo {
1278    /// Max inflight [`GetPooledTransactions`] requests.
1279    pub max_inflight_requests: usize,
1280    /// Max inflight [`GetPooledTransactions`] requests per peer.
1281    pub max_inflight_requests_per_peer: u8,
1282    /// Soft limit for the byte size of the expected [`PooledTransactions`] response, upon packing
1283    /// a [`GetPooledTransactions`] request with hashes (by default less than 2 MiB worth of
1284    /// transactions is requested).
1285    pub soft_limit_byte_size_pooled_transactions_response_on_pack_request: usize,
1286    /// Soft limit for the byte size of a [`PooledTransactions`] response, upon assembling the
1287    /// response. Spec'd at 2 MiB, but can be adjusted for research purpose.
1288    pub soft_limit_byte_size_pooled_transactions_response: usize,
1289    /// Max capacity of the cache of transaction hashes, for transactions that weren't yet fetched.
1290    /// A transaction is pending fetch if its hash didn't fit into a [`GetPooledTransactions`] yet,
1291    /// or it wasn't returned upon request to peers.
1292    pub max_capacity_cache_txns_pending_fetch: u32,
1293}
1294
1295impl Default for TransactionFetcherInfo {
1296    fn default() -> Self {
1297        Self::new(
1298            DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS as usize,
1299            DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER,
1300            DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
1301            SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
1302            DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH,
1303        )
1304    }
1305}
1306
1307impl From<TransactionFetcherConfig> for TransactionFetcherInfo {
1308    fn from(config: TransactionFetcherConfig) -> Self {
1309        let TransactionFetcherConfig {
1310            max_inflight_requests,
1311            max_inflight_requests_per_peer,
1312            soft_limit_byte_size_pooled_transactions_response,
1313            soft_limit_byte_size_pooled_transactions_response_on_pack_request,
1314            max_capacity_cache_txns_pending_fetch,
1315        } = config;
1316
1317        Self::new(
1318            max_inflight_requests as usize,
1319            max_inflight_requests_per_peer,
1320            soft_limit_byte_size_pooled_transactions_response_on_pack_request,
1321            soft_limit_byte_size_pooled_transactions_response,
1322            max_capacity_cache_txns_pending_fetch,
1323        )
1324    }
1325}
1326
1327#[derive(Debug, Default)]
1328struct TxFetcherSearchDurations {
1329    find_idle_peer: Duration,
1330    fill_request: Duration,
1331}
1332
1333#[cfg(test)]
1334mod test {
1335    use super::*;
1336    use crate::transactions::tests::{default_cache, new_mock_session};
1337    use alloy_primitives::{hex, B256};
1338    use alloy_rlp::Decodable;
1339    use derive_more::IntoIterator;
1340    use reth_primitives::TransactionSigned;
1341    use std::{collections::HashSet, str::FromStr};
1342
1343    #[derive(IntoIterator)]
1344    struct TestValidAnnouncementData(Vec<(TxHash, Option<(u8, usize)>)>);
1345
1346    impl HandleMempoolData for TestValidAnnouncementData {
1347        fn is_empty(&self) -> bool {
1348            self.0.is_empty()
1349        }
1350
1351        fn len(&self) -> usize {
1352            self.0.len()
1353        }
1354
1355        fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) {
1356            self.0.retain(|(hash, _)| f(hash))
1357        }
1358    }
1359
1360    impl HandleVersionedMempoolData for TestValidAnnouncementData {
1361        fn msg_version(&self) -> EthVersion {
1362            EthVersion::Eth68
1363        }
1364    }
1365
1366    #[test]
1367    fn pack_eth68_request() {
1368        reth_tracing::init_test_tracing();
1369
1370        // RIG TEST
1371
1372        let tx_fetcher = &mut TransactionFetcher::<EthNetworkPrimitives>::default();
1373
1374        let eth68_hashes = [
1375            B256::from_slice(&[1; 32]),
1376            B256::from_slice(&[2; 32]),
1377            B256::from_slice(&[3; 32]),
1378            B256::from_slice(&[4; 32]),
1379            B256::from_slice(&[5; 32]),
1380        ];
1381        let eth68_sizes = [
1382            DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ - MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED - 1, // first will fit
1383            DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ, // second won't
1384            2, // free space > `MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED`, third will fit, no more after this
1385            9,
1386            0,
1387        ];
1388
1389        let expected_request_hashes =
1390            [eth68_hashes[0], eth68_hashes[2]].into_iter().collect::<HashSet<_>>();
1391
1392        let expected_surplus_hashes =
1393            [eth68_hashes[1], eth68_hashes[3], eth68_hashes[4]].into_iter().collect::<HashSet<_>>();
1394
1395        let mut eth68_hashes_to_request = RequestTxHashes::with_capacity(3);
1396
1397        let valid_announcement_data = TestValidAnnouncementData(
1398            eth68_hashes
1399                .into_iter()
1400                .zip(eth68_sizes)
1401                .map(|(hash, size)| (hash, Some((0u8, size))))
1402                .collect::<Vec<_>>(),
1403        );
1404
1405        // TEST
1406
1407        let surplus_eth68_hashes =
1408            tx_fetcher.pack_request_eth68(&mut eth68_hashes_to_request, valid_announcement_data);
1409
1410        let eth68_hashes_to_request = eth68_hashes_to_request.into_iter().collect::<HashSet<_>>();
1411        let surplus_eth68_hashes = surplus_eth68_hashes.into_iter().collect::<HashSet<_>>();
1412
1413        assert_eq!(expected_request_hashes, eth68_hashes_to_request);
1414        assert_eq!(expected_surplus_hashes, surplus_eth68_hashes);
1415    }
1416
1417    #[tokio::test]
1418    async fn test_on_fetch_pending_hashes() {
1419        reth_tracing::init_test_tracing();
1420
1421        let tx_fetcher = &mut TransactionFetcher::default();
1422
1423        // RIG TEST
1424
1425        // hashes that will be fetched because they are stored as pending fetch
1426        let seen_hashes = [
1427            B256::from_slice(&[1; 32]),
1428            B256::from_slice(&[2; 32]),
1429            B256::from_slice(&[3; 32]),
1430            B256::from_slice(&[4; 32]),
1431        ];
1432        //
1433        // txns 1-3 are small, all will fit in request. no metadata has been made available for
1434        // hash 4, it has only been seen over eth66 conn, so average tx size will be assumed in
1435        // filling request.
1436        let seen_eth68_hashes_sizes = [120, 158, 116];
1437
1438        // peer that will fetch seen hashes because they are pending fetch
1439        let peer_1 = PeerId::new([1; 64]);
1440        // second peer, won't do anything in this test
1441        let peer_2 = PeerId::new([2; 64]);
1442
1443        // add seen hashes to peers seen transactions
1444        //
1445        // get handle for peer_1's session to receive request for pending hashes
1446        let (mut peer_1_data, mut peer_1_mock_session_rx) =
1447            new_mock_session(peer_1, EthVersion::Eth66);
1448        for hash in &seen_hashes {
1449            peer_1_data.seen_transactions.insert(*hash);
1450        }
1451        let (mut peer_2_data, _) = new_mock_session(peer_2, EthVersion::Eth66);
1452        for hash in &seen_hashes {
1453            peer_2_data.seen_transactions.insert(*hash);
1454        }
1455        let mut peers = HashMap::default();
1456        peers.insert(peer_1, peer_1_data);
1457        peers.insert(peer_2, peer_2_data);
1458
1459        let mut backups = default_cache();
1460        backups.insert(peer_2);
1461        // insert seen_hashes into tx fetcher
1462        for i in 0..3 {
1463            // insert peer_2 as fallback peer for seen_hashes
1464            let mut backups = default_cache();
1465            backups.insert(peer_2);
1466            let meta = TxFetchMetadata::new(0, backups, Some(seen_eth68_hashes_sizes[i]));
1467            tx_fetcher.hashes_fetch_inflight_and_pending_fetch.insert(seen_hashes[i], meta);
1468        }
1469        let meta = TxFetchMetadata::new(0, backups, None);
1470        tx_fetcher.hashes_fetch_inflight_and_pending_fetch.insert(seen_hashes[3], meta);
1471
1472        let mut backups = default_cache();
1473        backups.insert(peer_2);
1474        // insert pending hash without peer_1 as fallback peer, only with peer_2 as fallback peer
1475        let hash_other = B256::from_slice(&[5; 32]);
1476        tx_fetcher
1477            .hashes_fetch_inflight_and_pending_fetch
1478            .insert(hash_other, TxFetchMetadata::new(0, backups, None));
1479        tx_fetcher.hashes_pending_fetch.insert(hash_other);
1480
1481        // add peer_1 as lru fallback peer for seen hashes
1482        for hash in &seen_hashes {
1483            tx_fetcher
1484                .hashes_fetch_inflight_and_pending_fetch
1485                .get(hash)
1486                .unwrap()
1487                .fallback_peers_mut()
1488                .insert(peer_1);
1489        }
1490
1491        // mark seen hashes as pending fetch
1492        for hash in &seen_hashes {
1493            tx_fetcher.hashes_pending_fetch.insert(*hash);
1494        }
1495
1496        // seen hashes and the random hash from peer_2 are pending fetch
1497        assert_eq!(tx_fetcher.hashes_pending_fetch.len(), 5);
1498
1499        // TEST
1500
1501        tx_fetcher.on_fetch_pending_hashes(&peers, |_| true);
1502
1503        // mock session of peer_1 receives request
1504        let req = peer_1_mock_session_rx
1505            .recv()
1506            .await
1507            .expect("peer session should receive request with buffered hashes");
1508        let PeerRequest::GetPooledTransactions { request, .. } = req else { unreachable!() };
1509        let GetPooledTransactions(requested_hashes) = request;
1510
1511        assert_eq!(
1512            requested_hashes.into_iter().collect::<HashSet<_>>(),
1513            seen_hashes.into_iter().collect::<HashSet<_>>()
1514        )
1515    }
1516
1517    #[test]
1518    fn verify_response_hashes() {
1519        let input = hex!("02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598daa");
1520        let signed_tx_1: PooledTransactionsElement =
1521            TransactionSigned::decode(&mut &input[..]).unwrap().try_into().unwrap();
1522        let input = hex!("02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76");
1523        let signed_tx_2: PooledTransactionsElement =
1524            TransactionSigned::decode(&mut &input[..]).unwrap().try_into().unwrap();
1525
1526        // only tx 1 is requested
1527        let request_hashes = [
1528            B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e67890")
1529                .unwrap(),
1530            *signed_tx_1.hash(),
1531            B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e12345")
1532                .unwrap(),
1533            B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4edabe3")
1534                .unwrap(),
1535        ];
1536
1537        for hash in &request_hashes {
1538            assert_ne!(hash, signed_tx_2.hash())
1539        }
1540
1541        let request_hashes =
1542            RequestTxHashes::new(request_hashes.into_iter().collect::<HashSet<_>>());
1543
1544        // but response contains tx 1 + another tx
1545        let response_txns = PooledTransactions(vec![signed_tx_1.clone(), signed_tx_2]);
1546        let payload = UnverifiedPooledTransactions::new(response_txns);
1547
1548        let (outcome, verified_payload) = payload.verify(&request_hashes, &PeerId::ZERO);
1549
1550        assert_eq!(VerificationOutcome::ReportPeer, outcome);
1551        assert_eq!(1, verified_payload.len());
1552        assert!(verified_payload.contains(&signed_tx_1));
1553    }
1554}