reth_transaction_pool/pool/
best.rs

1use crate::{
2    error::{Eip4844PoolTransactionError, InvalidPoolTransactionError},
3    identifier::{SenderId, TransactionId},
4    pool::pending::PendingTransaction,
5    PoolTransaction, TransactionOrdering, ValidPoolTransaction,
6};
7use alloy_primitives::Address;
8use core::fmt;
9use reth_payload_util::PayloadTransactions;
10use reth_primitives::{InvalidTransactionError, RecoveredTx};
11use std::{
12    collections::{BTreeMap, BTreeSet, HashSet, VecDeque},
13    sync::Arc,
14};
15use tokio::sync::broadcast::{error::TryRecvError, Receiver};
16use tracing::debug;
17
18/// An iterator that returns transactions that can be executed on the current state (*best*
19/// transactions).
20///
21/// This is a wrapper around [`BestTransactions`] that also enforces a specific basefee.
22///
23/// This iterator guarantees that all transaction it returns satisfy both the base fee and blob fee!
24pub(crate) struct BestTransactionsWithFees<T: TransactionOrdering> {
25    pub(crate) best: BestTransactions<T>,
26    pub(crate) base_fee: u64,
27    pub(crate) base_fee_per_blob_gas: u64,
28}
29
30impl<T: TransactionOrdering> crate::traits::BestTransactions for BestTransactionsWithFees<T> {
31    fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
32        BestTransactions::mark_invalid(&mut self.best, tx, kind)
33    }
34
35    fn no_updates(&mut self) {
36        self.best.no_updates()
37    }
38
39    fn skip_blobs(&mut self) {
40        self.set_skip_blobs(true)
41    }
42
43    fn set_skip_blobs(&mut self, skip_blobs: bool) {
44        self.best.set_skip_blobs(skip_blobs)
45    }
46}
47
48impl<T: TransactionOrdering> Iterator for BestTransactionsWithFees<T> {
49    type Item = Arc<ValidPoolTransaction<T::Transaction>>;
50
51    fn next(&mut self) -> Option<Self::Item> {
52        // find the next transaction that satisfies the base fee
53        loop {
54            let best = Iterator::next(&mut self.best)?;
55            // If both the base fee and blob fee (if applicable for EIP-4844) are satisfied, return
56            // the transaction
57            if best.transaction.max_fee_per_gas() >= self.base_fee as u128 &&
58                best.transaction
59                    .max_fee_per_blob_gas()
60                    .is_none_or(|fee| fee >= self.base_fee_per_blob_gas as u128)
61            {
62                return Some(best);
63            }
64            crate::traits::BestTransactions::mark_invalid(
65                self,
66                &best,
67                InvalidPoolTransactionError::Underpriced,
68            );
69        }
70    }
71}
72
73/// An iterator that returns transactions that can be executed on the current state (*best*
74/// transactions).
75///
76/// The [`PendingPool`](crate::pool::pending::PendingPool) contains transactions that *could* all
77/// be executed on the current state, but only yields transactions that are ready to be executed
78/// now. While it contains all gapless transactions of a sender, it _always_ only returns the
79/// transaction with the current on chain nonce.
80#[derive(Debug)]
81pub struct BestTransactions<T: TransactionOrdering> {
82    /// Contains a copy of _all_ transactions of the pending pool at the point in time this
83    /// iterator was created.
84    pub(crate) all: BTreeMap<TransactionId, PendingTransaction<T>>,
85    /// Transactions that can be executed right away: these have the expected nonce.
86    ///
87    /// Once an `independent` transaction with the nonce `N` is returned, it unlocks `N+1`, which
88    /// then can be moved from the `all` set to the `independent` set.
89    pub(crate) independent: BTreeSet<PendingTransaction<T>>,
90    /// There might be the case where a yielded transactions is invalid, this will track it.
91    pub(crate) invalid: HashSet<SenderId>,
92    /// Used to receive any new pending transactions that have been added to the pool after this
93    /// iterator was static fileted
94    ///
95    /// These new pending transactions are inserted into this iterator's pool before yielding the
96    /// next value
97    pub(crate) new_transaction_receiver: Option<Receiver<PendingTransaction<T>>>,
98    /// Flag to control whether to skip blob transactions (EIP4844).
99    pub(crate) skip_blobs: bool,
100}
101
102impl<T: TransactionOrdering> BestTransactions<T> {
103    /// Mark the transaction and it's descendants as invalid.
104    pub(crate) fn mark_invalid(
105        &mut self,
106        tx: &Arc<ValidPoolTransaction<T::Transaction>>,
107        _kind: InvalidPoolTransactionError,
108    ) {
109        self.invalid.insert(tx.sender_id());
110    }
111
112    /// Returns the ancestor the given transaction, the transaction with `nonce - 1`.
113    ///
114    /// Note: for a transaction with nonce higher than the current on chain nonce this will always
115    /// return an ancestor since all transaction in this pool are gapless.
116    pub(crate) fn ancestor(&self, id: &TransactionId) -> Option<&PendingTransaction<T>> {
117        self.all.get(&id.unchecked_ancestor()?)
118    }
119
120    /// Non-blocking read on the new pending transactions subscription channel
121    fn try_recv(&mut self) -> Option<PendingTransaction<T>> {
122        loop {
123            match self.new_transaction_receiver.as_mut()?.try_recv() {
124                Ok(tx) => return Some(tx),
125                // note TryRecvError::Lagged can be returned here, which is an error that attempts
126                // to correct itself on consecutive try_recv() attempts
127
128                // the cost of ignoring this error is allowing old transactions to get
129                // overwritten after the chan buffer size is met
130                Err(TryRecvError::Lagged(_)) => {
131                    // Handle the case where the receiver lagged too far behind.
132                    // `num_skipped` indicates the number of messages that were skipped.
133                    continue
134                }
135
136                // this case is still better than the existing iterator behavior where no new
137                // pending txs are surfaced to consumers
138                Err(_) => return None,
139            }
140        }
141    }
142
143    /// Removes the currently best independent transaction from the independent set and the total
144    /// set.
145    fn pop_best(&mut self) -> Option<PendingTransaction<T>> {
146        self.independent.pop_last().inspect(|best| {
147            let removed = self.all.remove(best.transaction.id());
148            debug_assert!(removed.is_some(), "must be present in both sets");
149        })
150    }
151
152    /// Checks for new transactions that have come into the `PendingPool` after this iterator was
153    /// created and inserts them
154    fn add_new_transactions(&mut self) {
155        while let Some(pending_tx) = self.try_recv() {
156            //  same logic as PendingPool::add_transaction/PendingPool::best_with_unlocked
157            let tx_id = *pending_tx.transaction.id();
158            if self.ancestor(&tx_id).is_none() {
159                self.independent.insert(pending_tx.clone());
160            }
161            self.all.insert(tx_id, pending_tx);
162        }
163    }
164}
165
166impl<T: TransactionOrdering> crate::traits::BestTransactions for BestTransactions<T> {
167    fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
168        Self::mark_invalid(self, tx, kind)
169    }
170
171    fn no_updates(&mut self) {
172        self.new_transaction_receiver.take();
173    }
174
175    fn skip_blobs(&mut self) {
176        self.set_skip_blobs(true);
177    }
178
179    fn set_skip_blobs(&mut self, skip_blobs: bool) {
180        self.skip_blobs = skip_blobs;
181    }
182}
183
184impl<T: TransactionOrdering> Iterator for BestTransactions<T> {
185    type Item = Arc<ValidPoolTransaction<T::Transaction>>;
186
187    fn next(&mut self) -> Option<Self::Item> {
188        loop {
189            self.add_new_transactions();
190            // Remove the next independent tx with the highest priority
191            let best = self.pop_best()?;
192            let sender_id = best.transaction.sender_id();
193
194            // skip transactions for which sender was marked as invalid
195            if self.invalid.contains(&sender_id) {
196                debug!(
197                    target: "txpool",
198                    "[{:?}] skipping invalid transaction",
199                    best.transaction.hash()
200                );
201                continue
202            }
203
204            // Insert transactions that just got unlocked.
205            if let Some(unlocked) = self.all.get(&best.unlocks()) {
206                self.independent.insert(unlocked.clone());
207            }
208
209            if self.skip_blobs && best.transaction.transaction.is_eip4844() {
210                // blobs should be skipped, marking them as invalid will ensure that no dependent
211                // transactions are returned
212                self.mark_invalid(
213                    &best.transaction,
214                    InvalidPoolTransactionError::Eip4844(
215                        Eip4844PoolTransactionError::NoEip4844Blobs,
216                    ),
217                )
218            } else {
219                return Some(best.transaction)
220            }
221        }
222    }
223}
224
225/// Wrapper struct that allows to convert `BestTransactions` (used in tx pool) to
226/// `PayloadTransactions` (used in block composition).
227#[derive(Debug)]
228pub struct BestPayloadTransactions<T, I>
229where
230    T: PoolTransaction,
231    I: Iterator<Item = Arc<ValidPoolTransaction<T>>>,
232{
233    invalid: HashSet<Address>,
234    best: I,
235}
236
237impl<T, I> BestPayloadTransactions<T, I>
238where
239    T: PoolTransaction,
240    I: Iterator<Item = Arc<ValidPoolTransaction<T>>>,
241{
242    /// Create a new `BestPayloadTransactions` with the given iterator.
243    pub fn new(best: I) -> Self {
244        Self { invalid: Default::default(), best }
245    }
246}
247
248impl<T, I> PayloadTransactions for BestPayloadTransactions<T, I>
249where
250    T: PoolTransaction,
251    I: Iterator<Item = Arc<ValidPoolTransaction<T>>>,
252{
253    type Transaction = T::Consensus;
254
255    fn next(&mut self, _ctx: ()) -> Option<RecoveredTx<Self::Transaction>> {
256        loop {
257            let tx = self.best.next()?;
258            if self.invalid.contains(&tx.sender()) {
259                continue
260            }
261            return Some(tx.to_consensus())
262        }
263    }
264
265    fn mark_invalid(&mut self, sender: Address, _nonce: u64) {
266        self.invalid.insert(sender);
267    }
268}
269
270/// A [`BestTransactions`](crate::traits::BestTransactions) implementation that filters the
271/// transactions of iter with predicate.
272///
273/// Filter out transactions are marked as invalid:
274/// [`BestTransactions::mark_invalid`](crate::traits::BestTransactions::mark_invalid).
275pub struct BestTransactionFilter<I, P> {
276    pub(crate) best: I,
277    pub(crate) predicate: P,
278}
279
280impl<I, P> BestTransactionFilter<I, P> {
281    /// Create a new [`BestTransactionFilter`] with the given predicate.
282    pub const fn new(best: I, predicate: P) -> Self {
283        Self { best, predicate }
284    }
285}
286
287impl<I, P> Iterator for BestTransactionFilter<I, P>
288where
289    I: crate::traits::BestTransactions,
290    P: FnMut(&<I as Iterator>::Item) -> bool,
291{
292    type Item = <I as Iterator>::Item;
293
294    fn next(&mut self) -> Option<Self::Item> {
295        loop {
296            let best = self.best.next()?;
297            if (self.predicate)(&best) {
298                return Some(best)
299            }
300            self.best.mark_invalid(
301                &best,
302                InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
303            );
304        }
305    }
306}
307
308impl<I, P> crate::traits::BestTransactions for BestTransactionFilter<I, P>
309where
310    I: crate::traits::BestTransactions,
311    P: FnMut(&<I as Iterator>::Item) -> bool + Send,
312{
313    fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
314        crate::traits::BestTransactions::mark_invalid(&mut self.best, tx, kind)
315    }
316
317    fn no_updates(&mut self) {
318        self.best.no_updates()
319    }
320
321    fn skip_blobs(&mut self) {
322        self.set_skip_blobs(true)
323    }
324
325    fn set_skip_blobs(&mut self, skip_blobs: bool) {
326        self.best.set_skip_blobs(skip_blobs)
327    }
328}
329
330impl<I: fmt::Debug, P> fmt::Debug for BestTransactionFilter<I, P> {
331    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
332        f.debug_struct("BestTransactionFilter").field("best", &self.best).finish()
333    }
334}
335
336/// Wrapper over [`crate::traits::BestTransactions`] that prioritizes transactions of certain
337/// senders capping total gas used by such transactions.
338#[derive(Debug)]
339pub struct BestTransactionsWithPrioritizedSenders<I: Iterator> {
340    /// Inner iterator
341    inner: I,
342    /// A set of senders which transactions should be prioritized
343    prioritized_senders: HashSet<Address>,
344    /// Maximum total gas limit of prioritized transactions
345    max_prioritized_gas: u64,
346    /// Buffer with transactions that are not being prioritized. Those will be the first to be
347    /// included after the prioritized transactions
348    buffer: VecDeque<I::Item>,
349    /// Tracker of total gas limit of prioritized transactions. Once it reaches
350    /// `max_prioritized_gas` no more transactions will be prioritized
351    prioritized_gas: u64,
352}
353
354impl<I: Iterator> BestTransactionsWithPrioritizedSenders<I> {
355    /// Constructs a new [`BestTransactionsWithPrioritizedSenders`].
356    pub fn new(prioritized_senders: HashSet<Address>, max_prioritized_gas: u64, inner: I) -> Self {
357        Self {
358            inner,
359            prioritized_senders,
360            max_prioritized_gas,
361            buffer: Default::default(),
362            prioritized_gas: Default::default(),
363        }
364    }
365}
366
367impl<I, T> Iterator for BestTransactionsWithPrioritizedSenders<I>
368where
369    I: crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T>>>,
370    T: PoolTransaction,
371{
372    type Item = <I as Iterator>::Item;
373
374    fn next(&mut self) -> Option<Self::Item> {
375        // If we have space, try prioritizing transactions
376        if self.prioritized_gas < self.max_prioritized_gas {
377            for item in &mut self.inner {
378                if self.prioritized_senders.contains(&item.transaction.sender()) &&
379                    self.prioritized_gas + item.transaction.gas_limit() <=
380                        self.max_prioritized_gas
381                {
382                    self.prioritized_gas += item.transaction.gas_limit();
383                    return Some(item)
384                }
385                self.buffer.push_back(item);
386            }
387        }
388
389        if let Some(item) = self.buffer.pop_front() {
390            Some(item)
391        } else {
392            self.inner.next()
393        }
394    }
395}
396
397impl<I, T> crate::traits::BestTransactions for BestTransactionsWithPrioritizedSenders<I>
398where
399    I: crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T>>>,
400    T: PoolTransaction,
401{
402    fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
403        self.inner.mark_invalid(tx, kind)
404    }
405
406    fn no_updates(&mut self) {
407        self.inner.no_updates()
408    }
409
410    fn set_skip_blobs(&mut self, skip_blobs: bool) {
411        if skip_blobs {
412            self.buffer.retain(|tx| !tx.transaction.is_eip4844())
413        }
414        self.inner.set_skip_blobs(skip_blobs)
415    }
416}
417
418#[cfg(test)]
419mod tests {
420    use super::*;
421    use crate::{
422        pool::pending::PendingPool,
423        test_utils::{MockOrdering, MockTransaction, MockTransactionFactory},
424        BestTransactions, Priority,
425    };
426    use alloy_primitives::U256;
427    use reth_payload_util::{PayloadTransactionsChain, PayloadTransactionsFixed};
428
429    #[test]
430    fn test_best_iter() {
431        let mut pool = PendingPool::new(MockOrdering::default());
432        let mut f = MockTransactionFactory::default();
433
434        let num_tx = 10;
435        // insert 10 gapless tx
436        let tx = MockTransaction::eip1559();
437        for nonce in 0..num_tx {
438            let tx = tx.clone().rng_hash().with_nonce(nonce);
439            let valid_tx = f.validated(tx);
440            pool.add_transaction(Arc::new(valid_tx), 0);
441        }
442
443        let mut best = pool.best();
444        assert_eq!(best.all.len(), num_tx as usize);
445        assert_eq!(best.independent.len(), 1);
446
447        // check tx are returned in order
448        for nonce in 0..num_tx {
449            assert_eq!(best.independent.len(), 1);
450            let tx = best.next().unwrap();
451            assert_eq!(tx.nonce(), nonce);
452        }
453    }
454
455    #[test]
456    fn test_best_iter_invalid() {
457        let mut pool = PendingPool::new(MockOrdering::default());
458        let mut f = MockTransactionFactory::default();
459
460        let num_tx = 10;
461        // insert 10 gapless tx
462        let tx = MockTransaction::eip1559();
463        for nonce in 0..num_tx {
464            let tx = tx.clone().rng_hash().with_nonce(nonce);
465            let valid_tx = f.validated(tx);
466            pool.add_transaction(Arc::new(valid_tx), 0);
467        }
468
469        let mut best = pool.best();
470
471        // mark the first tx as invalid
472        let invalid = best.independent.iter().next().unwrap();
473        best.mark_invalid(
474            &invalid.transaction.clone(),
475            InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
476        );
477
478        // iterator is empty
479        assert!(best.next().is_none());
480    }
481
482    #[test]
483    fn test_best_transactions_iter_invalid() {
484        let mut pool = PendingPool::new(MockOrdering::default());
485        let mut f = MockTransactionFactory::default();
486
487        let num_tx = 10;
488        // insert 10 gapless tx
489        let tx = MockTransaction::eip1559();
490        for nonce in 0..num_tx {
491            let tx = tx.clone().rng_hash().with_nonce(nonce);
492            let valid_tx = f.validated(tx);
493            pool.add_transaction(Arc::new(valid_tx), 0);
494        }
495
496        let mut best: Box<
497            dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<MockTransaction>>>,
498        > = Box::new(pool.best());
499
500        let tx = Iterator::next(&mut best).unwrap();
501        crate::traits::BestTransactions::mark_invalid(
502            &mut *best,
503            &tx,
504            InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
505        );
506        assert!(Iterator::next(&mut best).is_none());
507    }
508
509    #[test]
510    fn test_best_with_fees_iter_base_fee_satisfied() {
511        let mut pool = PendingPool::new(MockOrdering::default());
512        let mut f = MockTransactionFactory::default();
513
514        let num_tx = 5;
515        let base_fee: u64 = 10;
516        let base_fee_per_blob_gas: u64 = 15;
517
518        // Insert transactions with a max_fee_per_gas greater than or equal to the base fee
519        // Without blob fee
520        for nonce in 0..num_tx {
521            let tx = MockTransaction::eip1559()
522                .rng_hash()
523                .with_nonce(nonce)
524                .with_max_fee(base_fee as u128 + 5);
525            let valid_tx = f.validated(tx);
526            pool.add_transaction(Arc::new(valid_tx), 0);
527        }
528
529        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
530
531        for nonce in 0..num_tx {
532            let tx = best.next().expect("Transaction should be returned");
533            assert_eq!(tx.nonce(), nonce);
534            assert!(tx.transaction.max_fee_per_gas() >= base_fee as u128);
535        }
536    }
537
538    #[test]
539    fn test_best_with_fees_iter_base_fee_violated() {
540        let mut pool = PendingPool::new(MockOrdering::default());
541        let mut f = MockTransactionFactory::default();
542
543        let num_tx = 5;
544        let base_fee: u64 = 20;
545        let base_fee_per_blob_gas: u64 = 15;
546
547        // Insert transactions with a max_fee_per_gas less than the base fee
548        for nonce in 0..num_tx {
549            let tx = MockTransaction::eip1559()
550                .rng_hash()
551                .with_nonce(nonce)
552                .with_max_fee(base_fee as u128 - 5);
553            let valid_tx = f.validated(tx);
554            pool.add_transaction(Arc::new(valid_tx), 0);
555        }
556
557        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
558
559        // No transaction should be returned since all violate the base fee
560        assert!(best.next().is_none());
561    }
562
563    #[test]
564    fn test_best_with_fees_iter_blob_fee_satisfied() {
565        let mut pool = PendingPool::new(MockOrdering::default());
566        let mut f = MockTransactionFactory::default();
567
568        let num_tx = 5;
569        let base_fee: u64 = 10;
570        let base_fee_per_blob_gas: u64 = 20;
571
572        // Insert transactions with a max_fee_per_blob_gas greater than or equal to the base fee per
573        // blob gas
574        for nonce in 0..num_tx {
575            let tx = MockTransaction::eip4844()
576                .rng_hash()
577                .with_nonce(nonce)
578                .with_max_fee(base_fee as u128 + 5)
579                .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
580            let valid_tx = f.validated(tx);
581            pool.add_transaction(Arc::new(valid_tx), 0);
582        }
583
584        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
585
586        // All transactions should be returned in order since they satisfy both base fee and blob
587        // fee
588        for nonce in 0..num_tx {
589            let tx = best.next().expect("Transaction should be returned");
590            assert_eq!(tx.nonce(), nonce);
591            assert!(tx.transaction.max_fee_per_gas() >= base_fee as u128);
592            assert!(
593                tx.transaction.max_fee_per_blob_gas().unwrap() >= base_fee_per_blob_gas as u128
594            );
595        }
596
597        // No more transactions should be returned
598        assert!(best.next().is_none());
599    }
600
601    #[test]
602    fn test_best_with_fees_iter_blob_fee_violated() {
603        let mut pool = PendingPool::new(MockOrdering::default());
604        let mut f = MockTransactionFactory::default();
605
606        let num_tx = 5;
607        let base_fee: u64 = 10;
608        let base_fee_per_blob_gas: u64 = 20;
609
610        // Insert transactions with a max_fee_per_blob_gas less than the base fee per blob gas
611        for nonce in 0..num_tx {
612            let tx = MockTransaction::eip4844()
613                .rng_hash()
614                .with_nonce(nonce)
615                .with_max_fee(base_fee as u128 + 5)
616                .with_blob_fee(base_fee_per_blob_gas as u128 - 5);
617            let valid_tx = f.validated(tx);
618            pool.add_transaction(Arc::new(valid_tx), 0);
619        }
620
621        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
622
623        // No transaction should be returned since all violate the blob fee
624        assert!(best.next().is_none());
625    }
626
627    #[test]
628    fn test_best_with_fees_iter_mixed_fees() {
629        let mut pool = PendingPool::new(MockOrdering::default());
630        let mut f = MockTransactionFactory::default();
631
632        let base_fee: u64 = 10;
633        let base_fee_per_blob_gas: u64 = 20;
634
635        // Insert transactions with varying max_fee_per_gas and max_fee_per_blob_gas
636        let tx1 =
637            MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(base_fee as u128 + 5);
638        let tx2 = MockTransaction::eip4844()
639            .rng_hash()
640            .with_nonce(1)
641            .with_max_fee(base_fee as u128 + 5)
642            .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
643        let tx3 = MockTransaction::eip4844()
644            .rng_hash()
645            .with_nonce(2)
646            .with_max_fee(base_fee as u128 + 5)
647            .with_blob_fee(base_fee_per_blob_gas as u128 - 5);
648        let tx4 =
649            MockTransaction::eip1559().rng_hash().with_nonce(3).with_max_fee(base_fee as u128 - 5);
650
651        pool.add_transaction(Arc::new(f.validated(tx1.clone())), 0);
652        pool.add_transaction(Arc::new(f.validated(tx2.clone())), 0);
653        pool.add_transaction(Arc::new(f.validated(tx3)), 0);
654        pool.add_transaction(Arc::new(f.validated(tx4)), 0);
655
656        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
657
658        let expected_order = vec![tx1, tx2];
659        for expected_tx in expected_order {
660            let tx = best.next().expect("Transaction should be returned");
661            assert_eq!(tx.transaction, expected_tx);
662        }
663
664        // No more transactions should be returned
665        assert!(best.next().is_none());
666    }
667
668    #[test]
669    fn test_best_add_transaction_with_next_nonce() {
670        let mut pool = PendingPool::new(MockOrdering::default());
671        let mut f = MockTransactionFactory::default();
672
673        // Add 5 transactions with increasing nonces to the pool
674        let num_tx = 5;
675        let tx = MockTransaction::eip1559();
676        for nonce in 0..num_tx {
677            let tx = tx.clone().rng_hash().with_nonce(nonce);
678            let valid_tx = f.validated(tx);
679            pool.add_transaction(Arc::new(valid_tx), 0);
680        }
681
682        // Create a BestTransactions iterator from the pool
683        let mut best = pool.best();
684
685        // Use a broadcast channel for transaction updates
686        let (tx_sender, tx_receiver) =
687            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
688        best.new_transaction_receiver = Some(tx_receiver);
689
690        // Create a new transaction with nonce 5 and validate it
691        let new_tx = MockTransaction::eip1559().rng_hash().with_nonce(5);
692        let valid_new_tx = f.validated(new_tx);
693
694        // Send the new transaction through the broadcast channel
695        let pending_tx = PendingTransaction {
696            submission_id: 10,
697            transaction: Arc::new(valid_new_tx.clone()),
698            priority: Priority::Value(U256::from(1000)),
699        };
700        tx_sender.send(pending_tx.clone()).unwrap();
701
702        // Add new transactions to the iterator
703        best.add_new_transactions();
704
705        // Verify that the new transaction has been added to the 'all' map
706        assert_eq!(best.all.len(), 6);
707        assert!(best.all.contains_key(valid_new_tx.id()));
708
709        // Verify that the new transaction has been added to the 'independent' set
710        assert_eq!(best.independent.len(), 2);
711        assert!(best.independent.contains(&pending_tx));
712    }
713
714    #[test]
715    fn test_best_add_transaction_with_ancestor() {
716        // Initialize a new PendingPool with default MockOrdering and MockTransactionFactory
717        let mut pool = PendingPool::new(MockOrdering::default());
718        let mut f = MockTransactionFactory::default();
719
720        // Add 5 transactions with increasing nonces to the pool
721        let num_tx = 5;
722        let tx = MockTransaction::eip1559();
723        for nonce in 0..num_tx {
724            let tx = tx.clone().rng_hash().with_nonce(nonce);
725            let valid_tx = f.validated(tx);
726            pool.add_transaction(Arc::new(valid_tx), 0);
727        }
728
729        // Create a BestTransactions iterator from the pool
730        let mut best = pool.best();
731
732        // Use a broadcast channel for transaction updates
733        let (tx_sender, tx_receiver) =
734            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
735        best.new_transaction_receiver = Some(tx_receiver);
736
737        // Create a new transaction with nonce 5 and validate it
738        let base_tx1 = MockTransaction::eip1559().rng_hash().with_nonce(5);
739        let valid_new_tx1 = f.validated(base_tx1.clone());
740
741        // Send the new transaction through the broadcast channel
742        let pending_tx1 = PendingTransaction {
743            submission_id: 10,
744            transaction: Arc::new(valid_new_tx1.clone()),
745            priority: Priority::Value(U256::from(1000)),
746        };
747        tx_sender.send(pending_tx1.clone()).unwrap();
748
749        // Add new transactions to the iterator
750        best.add_new_transactions();
751
752        // Verify that the new transaction has been added to the 'all' map
753        assert_eq!(best.all.len(), 6);
754        assert!(best.all.contains_key(valid_new_tx1.id()));
755
756        // Verify that the new transaction has been added to the 'independent' set
757        assert_eq!(best.independent.len(), 2);
758        assert!(best.independent.contains(&pending_tx1));
759
760        // Attempt to add a new transaction with a different nonce (not a duplicate)
761        let base_tx2 = base_tx1.with_nonce(6);
762        let valid_new_tx2 = f.validated(base_tx2);
763
764        // Send the new transaction through the broadcast channel
765        let pending_tx2 = PendingTransaction {
766            submission_id: 11, // Different submission ID
767            transaction: Arc::new(valid_new_tx2.clone()),
768            priority: Priority::Value(U256::from(1000)),
769        };
770        tx_sender.send(pending_tx2.clone()).unwrap();
771
772        // Add new transactions to the iterator
773        best.add_new_transactions();
774
775        // Verify that the new transaction has been added to 'all'
776        assert_eq!(best.all.len(), 7);
777        assert!(best.all.contains_key(valid_new_tx2.id()));
778
779        // Verify that the new transaction has not been added to the 'independent' set
780        assert_eq!(best.independent.len(), 2);
781        assert!(!best.independent.contains(&pending_tx2));
782    }
783
784    #[test]
785    fn test_best_transactions_filter_trait_object() {
786        // Initialize a new PendingPool with default MockOrdering and MockTransactionFactory
787        let mut pool = PendingPool::new(MockOrdering::default());
788        let mut f = MockTransactionFactory::default();
789
790        // Add 5 transactions with increasing nonces to the pool
791        let num_tx = 5;
792        let tx = MockTransaction::eip1559();
793        for nonce in 0..num_tx {
794            let tx = tx.clone().rng_hash().with_nonce(nonce);
795            let valid_tx = f.validated(tx);
796            pool.add_transaction(Arc::new(valid_tx), 0);
797        }
798
799        // Create a trait object of BestTransactions iterator from the pool
800        let best: Box<dyn crate::traits::BestTransactions<Item = _>> = Box::new(pool.best());
801
802        // Create a filter that only returns transactions with even nonces
803        let filter =
804            BestTransactionFilter::new(best, |tx: &Arc<ValidPoolTransaction<MockTransaction>>| {
805                tx.nonce() % 2 == 0
806            });
807
808        // Verify that the filter only returns transactions with even nonces
809        for tx in filter {
810            assert_eq!(tx.nonce() % 2, 0);
811        }
812    }
813
814    #[test]
815    fn test_best_transactions_prioritized_senders() {
816        let mut pool = PendingPool::new(MockOrdering::default());
817        let mut f = MockTransactionFactory::default();
818
819        // Add 5 plain transactions from different senders with increasing gas price
820        for gas_price in 0..5 {
821            let tx = MockTransaction::eip1559().with_gas_price(gas_price);
822            let valid_tx = f.validated(tx);
823            pool.add_transaction(Arc::new(valid_tx), 0);
824        }
825
826        // Add another transaction with 0 gas price that's going to be prioritized by sender
827        let prioritized_tx = MockTransaction::eip1559().with_gas_price(0);
828        let valid_prioritized_tx = f.validated(prioritized_tx.clone());
829        pool.add_transaction(Arc::new(valid_prioritized_tx), 0);
830
831        let prioritized_senders = HashSet::from([prioritized_tx.sender()]);
832        let best =
833            BestTransactionsWithPrioritizedSenders::new(prioritized_senders, 200, pool.best());
834
835        // Verify that the prioritized transaction is returned first
836        // and the rest are returned in the reverse order of gas price
837        let mut iter = best.into_iter();
838        let top_of_block_tx = iter.next().unwrap();
839        assert_eq!(top_of_block_tx.max_fee_per_gas(), 0);
840        assert_eq!(top_of_block_tx.sender(), prioritized_tx.sender());
841        for gas_price in (0..5).rev() {
842            assert_eq!(iter.next().unwrap().max_fee_per_gas(), gas_price);
843        }
844
845        // TODO: Test that gas limits for prioritized transactions are respected
846    }
847
848    #[test]
849    fn test_best_transactions_chained_iterators() {
850        let mut priority_pool = PendingPool::new(MockOrdering::default());
851        let mut pool = PendingPool::new(MockOrdering::default());
852        let mut f = MockTransactionFactory::default();
853
854        // Block composition
855        // ===
856        // (1) up to 100 gas: custom top-of-block transaction
857        // (2) up to 100 gas: transactions from the priority pool
858        // (3) up to 200 gas: only transactions from address A
859        // (4) up to 200 gas: only transactions from address B
860        // (5) until block gas limit: all transactions from the main pool
861
862        // Notes:
863        // - If prioritized addresses overlap, a single transaction will be prioritized twice and
864        //   therefore use the per-segment gas limit twice.
865        // - Priority pool and main pool must synchronize between each other to make sure there are
866        //   no conflicts for the same nonce. For example, in this scenario, pools can't reject
867        //   transactions with seemingly incorrect nonces, because previous transactions might be in
868        //   the other pool.
869
870        let address_top_of_block = Address::random();
871        let address_in_priority_pool = Address::random();
872        let address_a = Address::random();
873        let address_b = Address::random();
874        let address_regular = Address::random();
875
876        // Add transactions to the main pool
877        {
878            let prioritized_tx_a =
879                MockTransaction::eip1559().with_gas_price(5).with_sender(address_a);
880            // without our custom logic, B would be prioritized over A due to gas price:
881            let prioritized_tx_b =
882                MockTransaction::eip1559().with_gas_price(10).with_sender(address_b);
883            let regular_tx =
884                MockTransaction::eip1559().with_gas_price(15).with_sender(address_regular);
885            pool.add_transaction(Arc::new(f.validated(prioritized_tx_a)), 0);
886            pool.add_transaction(Arc::new(f.validated(prioritized_tx_b)), 0);
887            pool.add_transaction(Arc::new(f.validated(regular_tx)), 0);
888        }
889
890        // Add transactions to the priority pool
891        {
892            let prioritized_tx =
893                MockTransaction::eip1559().with_gas_price(0).with_sender(address_in_priority_pool);
894            let valid_prioritized_tx = f.validated(prioritized_tx);
895            priority_pool.add_transaction(Arc::new(valid_prioritized_tx), 0);
896        }
897
898        let mut block = PayloadTransactionsChain::new(
899            PayloadTransactionsFixed::single(
900                MockTransaction::eip1559().with_sender(address_top_of_block).into(),
901            ),
902            Some(100),
903            PayloadTransactionsChain::new(
904                BestPayloadTransactions::new(priority_pool.best()),
905                Some(100),
906                BestPayloadTransactions::new(BestTransactionsWithPrioritizedSenders::new(
907                    HashSet::from([address_a]),
908                    200,
909                    BestTransactionsWithPrioritizedSenders::new(
910                        HashSet::from([address_b]),
911                        200,
912                        pool.best(),
913                    ),
914                )),
915                None,
916            ),
917            None,
918        );
919
920        assert_eq!(block.next(()).unwrap().signer(), address_top_of_block);
921        assert_eq!(block.next(()).unwrap().signer(), address_in_priority_pool);
922        assert_eq!(block.next(()).unwrap().signer(), address_a);
923        assert_eq!(block.next(()).unwrap().signer(), address_b);
924        assert_eq!(block.next(()).unwrap().signer(), address_regular);
925    }
926
927    #[test]
928    fn test_best_with_fees_iter_no_blob_fee_required() {
929        // Tests transactions without blob fees where base fees are checked.
930        let mut pool = PendingPool::new(MockOrdering::default());
931        let mut f = MockTransactionFactory::default();
932
933        let base_fee: u64 = 10;
934        let base_fee_per_blob_gas: u64 = 0; // No blob fee requirement
935
936        // Insert transactions with max_fee_per_gas above the base fee
937        for nonce in 0..5 {
938            let tx = MockTransaction::eip1559()
939                .rng_hash()
940                .with_nonce(nonce)
941                .with_max_fee(base_fee as u128 + 5);
942            let valid_tx = f.validated(tx);
943            pool.add_transaction(Arc::new(valid_tx), 0);
944        }
945
946        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
947
948        // All transactions should be returned as no blob fee requirement is imposed
949        for nonce in 0..5 {
950            let tx = best.next().expect("Transaction should be returned");
951            assert_eq!(tx.nonce(), nonce);
952        }
953
954        // Ensure no more transactions are left
955        assert!(best.next().is_none());
956    }
957
958    #[test]
959    fn test_best_with_fees_iter_mix_of_blob_and_non_blob_transactions() {
960        // Tests mixed scenarios with both blob and non-blob transactions.
961        let mut pool = PendingPool::new(MockOrdering::default());
962        let mut f = MockTransactionFactory::default();
963
964        let base_fee: u64 = 10;
965        let base_fee_per_blob_gas: u64 = 15;
966
967        // Add a non-blob transaction that satisfies the base fee
968        let tx_non_blob =
969            MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(base_fee as u128 + 5);
970        pool.add_transaction(Arc::new(f.validated(tx_non_blob.clone())), 0);
971
972        // Add a blob transaction that satisfies both base fee and blob fee
973        let tx_blob = MockTransaction::eip4844()
974            .rng_hash()
975            .with_nonce(1)
976            .with_max_fee(base_fee as u128 + 5)
977            .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
978        pool.add_transaction(Arc::new(f.validated(tx_blob.clone())), 0);
979
980        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
981
982        // Verify both transactions are returned
983        let tx = best.next().expect("Transaction should be returned");
984        assert_eq!(tx.transaction, tx_non_blob);
985
986        let tx = best.next().expect("Transaction should be returned");
987        assert_eq!(tx.transaction, tx_blob);
988
989        // Ensure no more transactions are left
990        assert!(best.next().is_none());
991    }
992
993    #[test]
994    fn test_best_transactions_with_skipping_blobs() {
995        // Tests the skip_blobs functionality to ensure blob transactions are skipped.
996        let mut pool = PendingPool::new(MockOrdering::default());
997        let mut f = MockTransactionFactory::default();
998
999        // Add a blob transaction
1000        let tx_blob = MockTransaction::eip4844().rng_hash().with_nonce(0).with_blob_fee(100);
1001        let valid_blob_tx = f.validated(tx_blob);
1002        pool.add_transaction(Arc::new(valid_blob_tx), 0);
1003
1004        // Add a non-blob transaction
1005        let tx_non_blob = MockTransaction::eip1559().rng_hash().with_nonce(1).with_max_fee(200);
1006        let valid_non_blob_tx = f.validated(tx_non_blob.clone());
1007        pool.add_transaction(Arc::new(valid_non_blob_tx), 0);
1008
1009        let mut best = pool.best();
1010        best.skip_blobs();
1011
1012        // Only the non-blob transaction should be returned
1013        let tx = best.next().expect("Transaction should be returned");
1014        assert_eq!(tx.transaction, tx_non_blob);
1015
1016        // Ensure no more transactions are left
1017        assert!(best.next().is_none());
1018    }
1019
1020    #[test]
1021    fn test_best_transactions_no_updates() {
1022        // Tests the no_updates functionality to ensure it properly clears the
1023        // new_transaction_receiver.
1024        let mut pool = PendingPool::new(MockOrdering::default());
1025        let mut f = MockTransactionFactory::default();
1026
1027        // Add a transaction
1028        let tx = MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(100);
1029        let valid_tx = f.validated(tx);
1030        pool.add_transaction(Arc::new(valid_tx), 0);
1031
1032        let mut best = pool.best();
1033
1034        // Use a broadcast channel for transaction updates
1035        let (_tx_sender, tx_receiver) =
1036            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
1037        best.new_transaction_receiver = Some(tx_receiver);
1038
1039        // Ensure receiver is set
1040        assert!(best.new_transaction_receiver.is_some());
1041
1042        // Call no_updates to clear the receiver
1043        best.no_updates();
1044
1045        // Ensure receiver is cleared
1046        assert!(best.new_transaction_receiver.is_none());
1047    }
1048
1049    // TODO: Same nonce test
1050}