reth_network/transactions/
mod.rs

1//! Transactions management for the p2p network.
2
3/// Aggregation on configurable parameters for [`TransactionsManager`].
4pub mod config;
5/// Default and spec'd bounds.
6pub mod constants;
7/// Component responsible for fetching transactions from [`NewPooledTransactionHashes`].
8pub mod fetcher;
9pub mod validation;
10
11pub use self::constants::{
12    tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
13    SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
14};
15pub use config::{TransactionFetcherConfig, TransactionPropagationMode, TransactionsManagerConfig};
16pub use validation::*;
17
18pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
19
20use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE};
21use crate::{
22    budget::{
23        DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
24        DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS,
25        DEFAULT_BUDGET_TRY_DRAIN_STREAM,
26    },
27    cache::LruCache,
28    duration_metered_exec, metered_poll_nested_stream_with_budget,
29    metrics::{TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
30    NetworkHandle,
31};
32use alloy_primitives::{TxHash, B256};
33use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
34use futures::{stream::FuturesUnordered, Future, StreamExt};
35use reth_eth_wire::{
36    DedupPayload, EthNetworkPrimitives, EthVersion, GetPooledTransactions, HandleMempoolData,
37    HandleVersionedMempoolData, NetworkPrimitives, NewPooledTransactionHashes,
38    NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions,
39    RequestTxHashes, Transactions,
40};
41use reth_metrics::common::mpsc::UnboundedMeteredReceiver;
42use reth_network_api::{
43    events::{PeerEvent, SessionInfo},
44    NetworkEvent, NetworkEventListenerProvider, PeerRequest, PeerRequestSender, Peers,
45};
46use reth_network_p2p::{
47    error::{RequestError, RequestResult},
48    sync::SyncStateProvider,
49};
50use reth_network_peers::PeerId;
51use reth_network_types::ReputationChangeKind;
52use reth_primitives::{transaction::SignedTransactionIntoRecoveredExt, TransactionSigned};
53use reth_primitives_traits::SignedTransaction;
54use reth_tokio_util::EventStream;
55use reth_transaction_pool::{
56    error::{PoolError, PoolResult},
57    GetPooledTransactionLimit, PoolTransaction, PropagateKind, PropagatedTransactions,
58    TransactionPool, ValidPoolTransaction,
59};
60use std::{
61    collections::{hash_map::Entry, HashMap, HashSet},
62    pin::Pin,
63    sync::{
64        atomic::{AtomicUsize, Ordering},
65        Arc,
66    },
67    task::{Context, Poll},
68    time::{Duration, Instant},
69};
70use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
71use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
72use tracing::{debug, trace};
73
74/// The future for importing transactions into the pool.
75///
76/// Resolves with the result of each transaction import.
77pub type PoolImportFuture = Pin<Box<dyn Future<Output = Vec<PoolResult<TxHash>>> + Send + 'static>>;
78
79/// Api to interact with [`TransactionsManager`] task.
80///
81/// This can be obtained via [`TransactionsManager::handle`] and can be used to manually interact
82/// with the [`TransactionsManager`] task once it is spawned.
83///
84/// For example [`TransactionsHandle::get_peer_transaction_hashes`] returns the transaction hashes
85/// known by a specific peer.
86#[derive(Debug, Clone)]
87pub struct TransactionsHandle<N: NetworkPrimitives = EthNetworkPrimitives> {
88    /// Command channel to the [`TransactionsManager`]
89    manager_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
90}
91
92/// Implementation of the `TransactionsHandle` API for use in testnet via type
93/// [`PeerHandle`](crate::test_utils::PeerHandle).
94impl<N: NetworkPrimitives> TransactionsHandle<N> {
95    fn send(&self, cmd: TransactionsCommand<N>) {
96        let _ = self.manager_tx.send(cmd);
97    }
98
99    /// Fetch the [`PeerRequestSender`] for the given peer.
100    async fn peer_handle(
101        &self,
102        peer_id: PeerId,
103    ) -> Result<Option<PeerRequestSender<PeerRequest<N>>>, RecvError> {
104        let (tx, rx) = oneshot::channel();
105        self.send(TransactionsCommand::GetPeerSender { peer_id, peer_request_sender: tx });
106        rx.await
107    }
108
109    /// Manually propagate the transaction that belongs to the hash.
110    pub fn propagate(&self, hash: TxHash) {
111        self.send(TransactionsCommand::PropagateHash(hash))
112    }
113
114    /// Manually propagate the transaction hash to a specific peer.
115    ///
116    /// Note: this only propagates if the pool contains the transaction.
117    pub fn propagate_hash_to(&self, hash: TxHash, peer: PeerId) {
118        self.propagate_hashes_to(Some(hash), peer)
119    }
120
121    /// Manually propagate the transaction hashes to a specific peer.
122    ///
123    /// Note: this only propagates the transactions that are known to the pool.
124    pub fn propagate_hashes_to(&self, hash: impl IntoIterator<Item = TxHash>, peer: PeerId) {
125        let hashes = hash.into_iter().collect::<Vec<_>>();
126        if hashes.is_empty() {
127            return
128        }
129        self.send(TransactionsCommand::PropagateHashesTo(hashes, peer))
130    }
131
132    /// Request the active peer IDs from the [`TransactionsManager`].
133    pub async fn get_active_peers(&self) -> Result<HashSet<PeerId>, RecvError> {
134        let (tx, rx) = oneshot::channel();
135        self.send(TransactionsCommand::GetActivePeers(tx));
136        rx.await
137    }
138
139    /// Manually propagate full transactions to a specific peer.
140    ///
141    /// Do nothing if transactions are empty.
142    pub fn propagate_transactions_to(&self, transactions: Vec<TxHash>, peer: PeerId) {
143        if transactions.is_empty() {
144            return
145        }
146        self.send(TransactionsCommand::PropagateTransactionsTo(transactions, peer))
147    }
148
149    /// Manually propagate the given transactions to all peers.
150    ///
151    /// It's up to the [`TransactionsManager`] whether the transactions are sent as hashes or in
152    /// full.
153    ///
154    /// Do nothing if transactions are empty.
155    pub fn propagate_transactions(&self, transactions: Vec<TxHash>) {
156        if transactions.is_empty() {
157            return
158        }
159        self.send(TransactionsCommand::PropagateTransactions(transactions))
160    }
161
162    /// Request the transaction hashes known by specific peers.
163    pub async fn get_transaction_hashes(
164        &self,
165        peers: Vec<PeerId>,
166    ) -> Result<HashMap<PeerId, HashSet<TxHash>>, RecvError> {
167        if peers.is_empty() {
168            return Ok(Default::default())
169        }
170        let (tx, rx) = oneshot::channel();
171        self.send(TransactionsCommand::GetTransactionHashes { peers, tx });
172        rx.await
173    }
174
175    /// Request the transaction hashes known by a specific peer.
176    pub async fn get_peer_transaction_hashes(
177        &self,
178        peer: PeerId,
179    ) -> Result<HashSet<TxHash>, RecvError> {
180        let res = self.get_transaction_hashes(vec![peer]).await?;
181        Ok(res.into_values().next().unwrap_or_default())
182    }
183
184    /// Requests the transactions directly from the given peer.
185    ///
186    /// Returns `None` if the peer is not connected.
187    ///
188    /// **Note**: this returns the response from the peer as received.
189    pub async fn get_pooled_transactions_from(
190        &self,
191        peer_id: PeerId,
192        hashes: Vec<B256>,
193    ) -> Result<Option<Vec<N::PooledTransaction>>, RequestError> {
194        let Some(peer) = self.peer_handle(peer_id).await? else { return Ok(None) };
195
196        let (tx, rx) = oneshot::channel();
197        let request = PeerRequest::GetPooledTransactions { request: hashes.into(), response: tx };
198        peer.try_send(request).ok();
199
200        rx.await?.map(|res| Some(res.0))
201    }
202}
203
204/// Manages transactions on top of the p2p network.
205///
206/// This can be spawned to another task and is supposed to be run as background service.
207/// [`TransactionsHandle`] can be used as frontend to programmatically send commands to it and
208/// interact with it.
209///
210/// The [`TransactionsManager`] is responsible for:
211///    - handling incoming eth messages for transactions.
212///    - serving transaction requests.
213///    - propagate transactions
214///
215/// This type communicates with the [`NetworkManager`](crate::NetworkManager) in both directions.
216///   - receives incoming network messages.
217///   - sends messages to dispatch (responses, propagate tx)
218///
219/// It is directly connected to the [`TransactionPool`] to retrieve requested transactions and
220/// propagate new transactions over the network.
221#[derive(Debug)]
222#[must_use = "Manager does nothing unless polled."]
223pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives> {
224    /// Access to the transaction pool.
225    pool: Pool,
226    /// Network access.
227    network: NetworkHandle<N>,
228    /// Subscriptions to all network related events.
229    ///
230    /// From which we get all new incoming transaction related messages.
231    network_events: EventStream<NetworkEvent<PeerRequest<N>>>,
232    /// Transaction fetcher to handle inflight and missing transaction requests.
233    transaction_fetcher: TransactionFetcher<N>,
234    /// All currently pending transactions grouped by peers.
235    ///
236    /// This way we can track incoming transactions and prevent multiple pool imports for the same
237    /// transaction
238    transactions_by_peers: HashMap<TxHash, HashSet<PeerId>>,
239    /// Transactions that are currently imported into the `Pool`.
240    ///
241    /// The import process includes:
242    ///  - validation of the transactions, e.g. transaction is well formed: valid tx type, fees are
243    ///    valid, or for 4844 transaction the blobs are valid. See also
244    ///    [`EthTransactionValidator`](reth_transaction_pool::validate::EthTransactionValidator)
245    /// - if the transaction is valid, it is added into the pool.
246    ///
247    /// Once the new transaction reaches the __pending__ state it will be emitted by the pool via
248    /// [`TransactionPool::pending_transactions_listener`] and arrive at the `pending_transactions`
249    /// receiver.
250    pool_imports: FuturesUnordered<PoolImportFuture>,
251    /// Stats on pending pool imports that help the node self-monitor.
252    pending_pool_imports_info: PendingPoolImportsInfo,
253    /// Bad imports.
254    bad_imports: LruCache<TxHash>,
255    /// All the connected peers.
256    peers: HashMap<PeerId, PeerMetadata<N>>,
257    /// Send half for the command channel.
258    ///
259    /// This is kept so that a new [`TransactionsHandle`] can be created at any time.
260    command_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
261    /// Incoming commands from [`TransactionsHandle`].
262    ///
263    /// This will only receive commands if a user manually sends a command to the manager through
264    /// the [`TransactionsHandle`] to interact with this type directly.
265    command_rx: UnboundedReceiverStream<TransactionsCommand<N>>,
266    /// A stream that yields new __pending__ transactions.
267    ///
268    /// A transaction is considered __pending__ if it is executable on the current state of the
269    /// chain. In other words, this only yields transactions that satisfy all consensus
270    /// requirements, these include:
271    ///   - no nonce gaps
272    ///   - all dynamic fee requirements are (currently) met
273    ///   - account has enough balance to cover the transaction's gas
274    pending_transactions: ReceiverStream<TxHash>,
275    /// Incoming events from the [`NetworkManager`](crate::NetworkManager).
276    transaction_events: UnboundedMeteredReceiver<NetworkTransactionEvent<N>>,
277    /// How the `TransactionsManager` is configured.
278    config: TransactionsManagerConfig,
279    /// `TransactionsManager` metrics
280    metrics: TransactionsManagerMetrics,
281}
282
283impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
284    /// Sets up a new instance.
285    ///
286    /// Note: This expects an existing [`NetworkManager`](crate::NetworkManager) instance.
287    pub fn new(
288        network: NetworkHandle<N>,
289        pool: Pool,
290        from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
291        transactions_manager_config: TransactionsManagerConfig,
292    ) -> Self {
293        let network_events = network.event_listener();
294
295        let (command_tx, command_rx) = mpsc::unbounded_channel();
296
297        let transaction_fetcher = TransactionFetcher::with_transaction_fetcher_config(
298            &transactions_manager_config.transaction_fetcher_config,
299        );
300
301        // install a listener for new __pending__ transactions that are allowed to be propagated
302        // over the network
303        let pending = pool.pending_transactions_listener();
304        let pending_pool_imports_info = PendingPoolImportsInfo::default();
305        let metrics = TransactionsManagerMetrics::default();
306        metrics
307            .capacity_pending_pool_imports
308            .increment(pending_pool_imports_info.max_pending_pool_imports as u64);
309
310        Self {
311            pool,
312            network,
313            network_events,
314            transaction_fetcher,
315            transactions_by_peers: Default::default(),
316            pool_imports: Default::default(),
317            pending_pool_imports_info: PendingPoolImportsInfo::new(
318                DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS,
319            ),
320            bad_imports: LruCache::new(DEFAULT_MAX_COUNT_BAD_IMPORTS),
321            peers: Default::default(),
322            command_tx,
323            command_rx: UnboundedReceiverStream::new(command_rx),
324            pending_transactions: ReceiverStream::new(pending),
325            transaction_events: UnboundedMeteredReceiver::new(
326                from_network,
327                NETWORK_POOL_TRANSACTIONS_SCOPE,
328            ),
329            config: transactions_manager_config,
330            metrics,
331        }
332    }
333
334    /// Returns a new handle that can send commands to this type.
335    pub fn handle(&self) -> TransactionsHandle<N> {
336        TransactionsHandle { manager_tx: self.command_tx.clone() }
337    }
338
339    /// Returns `true` if [`TransactionsManager`] has capacity to request pending hashes. Returns
340    /// `false` if [`TransactionsManager`] is operating close to full capacity.
341    fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
342        self.pending_pool_imports_info
343            .has_capacity(self.pending_pool_imports_info.max_pending_pool_imports) &&
344            self.transaction_fetcher.has_capacity_for_fetching_pending_hashes()
345    }
346
347    fn report_peer_bad_transactions(&self, peer_id: PeerId) {
348        self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
349        self.metrics.reported_bad_transactions.increment(1);
350    }
351
352    fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) {
353        trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change");
354        self.network.reputation_change(peer_id, kind);
355    }
356
357    fn report_already_seen(&self, peer_id: PeerId) {
358        trace!(target: "net::tx", ?peer_id, "Penalizing peer for already seen transaction");
359        self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction);
360    }
361
362    /// Clear the transaction
363    fn on_good_import(&mut self, hash: TxHash) {
364        self.transactions_by_peers.remove(&hash);
365    }
366
367    /// Penalize the peers that intentionally sent the bad transaction, and cache it to avoid
368    /// fetching or importing it again.
369    ///
370    /// Errors that count as bad transactions are:
371    ///
372    /// - intrinsic gas too low
373    /// - exceeds gas limit
374    /// - gas uint overflow
375    /// - exceeds max init code size
376    /// - oversized data
377    /// - signer account has bytecode
378    /// - chain id mismatch
379    /// - old legacy chain id
380    /// - tx type not supported
381    ///
382    /// (and additionally for blobs txns...)
383    ///
384    /// - no blobs
385    /// - too many blobs
386    /// - invalid kzg proof
387    /// - kzg error
388    /// - not blob transaction (tx type mismatch)
389    /// - wrong versioned kzg commitment hash
390    fn on_bad_import(&mut self, err: PoolError) {
391        let peers = self.transactions_by_peers.remove(&err.hash);
392
393        // if we're _currently_ syncing, we ignore a bad transaction
394        if !err.is_bad_transaction() || self.network.is_syncing() {
395            return
396        }
397        // otherwise we penalize the peer that sent the bad transaction, with the assumption that
398        // the peer should have known that this transaction is bad (e.g. violating consensus rules)
399        if let Some(peers) = peers {
400            for peer_id in peers {
401                self.report_peer_bad_transactions(peer_id);
402            }
403        }
404        self.metrics.bad_imports.increment(1);
405        self.bad_imports.insert(err.hash);
406    }
407
408    /// Runs an operation to fetch hashes that are cached in [`TransactionFetcher`].
409    fn on_fetch_hashes_pending_fetch(&mut self) {
410        // try drain transaction hashes pending fetch
411        let info = &self.pending_pool_imports_info;
412        let max_pending_pool_imports = info.max_pending_pool_imports;
413        let has_capacity_wrt_pending_pool_imports =
414            |divisor| info.has_capacity(max_pending_pool_imports / divisor);
415
416        self.transaction_fetcher
417            .on_fetch_pending_hashes(&self.peers, has_capacity_wrt_pending_pool_imports);
418    }
419
420    fn on_request_error(&self, peer_id: PeerId, req_err: RequestError) {
421        let kind = match req_err {
422            RequestError::UnsupportedCapability => ReputationChangeKind::BadProtocol,
423            RequestError::Timeout => ReputationChangeKind::Timeout,
424            RequestError::ChannelClosed | RequestError::ConnectionDropped => {
425                // peer is already disconnected
426                return
427            }
428            RequestError::BadResponse => return self.report_peer_bad_transactions(peer_id),
429        };
430        self.report_peer(peer_id, kind);
431    }
432
433    #[inline]
434    fn update_poll_metrics(&self, start: Instant, poll_durations: TxManagerPollDurations) {
435        let metrics = &self.metrics;
436
437        let TxManagerPollDurations {
438            acc_network_events,
439            acc_pending_imports,
440            acc_tx_events,
441            acc_imported_txns,
442            acc_fetch_events,
443            acc_pending_fetch,
444            acc_cmds,
445        } = poll_durations;
446
447        // update metrics for whole poll function
448        metrics.duration_poll_tx_manager.set(start.elapsed().as_secs_f64());
449        // update metrics for nested expressions
450        metrics.acc_duration_poll_network_events.set(acc_network_events.as_secs_f64());
451        metrics.acc_duration_poll_pending_pool_imports.set(acc_pending_imports.as_secs_f64());
452        metrics.acc_duration_poll_transaction_events.set(acc_tx_events.as_secs_f64());
453        metrics.acc_duration_poll_imported_transactions.set(acc_imported_txns.as_secs_f64());
454        metrics.acc_duration_poll_fetch_events.set(acc_fetch_events.as_secs_f64());
455        metrics.acc_duration_fetch_pending_hashes.set(acc_pending_fetch.as_secs_f64());
456        metrics.acc_duration_poll_commands.set(acc_cmds.as_secs_f64());
457    }
458}
459
460impl<Pool, N> TransactionsManager<Pool, N>
461where
462    Pool: TransactionPool,
463    N: NetworkPrimitives,
464{
465    /// Processes a batch import results.
466    fn on_batch_import_result(&mut self, batch_results: Vec<PoolResult<TxHash>>) {
467        for res in batch_results {
468            match res {
469                Ok(hash) => {
470                    self.on_good_import(hash);
471                }
472                Err(err) => {
473                    self.on_bad_import(err);
474                }
475            }
476        }
477    }
478
479    /// Request handler for an incoming `NewPooledTransactionHashes`
480    fn on_new_pooled_transaction_hashes(
481        &mut self,
482        peer_id: PeerId,
483        msg: NewPooledTransactionHashes,
484    ) {
485        // If the node is initially syncing, ignore transactions
486        if self.network.is_initially_syncing() {
487            return
488        }
489        if self.network.tx_gossip_disabled() {
490            return
491        }
492
493        // get handle to peer's session, if the session is still active
494        let Some(peer) = self.peers.get_mut(&peer_id) else {
495            trace!(
496                peer_id = format!("{peer_id:#}"),
497                ?msg,
498                "discarding announcement from inactive peer"
499            );
500
501            return
502        };
503        let client = peer.client_version.clone();
504
505        // keep track of the transactions the peer knows
506        let mut count_txns_already_seen_by_peer = 0;
507        for tx in msg.iter_hashes().copied() {
508            if !peer.seen_transactions.insert(tx) {
509                count_txns_already_seen_by_peer += 1;
510            }
511        }
512        if count_txns_already_seen_by_peer > 0 {
513            // this may occur if transactions are sent or announced to a peer, at the same time as
514            // the peer sends/announces those hashes to us. this is because, marking
515            // txns as seen by a peer is done optimistically upon sending them to the
516            // peer.
517            self.metrics.messages_with_hashes_already_seen_by_peer.increment(1);
518            self.metrics
519                .occurrences_hash_already_seen_by_peer
520                .increment(count_txns_already_seen_by_peer);
521
522            trace!(target: "net::tx",
523                %count_txns_already_seen_by_peer,
524                peer_id=format!("{peer_id:#}"),
525                ?client,
526                "Peer sent hashes that have already been marked as seen by peer"
527            );
528
529            self.report_already_seen(peer_id);
530        }
531
532        // 1. filter out spam
533        let (validation_outcome, mut partially_valid_msg) =
534            self.transaction_fetcher.filter_valid_message.partially_filter_valid_entries(msg);
535
536        if validation_outcome == FilterOutcome::ReportPeer {
537            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
538        }
539
540        // 2. filter out transactions pending import to pool
541        partially_valid_msg.retain_by_hash(|hash| !self.transactions_by_peers.contains_key(hash));
542
543        // 3. filter out known hashes
544        //
545        // known txns have already been successfully fetched or received over gossip.
546        //
547        // most hashes will be filtered out here since this the mempool protocol is a gossip
548        // protocol, healthy peers will send many of the same hashes.
549        //
550        let hashes_count_pre_pool_filter = partially_valid_msg.len();
551        self.pool.retain_unknown(&mut partially_valid_msg);
552        if hashes_count_pre_pool_filter > partially_valid_msg.len() {
553            let already_known_hashes_count =
554                hashes_count_pre_pool_filter - partially_valid_msg.len();
555            self.metrics
556                .occurrences_hashes_already_in_pool
557                .increment(already_known_hashes_count as u64);
558        }
559
560        if partially_valid_msg.is_empty() {
561            // nothing to request
562            return
563        }
564
565        // 4. filter out invalid entries (spam)
566        //
567        // validates messages with respect to the given network, e.g. allowed tx types
568        //
569        let (validation_outcome, mut valid_announcement_data) = if partially_valid_msg
570            .msg_version()
571            .expect("partially valid announcement should have version")
572            .is_eth68()
573        {
574            // validate eth68 announcement data
575            self.transaction_fetcher
576                .filter_valid_message
577                .filter_valid_entries_68(partially_valid_msg)
578        } else {
579            // validate eth66 announcement data
580            self.transaction_fetcher
581                .filter_valid_message
582                .filter_valid_entries_66(partially_valid_msg)
583        };
584
585        if validation_outcome == FilterOutcome::ReportPeer {
586            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
587        }
588
589        if valid_announcement_data.is_empty() {
590            // no valid announcement data
591            return
592        }
593
594        // 5. filter out already seen unknown hashes
595        //
596        // seen hashes are already in the tx fetcher, pending fetch.
597        //
598        // for any seen hashes add the peer as fallback. unseen hashes are loaded into the tx
599        // fetcher, hence they should be valid at this point.
600        let bad_imports = &self.bad_imports;
601        self.transaction_fetcher.filter_unseen_and_pending_hashes(
602            &mut valid_announcement_data,
603            |hash| bad_imports.contains(hash),
604            &peer_id,
605            |peer_id| self.peers.contains_key(&peer_id),
606            &client,
607        );
608
609        if valid_announcement_data.is_empty() {
610            // nothing to request
611            return
612        }
613
614        trace!(target: "net::tx::propagation",
615            peer_id=format!("{peer_id:#}"),
616            hashes_len=valid_announcement_data.iter().count(),
617            hashes=?valid_announcement_data.keys().collect::<Vec<_>>(),
618            msg_version=%valid_announcement_data.msg_version(),
619            client_version=%client,
620            "received previously unseen and pending hashes in announcement from peer"
621        );
622
623        // only send request for hashes to idle peer, otherwise buffer hashes storing peer as
624        // fallback
625        if !self.transaction_fetcher.is_idle(&peer_id) {
626            // load message version before announcement data is destructed in packing
627            let msg_version = valid_announcement_data.msg_version();
628            let (hashes, _version) = valid_announcement_data.into_request_hashes();
629
630            trace!(target: "net::tx",
631                peer_id=format!("{peer_id:#}"),
632                hashes=?*hashes,
633                %msg_version,
634                %client,
635                "buffering hashes announced by busy peer"
636            );
637
638            self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id));
639
640            return
641        }
642
643        let mut hashes_to_request =
644            RequestTxHashes::with_capacity(valid_announcement_data.len() / 4);
645        let surplus_hashes =
646            self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data);
647
648        if !surplus_hashes.is_empty() {
649            trace!(target: "net::tx",
650                peer_id=format!("{peer_id:#}"),
651                surplus_hashes=?*surplus_hashes,
652                %client,
653                "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
654            );
655
656            self.transaction_fetcher.buffer_hashes(surplus_hashes, Some(peer_id));
657        }
658
659        trace!(target: "net::tx",
660            peer_id=format!("{peer_id:#}"),
661            hashes=?*hashes_to_request,
662            %client,
663            "sending hashes in `GetPooledTransactions` request to peer's session"
664        );
665
666        // request the missing transactions
667        //
668        // get handle to peer's session again, at this point we know it exists
669        let Some(peer) = self.peers.get_mut(&peer_id) else { return };
670        if let Some(failed_to_request_hashes) =
671            self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer)
672        {
673            let conn_eth_version = peer.version;
674
675            trace!(target: "net::tx",
676                peer_id=format!("{peer_id:#}"),
677                failed_to_request_hashes=?*failed_to_request_hashes,
678                %conn_eth_version,
679                %client,
680                "sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
681            );
682            self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
683        }
684    }
685}
686
687impl<Pool, N> TransactionsManager<Pool, N>
688where
689    Pool: TransactionPool + 'static,
690    N: NetworkPrimitives<
691        BroadcastedTransaction: SignedTransaction,
692        PooledTransaction: SignedTransaction,
693    >,
694    Pool::Transaction:
695        PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
696{
697    /// Invoked when transactions in the local mempool are considered __pending__.
698    ///
699    /// When a transaction in the local mempool is moved to the pending pool, we propagate them to
700    /// connected peers over network using the `Transactions` and `NewPooledTransactionHashes`
701    /// messages. The Transactions message relays complete transaction objects and is typically
702    /// sent to a small, random fraction of connected peers.
703    ///
704    /// All other peers receive a notification of the transaction hash and can request the
705    /// complete transaction object if it is unknown to them. The dissemination of complete
706    /// transactions to a fraction of peers usually ensures that all nodes receive the transaction
707    /// and won't need to request it.
708    fn on_new_pending_transactions(&mut self, hashes: Vec<TxHash>) {
709        // Nothing to propagate while initially syncing
710        if self.network.is_initially_syncing() {
711            return
712        }
713        if self.network.tx_gossip_disabled() {
714            return
715        }
716
717        trace!(target: "net::tx", num_hashes=?hashes.len(), "Start propagating transactions");
718
719        self.propagate_all(hashes);
720    }
721
722    /// Propagate the full transactions to a specific peer.
723    ///
724    /// Returns the propagated transactions.
725    fn propagate_full_transactions_to_peer(
726        &mut self,
727        txs: Vec<TxHash>,
728        peer_id: PeerId,
729        propagation_mode: PropagationMode,
730    ) -> Option<PropagatedTransactions> {
731        trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");
732
733        let peer = self.peers.get_mut(&peer_id)?;
734        let mut propagated = PropagatedTransactions::default();
735
736        // filter all transactions unknown to the peer
737        let mut full_transactions = FullTransactionsBuilder::new(peer.version);
738
739        let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::new);
740
741        if propagation_mode.is_forced() {
742            // skip cache check if forced
743            full_transactions.extend(to_propagate);
744        } else {
745            // Iterate through the transactions to propagate and fill the hashes and full
746            // transaction
747            for tx in to_propagate {
748                if !peer.seen_transactions.contains(tx.tx_hash()) {
749                    // Only include if the peer hasn't seen the transaction
750                    full_transactions.push(&tx);
751                }
752            }
753        }
754
755        if full_transactions.is_empty() {
756            // nothing to propagate
757            return None
758        }
759
760        let PropagateTransactions { pooled, full } = full_transactions.build();
761
762        // send hashes if any
763        if let Some(new_pooled_hashes) = pooled {
764            for hash in new_pooled_hashes.iter_hashes().copied() {
765                propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
766                // mark transaction as seen by peer
767                peer.seen_transactions.insert(hash);
768            }
769
770            // send hashes of transactions
771            self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
772        }
773
774        // send full transactions, if any
775        if let Some(new_full_transactions) = full {
776            for tx in &new_full_transactions {
777                propagated.0.entry(*tx.tx_hash()).or_default().push(PropagateKind::Full(peer_id));
778                // mark transaction as seen by peer
779                peer.seen_transactions.insert(*tx.tx_hash());
780            }
781
782            // send full transactions
783            self.network.send_transactions(peer_id, new_full_transactions);
784        }
785
786        // Update propagated transactions metrics
787        self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
788
789        Some(propagated)
790    }
791
792    /// Propagate the transaction hashes to the given peer
793    ///
794    /// Note: This will only send the hashes for transactions that exist in the pool.
795    fn propagate_hashes_to(
796        &mut self,
797        hashes: Vec<TxHash>,
798        peer_id: PeerId,
799        propagation_mode: PropagationMode,
800    ) {
801        trace!(target: "net::tx", "Start propagating transactions as hashes");
802
803        // This fetches a transactions from the pool, including the blob transactions, which are
804        // only ever sent as hashes.
805        let propagated = {
806            let Some(peer) = self.peers.get_mut(&peer_id) else {
807                // no such peer
808                return
809            };
810
811            let to_propagate = self
812                .pool
813                .get_all(hashes)
814                .into_iter()
815                .map(PropagateTransaction::new)
816                .collect::<Vec<_>>();
817
818            let mut propagated = PropagatedTransactions::default();
819
820            // check if transaction is known to peer
821            let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);
822
823            if propagation_mode.is_forced() {
824                hashes.extend(to_propagate)
825            } else {
826                for tx in to_propagate {
827                    if !peer.seen_transactions.contains(tx.tx_hash()) {
828                        // Include if the peer hasn't seen it
829                        hashes.push(&tx);
830                    }
831                }
832            }
833
834            let new_pooled_hashes = hashes.build();
835
836            if new_pooled_hashes.is_empty() {
837                // nothing to propagate
838                return
839            }
840
841            for hash in new_pooled_hashes.iter_hashes().copied() {
842                propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
843            }
844
845            trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer");
846
847            // send hashes of transactions
848            self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
849
850            // Update propagated transactions metrics
851            self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
852
853            propagated
854        };
855
856        // notify pool so events get fired
857        self.pool.on_propagated(propagated);
858    }
859
860    /// Propagate the transactions to all connected peers either as full objects or hashes.
861    ///
862    /// The message for new pooled hashes depends on the negotiated version of the stream.
863    /// See [`NewPooledTransactionHashes`]
864    ///
865    /// Note: EIP-4844 are disallowed from being broadcast in full and are only ever sent as hashes, see also <https://eips.ethereum.org/EIPS/eip-4844#networking>.
866    fn propagate_transactions(
867        &mut self,
868        to_propagate: Vec<PropagateTransaction<N::BroadcastedTransaction>>,
869        propagation_mode: PropagationMode,
870    ) -> PropagatedTransactions {
871        let mut propagated = PropagatedTransactions::default();
872        if self.network.tx_gossip_disabled() {
873            return propagated
874        }
875
876        // send full transactions to a set of the connected peers based on the configured mode
877        let max_num_full = self.config.propagation_mode.full_peer_count(self.peers.len());
878
879        // Note: Assuming ~random~ order due to random state of the peers map hasher
880        for (peer_idx, (peer_id, peer)) in self.peers.iter_mut().enumerate() {
881            // determine whether to send full tx objects or hashes.
882            let mut builder = if peer_idx > max_num_full {
883                PropagateTransactionsBuilder::pooled(peer.version)
884            } else {
885                PropagateTransactionsBuilder::full(peer.version)
886            };
887
888            if propagation_mode.is_forced() {
889                builder.extend(to_propagate.iter());
890            } else {
891                // Iterate through the transactions to propagate and fill the hashes and full
892                // transaction lists, before deciding whether or not to send full transactions to
893                // the peer.
894                for tx in &to_propagate {
895                    // Only proceed if the transaction is not in the peer's list of seen
896                    // transactions
897                    if !peer.seen_transactions.contains(tx.tx_hash()) {
898                        builder.push(tx);
899                    }
900                }
901            }
902
903            if builder.is_empty() {
904                trace!(target: "net::tx", ?peer_id, "Nothing to propagate to peer; has seen all transactions");
905                continue
906            }
907
908            let PropagateTransactions { pooled, full } = builder.build();
909
910            // send hashes if any
911            if let Some(mut new_pooled_hashes) = pooled {
912                // enforce tx soft limit per message for the (unlikely) event the number of
913                // hashes exceeds it
914                new_pooled_hashes
915                    .truncate(SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE);
916
917                for hash in new_pooled_hashes.iter_hashes().copied() {
918                    propagated.0.entry(hash).or_default().push(PropagateKind::Hash(*peer_id));
919                    // mark transaction as seen by peer
920                    peer.seen_transactions.insert(hash);
921                }
922
923                trace!(target: "net::tx", ?peer_id, num_txs=?new_pooled_hashes.len(), "Propagating tx hashes to peer");
924
925                // send hashes of transactions
926                self.network.send_transactions_hashes(*peer_id, new_pooled_hashes);
927            }
928
929            // send full transactions, if any
930            if let Some(new_full_transactions) = full {
931                for tx in &new_full_transactions {
932                    propagated
933                        .0
934                        .entry(*tx.tx_hash())
935                        .or_default()
936                        .push(PropagateKind::Full(*peer_id));
937                    // mark transaction as seen by peer
938                    peer.seen_transactions.insert(*tx.tx_hash());
939                }
940
941                trace!(target: "net::tx", ?peer_id, num_txs=?new_full_transactions.len(), "Propagating full transactions to peer");
942
943                // send full transactions
944                self.network.send_transactions(*peer_id, new_full_transactions);
945            }
946        }
947
948        // Update propagated transactions metrics
949        self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
950
951        propagated
952    }
953
954    /// Propagates the given transactions to the peers
955    ///
956    /// This fetches all transaction from the pool, including the 4844 blob transactions but
957    /// __without__ their sidecar, because 4844 transactions are only ever announced as hashes.
958    fn propagate_all(&mut self, hashes: Vec<TxHash>) {
959        let propagated = self.propagate_transactions(
960            self.pool.get_all(hashes).into_iter().map(PropagateTransaction::new).collect(),
961            PropagationMode::Basic,
962        );
963
964        // notify pool so events get fired
965        self.pool.on_propagated(propagated);
966    }
967
968    /// Request handler for an incoming request for transactions
969    fn on_get_pooled_transactions(
970        &mut self,
971        peer_id: PeerId,
972        request: GetPooledTransactions,
973        response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
974    ) {
975        if let Some(peer) = self.peers.get_mut(&peer_id) {
976            if self.network.tx_gossip_disabled() {
977                let _ = response.send(Ok(PooledTransactions::default()));
978                return
979            }
980            let transactions = self.pool.get_pooled_transaction_elements(
981                request.0,
982                GetPooledTransactionLimit::ResponseSizeSoftLimit(
983                    self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response,
984                ),
985            );
986            trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| tx.tx_hash()), "Sending requested transactions to peer");
987
988            // we sent a response at which point we assume that the peer is aware of the
989            // transactions
990            peer.seen_transactions.extend(transactions.iter().map(|tx| *tx.tx_hash()));
991
992            let resp = PooledTransactions(transactions);
993            let _ = response.send(Ok(resp));
994        }
995    }
996
997    /// Handles a command received from a detached [`TransactionsHandle`]
998    fn on_command(&mut self, cmd: TransactionsCommand<N>) {
999        match cmd {
1000            TransactionsCommand::PropagateHash(hash) => {
1001                self.on_new_pending_transactions(vec![hash])
1002            }
1003            TransactionsCommand::PropagateHashesTo(hashes, peer) => {
1004                self.propagate_hashes_to(hashes, peer, PropagationMode::Forced)
1005            }
1006            TransactionsCommand::GetActivePeers(tx) => {
1007                let peers = self.peers.keys().copied().collect::<HashSet<_>>();
1008                tx.send(peers).ok();
1009            }
1010            TransactionsCommand::PropagateTransactionsTo(txs, peer) => {
1011                if let Some(propagated) =
1012                    self.propagate_full_transactions_to_peer(txs, peer, PropagationMode::Forced)
1013                {
1014                    self.pool.on_propagated(propagated);
1015                }
1016            }
1017            TransactionsCommand::PropagateTransactions(txs) => self.propagate_all(txs),
1018            TransactionsCommand::GetTransactionHashes { peers, tx } => {
1019                let mut res = HashMap::with_capacity(peers.len());
1020                for peer_id in peers {
1021                    let hashes = self
1022                        .peers
1023                        .get(&peer_id)
1024                        .map(|peer| peer.seen_transactions.iter().copied().collect::<HashSet<_>>())
1025                        .unwrap_or_default();
1026                    res.insert(peer_id, hashes);
1027                }
1028                tx.send(res).ok();
1029            }
1030            TransactionsCommand::GetPeerSender { peer_id, peer_request_sender } => {
1031                let sender = self.peers.get(&peer_id).map(|peer| peer.request_tx.clone());
1032                peer_request_sender.send(sender).ok();
1033            }
1034        }
1035    }
1036
1037    /// Handles session establishment and peer transactions initialization.
1038    fn handle_peer_session(
1039        &mut self,
1040        info: SessionInfo,
1041        messages: PeerRequestSender<PeerRequest<N>>,
1042    ) {
1043        let SessionInfo { peer_id, client_version, version, .. } = info;
1044
1045        // Insert a new peer into the peerset.
1046        let peer = PeerMetadata::<N>::new(
1047            messages,
1048            version,
1049            client_version,
1050            self.config.max_transactions_seen_by_peer_history,
1051        );
1052        let peer = match self.peers.entry(peer_id) {
1053            Entry::Occupied(mut entry) => {
1054                entry.insert(peer);
1055                entry.into_mut()
1056            }
1057            Entry::Vacant(entry) => entry.insert(peer),
1058        };
1059
1060        // Send a `NewPooledTransactionHashes` to the peer with up to
1061        // `SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE`
1062        // transactions in the pool.
1063        if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() {
1064            trace!(target: "net::tx", ?peer_id, "Skipping transaction broadcast: node syncing or gossip disabled");
1065            return
1066        }
1067
1068        // Get transactions to broadcast
1069        let pooled_txs = self.pool.pooled_transactions_max(
1070            SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1071        );
1072        if pooled_txs.is_empty() {
1073            trace!(target: "net::tx", ?peer_id, "No transactions in the pool to broadcast");
1074            return;
1075        }
1076
1077        // Build and send transaction hashes message
1078        let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
1079        for pooled_tx in pooled_txs {
1080            peer.seen_transactions.insert(*pooled_tx.hash());
1081            msg_builder.push_pooled(pooled_tx);
1082        }
1083
1084        debug!(target: "net::tx", ?peer_id, tx_count = msg_builder.is_empty(), "Broadcasting transaction hashes");
1085        let msg = msg_builder.build();
1086        self.network.send_transactions_hashes(peer_id, msg);
1087    }
1088
1089    /// Handles a received event related to common network events.
1090    fn on_network_event(&mut self, event_result: NetworkEvent<PeerRequest<N>>) {
1091        match event_result {
1092            NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
1093                // remove the peer
1094                self.peers.remove(&peer_id);
1095                self.transaction_fetcher.remove_peer(&peer_id);
1096            }
1097            NetworkEvent::ActivePeerSession { info, messages } => {
1098                // process active peer session and broadcast available transaction from the pool
1099                self.handle_peer_session(info, messages);
1100            }
1101            NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
1102                let peer_id = info.peer_id;
1103                // get messages from existing peer
1104                let messages = match self.peers.get(&peer_id) {
1105                    Some(p) => p.request_tx.clone(),
1106                    None => {
1107                        debug!(target: "net::tx", ?peer_id, "No peer request sender found");
1108                        return;
1109                    }
1110                };
1111                self.handle_peer_session(info, messages);
1112            }
1113            _ => {}
1114        }
1115    }
1116
1117    /// Handles dedicated transaction events related to the `eth` protocol.
1118    fn on_network_tx_event(&mut self, event: NetworkTransactionEvent<N>) {
1119        match event {
1120            NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => {
1121                // ensure we didn't receive any blob transactions as these are disallowed to be
1122                // broadcasted in full
1123
1124                let has_blob_txs = msg.has_eip4844();
1125
1126                let non_blob_txs = msg
1127                    .0
1128                    .into_iter()
1129                    .map(N::PooledTransaction::try_from)
1130                    .filter_map(Result::ok)
1131                    .collect();
1132
1133                self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast);
1134
1135                if has_blob_txs {
1136                    debug!(target: "net::tx", ?peer_id, "received bad full blob transaction broadcast");
1137                    self.report_peer_bad_transactions(peer_id);
1138                }
1139            }
1140            NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => {
1141                self.on_new_pooled_transaction_hashes(peer_id, msg)
1142            }
1143            NetworkTransactionEvent::GetPooledTransactions { peer_id, request, response } => {
1144                self.on_get_pooled_transactions(peer_id, request, response)
1145            }
1146            NetworkTransactionEvent::GetTransactionsHandle(response) => {
1147                let _ = response.send(Some(self.handle()));
1148            }
1149        }
1150    }
1151
1152    /// Starts the import process for the given transactions.
1153    fn import_transactions(
1154        &mut self,
1155        peer_id: PeerId,
1156        transactions: PooledTransactions<N::PooledTransaction>,
1157        source: TransactionSource,
1158    ) {
1159        // If the node is pipeline syncing, ignore transactions
1160        if self.network.is_initially_syncing() {
1161            return
1162        }
1163        if self.network.tx_gossip_disabled() {
1164            return
1165        }
1166
1167        let Some(peer) = self.peers.get_mut(&peer_id) else { return };
1168        let mut transactions = transactions.0;
1169
1170        // mark the transactions as received
1171        self.transaction_fetcher
1172            .remove_hashes_from_transaction_fetcher(transactions.iter().map(|tx| *tx.tx_hash()));
1173
1174        // track that the peer knows these transaction, but only if this is a new broadcast.
1175        // If we received the transactions as the response to our `GetPooledTransactions``
1176        // requests (based on received `NewPooledTransactionHashes`) then we already
1177        // recorded the hashes as seen by this peer in `Self::on_new_pooled_transaction_hashes`.
1178        let mut num_already_seen_by_peer = 0;
1179        for tx in &transactions {
1180            if source.is_broadcast() && !peer.seen_transactions.insert(*tx.tx_hash()) {
1181                num_already_seen_by_peer += 1;
1182            }
1183        }
1184
1185        // 1. filter out txns already inserted into pool
1186        let txns_count_pre_pool_filter = transactions.len();
1187        self.pool.retain_unknown(&mut transactions);
1188        if txns_count_pre_pool_filter > transactions.len() {
1189            let already_known_txns_count = txns_count_pre_pool_filter - transactions.len();
1190            self.metrics
1191                .occurrences_transactions_already_in_pool
1192                .increment(already_known_txns_count as u64);
1193        }
1194
1195        // tracks the quality of the given transactions
1196        let mut has_bad_transactions = false;
1197
1198        // 2. filter out transactions that are invalid or already pending import
1199        if let Some(peer) = self.peers.get_mut(&peer_id) {
1200            // pre-size to avoid reallocations
1201            let mut new_txs = Vec::with_capacity(transactions.len());
1202            for tx in transactions {
1203                // recover transaction
1204                let tx = match tx.try_into_ecrecovered() {
1205                    Ok(tx) => tx,
1206                    Err(badtx) => {
1207                        trace!(target: "net::tx",
1208                            peer_id=format!("{peer_id:#}"),
1209                            hash=%badtx.tx_hash(),
1210                            client_version=%peer.client_version,
1211                            "failed ecrecovery for transaction"
1212                        );
1213                        has_bad_transactions = true;
1214                        continue
1215                    }
1216                };
1217
1218                match self.transactions_by_peers.entry(*tx.tx_hash()) {
1219                    Entry::Occupied(mut entry) => {
1220                        // transaction was already inserted
1221                        entry.get_mut().insert(peer_id);
1222                    }
1223                    Entry::Vacant(entry) => {
1224                        if self.bad_imports.contains(tx.tx_hash()) {
1225                            trace!(target: "net::tx",
1226                                peer_id=format!("{peer_id:#}"),
1227                                hash=%tx.tx_hash(),
1228                                client_version=%peer.client_version,
1229                                "received a known bad transaction from peer"
1230                            );
1231                            has_bad_transactions = true;
1232                        } else {
1233                            // this is a new transaction that should be imported into the pool
1234
1235                            let pool_transaction = Pool::Transaction::from_pooled(tx);
1236                            new_txs.push(pool_transaction);
1237
1238                            entry.insert(HashSet::from([peer_id]));
1239                        }
1240                    }
1241                }
1242            }
1243            new_txs.shrink_to_fit();
1244
1245            // 3. import new transactions as a batch to minimize lock contention on the underlying
1246            // pool
1247            if !new_txs.is_empty() {
1248                let pool = self.pool.clone();
1249                // update metrics
1250                let metric_pending_pool_imports = self.metrics.pending_pool_imports.clone();
1251                metric_pending_pool_imports.increment(new_txs.len() as f64);
1252
1253                // update self-monitoring info
1254                self.pending_pool_imports_info
1255                    .pending_pool_imports
1256                    .fetch_add(new_txs.len(), Ordering::Relaxed);
1257                let tx_manager_info_pending_pool_imports =
1258                    self.pending_pool_imports_info.pending_pool_imports.clone();
1259
1260                trace!(target: "net::tx::propagation", new_txs_len=?new_txs.len(), "Importing new transactions");
1261                let import = Box::pin(async move {
1262                    let added = new_txs.len();
1263                    let res = pool.add_external_transactions(new_txs).await;
1264
1265                    // update metrics
1266                    metric_pending_pool_imports.decrement(added as f64);
1267                    // update self-monitoring info
1268                    tx_manager_info_pending_pool_imports.fetch_sub(added, Ordering::Relaxed);
1269
1270                    res
1271                });
1272
1273                self.pool_imports.push(import);
1274            }
1275
1276            if num_already_seen_by_peer > 0 {
1277                self.metrics.messages_with_transactions_already_seen_by_peer.increment(1);
1278                self.metrics
1279                    .occurrences_of_transaction_already_seen_by_peer
1280                    .increment(num_already_seen_by_peer);
1281                trace!(target: "net::tx", num_txs=%num_already_seen_by_peer, ?peer_id, client=?peer.client_version, "Peer sent already seen transactions");
1282            }
1283        }
1284
1285        if has_bad_transactions {
1286            // peer sent us invalid transactions
1287            self.report_peer_bad_transactions(peer_id)
1288        }
1289
1290        if num_already_seen_by_peer > 0 {
1291            self.report_already_seen(peer_id);
1292        }
1293    }
1294
1295    /// Processes a [`FetchEvent`].
1296    fn on_fetch_event(&mut self, fetch_event: FetchEvent<N::PooledTransaction>) {
1297        match fetch_event {
1298            FetchEvent::TransactionsFetched { peer_id, transactions } => {
1299                self.import_transactions(peer_id, transactions, TransactionSource::Response);
1300            }
1301            FetchEvent::FetchError { peer_id, error } => {
1302                trace!(target: "net::tx", ?peer_id, %error, "requesting transactions from peer failed");
1303                self.on_request_error(peer_id, error);
1304            }
1305            FetchEvent::EmptyResponse { peer_id } => {
1306                trace!(target: "net::tx", ?peer_id, "peer returned empty response");
1307            }
1308        }
1309    }
1310}
1311
1312/// An endless future. Preemption ensure that future is non-blocking, nonetheless. See
1313/// [`crate::NetworkManager`] for more context on the design pattern.
1314///
1315/// This should be spawned or used as part of `tokio::select!`.
1316//
1317// spawned in `NodeConfig::start_network`(reth_node_core::NodeConfig) and
1318// `NetworkConfig::start_network`(reth_network::NetworkConfig)
1319impl<Pool, N> Future for TransactionsManager<Pool, N>
1320where
1321    Pool: TransactionPool + Unpin + 'static,
1322    N: NetworkPrimitives<
1323        BroadcastedTransaction: SignedTransaction,
1324        PooledTransaction: SignedTransaction,
1325    >,
1326    Pool::Transaction:
1327        PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
1328{
1329    type Output = ();
1330
1331    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1332        let start = Instant::now();
1333        let mut poll_durations = TxManagerPollDurations::default();
1334
1335        let this = self.get_mut();
1336
1337        // All streams are polled until their corresponding budget is exhausted, then we manually
1338        // yield back control to tokio. See `NetworkManager` for more context on the design
1339        // pattern.
1340
1341        // Advance network/peer related events (update peers map).
1342        let maybe_more_network_events = metered_poll_nested_stream_with_budget!(
1343            poll_durations.acc_network_events,
1344            "net::tx",
1345            "Network events stream",
1346            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1347            this.network_events.poll_next_unpin(cx),
1348            |event| this.on_network_event(event)
1349        );
1350
1351        // Advances new __pending__ transactions, transactions that were successfully inserted into
1352        // pending set in pool (are valid), and propagates them (inform peers which
1353        // transactions we have seen).
1354        //
1355        // We try to drain this to batch the transactions in a single message.
1356        //
1357        // We don't expect this buffer to be large, since only pending transactions are
1358        // emitted here.
1359        let mut new_txs = Vec::new();
1360        let maybe_more_pending_txns = metered_poll_nested_stream_with_budget!(
1361            poll_durations.acc_imported_txns,
1362            "net::tx",
1363            "Pending transactions stream",
1364            DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS,
1365            this.pending_transactions.poll_next_unpin(cx),
1366            |hash| new_txs.push(hash)
1367        );
1368        if !new_txs.is_empty() {
1369            this.on_new_pending_transactions(new_txs);
1370        }
1371
1372        // Advance inflight fetch requests (flush transaction fetcher and queue for
1373        // import to pool).
1374        //
1375        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1376        // (2 MiB / 10 bytes > 200k transactions).
1377        //
1378        // Since transactions aren't validated until they are inserted into the pool,
1379        // this can potentially queue >200k transactions for insertion to pool. More
1380        // if the message size is bigger than the soft limit on a `PooledTransactions`
1381        // response which is 2 MiB.
1382        let maybe_more_tx_fetch_events = metered_poll_nested_stream_with_budget!(
1383            poll_durations.acc_fetch_events,
1384            "net::tx",
1385            "Transaction fetch events stream",
1386            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1387            this.transaction_fetcher.poll_next_unpin(cx),
1388            |event| this.on_fetch_event(event),
1389        );
1390
1391        // Advance incoming transaction events (stream new txns/announcements from
1392        // network manager and queue for import to pool/fetch txns).
1393        //
1394        // This will potentially remove hashes from hashes pending fetch, it the event
1395        // is an announcement (if same hashes are announced that didn't fit into a
1396        // previous request).
1397        //
1398        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1399        // (128 KiB / 10 bytes > 13k transactions).
1400        //
1401        // If this is an event with `Transactions` message, since transactions aren't
1402        // validated until they are inserted into the pool, this can potentially queue
1403        // >13k transactions for insertion to pool. More if the message size is bigger
1404        // than the soft limit on a `Transactions` broadcast message, which is 128 KiB.
1405        let maybe_more_tx_events = metered_poll_nested_stream_with_budget!(
1406            poll_durations.acc_tx_events,
1407            "net::tx",
1408            "Network transaction events stream",
1409            DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
1410            this.transaction_events.poll_next_unpin(cx),
1411            |event| this.on_network_tx_event(event),
1412        );
1413
1414        // Advance pool imports (flush txns to pool).
1415        //
1416        // Note, this is done in batches. A batch is filled from one `Transactions`
1417        // broadcast messages or one `PooledTransactions` response at a time. The
1418        // minimum batch size is 1 transaction (and might often be the case with blob
1419        // transactions).
1420        //
1421        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1422        // (2 MiB / 10 bytes > 200k transactions).
1423        //
1424        // Since transactions aren't validated until they are inserted into the pool,
1425        // this can potentially validate >200k transactions. More if the message size
1426        // is bigger than the soft limit on a `PooledTransactions` response which is
1427        // 2 MiB (`Transactions` broadcast messages is smaller, 128 KiB).
1428        let maybe_more_pool_imports = metered_poll_nested_stream_with_budget!(
1429            poll_durations.acc_pending_imports,
1430            "net::tx",
1431            "Batched pool imports stream",
1432            DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS,
1433            this.pool_imports.poll_next_unpin(cx),
1434            |batch_results| this.on_batch_import_result(batch_results)
1435        );
1436
1437        // Tries to drain hashes pending fetch cache if the tx manager currently has
1438        // capacity for this (fetch txns).
1439        //
1440        // Sends at most one request.
1441        duration_metered_exec!(
1442            {
1443                if this.has_capacity_for_fetching_pending_hashes() {
1444                    this.on_fetch_hashes_pending_fetch();
1445                }
1446            },
1447            poll_durations.acc_pending_fetch
1448        );
1449
1450        // Advance commands (propagate/fetch/serve txns).
1451        let maybe_more_commands = metered_poll_nested_stream_with_budget!(
1452            poll_durations.acc_cmds,
1453            "net::tx",
1454            "Commands channel",
1455            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1456            this.command_rx.poll_next_unpin(cx),
1457            |cmd| this.on_command(cmd)
1458        );
1459
1460        this.transaction_fetcher.update_metrics();
1461
1462        // all channels are fully drained and import futures pending
1463        if maybe_more_network_events ||
1464            maybe_more_commands ||
1465            maybe_more_tx_events ||
1466            maybe_more_tx_fetch_events ||
1467            maybe_more_pool_imports ||
1468            maybe_more_pending_txns
1469        {
1470            // make sure we're woken up again
1471            cx.waker().wake_by_ref();
1472            return Poll::Pending
1473        }
1474
1475        this.update_poll_metrics(start, poll_durations);
1476
1477        Poll::Pending
1478    }
1479}
1480
1481/// Represents the different modes of transaction propagation.
1482///
1483/// This enum is used to determine how transactions are propagated to peers in the network.
1484#[derive(Debug, Copy, Clone, Eq, PartialEq)]
1485enum PropagationMode {
1486    /// Default propagation mode.
1487    ///
1488    /// Transactions are only sent to peers that haven't seen them yet.
1489    Basic,
1490    /// Forced propagation mode.
1491    ///
1492    /// Transactions are sent to all peers regardless of whether they have been sent or received
1493    /// before.
1494    Forced,
1495}
1496
1497impl PropagationMode {
1498    /// Returns `true` if the propagation kind is `Forced`.
1499    const fn is_forced(self) -> bool {
1500        matches!(self, Self::Forced)
1501    }
1502}
1503
1504/// A transaction that's about to be propagated to multiple peers.
1505#[derive(Debug, Clone)]
1506struct PropagateTransaction<T = TransactionSigned> {
1507    size: usize,
1508    transaction: Arc<T>,
1509}
1510
1511impl<T: SignedTransaction> PropagateTransaction<T> {
1512    /// Create a new instance from a pooled transaction
1513    fn new<P>(tx: Arc<ValidPoolTransaction<P>>) -> Self
1514    where
1515        P: PoolTransaction<Consensus = T>,
1516    {
1517        let size = tx.encoded_length();
1518        let transaction = tx.transaction.clone_into_consensus();
1519        let transaction = Arc::new(transaction.into_signed());
1520        Self { size, transaction }
1521    }
1522
1523    fn tx_hash(&self) -> &TxHash {
1524        self.transaction.tx_hash()
1525    }
1526}
1527
1528/// Helper type to construct the appropriate message to send to the peer based on whether the peer
1529/// should receive them in full or as pooled
1530#[derive(Debug, Clone)]
1531enum PropagateTransactionsBuilder<T> {
1532    Pooled(PooledTransactionsHashesBuilder),
1533    Full(FullTransactionsBuilder<T>),
1534}
1535
1536impl<T> PropagateTransactionsBuilder<T> {
1537    /// Create a builder for pooled transactions
1538    fn pooled(version: EthVersion) -> Self {
1539        Self::Pooled(PooledTransactionsHashesBuilder::new(version))
1540    }
1541
1542    /// Create a builder that sends transactions in full and records transactions that don't fit.
1543    fn full(version: EthVersion) -> Self {
1544        Self::Full(FullTransactionsBuilder::new(version))
1545    }
1546
1547    /// Returns true if no transactions are recorded.
1548    fn is_empty(&self) -> bool {
1549        match self {
1550            Self::Pooled(builder) => builder.is_empty(),
1551            Self::Full(builder) => builder.is_empty(),
1552        }
1553    }
1554
1555    /// Consumes the type and returns the built messages that should be sent to the peer.
1556    fn build(self) -> PropagateTransactions<T> {
1557        match self {
1558            Self::Pooled(pooled) => {
1559                PropagateTransactions { pooled: Some(pooled.build()), full: None }
1560            }
1561            Self::Full(full) => full.build(),
1562        }
1563    }
1564}
1565
1566impl<T: SignedTransaction> PropagateTransactionsBuilder<T> {
1567    /// Appends all transactions
1568    fn extend<'a>(&mut self, txs: impl IntoIterator<Item = &'a PropagateTransaction<T>>) {
1569        for tx in txs {
1570            self.push(tx);
1571        }
1572    }
1573
1574    /// Appends a transaction to the list.
1575    fn push(&mut self, transaction: &PropagateTransaction<T>) {
1576        match self {
1577            Self::Pooled(builder) => builder.push(transaction),
1578            Self::Full(builder) => builder.push(transaction),
1579        }
1580    }
1581}
1582
1583/// Represents how the transactions should be sent to a peer if any.
1584struct PropagateTransactions<T> {
1585    /// The pooled transaction hashes to send.
1586    pooled: Option<NewPooledTransactionHashes>,
1587    /// The transactions to send in full.
1588    full: Option<Vec<Arc<T>>>,
1589}
1590
1591/// Helper type for constructing the full transaction message that enforces the
1592/// [`DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE`] for full transaction broadcast
1593/// and enforces other propagation rules for EIP-4844 and tracks those transactions that can't be
1594/// broadcasted in full.
1595#[derive(Debug, Clone)]
1596struct FullTransactionsBuilder<T> {
1597    /// The soft limit to enforce for a single broadcast message of full transactions.
1598    total_size: usize,
1599    /// All transactions to be broadcasted.
1600    transactions: Vec<Arc<T>>,
1601    /// Transactions that didn't fit into the broadcast message
1602    pooled: PooledTransactionsHashesBuilder,
1603}
1604
1605impl<T> FullTransactionsBuilder<T> {
1606    /// Create a builder for the negotiated version of the peer's session
1607    fn new(version: EthVersion) -> Self {
1608        Self {
1609            total_size: 0,
1610            pooled: PooledTransactionsHashesBuilder::new(version),
1611            transactions: vec![],
1612        }
1613    }
1614
1615    /// Returns whether or not any transactions are in the [`FullTransactionsBuilder`].
1616    fn is_empty(&self) -> bool {
1617        self.transactions.is_empty() && self.pooled.is_empty()
1618    }
1619
1620    /// Returns the messages that should be propagated to the peer.
1621    fn build(self) -> PropagateTransactions<T> {
1622        let pooled = Some(self.pooled.build()).filter(|pooled| !pooled.is_empty());
1623        let full = Some(self.transactions).filter(|full| !full.is_empty());
1624        PropagateTransactions { pooled, full }
1625    }
1626}
1627
1628impl<T: SignedTransaction> FullTransactionsBuilder<T> {
1629    /// Appends all transactions.
1630    fn extend(&mut self, txs: impl IntoIterator<Item = PropagateTransaction<T>>) {
1631        for tx in txs {
1632            self.push(&tx)
1633        }
1634    }
1635
1636    /// Append a transaction to the list of full transaction if the total message bytes size doesn't
1637    /// exceed the soft maximum target byte size. The limit is soft, meaning if one single
1638    /// transaction goes over the limit, it will be broadcasted in its own [`Transactions`]
1639    /// message. The same pattern is followed in filling a [`GetPooledTransactions`] request in
1640    /// [`TransactionFetcher::fill_request_from_hashes_pending_fetch`].
1641    ///
1642    /// If the transaction is unsuitable for broadcast or would exceed the softlimit, it is appended
1643    /// to list of pooled transactions, (e.g. 4844 transactions).
1644    /// See also [`SignedTransaction::is_broadcastable_in_full`].
1645    fn push(&mut self, transaction: &PropagateTransaction<T>) {
1646        // Do not send full 4844 transaction hashes to peers.
1647        //
1648        //  Nodes MUST NOT automatically broadcast blob transactions to their peers.
1649        //  Instead, those transactions are only announced using
1650        //  `NewPooledTransactionHashes` messages, and can then be manually requested
1651        //  via `GetPooledTransactions`.
1652        //
1653        // From: <https://eips.ethereum.org/EIPS/eip-4844#networking>
1654        if !transaction.transaction.is_broadcastable_in_full() {
1655            self.pooled.push(transaction);
1656            return
1657        }
1658
1659        let new_size = self.total_size + transaction.size;
1660        if new_size > DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE &&
1661            self.total_size > 0
1662        {
1663            // transaction does not fit into the message
1664            self.pooled.push(transaction);
1665            return
1666        }
1667
1668        self.total_size = new_size;
1669        self.transactions.push(Arc::clone(&transaction.transaction));
1670    }
1671}
1672
1673/// A helper type to create the pooled transactions message based on the negotiated version of the
1674/// session with the peer
1675#[derive(Debug, Clone)]
1676enum PooledTransactionsHashesBuilder {
1677    Eth66(NewPooledTransactionHashes66),
1678    Eth68(NewPooledTransactionHashes68),
1679}
1680
1681// === impl PooledTransactionsHashesBuilder ===
1682
1683impl PooledTransactionsHashesBuilder {
1684    /// Push a transaction from the pool to the list.
1685    fn push_pooled<T: PoolTransaction>(&mut self, pooled_tx: Arc<ValidPoolTransaction<T>>) {
1686        match self {
1687            Self::Eth66(msg) => msg.0.push(*pooled_tx.hash()),
1688            Self::Eth68(msg) => {
1689                msg.hashes.push(*pooled_tx.hash());
1690                msg.sizes.push(pooled_tx.encoded_length());
1691                msg.types.push(pooled_tx.transaction.tx_type());
1692            }
1693        }
1694    }
1695
1696    /// Returns whether or not any transactions are in the [`PooledTransactionsHashesBuilder`].
1697    fn is_empty(&self) -> bool {
1698        match self {
1699            Self::Eth66(hashes) => hashes.is_empty(),
1700            Self::Eth68(hashes) => hashes.is_empty(),
1701        }
1702    }
1703
1704    /// Appends all hashes
1705    fn extend<T: SignedTransaction>(
1706        &mut self,
1707        txs: impl IntoIterator<Item = PropagateTransaction<T>>,
1708    ) {
1709        for tx in txs {
1710            self.push(&tx);
1711        }
1712    }
1713
1714    fn push<T: SignedTransaction>(&mut self, tx: &PropagateTransaction<T>) {
1715        match self {
1716            Self::Eth66(msg) => msg.0.push(*tx.tx_hash()),
1717            Self::Eth68(msg) => {
1718                msg.hashes.push(*tx.tx_hash());
1719                msg.sizes.push(tx.size);
1720                msg.types.push(tx.transaction.ty());
1721            }
1722        }
1723    }
1724
1725    /// Create a builder for the negotiated version of the peer's session
1726    fn new(version: EthVersion) -> Self {
1727        match version {
1728            EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()),
1729            EthVersion::Eth68 | EthVersion::Eth69 => Self::Eth68(Default::default()),
1730        }
1731    }
1732
1733    fn build(self) -> NewPooledTransactionHashes {
1734        match self {
1735            Self::Eth66(msg) => msg.into(),
1736            Self::Eth68(msg) => msg.into(),
1737        }
1738    }
1739}
1740
1741/// How we received the transactions.
1742enum TransactionSource {
1743    /// Transactions were broadcast to us via [`Transactions`] message.
1744    Broadcast,
1745    /// Transactions were sent as the response of [`fetcher::GetPooledTxRequest`] issued by us.
1746    Response,
1747}
1748
1749// === impl TransactionSource ===
1750
1751impl TransactionSource {
1752    /// Whether the transaction were sent as broadcast.
1753    const fn is_broadcast(&self) -> bool {
1754        matches!(self, Self::Broadcast)
1755    }
1756}
1757
1758/// Tracks a single peer in the context of [`TransactionsManager`].
1759#[derive(Debug)]
1760pub struct PeerMetadata<N: NetworkPrimitives = EthNetworkPrimitives> {
1761    /// Optimistically keeps track of transactions that we know the peer has seen. Optimistic, in
1762    /// the sense that transactions are preemptively marked as seen by peer when they are sent to
1763    /// the peer.
1764    seen_transactions: LruCache<TxHash>,
1765    /// A communication channel directly to the peer's session task.
1766    request_tx: PeerRequestSender<PeerRequest<N>>,
1767    /// negotiated version of the session.
1768    version: EthVersion,
1769    /// The peer's client version.
1770    client_version: Arc<str>,
1771}
1772
1773impl<N: NetworkPrimitives> PeerMetadata<N> {
1774    /// Returns a new instance of [`PeerMetadata`].
1775    fn new(
1776        request_tx: PeerRequestSender<PeerRequest<N>>,
1777        version: EthVersion,
1778        client_version: Arc<str>,
1779        max_transactions_seen_by_peer: u32,
1780    ) -> Self {
1781        Self {
1782            seen_transactions: LruCache::new(max_transactions_seen_by_peer),
1783            request_tx,
1784            version,
1785            client_version,
1786        }
1787    }
1788}
1789
1790/// Commands to send to the [`TransactionsManager`]
1791#[derive(Debug)]
1792enum TransactionsCommand<N: NetworkPrimitives = EthNetworkPrimitives> {
1793    /// Propagate a transaction hash to the network.
1794    PropagateHash(B256),
1795    /// Propagate transaction hashes to a specific peer.
1796    PropagateHashesTo(Vec<B256>, PeerId),
1797    /// Request the list of active peer IDs from the [`TransactionsManager`].
1798    GetActivePeers(oneshot::Sender<HashSet<PeerId>>),
1799    /// Propagate a collection of full transactions to a specific peer.
1800    PropagateTransactionsTo(Vec<TxHash>, PeerId),
1801    /// Propagate a collection of full transactions to all peers.
1802    PropagateTransactions(Vec<TxHash>),
1803    /// Request transaction hashes known by specific peers from the [`TransactionsManager`].
1804    GetTransactionHashes {
1805        peers: Vec<PeerId>,
1806        tx: oneshot::Sender<HashMap<PeerId, HashSet<TxHash>>>,
1807    },
1808    /// Requests a clone of the sender sender channel to the peer.
1809    GetPeerSender {
1810        peer_id: PeerId,
1811        peer_request_sender: oneshot::Sender<Option<PeerRequestSender<PeerRequest<N>>>>,
1812    },
1813}
1814
1815/// All events related to transactions emitted by the network.
1816#[derive(Debug)]
1817pub enum NetworkTransactionEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
1818    /// Represents the event of receiving a list of transactions from a peer.
1819    ///
1820    /// This indicates transactions that were broadcasted to us from the peer.
1821    IncomingTransactions {
1822        /// The ID of the peer from which the transactions were received.
1823        peer_id: PeerId,
1824        /// The received transactions.
1825        msg: Transactions<N::BroadcastedTransaction>,
1826    },
1827    /// Represents the event of receiving a list of transaction hashes from a peer.
1828    IncomingPooledTransactionHashes {
1829        /// The ID of the peer from which the transaction hashes were received.
1830        peer_id: PeerId,
1831        /// The received new pooled transaction hashes.
1832        msg: NewPooledTransactionHashes,
1833    },
1834    /// Represents the event of receiving a `GetPooledTransactions` request from a peer.
1835    GetPooledTransactions {
1836        /// The ID of the peer from which the request was received.
1837        peer_id: PeerId,
1838        /// The received `GetPooledTransactions` request.
1839        request: GetPooledTransactions,
1840        /// The sender for responding to the request with a result of `PooledTransactions`.
1841        response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
1842    },
1843    /// Represents the event of receiving a `GetTransactionsHandle` request.
1844    GetTransactionsHandle(oneshot::Sender<Option<TransactionsHandle<N>>>),
1845}
1846
1847/// Tracks stats about the [`TransactionsManager`].
1848#[derive(Debug)]
1849pub struct PendingPoolImportsInfo {
1850    /// Number of transactions about to be inserted into the pool.
1851    pending_pool_imports: Arc<AtomicUsize>,
1852    /// Max number of transactions allowed to be imported concurrently.
1853    max_pending_pool_imports: usize,
1854}
1855
1856impl PendingPoolImportsInfo {
1857    /// Returns a new [`PendingPoolImportsInfo`].
1858    pub fn new(max_pending_pool_imports: usize) -> Self {
1859        Self { pending_pool_imports: Arc::new(AtomicUsize::default()), max_pending_pool_imports }
1860    }
1861
1862    /// Returns `true` if the number of pool imports is under a given tolerated max.
1863    pub fn has_capacity(&self, max_pending_pool_imports: usize) -> bool {
1864        self.pending_pool_imports.load(Ordering::Relaxed) < max_pending_pool_imports
1865    }
1866}
1867
1868impl Default for PendingPoolImportsInfo {
1869    fn default() -> Self {
1870        Self::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS)
1871    }
1872}
1873
1874#[derive(Debug, Default)]
1875struct TxManagerPollDurations {
1876    acc_network_events: Duration,
1877    acc_pending_imports: Duration,
1878    acc_tx_events: Duration,
1879    acc_imported_txns: Duration,
1880    acc_fetch_events: Duration,
1881    acc_pending_fetch: Duration,
1882    acc_cmds: Duration,
1883}
1884
1885#[cfg(test)]
1886mod tests {
1887    use super::*;
1888    use crate::{test_utils::Testnet, NetworkConfigBuilder, NetworkManager};
1889    use alloy_primitives::hex;
1890    use alloy_rlp::Decodable;
1891    use constants::tx_fetcher::DEFAULT_MAX_COUNT_FALLBACK_PEERS;
1892    use futures::FutureExt;
1893    use reth_network_api::NetworkInfo;
1894    use reth_network_p2p::{
1895        error::{RequestError, RequestResult},
1896        sync::{NetworkSyncUpdater, SyncState},
1897    };
1898    use reth_storage_api::noop::NoopProvider;
1899    use reth_transaction_pool::test_utils::{
1900        testing_pool, MockTransaction, MockTransactionFactory, TestPool,
1901    };
1902    use secp256k1::SecretKey;
1903    use std::{
1904        fmt,
1905        future::poll_fn,
1906        hash,
1907        net::{IpAddr, Ipv4Addr, SocketAddr},
1908    };
1909    use tests::fetcher::TxFetchMetadata;
1910    use tracing::error;
1911
1912    async fn new_tx_manager(
1913    ) -> (TransactionsManager<TestPool, EthNetworkPrimitives>, NetworkManager<EthNetworkPrimitives>)
1914    {
1915        let secret_key = SecretKey::new(&mut rand::thread_rng());
1916        let client = NoopProvider::default();
1917
1918        let config = NetworkConfigBuilder::new(secret_key)
1919            // let OS choose port
1920            .listener_port(0)
1921            .disable_discovery()
1922            .build(client);
1923
1924        let pool = testing_pool();
1925
1926        let transactions_manager_config = config.transactions_manager_config.clone();
1927        let (_network_handle, network, transactions, _) = NetworkManager::new(config)
1928            .await
1929            .unwrap()
1930            .into_builder()
1931            .transactions(pool.clone(), transactions_manager_config)
1932            .split_with_handle();
1933
1934        (transactions, network)
1935    }
1936
1937    pub(super) fn default_cache<T: hash::Hash + Eq + fmt::Debug>() -> LruCache<T> {
1938        LruCache::new(DEFAULT_MAX_COUNT_FALLBACK_PEERS as u32)
1939    }
1940
1941    // Returns (peer, channel-to-send-get-pooled-tx-response-on).
1942    pub(super) fn new_mock_session(
1943        peer_id: PeerId,
1944        version: EthVersion,
1945    ) -> (PeerMetadata<EthNetworkPrimitives>, mpsc::Receiver<PeerRequest>) {
1946        let (to_mock_session_tx, to_mock_session_rx) = mpsc::channel(1);
1947
1948        (
1949            PeerMetadata::new(
1950                PeerRequestSender::new(peer_id, to_mock_session_tx),
1951                version,
1952                Arc::from(""),
1953                DEFAULT_MAX_COUNT_TRANSACTIONS_SEEN_BY_PEER,
1954            ),
1955            to_mock_session_rx,
1956        )
1957    }
1958
1959    #[tokio::test(flavor = "multi_thread")]
1960    async fn test_ignored_tx_broadcasts_while_initially_syncing() {
1961        reth_tracing::init_test_tracing();
1962        let net = Testnet::create(3).await;
1963
1964        let mut handles = net.handles();
1965        let handle0 = handles.next().unwrap();
1966        let handle1 = handles.next().unwrap();
1967
1968        drop(handles);
1969        let handle = net.spawn();
1970
1971        let listener0 = handle0.event_listener();
1972        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
1973        let secret_key = SecretKey::new(&mut rand::thread_rng());
1974
1975        let client = NoopProvider::default();
1976        let pool = testing_pool();
1977        let config = NetworkConfigBuilder::<EthNetworkPrimitives>::new(secret_key)
1978            .disable_discovery()
1979            .listener_port(0)
1980            .build(client);
1981        let transactions_manager_config = config.transactions_manager_config.clone();
1982        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
1983            .await
1984            .unwrap()
1985            .into_builder()
1986            .transactions(pool.clone(), transactions_manager_config)
1987            .split_with_handle();
1988
1989        tokio::task::spawn(network);
1990
1991        // go to syncing (pipeline sync)
1992        network_handle.update_sync_state(SyncState::Syncing);
1993        assert!(NetworkInfo::is_syncing(&network_handle));
1994        assert!(NetworkInfo::is_initially_syncing(&network_handle));
1995
1996        // wait for all initiator connections
1997        let mut established = listener0.take(2);
1998        while let Some(ev) = established.next().await {
1999            match ev {
2000                NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
2001                    // to insert a new peer in transactions peerset
2002                    transactions
2003                        .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info)))
2004                }
2005                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => continue,
2006                ev => {
2007                    error!("unexpected event {ev:?}")
2008                }
2009            }
2010        }
2011        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2012        let input = hex!("02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76");
2013        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2014        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2015            peer_id: *handle1.peer_id(),
2016            msg: Transactions(vec![signed_tx.clone()]),
2017        });
2018        poll_fn(|cx| {
2019            let _ = transactions.poll_unpin(cx);
2020            Poll::Ready(())
2021        })
2022        .await;
2023        assert!(pool.is_empty());
2024        handle.terminate().await;
2025    }
2026
2027    #[tokio::test(flavor = "multi_thread")]
2028    async fn test_tx_broadcasts_through_two_syncs() {
2029        reth_tracing::init_test_tracing();
2030        let net = Testnet::create(3).await;
2031
2032        let mut handles = net.handles();
2033        let handle0 = handles.next().unwrap();
2034        let handle1 = handles.next().unwrap();
2035
2036        drop(handles);
2037        let handle = net.spawn();
2038
2039        let listener0 = handle0.event_listener();
2040        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2041        let secret_key = SecretKey::new(&mut rand::thread_rng());
2042
2043        let client = NoopProvider::default();
2044        let pool = testing_pool();
2045        let config = NetworkConfigBuilder::new(secret_key)
2046            .disable_discovery()
2047            .listener_port(0)
2048            .build(client);
2049        let transactions_manager_config = config.transactions_manager_config.clone();
2050        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2051            .await
2052            .unwrap()
2053            .into_builder()
2054            .transactions(pool.clone(), transactions_manager_config)
2055            .split_with_handle();
2056
2057        tokio::task::spawn(network);
2058
2059        // go to syncing (pipeline sync) to idle and then to syncing (live)
2060        network_handle.update_sync_state(SyncState::Syncing);
2061        assert!(NetworkInfo::is_syncing(&network_handle));
2062        network_handle.update_sync_state(SyncState::Idle);
2063        assert!(!NetworkInfo::is_syncing(&network_handle));
2064        network_handle.update_sync_state(SyncState::Syncing);
2065        assert!(NetworkInfo::is_syncing(&network_handle));
2066
2067        // wait for all initiator connections
2068        let mut established = listener0.take(2);
2069        while let Some(ev) = established.next().await {
2070            match ev {
2071                NetworkEvent::ActivePeerSession { .. } |
2072                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2073                    // to insert a new peer in transactions peerset
2074                    transactions.on_network_event(ev);
2075                }
2076                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => continue,
2077                _ => {
2078                    error!("unexpected event {ev:?}")
2079                }
2080            }
2081        }
2082        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2083        let input = hex!("02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76");
2084        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2085        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2086            peer_id: *handle1.peer_id(),
2087            msg: Transactions(vec![signed_tx.clone()]),
2088        });
2089        poll_fn(|cx| {
2090            let _ = transactions.poll_unpin(cx);
2091            Poll::Ready(())
2092        })
2093        .await;
2094        assert!(!NetworkInfo::is_initially_syncing(&network_handle));
2095        assert!(NetworkInfo::is_syncing(&network_handle));
2096        assert!(!pool.is_empty());
2097        handle.terminate().await;
2098    }
2099
2100    #[tokio::test(flavor = "multi_thread")]
2101    async fn test_handle_incoming_transactions() {
2102        reth_tracing::init_test_tracing();
2103        let net = Testnet::create(3).await;
2104
2105        let mut handles = net.handles();
2106        let handle0 = handles.next().unwrap();
2107        let handle1 = handles.next().unwrap();
2108
2109        drop(handles);
2110        let handle = net.spawn();
2111
2112        let listener0 = handle0.event_listener();
2113
2114        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2115        let secret_key = SecretKey::new(&mut rand::thread_rng());
2116
2117        let client = NoopProvider::default();
2118        let pool = testing_pool();
2119        let config = NetworkConfigBuilder::new(secret_key)
2120            .disable_discovery()
2121            .listener_port(0)
2122            .build(client);
2123        let transactions_manager_config = config.transactions_manager_config.clone();
2124        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2125            .await
2126            .unwrap()
2127            .into_builder()
2128            .transactions(pool.clone(), transactions_manager_config)
2129            .split_with_handle();
2130        tokio::task::spawn(network);
2131
2132        network_handle.update_sync_state(SyncState::Idle);
2133
2134        assert!(!NetworkInfo::is_syncing(&network_handle));
2135
2136        // wait for all initiator connections
2137        let mut established = listener0.take(2);
2138        while let Some(ev) = established.next().await {
2139            match ev {
2140                NetworkEvent::ActivePeerSession { .. } |
2141                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2142                    // to insert a new peer in transactions peerset
2143                    transactions.on_network_event(ev);
2144                }
2145                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => continue,
2146                ev => {
2147                    error!("unexpected event {ev:?}")
2148                }
2149            }
2150        }
2151        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2152        let input = hex!("02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76");
2153        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2154        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2155            peer_id: *handle1.peer_id(),
2156            msg: Transactions(vec![signed_tx.clone()]),
2157        });
2158        assert!(transactions
2159            .transactions_by_peers
2160            .get(&signed_tx.hash())
2161            .unwrap()
2162            .contains(handle1.peer_id()));
2163
2164        // advance the transaction manager future
2165        poll_fn(|cx| {
2166            let _ = transactions.poll_unpin(cx);
2167            Poll::Ready(())
2168        })
2169        .await;
2170
2171        assert!(!pool.is_empty());
2172        assert!(pool.get(signed_tx.tx_hash()).is_some());
2173        handle.terminate().await;
2174    }
2175
2176    #[tokio::test(flavor = "multi_thread")]
2177    async fn test_on_get_pooled_transactions_network() {
2178        reth_tracing::init_test_tracing();
2179        let net = Testnet::create(2).await;
2180
2181        let mut handles = net.handles();
2182        let handle0 = handles.next().unwrap();
2183        let handle1 = handles.next().unwrap();
2184
2185        drop(handles);
2186        let handle = net.spawn();
2187
2188        let listener0 = handle0.event_listener();
2189
2190        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2191        let secret_key = SecretKey::new(&mut rand::thread_rng());
2192
2193        let client = NoopProvider::default();
2194        let pool = testing_pool();
2195        let config = NetworkConfigBuilder::new(secret_key)
2196            .disable_discovery()
2197            .listener_port(0)
2198            .build(client);
2199        let transactions_manager_config = config.transactions_manager_config.clone();
2200        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2201            .await
2202            .unwrap()
2203            .into_builder()
2204            .transactions(pool.clone(), transactions_manager_config)
2205            .split_with_handle();
2206        tokio::task::spawn(network);
2207
2208        network_handle.update_sync_state(SyncState::Idle);
2209
2210        assert!(!NetworkInfo::is_syncing(&network_handle));
2211
2212        // wait for all initiator connections
2213        let mut established = listener0.take(2);
2214        while let Some(ev) = established.next().await {
2215            match ev {
2216                NetworkEvent::ActivePeerSession { .. } |
2217                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2218                    transactions.on_network_event(ev);
2219                }
2220                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => continue,
2221                ev => {
2222                    error!("unexpected event {ev:?}")
2223                }
2224            }
2225        }
2226        handle.terminate().await;
2227
2228        let tx = MockTransaction::eip1559();
2229        let _ = transactions
2230            .pool
2231            .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
2232            .await;
2233
2234        let request = GetPooledTransactions(vec![*tx.get_hash()]);
2235
2236        let (send, receive) = oneshot::channel::<RequestResult<PooledTransactions>>();
2237
2238        transactions.on_network_tx_event(NetworkTransactionEvent::GetPooledTransactions {
2239            peer_id: *handle1.peer_id(),
2240            request,
2241            response: send,
2242        });
2243
2244        match receive.await.unwrap() {
2245            Ok(PooledTransactions(transactions)) => {
2246                assert_eq!(transactions.len(), 1);
2247            }
2248            Err(e) => {
2249                panic!("error: {e:?}");
2250            }
2251        }
2252    }
2253
2254    #[tokio::test]
2255    async fn test_max_retries_tx_request() {
2256        reth_tracing::init_test_tracing();
2257
2258        let mut tx_manager = new_tx_manager().await.0;
2259        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2260
2261        let peer_id_1 = PeerId::new([1; 64]);
2262        let peer_id_2 = PeerId::new([2; 64]);
2263        let eth_version = EthVersion::Eth66;
2264        let seen_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])];
2265
2266        let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2267        // mark hashes as seen by peer so it can fish them out from the cache for hashes pending
2268        // fetch
2269        peer_1.seen_transactions.insert(seen_hashes[0]);
2270        peer_1.seen_transactions.insert(seen_hashes[1]);
2271        tx_manager.peers.insert(peer_id_1, peer_1);
2272
2273        // hashes are seen and currently not inflight, with one fallback peer, and are buffered
2274        // for first retry in reverse order to make index 0 lru
2275        let retries = 1;
2276        let mut backups = default_cache();
2277        backups.insert(peer_id_1);
2278
2279        let mut backups1 = default_cache();
2280        backups1.insert(peer_id_1);
2281        tx_fetcher
2282            .hashes_fetch_inflight_and_pending_fetch
2283            .insert(seen_hashes[1], TxFetchMetadata::new(retries, backups, None));
2284        tx_fetcher
2285            .hashes_fetch_inflight_and_pending_fetch
2286            .insert(seen_hashes[0], TxFetchMetadata::new(retries, backups1, None));
2287        tx_fetcher.hashes_pending_fetch.insert(seen_hashes[1]);
2288        tx_fetcher.hashes_pending_fetch.insert(seen_hashes[0]);
2289
2290        // peer_1 is idle
2291        assert!(tx_fetcher.is_idle(&peer_id_1));
2292        assert_eq!(tx_fetcher.active_peers.len(), 0);
2293
2294        // sends request for buffered hashes to peer_1
2295        tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2296
2297        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2298
2299        assert!(tx_fetcher.hashes_pending_fetch.is_empty());
2300        // as long as request is in inflight peer_1 is not idle
2301        assert!(!tx_fetcher.is_idle(&peer_id_1));
2302        assert_eq!(tx_fetcher.active_peers.len(), 1);
2303
2304        // mock session of peer_1 receives request
2305        let req = to_mock_session_rx
2306            .recv()
2307            .await
2308            .expect("peer_1 session should receive request with buffered hashes");
2309        let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2310        let GetPooledTransactions(hashes) = request;
2311
2312        let hashes = hashes.into_iter().collect::<HashSet<_>>();
2313
2314        assert_eq!(hashes, seen_hashes.into_iter().collect::<HashSet<_>>());
2315
2316        // fail request to peer_1
2317        response
2318            .send(Err(RequestError::BadResponse))
2319            .expect("should send peer_1 response to tx manager");
2320        let Some(FetchEvent::FetchError { peer_id, .. }) = tx_fetcher.next().await else {
2321            unreachable!()
2322        };
2323
2324        // request has resolved, peer_1 is idle again
2325        assert!(tx_fetcher.is_idle(&peer_id));
2326        assert_eq!(tx_fetcher.active_peers.len(), 0);
2327        // failing peer_1's request buffers requested hashes for retry
2328        assert_eq!(tx_fetcher.hashes_pending_fetch.len(), 2);
2329
2330        let (peer_2, mut to_mock_session_rx) = new_mock_session(peer_id_2, eth_version);
2331        tx_manager.peers.insert(peer_id_2, peer_2);
2332
2333        // peer_2 announces same hashes as peer_1
2334        let msg =
2335            NewPooledTransactionHashes::Eth66(NewPooledTransactionHashes66(seen_hashes.to_vec()));
2336        tx_manager.on_new_pooled_transaction_hashes(peer_id_2, msg);
2337
2338        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2339
2340        // peer_2 should be in active_peers.
2341        assert_eq!(tx_fetcher.active_peers.len(), 1);
2342
2343        // since hashes are already seen, no changes to length of unknown hashes
2344        assert_eq!(tx_fetcher.hashes_fetch_inflight_and_pending_fetch.len(), 2);
2345        // but hashes are taken out of buffer and packed into request to peer_2
2346        assert!(tx_fetcher.hashes_pending_fetch.is_empty());
2347
2348        // mock session of peer_2 receives request
2349        let req = to_mock_session_rx
2350            .recv()
2351            .await
2352            .expect("peer_2 session should receive request with buffered hashes");
2353        let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2354
2355        // report failed request to tx manager
2356        response
2357            .send(Err(RequestError::BadResponse))
2358            .expect("should send peer_2 response to tx manager");
2359        let Some(FetchEvent::FetchError { .. }) = tx_fetcher.next().await else { unreachable!() };
2360
2361        // `MAX_REQUEST_RETRIES_PER_TX_HASH`, 2, for hashes reached so this time won't be buffered
2362        // for retry
2363        assert!(tx_fetcher.hashes_pending_fetch.is_empty());
2364        assert_eq!(tx_fetcher.active_peers.len(), 0);
2365    }
2366
2367    #[test]
2368    fn test_transaction_builder_empty() {
2369        let mut builder =
2370            PropagateTransactionsBuilder::<TransactionSigned>::pooled(EthVersion::Eth68);
2371        assert!(builder.is_empty());
2372
2373        let mut factory = MockTransactionFactory::default();
2374        let tx = PropagateTransaction::new(Arc::new(factory.create_eip1559()));
2375        builder.push(&tx);
2376        assert!(!builder.is_empty());
2377
2378        let txs = builder.build();
2379        assert!(txs.full.is_none());
2380        let txs = txs.pooled.unwrap();
2381        assert_eq!(txs.len(), 1);
2382    }
2383
2384    #[test]
2385    fn test_transaction_builder_large() {
2386        let mut builder =
2387            PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2388        assert!(builder.is_empty());
2389
2390        let mut factory = MockTransactionFactory::default();
2391        let mut tx = factory.create_eip1559();
2392        // create a transaction that still fits
2393        tx.transaction.set_size(DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE + 1);
2394        let tx = Arc::new(tx);
2395        let tx = PropagateTransaction::new(tx);
2396        builder.push(&tx);
2397        assert!(!builder.is_empty());
2398
2399        let txs = builder.clone().build();
2400        assert!(txs.pooled.is_none());
2401        let txs = txs.full.unwrap();
2402        assert_eq!(txs.len(), 1);
2403
2404        builder.push(&tx);
2405
2406        let txs = builder.clone().build();
2407        let pooled = txs.pooled.unwrap();
2408        assert_eq!(pooled.len(), 1);
2409        let txs = txs.full.unwrap();
2410        assert_eq!(txs.len(), 1);
2411    }
2412
2413    #[test]
2414    fn test_transaction_builder_eip4844() {
2415        let mut builder =
2416            PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2417        assert!(builder.is_empty());
2418
2419        let mut factory = MockTransactionFactory::default();
2420        let tx = PropagateTransaction::new(Arc::new(factory.create_eip4844()));
2421        builder.push(&tx);
2422        assert!(!builder.is_empty());
2423
2424        let txs = builder.clone().build();
2425        assert!(txs.full.is_none());
2426        let txs = txs.pooled.unwrap();
2427        assert_eq!(txs.len(), 1);
2428
2429        let tx = PropagateTransaction::new(Arc::new(factory.create_eip1559()));
2430        builder.push(&tx);
2431
2432        let txs = builder.clone().build();
2433        let pooled = txs.pooled.unwrap();
2434        assert_eq!(pooled.len(), 1);
2435        let txs = txs.full.unwrap();
2436        assert_eq!(txs.len(), 1);
2437    }
2438
2439    #[tokio::test]
2440    async fn test_propagate_full() {
2441        reth_tracing::init_test_tracing();
2442
2443        let (mut tx_manager, network) = new_tx_manager().await;
2444        let peer_id = PeerId::random();
2445
2446        // ensure not syncing
2447        network.handle().update_sync_state(SyncState::Idle);
2448
2449        // mock a peer
2450        let (tx, _rx) = mpsc::channel::<PeerRequest>(1);
2451        let session_info = SessionInfo {
2452            peer_id,
2453            remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
2454            client_version: Arc::from(""),
2455            capabilities: Arc::new(vec![].into()),
2456            status: Arc::new(Default::default()),
2457            version: EthVersion::Eth68,
2458        };
2459        let messages: PeerRequestSender<PeerRequest> = PeerRequestSender::new(peer_id, tx);
2460        tx_manager
2461            .on_network_event(NetworkEvent::ActivePeerSession { info: session_info, messages });
2462        let mut propagate = vec![];
2463        let mut factory = MockTransactionFactory::default();
2464        let eip1559_tx = Arc::new(factory.create_eip1559());
2465        propagate.push(PropagateTransaction::new(eip1559_tx.clone()));
2466        let eip4844_tx = Arc::new(factory.create_eip4844());
2467        propagate.push(PropagateTransaction::new(eip4844_tx.clone()));
2468
2469        let propagated =
2470            tx_manager.propagate_transactions(propagate.clone(), PropagationMode::Basic);
2471        assert_eq!(propagated.0.len(), 2);
2472        let prop_txs = propagated.0.get(eip1559_tx.transaction.hash()).unwrap();
2473        assert_eq!(prop_txs.len(), 1);
2474        assert!(prop_txs[0].is_full());
2475
2476        let prop_txs = propagated.0.get(eip4844_tx.transaction.hash()).unwrap();
2477        assert_eq!(prop_txs.len(), 1);
2478        assert!(prop_txs[0].is_hash());
2479
2480        let peer = tx_manager.peers.get(&peer_id).unwrap();
2481        assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2482        assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2483        peer.seen_transactions.contains(eip4844_tx.transaction.hash());
2484
2485        // propagate again
2486        let propagated = tx_manager.propagate_transactions(propagate, PropagationMode::Basic);
2487        assert!(propagated.0.is_empty());
2488    }
2489}