1pub mod config;
5pub mod constants;
7pub mod fetcher;
9pub mod validation;
10
11pub use self::constants::{
12 tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
13 SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
14};
15pub use config::{TransactionFetcherConfig, TransactionPropagationMode, TransactionsManagerConfig};
16pub use validation::*;
17
18pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
19
20use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE};
21use crate::{
22 budget::{
23 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
24 DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS,
25 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
26 },
27 cache::LruCache,
28 duration_metered_exec, metered_poll_nested_stream_with_budget,
29 metrics::{TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
30 NetworkHandle,
31};
32use alloy_primitives::{TxHash, B256};
33use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
34use futures::{stream::FuturesUnordered, Future, StreamExt};
35use reth_eth_wire::{
36 DedupPayload, EthNetworkPrimitives, EthVersion, GetPooledTransactions, HandleMempoolData,
37 HandleVersionedMempoolData, NetworkPrimitives, NewPooledTransactionHashes,
38 NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions,
39 RequestTxHashes, Transactions,
40};
41use reth_metrics::common::mpsc::UnboundedMeteredReceiver;
42use reth_network_api::{
43 events::{PeerEvent, SessionInfo},
44 NetworkEvent, NetworkEventListenerProvider, PeerRequest, PeerRequestSender, Peers,
45};
46use reth_network_p2p::{
47 error::{RequestError, RequestResult},
48 sync::SyncStateProvider,
49};
50use reth_network_peers::PeerId;
51use reth_network_types::ReputationChangeKind;
52use reth_primitives::{transaction::SignedTransactionIntoRecoveredExt, TransactionSigned};
53use reth_primitives_traits::SignedTransaction;
54use reth_tokio_util::EventStream;
55use reth_transaction_pool::{
56 error::{PoolError, PoolResult},
57 GetPooledTransactionLimit, PoolTransaction, PropagateKind, PropagatedTransactions,
58 TransactionPool, ValidPoolTransaction,
59};
60use std::{
61 collections::{hash_map::Entry, HashMap, HashSet},
62 pin::Pin,
63 sync::{
64 atomic::{AtomicUsize, Ordering},
65 Arc,
66 },
67 task::{Context, Poll},
68 time::{Duration, Instant},
69};
70use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
71use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
72use tracing::{debug, trace};
73
74pub type PoolImportFuture = Pin<Box<dyn Future<Output = Vec<PoolResult<TxHash>>> + Send + 'static>>;
78
79#[derive(Debug, Clone)]
87pub struct TransactionsHandle<N: NetworkPrimitives = EthNetworkPrimitives> {
88 manager_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
90}
91
92impl<N: NetworkPrimitives> TransactionsHandle<N> {
95 fn send(&self, cmd: TransactionsCommand<N>) {
96 let _ = self.manager_tx.send(cmd);
97 }
98
99 async fn peer_handle(
101 &self,
102 peer_id: PeerId,
103 ) -> Result<Option<PeerRequestSender<PeerRequest<N>>>, RecvError> {
104 let (tx, rx) = oneshot::channel();
105 self.send(TransactionsCommand::GetPeerSender { peer_id, peer_request_sender: tx });
106 rx.await
107 }
108
109 pub fn propagate(&self, hash: TxHash) {
111 self.send(TransactionsCommand::PropagateHash(hash))
112 }
113
114 pub fn propagate_hash_to(&self, hash: TxHash, peer: PeerId) {
118 self.propagate_hashes_to(Some(hash), peer)
119 }
120
121 pub fn propagate_hashes_to(&self, hash: impl IntoIterator<Item = TxHash>, peer: PeerId) {
125 let hashes = hash.into_iter().collect::<Vec<_>>();
126 if hashes.is_empty() {
127 return
128 }
129 self.send(TransactionsCommand::PropagateHashesTo(hashes, peer))
130 }
131
132 pub async fn get_active_peers(&self) -> Result<HashSet<PeerId>, RecvError> {
134 let (tx, rx) = oneshot::channel();
135 self.send(TransactionsCommand::GetActivePeers(tx));
136 rx.await
137 }
138
139 pub fn propagate_transactions_to(&self, transactions: Vec<TxHash>, peer: PeerId) {
143 if transactions.is_empty() {
144 return
145 }
146 self.send(TransactionsCommand::PropagateTransactionsTo(transactions, peer))
147 }
148
149 pub fn propagate_transactions(&self, transactions: Vec<TxHash>) {
156 if transactions.is_empty() {
157 return
158 }
159 self.send(TransactionsCommand::PropagateTransactions(transactions))
160 }
161
162 pub async fn get_transaction_hashes(
164 &self,
165 peers: Vec<PeerId>,
166 ) -> Result<HashMap<PeerId, HashSet<TxHash>>, RecvError> {
167 if peers.is_empty() {
168 return Ok(Default::default())
169 }
170 let (tx, rx) = oneshot::channel();
171 self.send(TransactionsCommand::GetTransactionHashes { peers, tx });
172 rx.await
173 }
174
175 pub async fn get_peer_transaction_hashes(
177 &self,
178 peer: PeerId,
179 ) -> Result<HashSet<TxHash>, RecvError> {
180 let res = self.get_transaction_hashes(vec![peer]).await?;
181 Ok(res.into_values().next().unwrap_or_default())
182 }
183
184 pub async fn get_pooled_transactions_from(
190 &self,
191 peer_id: PeerId,
192 hashes: Vec<B256>,
193 ) -> Result<Option<Vec<N::PooledTransaction>>, RequestError> {
194 let Some(peer) = self.peer_handle(peer_id).await? else { return Ok(None) };
195
196 let (tx, rx) = oneshot::channel();
197 let request = PeerRequest::GetPooledTransactions { request: hashes.into(), response: tx };
198 peer.try_send(request).ok();
199
200 rx.await?.map(|res| Some(res.0))
201 }
202}
203
204#[derive(Debug)]
222#[must_use = "Manager does nothing unless polled."]
223pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives> {
224 pool: Pool,
226 network: NetworkHandle<N>,
228 network_events: EventStream<NetworkEvent<PeerRequest<N>>>,
232 transaction_fetcher: TransactionFetcher<N>,
234 transactions_by_peers: HashMap<TxHash, HashSet<PeerId>>,
239 pool_imports: FuturesUnordered<PoolImportFuture>,
251 pending_pool_imports_info: PendingPoolImportsInfo,
253 bad_imports: LruCache<TxHash>,
255 peers: HashMap<PeerId, PeerMetadata<N>>,
257 command_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
261 command_rx: UnboundedReceiverStream<TransactionsCommand<N>>,
266 pending_transactions: ReceiverStream<TxHash>,
275 transaction_events: UnboundedMeteredReceiver<NetworkTransactionEvent<N>>,
277 config: TransactionsManagerConfig,
279 metrics: TransactionsManagerMetrics,
281}
282
283impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
284 pub fn new(
288 network: NetworkHandle<N>,
289 pool: Pool,
290 from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
291 transactions_manager_config: TransactionsManagerConfig,
292 ) -> Self {
293 let network_events = network.event_listener();
294
295 let (command_tx, command_rx) = mpsc::unbounded_channel();
296
297 let transaction_fetcher = TransactionFetcher::with_transaction_fetcher_config(
298 &transactions_manager_config.transaction_fetcher_config,
299 );
300
301 let pending = pool.pending_transactions_listener();
304 let pending_pool_imports_info = PendingPoolImportsInfo::default();
305 let metrics = TransactionsManagerMetrics::default();
306 metrics
307 .capacity_pending_pool_imports
308 .increment(pending_pool_imports_info.max_pending_pool_imports as u64);
309
310 Self {
311 pool,
312 network,
313 network_events,
314 transaction_fetcher,
315 transactions_by_peers: Default::default(),
316 pool_imports: Default::default(),
317 pending_pool_imports_info: PendingPoolImportsInfo::new(
318 DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS,
319 ),
320 bad_imports: LruCache::new(DEFAULT_MAX_COUNT_BAD_IMPORTS),
321 peers: Default::default(),
322 command_tx,
323 command_rx: UnboundedReceiverStream::new(command_rx),
324 pending_transactions: ReceiverStream::new(pending),
325 transaction_events: UnboundedMeteredReceiver::new(
326 from_network,
327 NETWORK_POOL_TRANSACTIONS_SCOPE,
328 ),
329 config: transactions_manager_config,
330 metrics,
331 }
332 }
333
334 pub fn handle(&self) -> TransactionsHandle<N> {
336 TransactionsHandle { manager_tx: self.command_tx.clone() }
337 }
338
339 fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
342 self.pending_pool_imports_info
343 .has_capacity(self.pending_pool_imports_info.max_pending_pool_imports) &&
344 self.transaction_fetcher.has_capacity_for_fetching_pending_hashes()
345 }
346
347 fn report_peer_bad_transactions(&self, peer_id: PeerId) {
348 self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
349 self.metrics.reported_bad_transactions.increment(1);
350 }
351
352 fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) {
353 trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change");
354 self.network.reputation_change(peer_id, kind);
355 }
356
357 fn report_already_seen(&self, peer_id: PeerId) {
358 trace!(target: "net::tx", ?peer_id, "Penalizing peer for already seen transaction");
359 self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction);
360 }
361
362 fn on_good_import(&mut self, hash: TxHash) {
364 self.transactions_by_peers.remove(&hash);
365 }
366
367 fn on_bad_import(&mut self, err: PoolError) {
391 let peers = self.transactions_by_peers.remove(&err.hash);
392
393 if !err.is_bad_transaction() || self.network.is_syncing() {
395 return
396 }
397 if let Some(peers) = peers {
400 for peer_id in peers {
401 self.report_peer_bad_transactions(peer_id);
402 }
403 }
404 self.metrics.bad_imports.increment(1);
405 self.bad_imports.insert(err.hash);
406 }
407
408 fn on_fetch_hashes_pending_fetch(&mut self) {
410 let info = &self.pending_pool_imports_info;
412 let max_pending_pool_imports = info.max_pending_pool_imports;
413 let has_capacity_wrt_pending_pool_imports =
414 |divisor| info.has_capacity(max_pending_pool_imports / divisor);
415
416 self.transaction_fetcher
417 .on_fetch_pending_hashes(&self.peers, has_capacity_wrt_pending_pool_imports);
418 }
419
420 fn on_request_error(&self, peer_id: PeerId, req_err: RequestError) {
421 let kind = match req_err {
422 RequestError::UnsupportedCapability => ReputationChangeKind::BadProtocol,
423 RequestError::Timeout => ReputationChangeKind::Timeout,
424 RequestError::ChannelClosed | RequestError::ConnectionDropped => {
425 return
427 }
428 RequestError::BadResponse => return self.report_peer_bad_transactions(peer_id),
429 };
430 self.report_peer(peer_id, kind);
431 }
432
433 #[inline]
434 fn update_poll_metrics(&self, start: Instant, poll_durations: TxManagerPollDurations) {
435 let metrics = &self.metrics;
436
437 let TxManagerPollDurations {
438 acc_network_events,
439 acc_pending_imports,
440 acc_tx_events,
441 acc_imported_txns,
442 acc_fetch_events,
443 acc_pending_fetch,
444 acc_cmds,
445 } = poll_durations;
446
447 metrics.duration_poll_tx_manager.set(start.elapsed().as_secs_f64());
449 metrics.acc_duration_poll_network_events.set(acc_network_events.as_secs_f64());
451 metrics.acc_duration_poll_pending_pool_imports.set(acc_pending_imports.as_secs_f64());
452 metrics.acc_duration_poll_transaction_events.set(acc_tx_events.as_secs_f64());
453 metrics.acc_duration_poll_imported_transactions.set(acc_imported_txns.as_secs_f64());
454 metrics.acc_duration_poll_fetch_events.set(acc_fetch_events.as_secs_f64());
455 metrics.acc_duration_fetch_pending_hashes.set(acc_pending_fetch.as_secs_f64());
456 metrics.acc_duration_poll_commands.set(acc_cmds.as_secs_f64());
457 }
458}
459
460impl<Pool, N> TransactionsManager<Pool, N>
461where
462 Pool: TransactionPool,
463 N: NetworkPrimitives,
464{
465 fn on_batch_import_result(&mut self, batch_results: Vec<PoolResult<TxHash>>) {
467 for res in batch_results {
468 match res {
469 Ok(hash) => {
470 self.on_good_import(hash);
471 }
472 Err(err) => {
473 self.on_bad_import(err);
474 }
475 }
476 }
477 }
478
479 fn on_new_pooled_transaction_hashes(
481 &mut self,
482 peer_id: PeerId,
483 msg: NewPooledTransactionHashes,
484 ) {
485 if self.network.is_initially_syncing() {
487 return
488 }
489 if self.network.tx_gossip_disabled() {
490 return
491 }
492
493 let Some(peer) = self.peers.get_mut(&peer_id) else {
495 trace!(
496 peer_id = format!("{peer_id:#}"),
497 ?msg,
498 "discarding announcement from inactive peer"
499 );
500
501 return
502 };
503 let client = peer.client_version.clone();
504
505 let mut count_txns_already_seen_by_peer = 0;
507 for tx in msg.iter_hashes().copied() {
508 if !peer.seen_transactions.insert(tx) {
509 count_txns_already_seen_by_peer += 1;
510 }
511 }
512 if count_txns_already_seen_by_peer > 0 {
513 self.metrics.messages_with_hashes_already_seen_by_peer.increment(1);
518 self.metrics
519 .occurrences_hash_already_seen_by_peer
520 .increment(count_txns_already_seen_by_peer);
521
522 trace!(target: "net::tx",
523 %count_txns_already_seen_by_peer,
524 peer_id=format!("{peer_id:#}"),
525 ?client,
526 "Peer sent hashes that have already been marked as seen by peer"
527 );
528
529 self.report_already_seen(peer_id);
530 }
531
532 let (validation_outcome, mut partially_valid_msg) =
534 self.transaction_fetcher.filter_valid_message.partially_filter_valid_entries(msg);
535
536 if validation_outcome == FilterOutcome::ReportPeer {
537 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
538 }
539
540 partially_valid_msg.retain_by_hash(|hash| !self.transactions_by_peers.contains_key(hash));
542
543 let hashes_count_pre_pool_filter = partially_valid_msg.len();
551 self.pool.retain_unknown(&mut partially_valid_msg);
552 if hashes_count_pre_pool_filter > partially_valid_msg.len() {
553 let already_known_hashes_count =
554 hashes_count_pre_pool_filter - partially_valid_msg.len();
555 self.metrics
556 .occurrences_hashes_already_in_pool
557 .increment(already_known_hashes_count as u64);
558 }
559
560 if partially_valid_msg.is_empty() {
561 return
563 }
564
565 let (validation_outcome, mut valid_announcement_data) = if partially_valid_msg
570 .msg_version()
571 .expect("partially valid announcement should have version")
572 .is_eth68()
573 {
574 self.transaction_fetcher
576 .filter_valid_message
577 .filter_valid_entries_68(partially_valid_msg)
578 } else {
579 self.transaction_fetcher
581 .filter_valid_message
582 .filter_valid_entries_66(partially_valid_msg)
583 };
584
585 if validation_outcome == FilterOutcome::ReportPeer {
586 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
587 }
588
589 if valid_announcement_data.is_empty() {
590 return
592 }
593
594 let bad_imports = &self.bad_imports;
601 self.transaction_fetcher.filter_unseen_and_pending_hashes(
602 &mut valid_announcement_data,
603 |hash| bad_imports.contains(hash),
604 &peer_id,
605 |peer_id| self.peers.contains_key(&peer_id),
606 &client,
607 );
608
609 if valid_announcement_data.is_empty() {
610 return
612 }
613
614 trace!(target: "net::tx::propagation",
615 peer_id=format!("{peer_id:#}"),
616 hashes_len=valid_announcement_data.iter().count(),
617 hashes=?valid_announcement_data.keys().collect::<Vec<_>>(),
618 msg_version=%valid_announcement_data.msg_version(),
619 client_version=%client,
620 "received previously unseen and pending hashes in announcement from peer"
621 );
622
623 if !self.transaction_fetcher.is_idle(&peer_id) {
626 let msg_version = valid_announcement_data.msg_version();
628 let (hashes, _version) = valid_announcement_data.into_request_hashes();
629
630 trace!(target: "net::tx",
631 peer_id=format!("{peer_id:#}"),
632 hashes=?*hashes,
633 %msg_version,
634 %client,
635 "buffering hashes announced by busy peer"
636 );
637
638 self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id));
639
640 return
641 }
642
643 let mut hashes_to_request =
644 RequestTxHashes::with_capacity(valid_announcement_data.len() / 4);
645 let surplus_hashes =
646 self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data);
647
648 if !surplus_hashes.is_empty() {
649 trace!(target: "net::tx",
650 peer_id=format!("{peer_id:#}"),
651 surplus_hashes=?*surplus_hashes,
652 %client,
653 "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
654 );
655
656 self.transaction_fetcher.buffer_hashes(surplus_hashes, Some(peer_id));
657 }
658
659 trace!(target: "net::tx",
660 peer_id=format!("{peer_id:#}"),
661 hashes=?*hashes_to_request,
662 %client,
663 "sending hashes in `GetPooledTransactions` request to peer's session"
664 );
665
666 let Some(peer) = self.peers.get_mut(&peer_id) else { return };
670 if let Some(failed_to_request_hashes) =
671 self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer)
672 {
673 let conn_eth_version = peer.version;
674
675 trace!(target: "net::tx",
676 peer_id=format!("{peer_id:#}"),
677 failed_to_request_hashes=?*failed_to_request_hashes,
678 %conn_eth_version,
679 %client,
680 "sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
681 );
682 self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
683 }
684 }
685}
686
687impl<Pool, N> TransactionsManager<Pool, N>
688where
689 Pool: TransactionPool + 'static,
690 N: NetworkPrimitives<
691 BroadcastedTransaction: SignedTransaction,
692 PooledTransaction: SignedTransaction,
693 >,
694 Pool::Transaction:
695 PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
696{
697 fn on_new_pending_transactions(&mut self, hashes: Vec<TxHash>) {
709 if self.network.is_initially_syncing() {
711 return
712 }
713 if self.network.tx_gossip_disabled() {
714 return
715 }
716
717 trace!(target: "net::tx", num_hashes=?hashes.len(), "Start propagating transactions");
718
719 self.propagate_all(hashes);
720 }
721
722 fn propagate_full_transactions_to_peer(
726 &mut self,
727 txs: Vec<TxHash>,
728 peer_id: PeerId,
729 propagation_mode: PropagationMode,
730 ) -> Option<PropagatedTransactions> {
731 trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");
732
733 let peer = self.peers.get_mut(&peer_id)?;
734 let mut propagated = PropagatedTransactions::default();
735
736 let mut full_transactions = FullTransactionsBuilder::new(peer.version);
738
739 let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::new);
740
741 if propagation_mode.is_forced() {
742 full_transactions.extend(to_propagate);
744 } else {
745 for tx in to_propagate {
748 if !peer.seen_transactions.contains(tx.tx_hash()) {
749 full_transactions.push(&tx);
751 }
752 }
753 }
754
755 if full_transactions.is_empty() {
756 return None
758 }
759
760 let PropagateTransactions { pooled, full } = full_transactions.build();
761
762 if let Some(new_pooled_hashes) = pooled {
764 for hash in new_pooled_hashes.iter_hashes().copied() {
765 propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
766 peer.seen_transactions.insert(hash);
768 }
769
770 self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
772 }
773
774 if let Some(new_full_transactions) = full {
776 for tx in &new_full_transactions {
777 propagated.0.entry(*tx.tx_hash()).or_default().push(PropagateKind::Full(peer_id));
778 peer.seen_transactions.insert(*tx.tx_hash());
780 }
781
782 self.network.send_transactions(peer_id, new_full_transactions);
784 }
785
786 self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
788
789 Some(propagated)
790 }
791
792 fn propagate_hashes_to(
796 &mut self,
797 hashes: Vec<TxHash>,
798 peer_id: PeerId,
799 propagation_mode: PropagationMode,
800 ) {
801 trace!(target: "net::tx", "Start propagating transactions as hashes");
802
803 let propagated = {
806 let Some(peer) = self.peers.get_mut(&peer_id) else {
807 return
809 };
810
811 let to_propagate = self
812 .pool
813 .get_all(hashes)
814 .into_iter()
815 .map(PropagateTransaction::new)
816 .collect::<Vec<_>>();
817
818 let mut propagated = PropagatedTransactions::default();
819
820 let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);
822
823 if propagation_mode.is_forced() {
824 hashes.extend(to_propagate)
825 } else {
826 for tx in to_propagate {
827 if !peer.seen_transactions.contains(tx.tx_hash()) {
828 hashes.push(&tx);
830 }
831 }
832 }
833
834 let new_pooled_hashes = hashes.build();
835
836 if new_pooled_hashes.is_empty() {
837 return
839 }
840
841 for hash in new_pooled_hashes.iter_hashes().copied() {
842 propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
843 }
844
845 trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer");
846
847 self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
849
850 self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
852
853 propagated
854 };
855
856 self.pool.on_propagated(propagated);
858 }
859
860 fn propagate_transactions(
867 &mut self,
868 to_propagate: Vec<PropagateTransaction<N::BroadcastedTransaction>>,
869 propagation_mode: PropagationMode,
870 ) -> PropagatedTransactions {
871 let mut propagated = PropagatedTransactions::default();
872 if self.network.tx_gossip_disabled() {
873 return propagated
874 }
875
876 let max_num_full = self.config.propagation_mode.full_peer_count(self.peers.len());
878
879 for (peer_idx, (peer_id, peer)) in self.peers.iter_mut().enumerate() {
881 let mut builder = if peer_idx > max_num_full {
883 PropagateTransactionsBuilder::pooled(peer.version)
884 } else {
885 PropagateTransactionsBuilder::full(peer.version)
886 };
887
888 if propagation_mode.is_forced() {
889 builder.extend(to_propagate.iter());
890 } else {
891 for tx in &to_propagate {
895 if !peer.seen_transactions.contains(tx.tx_hash()) {
898 builder.push(tx);
899 }
900 }
901 }
902
903 if builder.is_empty() {
904 trace!(target: "net::tx", ?peer_id, "Nothing to propagate to peer; has seen all transactions");
905 continue
906 }
907
908 let PropagateTransactions { pooled, full } = builder.build();
909
910 if let Some(mut new_pooled_hashes) = pooled {
912 new_pooled_hashes
915 .truncate(SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE);
916
917 for hash in new_pooled_hashes.iter_hashes().copied() {
918 propagated.0.entry(hash).or_default().push(PropagateKind::Hash(*peer_id));
919 peer.seen_transactions.insert(hash);
921 }
922
923 trace!(target: "net::tx", ?peer_id, num_txs=?new_pooled_hashes.len(), "Propagating tx hashes to peer");
924
925 self.network.send_transactions_hashes(*peer_id, new_pooled_hashes);
927 }
928
929 if let Some(new_full_transactions) = full {
931 for tx in &new_full_transactions {
932 propagated
933 .0
934 .entry(*tx.tx_hash())
935 .or_default()
936 .push(PropagateKind::Full(*peer_id));
937 peer.seen_transactions.insert(*tx.tx_hash());
939 }
940
941 trace!(target: "net::tx", ?peer_id, num_txs=?new_full_transactions.len(), "Propagating full transactions to peer");
942
943 self.network.send_transactions(*peer_id, new_full_transactions);
945 }
946 }
947
948 self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
950
951 propagated
952 }
953
954 fn propagate_all(&mut self, hashes: Vec<TxHash>) {
959 let propagated = self.propagate_transactions(
960 self.pool.get_all(hashes).into_iter().map(PropagateTransaction::new).collect(),
961 PropagationMode::Basic,
962 );
963
964 self.pool.on_propagated(propagated);
966 }
967
968 fn on_get_pooled_transactions(
970 &mut self,
971 peer_id: PeerId,
972 request: GetPooledTransactions,
973 response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
974 ) {
975 if let Some(peer) = self.peers.get_mut(&peer_id) {
976 if self.network.tx_gossip_disabled() {
977 let _ = response.send(Ok(PooledTransactions::default()));
978 return
979 }
980 let transactions = self.pool.get_pooled_transaction_elements(
981 request.0,
982 GetPooledTransactionLimit::ResponseSizeSoftLimit(
983 self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response,
984 ),
985 );
986 trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| tx.tx_hash()), "Sending requested transactions to peer");
987
988 peer.seen_transactions.extend(transactions.iter().map(|tx| *tx.tx_hash()));
991
992 let resp = PooledTransactions(transactions);
993 let _ = response.send(Ok(resp));
994 }
995 }
996
997 fn on_command(&mut self, cmd: TransactionsCommand<N>) {
999 match cmd {
1000 TransactionsCommand::PropagateHash(hash) => {
1001 self.on_new_pending_transactions(vec![hash])
1002 }
1003 TransactionsCommand::PropagateHashesTo(hashes, peer) => {
1004 self.propagate_hashes_to(hashes, peer, PropagationMode::Forced)
1005 }
1006 TransactionsCommand::GetActivePeers(tx) => {
1007 let peers = self.peers.keys().copied().collect::<HashSet<_>>();
1008 tx.send(peers).ok();
1009 }
1010 TransactionsCommand::PropagateTransactionsTo(txs, peer) => {
1011 if let Some(propagated) =
1012 self.propagate_full_transactions_to_peer(txs, peer, PropagationMode::Forced)
1013 {
1014 self.pool.on_propagated(propagated);
1015 }
1016 }
1017 TransactionsCommand::PropagateTransactions(txs) => self.propagate_all(txs),
1018 TransactionsCommand::GetTransactionHashes { peers, tx } => {
1019 let mut res = HashMap::with_capacity(peers.len());
1020 for peer_id in peers {
1021 let hashes = self
1022 .peers
1023 .get(&peer_id)
1024 .map(|peer| peer.seen_transactions.iter().copied().collect::<HashSet<_>>())
1025 .unwrap_or_default();
1026 res.insert(peer_id, hashes);
1027 }
1028 tx.send(res).ok();
1029 }
1030 TransactionsCommand::GetPeerSender { peer_id, peer_request_sender } => {
1031 let sender = self.peers.get(&peer_id).map(|peer| peer.request_tx.clone());
1032 peer_request_sender.send(sender).ok();
1033 }
1034 }
1035 }
1036
1037 fn handle_peer_session(
1039 &mut self,
1040 info: SessionInfo,
1041 messages: PeerRequestSender<PeerRequest<N>>,
1042 ) {
1043 let SessionInfo { peer_id, client_version, version, .. } = info;
1044
1045 let peer = PeerMetadata::<N>::new(
1047 messages,
1048 version,
1049 client_version,
1050 self.config.max_transactions_seen_by_peer_history,
1051 );
1052 let peer = match self.peers.entry(peer_id) {
1053 Entry::Occupied(mut entry) => {
1054 entry.insert(peer);
1055 entry.into_mut()
1056 }
1057 Entry::Vacant(entry) => entry.insert(peer),
1058 };
1059
1060 if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() {
1064 trace!(target: "net::tx", ?peer_id, "Skipping transaction broadcast: node syncing or gossip disabled");
1065 return
1066 }
1067
1068 let pooled_txs = self.pool.pooled_transactions_max(
1070 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1071 );
1072 if pooled_txs.is_empty() {
1073 trace!(target: "net::tx", ?peer_id, "No transactions in the pool to broadcast");
1074 return;
1075 }
1076
1077 let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
1079 for pooled_tx in pooled_txs {
1080 peer.seen_transactions.insert(*pooled_tx.hash());
1081 msg_builder.push_pooled(pooled_tx);
1082 }
1083
1084 debug!(target: "net::tx", ?peer_id, tx_count = msg_builder.is_empty(), "Broadcasting transaction hashes");
1085 let msg = msg_builder.build();
1086 self.network.send_transactions_hashes(peer_id, msg);
1087 }
1088
1089 fn on_network_event(&mut self, event_result: NetworkEvent<PeerRequest<N>>) {
1091 match event_result {
1092 NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
1093 self.peers.remove(&peer_id);
1095 self.transaction_fetcher.remove_peer(&peer_id);
1096 }
1097 NetworkEvent::ActivePeerSession { info, messages } => {
1098 self.handle_peer_session(info, messages);
1100 }
1101 NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
1102 let peer_id = info.peer_id;
1103 let messages = match self.peers.get(&peer_id) {
1105 Some(p) => p.request_tx.clone(),
1106 None => {
1107 debug!(target: "net::tx", ?peer_id, "No peer request sender found");
1108 return;
1109 }
1110 };
1111 self.handle_peer_session(info, messages);
1112 }
1113 _ => {}
1114 }
1115 }
1116
1117 fn on_network_tx_event(&mut self, event: NetworkTransactionEvent<N>) {
1119 match event {
1120 NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => {
1121 let has_blob_txs = msg.has_eip4844();
1125
1126 let non_blob_txs = msg
1127 .0
1128 .into_iter()
1129 .map(N::PooledTransaction::try_from)
1130 .filter_map(Result::ok)
1131 .collect();
1132
1133 self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast);
1134
1135 if has_blob_txs {
1136 debug!(target: "net::tx", ?peer_id, "received bad full blob transaction broadcast");
1137 self.report_peer_bad_transactions(peer_id);
1138 }
1139 }
1140 NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => {
1141 self.on_new_pooled_transaction_hashes(peer_id, msg)
1142 }
1143 NetworkTransactionEvent::GetPooledTransactions { peer_id, request, response } => {
1144 self.on_get_pooled_transactions(peer_id, request, response)
1145 }
1146 NetworkTransactionEvent::GetTransactionsHandle(response) => {
1147 let _ = response.send(Some(self.handle()));
1148 }
1149 }
1150 }
1151
1152 fn import_transactions(
1154 &mut self,
1155 peer_id: PeerId,
1156 transactions: PooledTransactions<N::PooledTransaction>,
1157 source: TransactionSource,
1158 ) {
1159 if self.network.is_initially_syncing() {
1161 return
1162 }
1163 if self.network.tx_gossip_disabled() {
1164 return
1165 }
1166
1167 let Some(peer) = self.peers.get_mut(&peer_id) else { return };
1168 let mut transactions = transactions.0;
1169
1170 self.transaction_fetcher
1172 .remove_hashes_from_transaction_fetcher(transactions.iter().map(|tx| *tx.tx_hash()));
1173
1174 let mut num_already_seen_by_peer = 0;
1179 for tx in &transactions {
1180 if source.is_broadcast() && !peer.seen_transactions.insert(*tx.tx_hash()) {
1181 num_already_seen_by_peer += 1;
1182 }
1183 }
1184
1185 let txns_count_pre_pool_filter = transactions.len();
1187 self.pool.retain_unknown(&mut transactions);
1188 if txns_count_pre_pool_filter > transactions.len() {
1189 let already_known_txns_count = txns_count_pre_pool_filter - transactions.len();
1190 self.metrics
1191 .occurrences_transactions_already_in_pool
1192 .increment(already_known_txns_count as u64);
1193 }
1194
1195 let mut has_bad_transactions = false;
1197
1198 if let Some(peer) = self.peers.get_mut(&peer_id) {
1200 let mut new_txs = Vec::with_capacity(transactions.len());
1202 for tx in transactions {
1203 let tx = match tx.try_into_ecrecovered() {
1205 Ok(tx) => tx,
1206 Err(badtx) => {
1207 trace!(target: "net::tx",
1208 peer_id=format!("{peer_id:#}"),
1209 hash=%badtx.tx_hash(),
1210 client_version=%peer.client_version,
1211 "failed ecrecovery for transaction"
1212 );
1213 has_bad_transactions = true;
1214 continue
1215 }
1216 };
1217
1218 match self.transactions_by_peers.entry(*tx.tx_hash()) {
1219 Entry::Occupied(mut entry) => {
1220 entry.get_mut().insert(peer_id);
1222 }
1223 Entry::Vacant(entry) => {
1224 if self.bad_imports.contains(tx.tx_hash()) {
1225 trace!(target: "net::tx",
1226 peer_id=format!("{peer_id:#}"),
1227 hash=%tx.tx_hash(),
1228 client_version=%peer.client_version,
1229 "received a known bad transaction from peer"
1230 );
1231 has_bad_transactions = true;
1232 } else {
1233 let pool_transaction = Pool::Transaction::from_pooled(tx);
1236 new_txs.push(pool_transaction);
1237
1238 entry.insert(HashSet::from([peer_id]));
1239 }
1240 }
1241 }
1242 }
1243 new_txs.shrink_to_fit();
1244
1245 if !new_txs.is_empty() {
1248 let pool = self.pool.clone();
1249 let metric_pending_pool_imports = self.metrics.pending_pool_imports.clone();
1251 metric_pending_pool_imports.increment(new_txs.len() as f64);
1252
1253 self.pending_pool_imports_info
1255 .pending_pool_imports
1256 .fetch_add(new_txs.len(), Ordering::Relaxed);
1257 let tx_manager_info_pending_pool_imports =
1258 self.pending_pool_imports_info.pending_pool_imports.clone();
1259
1260 trace!(target: "net::tx::propagation", new_txs_len=?new_txs.len(), "Importing new transactions");
1261 let import = Box::pin(async move {
1262 let added = new_txs.len();
1263 let res = pool.add_external_transactions(new_txs).await;
1264
1265 metric_pending_pool_imports.decrement(added as f64);
1267 tx_manager_info_pending_pool_imports.fetch_sub(added, Ordering::Relaxed);
1269
1270 res
1271 });
1272
1273 self.pool_imports.push(import);
1274 }
1275
1276 if num_already_seen_by_peer > 0 {
1277 self.metrics.messages_with_transactions_already_seen_by_peer.increment(1);
1278 self.metrics
1279 .occurrences_of_transaction_already_seen_by_peer
1280 .increment(num_already_seen_by_peer);
1281 trace!(target: "net::tx", num_txs=%num_already_seen_by_peer, ?peer_id, client=?peer.client_version, "Peer sent already seen transactions");
1282 }
1283 }
1284
1285 if has_bad_transactions {
1286 self.report_peer_bad_transactions(peer_id)
1288 }
1289
1290 if num_already_seen_by_peer > 0 {
1291 self.report_already_seen(peer_id);
1292 }
1293 }
1294
1295 fn on_fetch_event(&mut self, fetch_event: FetchEvent<N::PooledTransaction>) {
1297 match fetch_event {
1298 FetchEvent::TransactionsFetched { peer_id, transactions } => {
1299 self.import_transactions(peer_id, transactions, TransactionSource::Response);
1300 }
1301 FetchEvent::FetchError { peer_id, error } => {
1302 trace!(target: "net::tx", ?peer_id, %error, "requesting transactions from peer failed");
1303 self.on_request_error(peer_id, error);
1304 }
1305 FetchEvent::EmptyResponse { peer_id } => {
1306 trace!(target: "net::tx", ?peer_id, "peer returned empty response");
1307 }
1308 }
1309 }
1310}
1311
1312impl<Pool, N> Future for TransactionsManager<Pool, N>
1320where
1321 Pool: TransactionPool + Unpin + 'static,
1322 N: NetworkPrimitives<
1323 BroadcastedTransaction: SignedTransaction,
1324 PooledTransaction: SignedTransaction,
1325 >,
1326 Pool::Transaction:
1327 PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
1328{
1329 type Output = ();
1330
1331 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1332 let start = Instant::now();
1333 let mut poll_durations = TxManagerPollDurations::default();
1334
1335 let this = self.get_mut();
1336
1337 let maybe_more_network_events = metered_poll_nested_stream_with_budget!(
1343 poll_durations.acc_network_events,
1344 "net::tx",
1345 "Network events stream",
1346 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1347 this.network_events.poll_next_unpin(cx),
1348 |event| this.on_network_event(event)
1349 );
1350
1351 let mut new_txs = Vec::new();
1360 let maybe_more_pending_txns = metered_poll_nested_stream_with_budget!(
1361 poll_durations.acc_imported_txns,
1362 "net::tx",
1363 "Pending transactions stream",
1364 DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS,
1365 this.pending_transactions.poll_next_unpin(cx),
1366 |hash| new_txs.push(hash)
1367 );
1368 if !new_txs.is_empty() {
1369 this.on_new_pending_transactions(new_txs);
1370 }
1371
1372 let maybe_more_tx_fetch_events = metered_poll_nested_stream_with_budget!(
1383 poll_durations.acc_fetch_events,
1384 "net::tx",
1385 "Transaction fetch events stream",
1386 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1387 this.transaction_fetcher.poll_next_unpin(cx),
1388 |event| this.on_fetch_event(event),
1389 );
1390
1391 let maybe_more_tx_events = metered_poll_nested_stream_with_budget!(
1406 poll_durations.acc_tx_events,
1407 "net::tx",
1408 "Network transaction events stream",
1409 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
1410 this.transaction_events.poll_next_unpin(cx),
1411 |event| this.on_network_tx_event(event),
1412 );
1413
1414 let maybe_more_pool_imports = metered_poll_nested_stream_with_budget!(
1429 poll_durations.acc_pending_imports,
1430 "net::tx",
1431 "Batched pool imports stream",
1432 DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS,
1433 this.pool_imports.poll_next_unpin(cx),
1434 |batch_results| this.on_batch_import_result(batch_results)
1435 );
1436
1437 duration_metered_exec!(
1442 {
1443 if this.has_capacity_for_fetching_pending_hashes() {
1444 this.on_fetch_hashes_pending_fetch();
1445 }
1446 },
1447 poll_durations.acc_pending_fetch
1448 );
1449
1450 let maybe_more_commands = metered_poll_nested_stream_with_budget!(
1452 poll_durations.acc_cmds,
1453 "net::tx",
1454 "Commands channel",
1455 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1456 this.command_rx.poll_next_unpin(cx),
1457 |cmd| this.on_command(cmd)
1458 );
1459
1460 this.transaction_fetcher.update_metrics();
1461
1462 if maybe_more_network_events ||
1464 maybe_more_commands ||
1465 maybe_more_tx_events ||
1466 maybe_more_tx_fetch_events ||
1467 maybe_more_pool_imports ||
1468 maybe_more_pending_txns
1469 {
1470 cx.waker().wake_by_ref();
1472 return Poll::Pending
1473 }
1474
1475 this.update_poll_metrics(start, poll_durations);
1476
1477 Poll::Pending
1478 }
1479}
1480
1481#[derive(Debug, Copy, Clone, Eq, PartialEq)]
1485enum PropagationMode {
1486 Basic,
1490 Forced,
1495}
1496
1497impl PropagationMode {
1498 const fn is_forced(self) -> bool {
1500 matches!(self, Self::Forced)
1501 }
1502}
1503
1504#[derive(Debug, Clone)]
1506struct PropagateTransaction<T = TransactionSigned> {
1507 size: usize,
1508 transaction: Arc<T>,
1509}
1510
1511impl<T: SignedTransaction> PropagateTransaction<T> {
1512 fn new<P>(tx: Arc<ValidPoolTransaction<P>>) -> Self
1514 where
1515 P: PoolTransaction<Consensus = T>,
1516 {
1517 let size = tx.encoded_length();
1518 let transaction = tx.transaction.clone_into_consensus();
1519 let transaction = Arc::new(transaction.into_signed());
1520 Self { size, transaction }
1521 }
1522
1523 fn tx_hash(&self) -> &TxHash {
1524 self.transaction.tx_hash()
1525 }
1526}
1527
1528#[derive(Debug, Clone)]
1531enum PropagateTransactionsBuilder<T> {
1532 Pooled(PooledTransactionsHashesBuilder),
1533 Full(FullTransactionsBuilder<T>),
1534}
1535
1536impl<T> PropagateTransactionsBuilder<T> {
1537 fn pooled(version: EthVersion) -> Self {
1539 Self::Pooled(PooledTransactionsHashesBuilder::new(version))
1540 }
1541
1542 fn full(version: EthVersion) -> Self {
1544 Self::Full(FullTransactionsBuilder::new(version))
1545 }
1546
1547 fn is_empty(&self) -> bool {
1549 match self {
1550 Self::Pooled(builder) => builder.is_empty(),
1551 Self::Full(builder) => builder.is_empty(),
1552 }
1553 }
1554
1555 fn build(self) -> PropagateTransactions<T> {
1557 match self {
1558 Self::Pooled(pooled) => {
1559 PropagateTransactions { pooled: Some(pooled.build()), full: None }
1560 }
1561 Self::Full(full) => full.build(),
1562 }
1563 }
1564}
1565
1566impl<T: SignedTransaction> PropagateTransactionsBuilder<T> {
1567 fn extend<'a>(&mut self, txs: impl IntoIterator<Item = &'a PropagateTransaction<T>>) {
1569 for tx in txs {
1570 self.push(tx);
1571 }
1572 }
1573
1574 fn push(&mut self, transaction: &PropagateTransaction<T>) {
1576 match self {
1577 Self::Pooled(builder) => builder.push(transaction),
1578 Self::Full(builder) => builder.push(transaction),
1579 }
1580 }
1581}
1582
1583struct PropagateTransactions<T> {
1585 pooled: Option<NewPooledTransactionHashes>,
1587 full: Option<Vec<Arc<T>>>,
1589}
1590
1591#[derive(Debug, Clone)]
1596struct FullTransactionsBuilder<T> {
1597 total_size: usize,
1599 transactions: Vec<Arc<T>>,
1601 pooled: PooledTransactionsHashesBuilder,
1603}
1604
1605impl<T> FullTransactionsBuilder<T> {
1606 fn new(version: EthVersion) -> Self {
1608 Self {
1609 total_size: 0,
1610 pooled: PooledTransactionsHashesBuilder::new(version),
1611 transactions: vec![],
1612 }
1613 }
1614
1615 fn is_empty(&self) -> bool {
1617 self.transactions.is_empty() && self.pooled.is_empty()
1618 }
1619
1620 fn build(self) -> PropagateTransactions<T> {
1622 let pooled = Some(self.pooled.build()).filter(|pooled| !pooled.is_empty());
1623 let full = Some(self.transactions).filter(|full| !full.is_empty());
1624 PropagateTransactions { pooled, full }
1625 }
1626}
1627
1628impl<T: SignedTransaction> FullTransactionsBuilder<T> {
1629 fn extend(&mut self, txs: impl IntoIterator<Item = PropagateTransaction<T>>) {
1631 for tx in txs {
1632 self.push(&tx)
1633 }
1634 }
1635
1636 fn push(&mut self, transaction: &PropagateTransaction<T>) {
1646 if !transaction.transaction.is_broadcastable_in_full() {
1655 self.pooled.push(transaction);
1656 return
1657 }
1658
1659 let new_size = self.total_size + transaction.size;
1660 if new_size > DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE &&
1661 self.total_size > 0
1662 {
1663 self.pooled.push(transaction);
1665 return
1666 }
1667
1668 self.total_size = new_size;
1669 self.transactions.push(Arc::clone(&transaction.transaction));
1670 }
1671}
1672
1673#[derive(Debug, Clone)]
1676enum PooledTransactionsHashesBuilder {
1677 Eth66(NewPooledTransactionHashes66),
1678 Eth68(NewPooledTransactionHashes68),
1679}
1680
1681impl PooledTransactionsHashesBuilder {
1684 fn push_pooled<T: PoolTransaction>(&mut self, pooled_tx: Arc<ValidPoolTransaction<T>>) {
1686 match self {
1687 Self::Eth66(msg) => msg.0.push(*pooled_tx.hash()),
1688 Self::Eth68(msg) => {
1689 msg.hashes.push(*pooled_tx.hash());
1690 msg.sizes.push(pooled_tx.encoded_length());
1691 msg.types.push(pooled_tx.transaction.tx_type());
1692 }
1693 }
1694 }
1695
1696 fn is_empty(&self) -> bool {
1698 match self {
1699 Self::Eth66(hashes) => hashes.is_empty(),
1700 Self::Eth68(hashes) => hashes.is_empty(),
1701 }
1702 }
1703
1704 fn extend<T: SignedTransaction>(
1706 &mut self,
1707 txs: impl IntoIterator<Item = PropagateTransaction<T>>,
1708 ) {
1709 for tx in txs {
1710 self.push(&tx);
1711 }
1712 }
1713
1714 fn push<T: SignedTransaction>(&mut self, tx: &PropagateTransaction<T>) {
1715 match self {
1716 Self::Eth66(msg) => msg.0.push(*tx.tx_hash()),
1717 Self::Eth68(msg) => {
1718 msg.hashes.push(*tx.tx_hash());
1719 msg.sizes.push(tx.size);
1720 msg.types.push(tx.transaction.ty());
1721 }
1722 }
1723 }
1724
1725 fn new(version: EthVersion) -> Self {
1727 match version {
1728 EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()),
1729 EthVersion::Eth68 | EthVersion::Eth69 => Self::Eth68(Default::default()),
1730 }
1731 }
1732
1733 fn build(self) -> NewPooledTransactionHashes {
1734 match self {
1735 Self::Eth66(msg) => msg.into(),
1736 Self::Eth68(msg) => msg.into(),
1737 }
1738 }
1739}
1740
1741enum TransactionSource {
1743 Broadcast,
1745 Response,
1747}
1748
1749impl TransactionSource {
1752 const fn is_broadcast(&self) -> bool {
1754 matches!(self, Self::Broadcast)
1755 }
1756}
1757
1758#[derive(Debug)]
1760pub struct PeerMetadata<N: NetworkPrimitives = EthNetworkPrimitives> {
1761 seen_transactions: LruCache<TxHash>,
1765 request_tx: PeerRequestSender<PeerRequest<N>>,
1767 version: EthVersion,
1769 client_version: Arc<str>,
1771}
1772
1773impl<N: NetworkPrimitives> PeerMetadata<N> {
1774 fn new(
1776 request_tx: PeerRequestSender<PeerRequest<N>>,
1777 version: EthVersion,
1778 client_version: Arc<str>,
1779 max_transactions_seen_by_peer: u32,
1780 ) -> Self {
1781 Self {
1782 seen_transactions: LruCache::new(max_transactions_seen_by_peer),
1783 request_tx,
1784 version,
1785 client_version,
1786 }
1787 }
1788}
1789
1790#[derive(Debug)]
1792enum TransactionsCommand<N: NetworkPrimitives = EthNetworkPrimitives> {
1793 PropagateHash(B256),
1795 PropagateHashesTo(Vec<B256>, PeerId),
1797 GetActivePeers(oneshot::Sender<HashSet<PeerId>>),
1799 PropagateTransactionsTo(Vec<TxHash>, PeerId),
1801 PropagateTransactions(Vec<TxHash>),
1803 GetTransactionHashes {
1805 peers: Vec<PeerId>,
1806 tx: oneshot::Sender<HashMap<PeerId, HashSet<TxHash>>>,
1807 },
1808 GetPeerSender {
1810 peer_id: PeerId,
1811 peer_request_sender: oneshot::Sender<Option<PeerRequestSender<PeerRequest<N>>>>,
1812 },
1813}
1814
1815#[derive(Debug)]
1817pub enum NetworkTransactionEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
1818 IncomingTransactions {
1822 peer_id: PeerId,
1824 msg: Transactions<N::BroadcastedTransaction>,
1826 },
1827 IncomingPooledTransactionHashes {
1829 peer_id: PeerId,
1831 msg: NewPooledTransactionHashes,
1833 },
1834 GetPooledTransactions {
1836 peer_id: PeerId,
1838 request: GetPooledTransactions,
1840 response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
1842 },
1843 GetTransactionsHandle(oneshot::Sender<Option<TransactionsHandle<N>>>),
1845}
1846
1847#[derive(Debug)]
1849pub struct PendingPoolImportsInfo {
1850 pending_pool_imports: Arc<AtomicUsize>,
1852 max_pending_pool_imports: usize,
1854}
1855
1856impl PendingPoolImportsInfo {
1857 pub fn new(max_pending_pool_imports: usize) -> Self {
1859 Self { pending_pool_imports: Arc::new(AtomicUsize::default()), max_pending_pool_imports }
1860 }
1861
1862 pub fn has_capacity(&self, max_pending_pool_imports: usize) -> bool {
1864 self.pending_pool_imports.load(Ordering::Relaxed) < max_pending_pool_imports
1865 }
1866}
1867
1868impl Default for PendingPoolImportsInfo {
1869 fn default() -> Self {
1870 Self::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS)
1871 }
1872}
1873
1874#[derive(Debug, Default)]
1875struct TxManagerPollDurations {
1876 acc_network_events: Duration,
1877 acc_pending_imports: Duration,
1878 acc_tx_events: Duration,
1879 acc_imported_txns: Duration,
1880 acc_fetch_events: Duration,
1881 acc_pending_fetch: Duration,
1882 acc_cmds: Duration,
1883}
1884
1885#[cfg(test)]
1886mod tests {
1887 use super::*;
1888 use crate::{test_utils::Testnet, NetworkConfigBuilder, NetworkManager};
1889 use alloy_primitives::hex;
1890 use alloy_rlp::Decodable;
1891 use constants::tx_fetcher::DEFAULT_MAX_COUNT_FALLBACK_PEERS;
1892 use futures::FutureExt;
1893 use reth_network_api::NetworkInfo;
1894 use reth_network_p2p::{
1895 error::{RequestError, RequestResult},
1896 sync::{NetworkSyncUpdater, SyncState},
1897 };
1898 use reth_storage_api::noop::NoopProvider;
1899 use reth_transaction_pool::test_utils::{
1900 testing_pool, MockTransaction, MockTransactionFactory, TestPool,
1901 };
1902 use secp256k1::SecretKey;
1903 use std::{
1904 fmt,
1905 future::poll_fn,
1906 hash,
1907 net::{IpAddr, Ipv4Addr, SocketAddr},
1908 };
1909 use tests::fetcher::TxFetchMetadata;
1910 use tracing::error;
1911
1912 async fn new_tx_manager(
1913 ) -> (TransactionsManager<TestPool, EthNetworkPrimitives>, NetworkManager<EthNetworkPrimitives>)
1914 {
1915 let secret_key = SecretKey::new(&mut rand::thread_rng());
1916 let client = NoopProvider::default();
1917
1918 let config = NetworkConfigBuilder::new(secret_key)
1919 .listener_port(0)
1921 .disable_discovery()
1922 .build(client);
1923
1924 let pool = testing_pool();
1925
1926 let transactions_manager_config = config.transactions_manager_config.clone();
1927 let (_network_handle, network, transactions, _) = NetworkManager::new(config)
1928 .await
1929 .unwrap()
1930 .into_builder()
1931 .transactions(pool.clone(), transactions_manager_config)
1932 .split_with_handle();
1933
1934 (transactions, network)
1935 }
1936
1937 pub(super) fn default_cache<T: hash::Hash + Eq + fmt::Debug>() -> LruCache<T> {
1938 LruCache::new(DEFAULT_MAX_COUNT_FALLBACK_PEERS as u32)
1939 }
1940
1941 pub(super) fn new_mock_session(
1943 peer_id: PeerId,
1944 version: EthVersion,
1945 ) -> (PeerMetadata<EthNetworkPrimitives>, mpsc::Receiver<PeerRequest>) {
1946 let (to_mock_session_tx, to_mock_session_rx) = mpsc::channel(1);
1947
1948 (
1949 PeerMetadata::new(
1950 PeerRequestSender::new(peer_id, to_mock_session_tx),
1951 version,
1952 Arc::from(""),
1953 DEFAULT_MAX_COUNT_TRANSACTIONS_SEEN_BY_PEER,
1954 ),
1955 to_mock_session_rx,
1956 )
1957 }
1958
1959 #[tokio::test(flavor = "multi_thread")]
1960 async fn test_ignored_tx_broadcasts_while_initially_syncing() {
1961 reth_tracing::init_test_tracing();
1962 let net = Testnet::create(3).await;
1963
1964 let mut handles = net.handles();
1965 let handle0 = handles.next().unwrap();
1966 let handle1 = handles.next().unwrap();
1967
1968 drop(handles);
1969 let handle = net.spawn();
1970
1971 let listener0 = handle0.event_listener();
1972 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
1973 let secret_key = SecretKey::new(&mut rand::thread_rng());
1974
1975 let client = NoopProvider::default();
1976 let pool = testing_pool();
1977 let config = NetworkConfigBuilder::<EthNetworkPrimitives>::new(secret_key)
1978 .disable_discovery()
1979 .listener_port(0)
1980 .build(client);
1981 let transactions_manager_config = config.transactions_manager_config.clone();
1982 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
1983 .await
1984 .unwrap()
1985 .into_builder()
1986 .transactions(pool.clone(), transactions_manager_config)
1987 .split_with_handle();
1988
1989 tokio::task::spawn(network);
1990
1991 network_handle.update_sync_state(SyncState::Syncing);
1993 assert!(NetworkInfo::is_syncing(&network_handle));
1994 assert!(NetworkInfo::is_initially_syncing(&network_handle));
1995
1996 let mut established = listener0.take(2);
1998 while let Some(ev) = established.next().await {
1999 match ev {
2000 NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
2001 transactions
2003 .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info)))
2004 }
2005 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => continue,
2006 ev => {
2007 error!("unexpected event {ev:?}")
2008 }
2009 }
2010 }
2011 let input = hex!("02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76");
2013 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2014 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2015 peer_id: *handle1.peer_id(),
2016 msg: Transactions(vec![signed_tx.clone()]),
2017 });
2018 poll_fn(|cx| {
2019 let _ = transactions.poll_unpin(cx);
2020 Poll::Ready(())
2021 })
2022 .await;
2023 assert!(pool.is_empty());
2024 handle.terminate().await;
2025 }
2026
2027 #[tokio::test(flavor = "multi_thread")]
2028 async fn test_tx_broadcasts_through_two_syncs() {
2029 reth_tracing::init_test_tracing();
2030 let net = Testnet::create(3).await;
2031
2032 let mut handles = net.handles();
2033 let handle0 = handles.next().unwrap();
2034 let handle1 = handles.next().unwrap();
2035
2036 drop(handles);
2037 let handle = net.spawn();
2038
2039 let listener0 = handle0.event_listener();
2040 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2041 let secret_key = SecretKey::new(&mut rand::thread_rng());
2042
2043 let client = NoopProvider::default();
2044 let pool = testing_pool();
2045 let config = NetworkConfigBuilder::new(secret_key)
2046 .disable_discovery()
2047 .listener_port(0)
2048 .build(client);
2049 let transactions_manager_config = config.transactions_manager_config.clone();
2050 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2051 .await
2052 .unwrap()
2053 .into_builder()
2054 .transactions(pool.clone(), transactions_manager_config)
2055 .split_with_handle();
2056
2057 tokio::task::spawn(network);
2058
2059 network_handle.update_sync_state(SyncState::Syncing);
2061 assert!(NetworkInfo::is_syncing(&network_handle));
2062 network_handle.update_sync_state(SyncState::Idle);
2063 assert!(!NetworkInfo::is_syncing(&network_handle));
2064 network_handle.update_sync_state(SyncState::Syncing);
2065 assert!(NetworkInfo::is_syncing(&network_handle));
2066
2067 let mut established = listener0.take(2);
2069 while let Some(ev) = established.next().await {
2070 match ev {
2071 NetworkEvent::ActivePeerSession { .. } |
2072 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2073 transactions.on_network_event(ev);
2075 }
2076 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => continue,
2077 _ => {
2078 error!("unexpected event {ev:?}")
2079 }
2080 }
2081 }
2082 let input = hex!("02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76");
2084 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2085 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2086 peer_id: *handle1.peer_id(),
2087 msg: Transactions(vec![signed_tx.clone()]),
2088 });
2089 poll_fn(|cx| {
2090 let _ = transactions.poll_unpin(cx);
2091 Poll::Ready(())
2092 })
2093 .await;
2094 assert!(!NetworkInfo::is_initially_syncing(&network_handle));
2095 assert!(NetworkInfo::is_syncing(&network_handle));
2096 assert!(!pool.is_empty());
2097 handle.terminate().await;
2098 }
2099
2100 #[tokio::test(flavor = "multi_thread")]
2101 async fn test_handle_incoming_transactions() {
2102 reth_tracing::init_test_tracing();
2103 let net = Testnet::create(3).await;
2104
2105 let mut handles = net.handles();
2106 let handle0 = handles.next().unwrap();
2107 let handle1 = handles.next().unwrap();
2108
2109 drop(handles);
2110 let handle = net.spawn();
2111
2112 let listener0 = handle0.event_listener();
2113
2114 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2115 let secret_key = SecretKey::new(&mut rand::thread_rng());
2116
2117 let client = NoopProvider::default();
2118 let pool = testing_pool();
2119 let config = NetworkConfigBuilder::new(secret_key)
2120 .disable_discovery()
2121 .listener_port(0)
2122 .build(client);
2123 let transactions_manager_config = config.transactions_manager_config.clone();
2124 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2125 .await
2126 .unwrap()
2127 .into_builder()
2128 .transactions(pool.clone(), transactions_manager_config)
2129 .split_with_handle();
2130 tokio::task::spawn(network);
2131
2132 network_handle.update_sync_state(SyncState::Idle);
2133
2134 assert!(!NetworkInfo::is_syncing(&network_handle));
2135
2136 let mut established = listener0.take(2);
2138 while let Some(ev) = established.next().await {
2139 match ev {
2140 NetworkEvent::ActivePeerSession { .. } |
2141 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2142 transactions.on_network_event(ev);
2144 }
2145 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => continue,
2146 ev => {
2147 error!("unexpected event {ev:?}")
2148 }
2149 }
2150 }
2151 let input = hex!("02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76");
2153 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2154 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2155 peer_id: *handle1.peer_id(),
2156 msg: Transactions(vec![signed_tx.clone()]),
2157 });
2158 assert!(transactions
2159 .transactions_by_peers
2160 .get(&signed_tx.hash())
2161 .unwrap()
2162 .contains(handle1.peer_id()));
2163
2164 poll_fn(|cx| {
2166 let _ = transactions.poll_unpin(cx);
2167 Poll::Ready(())
2168 })
2169 .await;
2170
2171 assert!(!pool.is_empty());
2172 assert!(pool.get(signed_tx.tx_hash()).is_some());
2173 handle.terminate().await;
2174 }
2175
2176 #[tokio::test(flavor = "multi_thread")]
2177 async fn test_on_get_pooled_transactions_network() {
2178 reth_tracing::init_test_tracing();
2179 let net = Testnet::create(2).await;
2180
2181 let mut handles = net.handles();
2182 let handle0 = handles.next().unwrap();
2183 let handle1 = handles.next().unwrap();
2184
2185 drop(handles);
2186 let handle = net.spawn();
2187
2188 let listener0 = handle0.event_listener();
2189
2190 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2191 let secret_key = SecretKey::new(&mut rand::thread_rng());
2192
2193 let client = NoopProvider::default();
2194 let pool = testing_pool();
2195 let config = NetworkConfigBuilder::new(secret_key)
2196 .disable_discovery()
2197 .listener_port(0)
2198 .build(client);
2199 let transactions_manager_config = config.transactions_manager_config.clone();
2200 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2201 .await
2202 .unwrap()
2203 .into_builder()
2204 .transactions(pool.clone(), transactions_manager_config)
2205 .split_with_handle();
2206 tokio::task::spawn(network);
2207
2208 network_handle.update_sync_state(SyncState::Idle);
2209
2210 assert!(!NetworkInfo::is_syncing(&network_handle));
2211
2212 let mut established = listener0.take(2);
2214 while let Some(ev) = established.next().await {
2215 match ev {
2216 NetworkEvent::ActivePeerSession { .. } |
2217 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2218 transactions.on_network_event(ev);
2219 }
2220 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => continue,
2221 ev => {
2222 error!("unexpected event {ev:?}")
2223 }
2224 }
2225 }
2226 handle.terminate().await;
2227
2228 let tx = MockTransaction::eip1559();
2229 let _ = transactions
2230 .pool
2231 .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
2232 .await;
2233
2234 let request = GetPooledTransactions(vec![*tx.get_hash()]);
2235
2236 let (send, receive) = oneshot::channel::<RequestResult<PooledTransactions>>();
2237
2238 transactions.on_network_tx_event(NetworkTransactionEvent::GetPooledTransactions {
2239 peer_id: *handle1.peer_id(),
2240 request,
2241 response: send,
2242 });
2243
2244 match receive.await.unwrap() {
2245 Ok(PooledTransactions(transactions)) => {
2246 assert_eq!(transactions.len(), 1);
2247 }
2248 Err(e) => {
2249 panic!("error: {e:?}");
2250 }
2251 }
2252 }
2253
2254 #[tokio::test]
2255 async fn test_max_retries_tx_request() {
2256 reth_tracing::init_test_tracing();
2257
2258 let mut tx_manager = new_tx_manager().await.0;
2259 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2260
2261 let peer_id_1 = PeerId::new([1; 64]);
2262 let peer_id_2 = PeerId::new([2; 64]);
2263 let eth_version = EthVersion::Eth66;
2264 let seen_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])];
2265
2266 let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2267 peer_1.seen_transactions.insert(seen_hashes[0]);
2270 peer_1.seen_transactions.insert(seen_hashes[1]);
2271 tx_manager.peers.insert(peer_id_1, peer_1);
2272
2273 let retries = 1;
2276 let mut backups = default_cache();
2277 backups.insert(peer_id_1);
2278
2279 let mut backups1 = default_cache();
2280 backups1.insert(peer_id_1);
2281 tx_fetcher
2282 .hashes_fetch_inflight_and_pending_fetch
2283 .insert(seen_hashes[1], TxFetchMetadata::new(retries, backups, None));
2284 tx_fetcher
2285 .hashes_fetch_inflight_and_pending_fetch
2286 .insert(seen_hashes[0], TxFetchMetadata::new(retries, backups1, None));
2287 tx_fetcher.hashes_pending_fetch.insert(seen_hashes[1]);
2288 tx_fetcher.hashes_pending_fetch.insert(seen_hashes[0]);
2289
2290 assert!(tx_fetcher.is_idle(&peer_id_1));
2292 assert_eq!(tx_fetcher.active_peers.len(), 0);
2293
2294 tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2296
2297 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2298
2299 assert!(tx_fetcher.hashes_pending_fetch.is_empty());
2300 assert!(!tx_fetcher.is_idle(&peer_id_1));
2302 assert_eq!(tx_fetcher.active_peers.len(), 1);
2303
2304 let req = to_mock_session_rx
2306 .recv()
2307 .await
2308 .expect("peer_1 session should receive request with buffered hashes");
2309 let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2310 let GetPooledTransactions(hashes) = request;
2311
2312 let hashes = hashes.into_iter().collect::<HashSet<_>>();
2313
2314 assert_eq!(hashes, seen_hashes.into_iter().collect::<HashSet<_>>());
2315
2316 response
2318 .send(Err(RequestError::BadResponse))
2319 .expect("should send peer_1 response to tx manager");
2320 let Some(FetchEvent::FetchError { peer_id, .. }) = tx_fetcher.next().await else {
2321 unreachable!()
2322 };
2323
2324 assert!(tx_fetcher.is_idle(&peer_id));
2326 assert_eq!(tx_fetcher.active_peers.len(), 0);
2327 assert_eq!(tx_fetcher.hashes_pending_fetch.len(), 2);
2329
2330 let (peer_2, mut to_mock_session_rx) = new_mock_session(peer_id_2, eth_version);
2331 tx_manager.peers.insert(peer_id_2, peer_2);
2332
2333 let msg =
2335 NewPooledTransactionHashes::Eth66(NewPooledTransactionHashes66(seen_hashes.to_vec()));
2336 tx_manager.on_new_pooled_transaction_hashes(peer_id_2, msg);
2337
2338 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2339
2340 assert_eq!(tx_fetcher.active_peers.len(), 1);
2342
2343 assert_eq!(tx_fetcher.hashes_fetch_inflight_and_pending_fetch.len(), 2);
2345 assert!(tx_fetcher.hashes_pending_fetch.is_empty());
2347
2348 let req = to_mock_session_rx
2350 .recv()
2351 .await
2352 .expect("peer_2 session should receive request with buffered hashes");
2353 let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2354
2355 response
2357 .send(Err(RequestError::BadResponse))
2358 .expect("should send peer_2 response to tx manager");
2359 let Some(FetchEvent::FetchError { .. }) = tx_fetcher.next().await else { unreachable!() };
2360
2361 assert!(tx_fetcher.hashes_pending_fetch.is_empty());
2364 assert_eq!(tx_fetcher.active_peers.len(), 0);
2365 }
2366
2367 #[test]
2368 fn test_transaction_builder_empty() {
2369 let mut builder =
2370 PropagateTransactionsBuilder::<TransactionSigned>::pooled(EthVersion::Eth68);
2371 assert!(builder.is_empty());
2372
2373 let mut factory = MockTransactionFactory::default();
2374 let tx = PropagateTransaction::new(Arc::new(factory.create_eip1559()));
2375 builder.push(&tx);
2376 assert!(!builder.is_empty());
2377
2378 let txs = builder.build();
2379 assert!(txs.full.is_none());
2380 let txs = txs.pooled.unwrap();
2381 assert_eq!(txs.len(), 1);
2382 }
2383
2384 #[test]
2385 fn test_transaction_builder_large() {
2386 let mut builder =
2387 PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2388 assert!(builder.is_empty());
2389
2390 let mut factory = MockTransactionFactory::default();
2391 let mut tx = factory.create_eip1559();
2392 tx.transaction.set_size(DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE + 1);
2394 let tx = Arc::new(tx);
2395 let tx = PropagateTransaction::new(tx);
2396 builder.push(&tx);
2397 assert!(!builder.is_empty());
2398
2399 let txs = builder.clone().build();
2400 assert!(txs.pooled.is_none());
2401 let txs = txs.full.unwrap();
2402 assert_eq!(txs.len(), 1);
2403
2404 builder.push(&tx);
2405
2406 let txs = builder.clone().build();
2407 let pooled = txs.pooled.unwrap();
2408 assert_eq!(pooled.len(), 1);
2409 let txs = txs.full.unwrap();
2410 assert_eq!(txs.len(), 1);
2411 }
2412
2413 #[test]
2414 fn test_transaction_builder_eip4844() {
2415 let mut builder =
2416 PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2417 assert!(builder.is_empty());
2418
2419 let mut factory = MockTransactionFactory::default();
2420 let tx = PropagateTransaction::new(Arc::new(factory.create_eip4844()));
2421 builder.push(&tx);
2422 assert!(!builder.is_empty());
2423
2424 let txs = builder.clone().build();
2425 assert!(txs.full.is_none());
2426 let txs = txs.pooled.unwrap();
2427 assert_eq!(txs.len(), 1);
2428
2429 let tx = PropagateTransaction::new(Arc::new(factory.create_eip1559()));
2430 builder.push(&tx);
2431
2432 let txs = builder.clone().build();
2433 let pooled = txs.pooled.unwrap();
2434 assert_eq!(pooled.len(), 1);
2435 let txs = txs.full.unwrap();
2436 assert_eq!(txs.len(), 1);
2437 }
2438
2439 #[tokio::test]
2440 async fn test_propagate_full() {
2441 reth_tracing::init_test_tracing();
2442
2443 let (mut tx_manager, network) = new_tx_manager().await;
2444 let peer_id = PeerId::random();
2445
2446 network.handle().update_sync_state(SyncState::Idle);
2448
2449 let (tx, _rx) = mpsc::channel::<PeerRequest>(1);
2451 let session_info = SessionInfo {
2452 peer_id,
2453 remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
2454 client_version: Arc::from(""),
2455 capabilities: Arc::new(vec![].into()),
2456 status: Arc::new(Default::default()),
2457 version: EthVersion::Eth68,
2458 };
2459 let messages: PeerRequestSender<PeerRequest> = PeerRequestSender::new(peer_id, tx);
2460 tx_manager
2461 .on_network_event(NetworkEvent::ActivePeerSession { info: session_info, messages });
2462 let mut propagate = vec![];
2463 let mut factory = MockTransactionFactory::default();
2464 let eip1559_tx = Arc::new(factory.create_eip1559());
2465 propagate.push(PropagateTransaction::new(eip1559_tx.clone()));
2466 let eip4844_tx = Arc::new(factory.create_eip4844());
2467 propagate.push(PropagateTransaction::new(eip4844_tx.clone()));
2468
2469 let propagated =
2470 tx_manager.propagate_transactions(propagate.clone(), PropagationMode::Basic);
2471 assert_eq!(propagated.0.len(), 2);
2472 let prop_txs = propagated.0.get(eip1559_tx.transaction.hash()).unwrap();
2473 assert_eq!(prop_txs.len(), 1);
2474 assert!(prop_txs[0].is_full());
2475
2476 let prop_txs = propagated.0.get(eip4844_tx.transaction.hash()).unwrap();
2477 assert_eq!(prop_txs.len(), 1);
2478 assert!(prop_txs[0].is_hash());
2479
2480 let peer = tx_manager.peers.get(&peer_id).unwrap();
2481 assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2482 assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2483 peer.seen_transactions.contains(eip4844_tx.transaction.hash());
2484
2485 let propagated = tx_manager.propagate_transactions(propagate, PropagationMode::Basic);
2487 assert!(propagated.0.is_empty());
2488 }
2489}