1use super::{
29 config::TransactionFetcherConfig,
30 constants::{tx_fetcher::*, SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST},
31 MessageFilter, PeerMetadata, PooledTransactions,
32 SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
33};
34use crate::{
35 cache::{LruCache, LruMap},
36 duration_metered_exec,
37 metrics::TransactionFetcherMetrics,
38 transactions::{validation, PartiallyFilterMessage},
39};
40use alloy_primitives::TxHash;
41use derive_more::{Constructor, Deref};
42use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt};
43use pin_project::pin_project;
44use reth_eth_wire::{
45 DedupPayload, EthVersion, GetPooledTransactions, HandleMempoolData, HandleVersionedMempoolData,
46 PartiallyValidData, RequestTxHashes, ValidAnnouncementData,
47};
48use reth_eth_wire_types::{EthNetworkPrimitives, NetworkPrimitives};
49use reth_network_api::PeerRequest;
50use reth_network_p2p::error::{RequestError, RequestResult};
51use reth_network_peers::PeerId;
52use reth_primitives::PooledTransactionsElement;
53use reth_primitives_traits::SignedTransaction;
54use schnellru::ByLength;
55#[cfg(debug_assertions)]
56use smallvec::{smallvec, SmallVec};
57use std::{
58 collections::HashMap,
59 pin::Pin,
60 task::{ready, Context, Poll},
61 time::Duration,
62};
63use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError};
64use tracing::{debug, trace};
65use validation::FilterOutcome;
66
67#[derive(Debug)]
72#[pin_project]
73pub struct TransactionFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
74 pub active_peers: LruMap<PeerId, u8, ByLength>,
76 #[pin]
82 pub inflight_requests: FuturesUnordered<GetPooledTxRequestFut<N::PooledTransaction>>,
83 pub hashes_pending_fetch: LruCache<TxHash>,
88 pub(super) hashes_fetch_inflight_and_pending_fetch: LruMap<TxHash, TxFetchMetadata, ByLength>,
90 pub(super) filter_valid_message: MessageFilter,
92 pub info: TransactionFetcherInfo,
94 #[doc(hidden)]
95 metrics: TransactionFetcherMetrics,
96}
97
98impl<N: NetworkPrimitives> TransactionFetcher<N> {
99 pub(crate) fn remove_peer(&mut self, peer_id: &PeerId) {
101 self.active_peers.remove(peer_id);
102 }
103
104 #[inline]
106 pub fn update_metrics(&self) {
107 let metrics = &self.metrics;
108
109 metrics.inflight_transaction_requests.set(self.inflight_requests.len() as f64);
110
111 let hashes_pending_fetch = self.hashes_pending_fetch.len() as f64;
112 let total_hashes = self.hashes_fetch_inflight_and_pending_fetch.len() as f64;
113
114 metrics.hashes_pending_fetch.set(hashes_pending_fetch);
115 metrics.hashes_inflight_transaction_requests.set(total_hashes - hashes_pending_fetch);
116 }
117
118 #[inline]
119 fn update_pending_fetch_cache_search_metrics(&self, durations: TxFetcherSearchDurations) {
120 let metrics = &self.metrics;
121
122 let TxFetcherSearchDurations { find_idle_peer, fill_request } = durations;
123 metrics
124 .duration_find_idle_fallback_peer_for_any_pending_hash
125 .set(find_idle_peer.as_secs_f64());
126 metrics.duration_fill_request_from_hashes_pending_fetch.set(fill_request.as_secs_f64());
127 }
128
129 pub fn with_transaction_fetcher_config(config: &TransactionFetcherConfig) -> Self {
131 let TransactionFetcherConfig {
132 max_inflight_requests,
133 max_capacity_cache_txns_pending_fetch,
134 ..
135 } = *config;
136
137 let info = config.clone().into();
138
139 let metrics = TransactionFetcherMetrics::default();
140 metrics.capacity_inflight_requests.increment(max_inflight_requests as u64);
141
142 Self {
143 active_peers: LruMap::new(max_inflight_requests),
144 hashes_pending_fetch: LruCache::new(max_capacity_cache_txns_pending_fetch),
145 hashes_fetch_inflight_and_pending_fetch: LruMap::new(
146 max_inflight_requests + max_capacity_cache_txns_pending_fetch,
147 ),
148 info,
149 metrics,
150 ..Default::default()
151 }
152 }
153
154 #[inline]
156 pub fn remove_hashes_from_transaction_fetcher<I>(&mut self, hashes: I)
157 where
158 I: IntoIterator<Item = TxHash>,
159 {
160 for hash in hashes {
161 self.hashes_fetch_inflight_and_pending_fetch.remove(&hash);
162 self.hashes_pending_fetch.remove(&hash);
163 }
164 }
165
166 fn decrement_inflight_request_count_for(&mut self, peer_id: &PeerId) {
168 let remove = || -> bool {
169 if let Some(inflight_count) = self.active_peers.get(peer_id) {
170 *inflight_count = inflight_count.saturating_sub(1);
171 if *inflight_count == 0 {
172 return true
173 }
174 }
175 false
176 }();
177
178 if remove {
179 self.active_peers.remove(peer_id);
180 }
181 }
182
183 pub fn is_idle(&self, peer_id: &PeerId) -> bool {
185 let Some(inflight_count) = self.active_peers.peek(peer_id) else { return true };
186 if *inflight_count < self.info.max_inflight_requests_per_peer {
187 return true
188 }
189 false
190 }
191
192 pub fn get_idle_peer_for(
194 &self,
195 hash: TxHash,
196 is_session_active: impl Fn(&PeerId) -> bool,
197 ) -> Option<&PeerId> {
198 let TxFetchMetadata { fallback_peers, .. } =
199 self.hashes_fetch_inflight_and_pending_fetch.peek(&hash)?;
200
201 for peer_id in fallback_peers.iter() {
202 if self.is_idle(peer_id) && is_session_active(peer_id) {
203 return Some(peer_id)
204 }
205 }
206
207 None
208 }
209
210 pub fn find_any_idle_fallback_peer_for_any_pending_hash(
216 &mut self,
217 hashes_to_request: &mut RequestTxHashes,
218 is_session_active: impl Fn(&PeerId) -> bool,
219 mut budget: Option<usize>, ) -> Option<PeerId> {
221 let mut hashes_pending_fetch_iter = self.hashes_pending_fetch.iter();
222
223 let idle_peer = loop {
224 let &hash = hashes_pending_fetch_iter.next()?;
225
226 let idle_peer = self.get_idle_peer_for(hash, &is_session_active);
227
228 if idle_peer.is_some() {
229 hashes_to_request.insert(hash);
230 break idle_peer.copied()
231 }
232
233 if let Some(ref mut bud) = budget {
234 *bud = bud.saturating_sub(1);
235 if *bud == 0 {
236 return None
237 }
238 }
239 };
240 let hash = hashes_to_request.iter().next()?;
241
242 drop(hashes_pending_fetch_iter);
244 _ = self.hashes_pending_fetch.remove(hash);
245
246 idle_peer
247 }
248
249 pub fn pack_request(
254 &self,
255 hashes_to_request: &mut RequestTxHashes,
256 hashes_from_announcement: ValidAnnouncementData,
257 ) -> RequestTxHashes {
258 if hashes_from_announcement.msg_version().is_eth68() {
259 return self.pack_request_eth68(hashes_to_request, hashes_from_announcement)
260 }
261 self.pack_request_eth66(hashes_to_request, hashes_from_announcement)
262 }
263
264 pub fn pack_request_eth68(
275 &self,
276 hashes_to_request: &mut RequestTxHashes,
277 hashes_from_announcement: impl HandleMempoolData
278 + IntoIterator<Item = (TxHash, Option<(u8, usize)>)>,
279 ) -> RequestTxHashes {
280 let mut acc_size_response = 0;
281
282 let mut hashes_from_announcement_iter = hashes_from_announcement.into_iter();
283
284 if let Some((hash, Some((_ty, size)))) = hashes_from_announcement_iter.next() {
285 hashes_to_request.insert(hash);
286
287 if size >= self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request {
289 return hashes_from_announcement_iter.collect()
290 }
291 acc_size_response = size;
292 }
293
294 let mut surplus_hashes = RequestTxHashes::default();
295
296 loop {
299 let Some((hash, metadata)) = hashes_from_announcement_iter.next() else { break };
300
301 let Some((_ty, size)) = metadata else {
302 unreachable!("this method is called upon reception of an eth68 announcement")
303 };
304
305 let next_acc_size = acc_size_response + size;
306
307 if next_acc_size <=
308 self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request
309 {
310 acc_size_response = next_acc_size;
313 _ = hashes_to_request.insert(hash)
314 } else {
315 _ = surplus_hashes.insert(hash)
316 }
317
318 let free_space =
319 self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request -
320 acc_size_response;
321
322 if free_space < MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED {
323 break
324 }
325 }
326
327 surplus_hashes.extend(hashes_from_announcement_iter.map(|(hash, _metadata)| hash));
328
329 surplus_hashes
330 }
331
332 pub fn pack_request_eth66(
339 &self,
340 hashes_to_request: &mut RequestTxHashes,
341 hashes_from_announcement: ValidAnnouncementData,
342 ) -> RequestTxHashes {
343 let (mut hashes, _version) = hashes_from_announcement.into_request_hashes();
344 if hashes.len() <= SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST {
345 *hashes_to_request = hashes;
346 hashes_to_request.shrink_to_fit();
347
348 RequestTxHashes::default()
349 } else {
350 let surplus_hashes =
351 hashes.retain_count(SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST);
352 *hashes_to_request = hashes;
353 hashes_to_request.shrink_to_fit();
354
355 surplus_hashes
356 }
357 }
358
359 pub fn try_buffer_hashes_for_retry(
361 &mut self,
362 mut hashes: RequestTxHashes,
363 peer_failed_to_serve: &PeerId,
364 ) {
365 hashes.retain(|hash| {
368 if let Some(entry) = self.hashes_fetch_inflight_and_pending_fetch.get(hash) {
369 entry.fallback_peers_mut().remove(peer_failed_to_serve);
370 return true
371 }
372 false
374 });
375
376 self.buffer_hashes(hashes, None)
377 }
378
379 pub fn buffer_hashes(&mut self, hashes: RequestTxHashes, fallback_peer: Option<PeerId>) {
384 let mut max_retried_and_evicted_hashes = vec![];
385
386 for hash in hashes {
387 if self.hashes_fetch_inflight_and_pending_fetch.peek(&hash).is_none() {
389 continue
390 }
391
392 let Some(TxFetchMetadata { retries, fallback_peers, .. }) =
393 self.hashes_fetch_inflight_and_pending_fetch.get(&hash)
394 else {
395 return
396 };
397
398 if let Some(peer_id) = fallback_peer {
399 fallback_peers.insert(peer_id);
401 } else {
402 if *retries >= DEFAULT_MAX_RETRIES {
403 trace!(target: "net::tx",
404 %hash,
405 retries,
406 "retry limit for `GetPooledTransactions` requests reached for hash, dropping hash"
407 );
408
409 max_retried_and_evicted_hashes.push(hash);
410 continue
411 }
412 *retries += 1;
413 }
414 if let (_, Some(evicted_hash)) = self.hashes_pending_fetch.insert_and_get_evicted(hash)
415 {
416 max_retried_and_evicted_hashes.push(evicted_hash);
417 }
418 }
419
420 self.remove_hashes_from_transaction_fetcher(max_retried_and_evicted_hashes);
421 }
422
423 pub fn on_fetch_pending_hashes(
428 &mut self,
429 peers: &HashMap<PeerId, PeerMetadata<N>>,
430 has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
431 ) {
432 let mut hashes_to_request = RequestTxHashes::default();
433 let is_session_active = |peer_id: &PeerId| peers.contains_key(peer_id);
434
435 let mut search_durations = TxFetcherSearchDurations::default();
436
437 let budget_find_idle_fallback_peer = self
439 .search_breadth_budget_find_idle_fallback_peer(&has_capacity_wrt_pending_pool_imports);
440
441 let peer_id = duration_metered_exec!(
442 {
443 let Some(peer_id) = self.find_any_idle_fallback_peer_for_any_pending_hash(
444 &mut hashes_to_request,
445 is_session_active,
446 budget_find_idle_fallback_peer,
447 ) else {
448 return
450 };
451
452 peer_id
453 },
454 search_durations.find_idle_peer
455 );
456
457 let Some(peer) = peers.get(&peer_id) else { return };
459 let conn_eth_version = peer.version;
460
461 let budget_fill_request = self
466 .search_breadth_budget_find_intersection_pending_hashes_and_hashes_seen_by_peer(
467 &has_capacity_wrt_pending_pool_imports,
468 );
469
470 duration_metered_exec!(
471 {
472 self.fill_request_from_hashes_pending_fetch(
473 &mut hashes_to_request,
474 &peer.seen_transactions,
475 budget_fill_request,
476 )
477 },
478 search_durations.fill_request
479 );
480
481 self.update_pending_fetch_cache_search_metrics(search_durations);
482
483 trace!(target: "net::tx",
484 peer_id=format!("{peer_id:#}"),
485 hashes=?*hashes_to_request,
486 %conn_eth_version,
487 "requesting hashes that were stored pending fetch from peer"
488 );
489
490 if let Some(failed_to_request_hashes) =
492 self.request_transactions_from_peer(hashes_to_request, peer)
493 {
494 trace!(target: "net::tx",
495 peer_id=format!("{peer_id:#}"),
496 ?failed_to_request_hashes,
497 %conn_eth_version,
498 "failed sending request to peer's session, buffering hashes"
499 );
500
501 self.buffer_hashes(failed_to_request_hashes, Some(peer_id));
502 }
503 }
504
505 pub fn filter_unseen_and_pending_hashes(
508 &mut self,
509 new_announced_hashes: &mut ValidAnnouncementData,
510 is_tx_bad_import: impl Fn(&TxHash) -> bool,
511 peer_id: &PeerId,
512 is_session_active: impl Fn(PeerId) -> bool,
513 client_version: &str,
514 ) {
515 #[cfg(not(debug_assertions))]
516 let mut previously_unseen_hashes_count = 0;
517 #[cfg(debug_assertions)]
518 let mut previously_unseen_hashes = Vec::with_capacity(new_announced_hashes.len() / 4);
519
520 let msg_version = new_announced_hashes.msg_version();
521
522 new_announced_hashes.retain(|hash, metadata| {
524
525 if let Some(TxFetchMetadata{ref mut fallback_peers, tx_encoded_length: ref mut previously_seen_size, ..}) = self.hashes_fetch_inflight_and_pending_fetch.peek_mut(hash) {
528 if let Some((_ty, size)) = metadata {
530 if let Some(prev_size) = previously_seen_size {
531 if size != prev_size {
533 trace!(target: "net::tx",
534 peer_id=format!("{peer_id:#}"),
535 %hash,
536 size,
537 previously_seen_size,
538 %client_version,
539 "peer announced a different size for tx, this is especially worrying if one size is much bigger..."
540 );
541 }
542 }
543 *previously_seen_size = Some(*size);
545 }
546
547 if self.hashes_pending_fetch.remove(hash) {
549 return true
550 }
551 let mut ended_sessions = vec![];
556 for &peer_id in fallback_peers.iter() {
557 if is_session_active(peer_id) {
558 ended_sessions.push(peer_id);
559 }
560 }
561 for peer_id in ended_sessions {
562 fallback_peers.remove(&peer_id);
563 }
564
565 return false
566 }
567
568 if is_tx_bad_import(hash) {
571 return false
572 }
573
574 #[cfg(not(debug_assertions))]
575 {
576 previously_unseen_hashes_count += 1;
577 }
578 #[cfg(debug_assertions)]
579 previously_unseen_hashes.push(*hash);
580
581 if self.hashes_fetch_inflight_and_pending_fetch.get_or_insert(*hash, ||
582 TxFetchMetadata{retries: 0, fallback_peers: LruCache::new(DEFAULT_MAX_COUNT_FALLBACK_PEERS as u32), tx_encoded_length: None}
583 ).is_none() {
584
585 debug!(target: "net::tx",
586 peer_id=format!("{peer_id:#}"),
587 %hash,
588 ?msg_version,
589 %client_version,
590 "failed to cache new announced hash from peer in schnellru::LruMap, dropping hash"
591 );
592
593 return false
594 }
595 true
596 });
597
598 #[cfg(not(debug_assertions))]
599 trace!(target: "net::tx",
600 peer_id=format!("{peer_id:#}"),
601 previously_unseen_hashes_count=previously_unseen_hashes_count,
602 msg_version=?msg_version,
603 client_version=%client_version,
604 "received previously unseen hashes in announcement from peer"
605 );
606
607 #[cfg(debug_assertions)]
608 trace!(target: "net::tx",
609 peer_id=format!("{peer_id:#}"),
610 ?msg_version,
611 %client_version,
612 previously_unseen_hashes_len=previously_unseen_hashes.len(),
613 ?previously_unseen_hashes,
614 "received previously unseen hashes in announcement from peer"
615 );
616 }
617
618 pub fn request_transactions_from_peer(
626 &mut self,
627 new_announced_hashes: RequestTxHashes,
628 peer: &PeerMetadata<N>,
629 ) -> Option<RequestTxHashes> {
630 let peer_id: PeerId = peer.request_tx.peer_id;
631 let conn_eth_version = peer.version;
632
633 if self.active_peers.len() >= self.info.max_inflight_requests {
634 trace!(target: "net::tx",
635 peer_id=format!("{peer_id:#}"),
636 hashes=?*new_announced_hashes,
637 %conn_eth_version,
638 max_inflight_transaction_requests=self.info.max_inflight_requests,
639 "limit for concurrent `GetPooledTransactions` requests reached, dropping request for hashes to peer"
640 );
641 return Some(new_announced_hashes)
642 }
643
644 let Some(inflight_count) = self.active_peers.get_or_insert(peer_id, || 0) else {
645 debug!(target: "net::tx",
646 peer_id=format!("{peer_id:#}"),
647 hashes=?*new_announced_hashes,
648 conn_eth_version=%conn_eth_version,
649 "failed to cache active peer in schnellru::LruMap, dropping request to peer"
650 );
651 return Some(new_announced_hashes)
652 };
653
654 if *inflight_count >= self.info.max_inflight_requests_per_peer {
655 trace!(target: "net::tx",
656 peer_id=format!("{peer_id:#}"),
657 hashes=?*new_announced_hashes,
658 %conn_eth_version,
659 max_concurrent_tx_reqs_per_peer=self.info.max_inflight_requests_per_peer,
660 "limit for concurrent `GetPooledTransactions` requests per peer reached"
661 );
662 return Some(new_announced_hashes)
663 }
664
665 #[cfg(debug_assertions)]
666 {
667 for hash in &new_announced_hashes {
668 if self.hashes_pending_fetch.contains(hash) {
669 debug!(target: "net::tx", "`{}` should have been taken out of buffer before packing in a request, breaks invariant `@hashes_pending_fetch` and `@inflight_requests`, `@hashes_fetch_inflight_and_pending_fetch` for `{}`: {:?}",
670 format!("{:?}", new_announced_hashes), format!("{:?}", new_announced_hashes),
672 new_announced_hashes.iter().map(|hash| {
673 let metadata = self.hashes_fetch_inflight_and_pending_fetch.get(hash);
674 (*hash, metadata.map(|m| (m.retries, m.tx_encoded_length)))
676 }).collect::<Vec<(TxHash, Option<(u8, Option<usize>)>)>>())
677 }
678 }
679 }
680
681 let (response, rx) = oneshot::channel();
682 let req = PeerRequest::GetPooledTransactions {
683 request: GetPooledTransactions(new_announced_hashes.iter().copied().collect()),
684 response,
685 };
686
687 if let Err(err) = peer.request_tx.try_send(req) {
689 return match err {
691 TrySendError::Full(_) | TrySendError::Closed(_) => {
692 self.metrics.egress_peer_channel_full.increment(1);
693 Some(new_announced_hashes)
694 }
695 }
696 }
697
698 *inflight_count += 1;
699 self.inflight_requests.push(GetPooledTxRequestFut::new(peer_id, new_announced_hashes, rx));
701
702 None
703 }
704
705 pub fn fill_request_from_hashes_pending_fetch(
725 &mut self,
726 hashes_to_request: &mut RequestTxHashes,
727 seen_hashes: &LruCache<TxHash>,
728 mut budget_fill_request: Option<usize>, ) {
730 let Some(hash) = hashes_to_request.iter().next() else { return };
731
732 let mut acc_size_response = self
733 .hashes_fetch_inflight_and_pending_fetch
734 .get(hash)
735 .and_then(|entry| entry.tx_encoded_len())
736 .unwrap_or(AVERAGE_BYTE_SIZE_TX_ENCODED);
737
738 if acc_size_response >=
740 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_FETCH_PENDING_HASHES
741 {
742 return
743 }
744
745 for hash in self.hashes_pending_fetch.iter() {
748 if !seen_hashes.contains(hash) {
750 continue
751 };
752
753 hashes_to_request.insert(*hash);
755
756 let size = self
758 .hashes_fetch_inflight_and_pending_fetch
759 .get(hash)
760 .and_then(|entry| entry.tx_encoded_len())
761 .unwrap_or(AVERAGE_BYTE_SIZE_TX_ENCODED);
762
763 acc_size_response += size;
764
765 if acc_size_response >=
769 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_FETCH_PENDING_HASHES ||
770 hashes_to_request.len() >
771 DEFAULT_SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST_ON_FETCH_PENDING_HASHES
772 {
773 break
774 }
775
776 if let Some(ref mut bud) = budget_fill_request {
777 *bud = bud.saturating_sub(1);
778 if *bud == 0 {
779 return
780 }
781 }
782 }
783
784 for hash in hashes_to_request.iter() {
786 self.hashes_pending_fetch.remove(hash);
787 }
788 }
789
790 pub fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
793 let info = &self.info;
794
795 self.has_capacity(info.max_inflight_requests)
796 }
797
798 fn has_capacity(&self, max_inflight_requests: usize) -> bool {
800 self.inflight_requests.len() <= max_inflight_requests
801 }
802
803 pub fn search_breadth_budget_find_idle_fallback_peer(
809 &self,
810 has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
811 ) -> Option<usize> {
812 let info = &self.info;
813
814 let tx_fetcher_has_capacity = self.has_capacity(
815 info.max_inflight_requests /
816 DEFAULT_DIVISOR_MAX_COUNT_INFLIGHT_REQUESTS_ON_FIND_IDLE_PEER,
817 );
818 let tx_pool_has_capacity = has_capacity_wrt_pending_pool_imports(
819 DEFAULT_DIVISOR_MAX_COUNT_PENDING_POOL_IMPORTS_ON_FIND_IDLE_PEER,
820 );
821
822 if tx_fetcher_has_capacity && tx_pool_has_capacity {
823 None
825 } else {
826 let limit = DEFAULT_BUDGET_FIND_IDLE_FALLBACK_PEER;
828
829 trace!(target: "net::tx",
830 inflight_requests=self.inflight_requests.len(),
831 max_inflight_transaction_requests=info.max_inflight_requests,
832 hashes_pending_fetch=self.hashes_pending_fetch.len(),
833 limit,
834 "search breadth limited in search for idle fallback peer for some hash pending fetch"
835 );
836
837 Some(limit)
838 }
839 }
840
841 pub fn search_breadth_budget_find_intersection_pending_hashes_and_hashes_seen_by_peer(
848 &self,
849 has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
850 ) -> Option<usize> {
851 let info = &self.info;
852
853 let tx_fetcher_has_capacity = self.has_capacity(
854 info.max_inflight_requests /
855 DEFAULT_DIVISOR_MAX_COUNT_INFLIGHT_REQUESTS_ON_FIND_INTERSECTION,
856 );
857 let tx_pool_has_capacity = has_capacity_wrt_pending_pool_imports(
858 DEFAULT_DIVISOR_MAX_COUNT_PENDING_POOL_IMPORTS_ON_FIND_INTERSECTION,
859 );
860
861 if tx_fetcher_has_capacity && tx_pool_has_capacity {
862 None
864 } else {
865 let limit = DEFAULT_BUDGET_FIND_INTERSECTION_ANNOUNCED_BY_PEER_AND_PENDING_FETCH;
867
868 trace!(target: "net::tx",
869 inflight_requests=self.inflight_requests.len(),
870 max_inflight_transaction_requests=self.info.max_inflight_requests,
871 hashes_pending_fetch=self.hashes_pending_fetch.len(),
872 limit=limit,
873 "search breadth limited in search for intersection of hashes announced by peer and hashes pending fetch"
874 );
875
876 Some(limit)
877 }
878 }
879
880 pub const fn approx_capacity_get_pooled_transactions_req(
883 &self,
884 announcement_version: EthVersion,
885 ) -> usize {
886 if announcement_version.is_eth68() {
887 approx_capacity_get_pooled_transactions_req_eth68(&self.info)
888 } else {
889 approx_capacity_get_pooled_transactions_req_eth66()
890 }
891 }
892
893 pub fn on_resolved_get_pooled_transactions_request_fut(
897 &mut self,
898 response: GetPooledTxResponse<N::PooledTransaction>,
899 ) -> FetchEvent<N::PooledTransaction> {
900 let GetPooledTxResponse { peer_id, mut requested_hashes, result } = response;
903
904 self.decrement_inflight_request_count_for(&peer_id);
905
906 match result {
907 Ok(Ok(transactions)) => {
908 if transactions.is_empty() {
913 trace!(target: "net::tx",
914 peer_id=format!("{peer_id:#}"),
915 requested_hashes_len=requested_hashes.len(),
916 "received empty `PooledTransactions` response from peer, peer failed to serve hashes it announced"
917 );
918
919 return FetchEvent::EmptyResponse { peer_id }
920 }
921
922 let payload = UnverifiedPooledTransactions::new(transactions);
926
927 let unverified_len = payload.len();
928 let (verification_outcome, verified_payload) =
929 payload.verify(&requested_hashes, &peer_id);
930
931 let unsolicited = unverified_len - verified_payload.len();
932 if unsolicited > 0 {
933 self.metrics.unsolicited_transactions.increment(unsolicited as u64);
934 }
935 if verification_outcome == VerificationOutcome::ReportPeer {
936 trace!(target: "net::tx",
938 peer_id=format!("{peer_id:#}"),
939 unverified_len,
940 verified_payload_len=verified_payload.len(),
941 "received `PooledTransactions` response from peer with entries that didn't verify against request, filtered out transactions"
942 );
943 }
944 if verified_payload.is_empty() {
946 return FetchEvent::FetchError { peer_id, error: RequestError::BadResponse }
947 }
948
949 let unvalidated_payload_len = verified_payload.len();
953
954 let (validation_outcome, valid_payload) =
955 self.filter_valid_message.partially_filter_valid_entries(verified_payload);
956
957 if validation_outcome == FilterOutcome::ReportPeer {
963 trace!(target: "net::tx",
964 peer_id=format!("{peer_id:#}"),
965 unvalidated_payload_len,
966 valid_payload_len=valid_payload.len(),
967 "received invalid `PooledTransactions` response from peer, filtered out duplicate entries"
968 );
969 }
970 let requested_hashes_len = requested_hashes.len();
978 let mut fetched = Vec::with_capacity(valid_payload.len());
979 requested_hashes.retain(|requested_hash| {
980 if valid_payload.contains_key(requested_hash) {
981 fetched.push(*requested_hash);
983 return false
984 }
985 true
986 });
987 fetched.shrink_to_fit();
988 self.metrics.fetched_transactions.increment(fetched.len() as u64);
989
990 if fetched.len() < requested_hashes_len {
991 trace!(target: "net::tx",
992 peer_id=format!("{peer_id:#}"),
993 requested_hashes_len=requested_hashes_len,
994 fetched_len=fetched.len(),
995 "peer failed to serve hashes it announced"
996 );
997 }
998
999 self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
1003
1004 let transactions = valid_payload.into_data().into_values().collect();
1005
1006 FetchEvent::TransactionsFetched { peer_id, transactions }
1007 }
1008 Ok(Err(req_err)) => {
1009 self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
1010 FetchEvent::FetchError { peer_id, error: req_err }
1011 }
1012 Err(_) => {
1013 self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
1014 FetchEvent::FetchError { peer_id, error: RequestError::ChannelClosed }
1016 }
1017 }
1018 }
1019}
1020
1021impl<N: NetworkPrimitives> Stream for TransactionFetcher<N> {
1022 type Item = FetchEvent<N::PooledTransaction>;
1023
1024 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1026 if self.inflight_requests.is_empty() {
1029 return Poll::Pending
1030 }
1031
1032 if let Some(resp) = ready!(self.inflight_requests.poll_next_unpin(cx)) {
1033 return Poll::Ready(Some(self.on_resolved_get_pooled_transactions_request_fut(resp)))
1034 }
1035
1036 Poll::Pending
1037 }
1038}
1039
1040impl<T: NetworkPrimitives> Default for TransactionFetcher<T> {
1041 fn default() -> Self {
1042 Self {
1043 active_peers: LruMap::new(DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS),
1044 inflight_requests: Default::default(),
1045 hashes_pending_fetch: LruCache::new(DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH),
1046 hashes_fetch_inflight_and_pending_fetch: LruMap::new(
1047 DEFAULT_MAX_CAPACITY_CACHE_INFLIGHT_AND_PENDING_FETCH,
1048 ),
1049 filter_valid_message: Default::default(),
1050 info: TransactionFetcherInfo::default(),
1051 metrics: Default::default(),
1052 }
1053 }
1054}
1055
1056#[derive(Debug, Constructor)]
1058pub struct TxFetchMetadata {
1059 retries: u8,
1061 fallback_peers: LruCache<PeerId>,
1063 tx_encoded_length: Option<usize>,
1068}
1069
1070impl TxFetchMetadata {
1071 pub fn fallback_peers_mut(&mut self) -> &mut LruCache<PeerId> {
1073 &mut self.fallback_peers
1074 }
1075
1076 pub const fn tx_encoded_len(&self) -> Option<usize> {
1081 self.tx_encoded_length
1082 }
1083}
1084
1085#[derive(Debug)]
1087pub enum FetchEvent<T = PooledTransactionsElement> {
1088 TransactionsFetched {
1090 peer_id: PeerId,
1092 transactions: PooledTransactions<T>,
1094 },
1095 FetchError {
1097 peer_id: PeerId,
1099 error: RequestError,
1101 },
1102 EmptyResponse {
1104 peer_id: PeerId,
1106 },
1107}
1108
1109#[derive(Debug)]
1111pub struct GetPooledTxRequest<T = PooledTransactionsElement> {
1112 peer_id: PeerId,
1113 requested_hashes: RequestTxHashes,
1115 response: oneshot::Receiver<RequestResult<PooledTransactions<T>>>,
1116}
1117
1118#[derive(Debug)]
1121pub struct GetPooledTxResponse<T = PooledTransactionsElement> {
1122 peer_id: PeerId,
1123 requested_hashes: RequestTxHashes,
1126 result: Result<RequestResult<PooledTransactions<T>>, RecvError>,
1127}
1128
1129#[must_use = "futures do nothing unless polled"]
1132#[pin_project::pin_project]
1133#[derive(Debug)]
1134pub struct GetPooledTxRequestFut<T = PooledTransactionsElement> {
1135 #[pin]
1136 inner: Option<GetPooledTxRequest<T>>,
1137}
1138
1139impl<T> GetPooledTxRequestFut<T> {
1140 #[inline]
1141 const fn new(
1142 peer_id: PeerId,
1143 requested_hashes: RequestTxHashes,
1144 response: oneshot::Receiver<RequestResult<PooledTransactions<T>>>,
1145 ) -> Self {
1146 Self { inner: Some(GetPooledTxRequest { peer_id, requested_hashes, response }) }
1147 }
1148}
1149
1150impl<T> Future for GetPooledTxRequestFut<T> {
1151 type Output = GetPooledTxResponse<T>;
1152
1153 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1154 let mut req = self.as_mut().project().inner.take().expect("polled after completion");
1155 match req.response.poll_unpin(cx) {
1156 Poll::Ready(result) => Poll::Ready(GetPooledTxResponse {
1157 peer_id: req.peer_id,
1158 requested_hashes: req.requested_hashes,
1159 result,
1160 }),
1161 Poll::Pending => {
1162 self.project().inner.set(Some(req));
1163 Poll::Pending
1164 }
1165 }
1166 }
1167}
1168
1169#[derive(Debug, Constructor, Deref)]
1171pub struct UnverifiedPooledTransactions<T> {
1172 txns: PooledTransactions<T>,
1173}
1174
1175#[derive(Debug, Constructor, Deref)]
1177pub struct VerifiedPooledTransactions<T> {
1178 txns: PooledTransactions<T>,
1179}
1180
1181impl<T: SignedTransaction> DedupPayload for VerifiedPooledTransactions<T> {
1182 type Value = T;
1183
1184 fn is_empty(&self) -> bool {
1185 self.txns.is_empty()
1186 }
1187
1188 fn len(&self) -> usize {
1189 self.txns.len()
1190 }
1191
1192 fn dedup(self) -> PartiallyValidData<Self::Value> {
1193 PartiallyValidData::from_raw_data(
1194 self.txns.into_iter().map(|tx| (*tx.tx_hash(), tx)).collect(),
1195 None,
1196 )
1197 }
1198}
1199
1200trait VerifyPooledTransactionsResponse {
1201 type Transaction: SignedTransaction;
1202
1203 fn verify(
1204 self,
1205 requested_hashes: &RequestTxHashes,
1206 peer_id: &PeerId,
1207 ) -> (VerificationOutcome, VerifiedPooledTransactions<Self::Transaction>);
1208}
1209
1210impl<T: SignedTransaction> VerifyPooledTransactionsResponse for UnverifiedPooledTransactions<T> {
1211 type Transaction = T;
1212
1213 fn verify(
1214 self,
1215 requested_hashes: &RequestTxHashes,
1216 _peer_id: &PeerId,
1217 ) -> (VerificationOutcome, VerifiedPooledTransactions<T>) {
1218 let mut verification_outcome = VerificationOutcome::Ok;
1219
1220 let Self { mut txns } = self;
1221
1222 #[cfg(debug_assertions)]
1223 let mut tx_hashes_not_requested: SmallVec<[TxHash; 16]> = smallvec!();
1224 #[cfg(not(debug_assertions))]
1225 let mut tx_hashes_not_requested_count = 0;
1226
1227 txns.0.retain(|tx| {
1228 if !requested_hashes.contains(tx.tx_hash()) {
1229 verification_outcome = VerificationOutcome::ReportPeer;
1230
1231 #[cfg(debug_assertions)]
1232 tx_hashes_not_requested.push(*tx.tx_hash());
1233 #[cfg(not(debug_assertions))]
1234 {
1235 tx_hashes_not_requested_count += 1;
1236 }
1237
1238 return false
1239 }
1240 true
1241 });
1242
1243 #[cfg(debug_assertions)]
1244 if !tx_hashes_not_requested.is_empty() {
1245 trace!(target: "net::tx",
1246 peer_id=format!("{_peer_id:#}"),
1247 ?tx_hashes_not_requested,
1248 "transactions in `PooledTransactions` response from peer were not requested"
1249 );
1250 }
1251 #[cfg(not(debug_assertions))]
1252 if tx_hashes_not_requested_count != 0 {
1253 trace!(target: "net::tx",
1254 peer_id=format!("{_peer_id:#}"),
1255 tx_hashes_not_requested_count,
1256 "transactions in `PooledTransactions` response from peer were not requested"
1257 );
1258 }
1259
1260 (verification_outcome, VerifiedPooledTransactions::new(txns))
1261 }
1262}
1263
1264#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1267pub enum VerificationOutcome {
1268 Ok,
1270 ReportPeer,
1273}
1274
1275#[derive(Debug, Constructor)]
1277pub struct TransactionFetcherInfo {
1278 pub max_inflight_requests: usize,
1280 pub max_inflight_requests_per_peer: u8,
1282 pub soft_limit_byte_size_pooled_transactions_response_on_pack_request: usize,
1286 pub soft_limit_byte_size_pooled_transactions_response: usize,
1289 pub max_capacity_cache_txns_pending_fetch: u32,
1293}
1294
1295impl Default for TransactionFetcherInfo {
1296 fn default() -> Self {
1297 Self::new(
1298 DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS as usize,
1299 DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER,
1300 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
1301 SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
1302 DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH,
1303 )
1304 }
1305}
1306
1307impl From<TransactionFetcherConfig> for TransactionFetcherInfo {
1308 fn from(config: TransactionFetcherConfig) -> Self {
1309 let TransactionFetcherConfig {
1310 max_inflight_requests,
1311 max_inflight_requests_per_peer,
1312 soft_limit_byte_size_pooled_transactions_response,
1313 soft_limit_byte_size_pooled_transactions_response_on_pack_request,
1314 max_capacity_cache_txns_pending_fetch,
1315 } = config;
1316
1317 Self::new(
1318 max_inflight_requests as usize,
1319 max_inflight_requests_per_peer,
1320 soft_limit_byte_size_pooled_transactions_response_on_pack_request,
1321 soft_limit_byte_size_pooled_transactions_response,
1322 max_capacity_cache_txns_pending_fetch,
1323 )
1324 }
1325}
1326
1327#[derive(Debug, Default)]
1328struct TxFetcherSearchDurations {
1329 find_idle_peer: Duration,
1330 fill_request: Duration,
1331}
1332
1333#[cfg(test)]
1334mod test {
1335 use super::*;
1336 use crate::transactions::tests::{default_cache, new_mock_session};
1337 use alloy_primitives::{hex, B256};
1338 use alloy_rlp::Decodable;
1339 use derive_more::IntoIterator;
1340 use reth_primitives::TransactionSigned;
1341 use std::{collections::HashSet, str::FromStr};
1342
1343 #[derive(IntoIterator)]
1344 struct TestValidAnnouncementData(Vec<(TxHash, Option<(u8, usize)>)>);
1345
1346 impl HandleMempoolData for TestValidAnnouncementData {
1347 fn is_empty(&self) -> bool {
1348 self.0.is_empty()
1349 }
1350
1351 fn len(&self) -> usize {
1352 self.0.len()
1353 }
1354
1355 fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) {
1356 self.0.retain(|(hash, _)| f(hash))
1357 }
1358 }
1359
1360 impl HandleVersionedMempoolData for TestValidAnnouncementData {
1361 fn msg_version(&self) -> EthVersion {
1362 EthVersion::Eth68
1363 }
1364 }
1365
1366 #[test]
1367 fn pack_eth68_request() {
1368 reth_tracing::init_test_tracing();
1369
1370 let tx_fetcher = &mut TransactionFetcher::<EthNetworkPrimitives>::default();
1373
1374 let eth68_hashes = [
1375 B256::from_slice(&[1; 32]),
1376 B256::from_slice(&[2; 32]),
1377 B256::from_slice(&[3; 32]),
1378 B256::from_slice(&[4; 32]),
1379 B256::from_slice(&[5; 32]),
1380 ];
1381 let eth68_sizes = [
1382 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ - MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED - 1, DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ, 2, 9,
1386 0,
1387 ];
1388
1389 let expected_request_hashes =
1390 [eth68_hashes[0], eth68_hashes[2]].into_iter().collect::<HashSet<_>>();
1391
1392 let expected_surplus_hashes =
1393 [eth68_hashes[1], eth68_hashes[3], eth68_hashes[4]].into_iter().collect::<HashSet<_>>();
1394
1395 let mut eth68_hashes_to_request = RequestTxHashes::with_capacity(3);
1396
1397 let valid_announcement_data = TestValidAnnouncementData(
1398 eth68_hashes
1399 .into_iter()
1400 .zip(eth68_sizes)
1401 .map(|(hash, size)| (hash, Some((0u8, size))))
1402 .collect::<Vec<_>>(),
1403 );
1404
1405 let surplus_eth68_hashes =
1408 tx_fetcher.pack_request_eth68(&mut eth68_hashes_to_request, valid_announcement_data);
1409
1410 let eth68_hashes_to_request = eth68_hashes_to_request.into_iter().collect::<HashSet<_>>();
1411 let surplus_eth68_hashes = surplus_eth68_hashes.into_iter().collect::<HashSet<_>>();
1412
1413 assert_eq!(expected_request_hashes, eth68_hashes_to_request);
1414 assert_eq!(expected_surplus_hashes, surplus_eth68_hashes);
1415 }
1416
1417 #[tokio::test]
1418 async fn test_on_fetch_pending_hashes() {
1419 reth_tracing::init_test_tracing();
1420
1421 let tx_fetcher = &mut TransactionFetcher::default();
1422
1423 let seen_hashes = [
1427 B256::from_slice(&[1; 32]),
1428 B256::from_slice(&[2; 32]),
1429 B256::from_slice(&[3; 32]),
1430 B256::from_slice(&[4; 32]),
1431 ];
1432 let seen_eth68_hashes_sizes = [120, 158, 116];
1437
1438 let peer_1 = PeerId::new([1; 64]);
1440 let peer_2 = PeerId::new([2; 64]);
1442
1443 let (mut peer_1_data, mut peer_1_mock_session_rx) =
1447 new_mock_session(peer_1, EthVersion::Eth66);
1448 for hash in &seen_hashes {
1449 peer_1_data.seen_transactions.insert(*hash);
1450 }
1451 let (mut peer_2_data, _) = new_mock_session(peer_2, EthVersion::Eth66);
1452 for hash in &seen_hashes {
1453 peer_2_data.seen_transactions.insert(*hash);
1454 }
1455 let mut peers = HashMap::default();
1456 peers.insert(peer_1, peer_1_data);
1457 peers.insert(peer_2, peer_2_data);
1458
1459 let mut backups = default_cache();
1460 backups.insert(peer_2);
1461 for i in 0..3 {
1463 let mut backups = default_cache();
1465 backups.insert(peer_2);
1466 let meta = TxFetchMetadata::new(0, backups, Some(seen_eth68_hashes_sizes[i]));
1467 tx_fetcher.hashes_fetch_inflight_and_pending_fetch.insert(seen_hashes[i], meta);
1468 }
1469 let meta = TxFetchMetadata::new(0, backups, None);
1470 tx_fetcher.hashes_fetch_inflight_and_pending_fetch.insert(seen_hashes[3], meta);
1471
1472 let mut backups = default_cache();
1473 backups.insert(peer_2);
1474 let hash_other = B256::from_slice(&[5; 32]);
1476 tx_fetcher
1477 .hashes_fetch_inflight_and_pending_fetch
1478 .insert(hash_other, TxFetchMetadata::new(0, backups, None));
1479 tx_fetcher.hashes_pending_fetch.insert(hash_other);
1480
1481 for hash in &seen_hashes {
1483 tx_fetcher
1484 .hashes_fetch_inflight_and_pending_fetch
1485 .get(hash)
1486 .unwrap()
1487 .fallback_peers_mut()
1488 .insert(peer_1);
1489 }
1490
1491 for hash in &seen_hashes {
1493 tx_fetcher.hashes_pending_fetch.insert(*hash);
1494 }
1495
1496 assert_eq!(tx_fetcher.hashes_pending_fetch.len(), 5);
1498
1499 tx_fetcher.on_fetch_pending_hashes(&peers, |_| true);
1502
1503 let req = peer_1_mock_session_rx
1505 .recv()
1506 .await
1507 .expect("peer session should receive request with buffered hashes");
1508 let PeerRequest::GetPooledTransactions { request, .. } = req else { unreachable!() };
1509 let GetPooledTransactions(requested_hashes) = request;
1510
1511 assert_eq!(
1512 requested_hashes.into_iter().collect::<HashSet<_>>(),
1513 seen_hashes.into_iter().collect::<HashSet<_>>()
1514 )
1515 }
1516
1517 #[test]
1518 fn verify_response_hashes() {
1519 let input = hex!("02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598daa");
1520 let signed_tx_1: PooledTransactionsElement =
1521 TransactionSigned::decode(&mut &input[..]).unwrap().try_into().unwrap();
1522 let input = hex!("02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76");
1523 let signed_tx_2: PooledTransactionsElement =
1524 TransactionSigned::decode(&mut &input[..]).unwrap().try_into().unwrap();
1525
1526 let request_hashes = [
1528 B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e67890")
1529 .unwrap(),
1530 *signed_tx_1.hash(),
1531 B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e12345")
1532 .unwrap(),
1533 B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4edabe3")
1534 .unwrap(),
1535 ];
1536
1537 for hash in &request_hashes {
1538 assert_ne!(hash, signed_tx_2.hash())
1539 }
1540
1541 let request_hashes =
1542 RequestTxHashes::new(request_hashes.into_iter().collect::<HashSet<_>>());
1543
1544 let response_txns = PooledTransactions(vec![signed_tx_1.clone(), signed_tx_2]);
1546 let payload = UnverifiedPooledTransactions::new(response_txns);
1547
1548 let (outcome, verified_payload) = payload.verify(&request_hashes, &PeerId::ZERO);
1549
1550 assert_eq!(VerificationOutcome::ReportPeer, outcome);
1551 assert_eq!(1, verified_payload.len());
1552 assert!(verified_payload.contains(&signed_tx_1));
1553 }
1554}