reth_transaction_pool/pool/
pending.rs

1use crate::{
2    identifier::{SenderId, TransactionId},
3    pool::{
4        best::{BestTransactions, BestTransactionsWithFees},
5        size::SizeTracker,
6    },
7    Priority, SubPoolLimit, TransactionOrdering, ValidPoolTransaction,
8};
9use rustc_hash::{FxHashMap, FxHashSet};
10use std::{
11    cmp::Ordering,
12    collections::{hash_map::Entry, BTreeMap},
13    ops::Bound::Unbounded,
14    sync::Arc,
15};
16use tokio::sync::broadcast;
17
18/// A pool of validated and gapless transactions that are ready to be executed on the current state
19/// and are waiting to be included in a block.
20///
21/// This pool distinguishes between `independent` transactions and pending transactions. A
22/// transaction is `independent`, if it is in the pending pool, and it has the current on chain
23/// nonce of the sender. Meaning `independent` transactions can be executed right away, other
24/// pending transactions depend on at least one `independent` transaction.
25///
26/// Once an `independent` transaction was executed it *unlocks* the next nonce, if this transaction
27/// is also pending, then this will be moved to the `independent` queue.
28#[derive(Debug, Clone)]
29pub struct PendingPool<T: TransactionOrdering> {
30    /// How to order transactions.
31    ordering: T,
32    /// Keeps track of transactions inserted in the pool.
33    ///
34    /// This way we can determine when transactions were submitted to the pool.
35    submission_id: u64,
36    /// _All_ Transactions that are currently inside the pool grouped by their identifier.
37    by_id: BTreeMap<TransactionId, PendingTransaction<T>>,
38    /// The highest nonce transactions for each sender - like the `independent` set, but the
39    /// highest instead of lowest nonce.
40    highest_nonces: FxHashMap<SenderId, PendingTransaction<T>>,
41    /// Independent transactions that can be included directly and don't require other
42    /// transactions.
43    independent_transactions: FxHashMap<SenderId, PendingTransaction<T>>,
44    /// Keeps track of the size of this pool.
45    ///
46    /// See also [`reth_primitives_traits::InMemorySize::size`].
47    size_of: SizeTracker,
48    /// Used to broadcast new transactions that have been added to the `PendingPool` to existing
49    /// `static_files` of this pool.
50    new_transaction_notifier: broadcast::Sender<PendingTransaction<T>>,
51}
52
53// === impl PendingPool ===
54
55impl<T: TransactionOrdering> PendingPool<T> {
56    /// Create a new pending pool instance.
57    pub fn new(ordering: T) -> Self {
58        Self::with_buffer(ordering, 200)
59    }
60
61    /// Create a new pool instance with the given buffer capacity.
62    pub fn with_buffer(ordering: T, buffer_capacity: usize) -> Self {
63        let (new_transaction_notifier, _) = broadcast::channel(buffer_capacity);
64        Self {
65            ordering,
66            submission_id: 0,
67            by_id: Default::default(),
68            independent_transactions: Default::default(),
69            highest_nonces: Default::default(),
70            size_of: Default::default(),
71            new_transaction_notifier,
72        }
73    }
74
75    /// Clear all transactions from the pool without resetting other values.
76    /// Used for atomic reordering during basefee update.
77    ///
78    /// # Returns
79    ///
80    /// Returns all transactions by id.
81    fn clear_transactions(&mut self) -> BTreeMap<TransactionId, PendingTransaction<T>> {
82        self.independent_transactions.clear();
83        self.highest_nonces.clear();
84        self.size_of.reset();
85        std::mem::take(&mut self.by_id)
86    }
87
88    /// Returns an iterator over all transactions that are _currently_ ready.
89    ///
90    /// 1. The iterator _always_ returns transactions in order: it never returns a transaction with
91    ///    an unsatisfied dependency and only returns them if dependency transaction were yielded
92    ///    previously. In other words: the nonces of transactions with the same sender will _always_
93    ///    increase by exactly 1.
94    ///
95    /// The order of transactions which satisfy (1.) is determined by their computed priority: a
96    /// transaction with a higher priority is returned before a transaction with a lower priority.
97    ///
98    /// If two transactions have the same priority score, then the transactions which spent more
99    /// time in pool (were added earlier) are returned first.
100    ///
101    /// NOTE: while this iterator returns transaction that pool considers valid at this point, they
102    /// could potentially be become invalid at point of execution. Therefore, this iterator
103    /// provides a way to mark transactions that the consumer of this iterator considers invalid. In
104    /// which case the transaction's subgraph is also automatically marked invalid, See (1.).
105    /// Invalid transactions are skipped.
106    pub fn best(&self) -> BestTransactions<T> {
107        BestTransactions {
108            all: self.by_id.clone(),
109            independent: self.independent_transactions.values().cloned().collect(),
110            invalid: Default::default(),
111            new_transaction_receiver: Some(self.new_transaction_notifier.subscribe()),
112            skip_blobs: false,
113        }
114    }
115
116    /// Same as `best` but only returns transactions that satisfy the given basefee and blobfee.
117    pub(crate) fn best_with_basefee_and_blobfee(
118        &self,
119        base_fee: u64,
120        base_fee_per_blob_gas: u64,
121    ) -> BestTransactionsWithFees<T> {
122        BestTransactionsWithFees { best: self.best(), base_fee, base_fee_per_blob_gas }
123    }
124
125    /// Same as `best` but also includes the given unlocked transactions.
126    ///
127    /// This mimics the [`Self::add_transaction`] method, but does not insert the transactions into
128    /// pool but only into the returned iterator.
129    ///
130    /// Note: this does not insert the unlocked transactions into the pool.
131    ///
132    /// # Panics
133    ///
134    /// if the transaction is already included
135    pub(crate) fn best_with_unlocked_and_attributes(
136        &self,
137        unlocked: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
138        base_fee: u64,
139        base_fee_per_blob_gas: u64,
140    ) -> BestTransactionsWithFees<T> {
141        let mut best = self.best();
142        let mut submission_id = self.submission_id;
143        for tx in unlocked {
144            submission_id += 1;
145            debug_assert!(!best.all.contains_key(tx.id()), "transaction already included");
146            let priority = self.ordering.priority(&tx.transaction, base_fee);
147            let tx_id = *tx.id();
148            let transaction = PendingTransaction { submission_id, transaction: tx, priority };
149            if best.ancestor(&tx_id).is_none() {
150                best.independent.insert(transaction.clone());
151            }
152            best.all.insert(tx_id, transaction);
153        }
154
155        BestTransactionsWithFees { best, base_fee, base_fee_per_blob_gas }
156    }
157
158    /// Returns an iterator over all transactions in the pool
159    pub(crate) fn all(
160        &self,
161    ) -> impl Iterator<Item = Arc<ValidPoolTransaction<T::Transaction>>> + '_ {
162        self.by_id.values().map(|tx| tx.transaction.clone())
163    }
164
165    /// Updates the pool with the new blob fee. Removes
166    /// from the subpool all transactions and their dependents that no longer satisfy the given
167    /// blob fee (`tx.max_blob_fee < blob_fee`).
168    ///
169    /// Note: the transactions are not returned in a particular order.
170    ///
171    /// # Returns
172    ///
173    /// Removed transactions that no longer satisfy the blob fee.
174    pub(crate) fn update_blob_fee(
175        &mut self,
176        blob_fee: u128,
177    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
178        // Create a collection for removed transactions.
179        let mut removed = Vec::new();
180
181        // Drain and iterate over all transactions.
182        let mut transactions_iter = self.clear_transactions().into_iter().peekable();
183        while let Some((id, tx)) = transactions_iter.next() {
184            if tx.transaction.max_fee_per_blob_gas() < Some(blob_fee) {
185                // Add this tx to the removed collection since it no longer satisfies the blob fee
186                // condition. Decrease the total pool size.
187                removed.push(Arc::clone(&tx.transaction));
188
189                // Remove all dependent transactions.
190                'this: while let Some((next_id, next_tx)) = transactions_iter.peek() {
191                    if next_id.sender != id.sender {
192                        break 'this
193                    }
194                    removed.push(Arc::clone(&next_tx.transaction));
195                    transactions_iter.next();
196                }
197            } else {
198                self.size_of += tx.transaction.size();
199                self.update_independents_and_highest_nonces(&tx);
200                self.by_id.insert(id, tx);
201            }
202        }
203
204        removed
205    }
206
207    /// Updates the pool with the new base fee. Reorders transactions by new priorities. Removes
208    /// from the subpool all transactions and their dependents that no longer satisfy the given
209    /// base fee (`tx.fee < base_fee`).
210    ///
211    /// Note: the transactions are not returned in a particular order.
212    ///
213    /// # Returns
214    ///
215    /// Removed transactions that no longer satisfy the base fee.
216    pub(crate) fn update_base_fee(
217        &mut self,
218        base_fee: u64,
219    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
220        // Create a collection for removed transactions.
221        let mut removed = Vec::new();
222
223        // Drain and iterate over all transactions.
224        let mut transactions_iter = self.clear_transactions().into_iter().peekable();
225        while let Some((id, mut tx)) = transactions_iter.next() {
226            if tx.transaction.max_fee_per_gas() < base_fee as u128 {
227                // Add this tx to the removed collection since it no longer satisfies the base fee
228                // condition. Decrease the total pool size.
229                removed.push(Arc::clone(&tx.transaction));
230
231                // Remove all dependent transactions.
232                'this: while let Some((next_id, next_tx)) = transactions_iter.peek() {
233                    if next_id.sender != id.sender {
234                        break 'this
235                    }
236                    removed.push(Arc::clone(&next_tx.transaction));
237                    transactions_iter.next();
238                }
239            } else {
240                // Re-insert the transaction with new priority.
241                tx.priority = self.ordering.priority(&tx.transaction.transaction, base_fee);
242
243                self.size_of += tx.transaction.size();
244                self.update_independents_and_highest_nonces(&tx);
245                self.by_id.insert(id, tx);
246            }
247        }
248
249        removed
250    }
251
252    /// Updates the independent transaction and highest nonces set, assuming the given transaction
253    /// is being _added_ to the pool.
254    fn update_independents_and_highest_nonces(&mut self, tx: &PendingTransaction<T>) {
255        match self.highest_nonces.entry(tx.transaction.sender_id()) {
256            Entry::Occupied(mut entry) => {
257                if entry.get().transaction.nonce() < tx.transaction.nonce() {
258                    *entry.get_mut() = tx.clone();
259                }
260            }
261            Entry::Vacant(entry) => {
262                entry.insert(tx.clone());
263            }
264        }
265        match self.independent_transactions.entry(tx.transaction.sender_id()) {
266            Entry::Occupied(mut entry) => {
267                if entry.get().transaction.nonce() > tx.transaction.nonce() {
268                    *entry.get_mut() = tx.clone();
269                }
270            }
271            Entry::Vacant(entry) => {
272                entry.insert(tx.clone());
273            }
274        }
275    }
276
277    /// Returns the ancestor the given transaction, the transaction with `nonce - 1`.
278    ///
279    /// Note: for a transaction with nonce higher than the current on chain nonce this will always
280    /// return an ancestor since all transaction in this pool are gapless.
281    fn ancestor(&self, id: &TransactionId) -> Option<&PendingTransaction<T>> {
282        self.get(&id.unchecked_ancestor()?)
283    }
284
285    /// Adds a new transactions to the pending queue.
286    ///
287    /// # Panics
288    ///
289    /// if the transaction is already included
290    pub fn add_transaction(
291        &mut self,
292        tx: Arc<ValidPoolTransaction<T::Transaction>>,
293        base_fee: u64,
294    ) {
295        assert!(
296            !self.contains(tx.id()),
297            "transaction already included {:?}",
298            self.get(tx.id()).unwrap().transaction
299        );
300
301        // keep track of size
302        self.size_of += tx.size();
303
304        let tx_id = *tx.id();
305
306        let submission_id = self.next_id();
307        let priority = self.ordering.priority(&tx.transaction, base_fee);
308        let tx = PendingTransaction { submission_id, transaction: tx, priority };
309
310        self.update_independents_and_highest_nonces(&tx);
311
312        // send the new transaction to any existing pendingpool static file iterators
313        if self.new_transaction_notifier.receiver_count() > 0 {
314            let _ = self.new_transaction_notifier.send(tx.clone());
315        }
316
317        self.by_id.insert(tx_id, tx);
318    }
319
320    /// Removes the transaction from the pool.
321    ///
322    /// Note: If the transaction has a descendant transaction
323    /// it will advance it to the best queue.
324    pub(crate) fn remove_transaction(
325        &mut self,
326        id: &TransactionId,
327    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
328        if let Some(lowest) = self.independent_transactions.get(&id.sender) {
329            if lowest.transaction.nonce() == id.nonce {
330                self.independent_transactions.remove(&id.sender);
331                // mark the next as independent if it exists
332                if let Some(unlocked) = self.get(&id.descendant()) {
333                    self.independent_transactions.insert(id.sender, unlocked.clone());
334                }
335            }
336        }
337
338        let tx = self.by_id.remove(id)?;
339        self.size_of -= tx.transaction.size();
340
341        if let Some(highest) = self.highest_nonces.get(&id.sender) {
342            if highest.transaction.nonce() == id.nonce {
343                self.highest_nonces.remove(&id.sender);
344            }
345            if let Some(ancestor) = self.ancestor(id) {
346                self.highest_nonces.insert(id.sender, ancestor.clone());
347            }
348        }
349        Some(tx.transaction)
350    }
351
352    const fn next_id(&mut self) -> u64 {
353        let id = self.submission_id;
354        self.submission_id = self.submission_id.wrapping_add(1);
355        id
356    }
357
358    /// Traverses the pool, starting at the highest nonce set, removing the transactions which
359    /// would put the pool under the specified limits.
360    ///
361    /// This attempts to remove transactions by roughly the same amount for each sender. This is
362    /// done by removing the highest-nonce transactions for each sender.
363    ///
364    /// If the `remove_locals` flag is unset, transactions will be removed per-sender until a
365    /// local transaction is the highest nonce transaction for that sender. If all senders have a
366    /// local highest-nonce transaction, the pool will not be truncated further.
367    ///
368    /// Otherwise, if the `remove_locals` flag is set, transactions will be removed per-sender
369    /// until the pool is under the given limits.
370    ///
371    /// Any removed transactions will be added to the `end_removed` vector.
372    pub fn remove_to_limit(
373        &mut self,
374        limit: &SubPoolLimit,
375        remove_locals: bool,
376        end_removed: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
377    ) {
378        // This serves as a termination condition for the loop - it represents the number of
379        // _valid_ unique senders that might have descendants in the pool.
380        //
381        // If `remove_locals` is false, a value of zero means that there are no non-local txs in the
382        // pool that can be removed.
383        //
384        // If `remove_locals` is true, a value of zero means that there are no txs in the pool that
385        // can be removed.
386        let mut non_local_senders = self.highest_nonces.len();
387
388        // keeps track of unique senders from previous iterations, to understand how many unique
389        // senders were removed in the last iteration
390        let mut unique_senders = self.highest_nonces.len();
391
392        // keeps track of which senders we've marked as local
393        let mut local_senders = FxHashSet::default();
394
395        // keep track of transactions to remove and how many have been removed so far
396        let original_length = self.len();
397        let mut removed = Vec::new();
398        let mut total_removed = 0;
399
400        // track total `size` of transactions to remove
401        let original_size = self.size();
402        let mut total_size = 0;
403
404        loop {
405            // check how many unique senders were removed last iteration
406            let unique_removed = unique_senders - self.highest_nonces.len();
407
408            // the new number of unique senders
409            unique_senders = self.highest_nonces.len();
410            non_local_senders -= unique_removed;
411
412            // we can reuse the temp array
413            removed.clear();
414
415            // we prefer removing transactions with lower ordering
416            let mut worst_transactions = self.highest_nonces.values().collect::<Vec<_>>();
417            worst_transactions.sort();
418
419            // loop through the highest nonces set, removing transactions until we reach the limit
420            for tx in worst_transactions {
421                // return early if the pool is under limits
422                if !limit.is_exceeded(original_length - total_removed, original_size - total_size) ||
423                    non_local_senders == 0
424                {
425                    // need to remove remaining transactions before exiting
426                    for id in &removed {
427                        if let Some(tx) = self.remove_transaction(id) {
428                            end_removed.push(tx);
429                        }
430                    }
431
432                    return
433                }
434
435                if !remove_locals && tx.transaction.is_local() {
436                    let sender_id = tx.transaction.sender_id();
437                    if local_senders.insert(sender_id) {
438                        non_local_senders -= 1;
439                    }
440                    continue
441                }
442
443                total_size += tx.transaction.size();
444                total_removed += 1;
445                removed.push(*tx.transaction.id());
446            }
447
448            // remove the transactions from this iteration
449            for id in &removed {
450                if let Some(tx) = self.remove_transaction(id) {
451                    end_removed.push(tx);
452                }
453            }
454
455            // return if either the pool is under limits or there are no more _eligible_
456            // transactions to remove
457            if !self.exceeds(limit) || non_local_senders == 0 {
458                return
459            }
460        }
461    }
462
463    /// Truncates the pool to the given [`SubPoolLimit`], removing transactions until the subpool
464    /// limits are met.
465    ///
466    /// This attempts to remove transactions by roughly the same amount for each sender. For more
467    /// information on this exact process see docs for
468    /// [`remove_to_limit`](PendingPool::remove_to_limit).
469    ///
470    /// This first truncates all of the non-local transactions in the pool. If the subpool is still
471    /// not under the limit, this truncates the entire pool, including non-local transactions. The
472    /// removed transactions are returned.
473    pub fn truncate_pool(
474        &mut self,
475        limit: SubPoolLimit,
476    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
477        let mut removed = Vec::new();
478        // return early if the pool is already under the limits
479        if !self.exceeds(&limit) {
480            return removed
481        }
482
483        // first truncate only non-local transactions, returning if the pool end up under the limit
484        self.remove_to_limit(&limit, false, &mut removed);
485        if !self.exceeds(&limit) {
486            return removed
487        }
488
489        // now repeat for local transactions, since local transactions must be removed now for the
490        // pool to be under the limit
491        self.remove_to_limit(&limit, true, &mut removed);
492
493        removed
494    }
495
496    /// Returns true if the pool exceeds the given limit
497    #[inline]
498    pub(crate) fn exceeds(&self, limit: &SubPoolLimit) -> bool {
499        limit.is_exceeded(self.len(), self.size())
500    }
501
502    /// The reported size of all transactions in this pool.
503    pub(crate) fn size(&self) -> usize {
504        self.size_of.into()
505    }
506
507    /// Number of transactions in the entire pool
508    pub(crate) fn len(&self) -> usize {
509        self.by_id.len()
510    }
511
512    /// Whether the pool is empty
513    #[cfg(test)]
514    pub(crate) fn is_empty(&self) -> bool {
515        self.by_id.is_empty()
516    }
517
518    /// Returns `true` if the transaction with the given id is already included in this pool.
519    pub(crate) fn contains(&self, id: &TransactionId) -> bool {
520        self.by_id.contains_key(id)
521    }
522
523    /// Get transactions by sender
524    pub(crate) fn get_txs_by_sender(&self, sender: SenderId) -> Vec<TransactionId> {
525        self.iter_txs_by_sender(sender).copied().collect()
526    }
527
528    /// Returns an iterator over all transaction with the sender id
529    pub(crate) fn iter_txs_by_sender(
530        &self,
531        sender: SenderId,
532    ) -> impl Iterator<Item = &TransactionId> + '_ {
533        self.by_id
534            .range((sender.start_bound(), Unbounded))
535            .take_while(move |(other, _)| sender == other.sender)
536            .map(|(tx_id, _)| tx_id)
537    }
538
539    /// Retrieves a transaction with the given ID from the pool, if it exists.
540    fn get(&self, id: &TransactionId) -> Option<&PendingTransaction<T>> {
541        self.by_id.get(id)
542    }
543
544    /// Returns a reference to the independent transactions in the pool
545    #[cfg(test)]
546    pub(crate) const fn independent(&self) -> &FxHashMap<SenderId, PendingTransaction<T>> {
547        &self.independent_transactions
548    }
549
550    /// Asserts that the bijection between `by_id` and `all` is valid.
551    #[cfg(any(test, feature = "test-utils"))]
552    pub(crate) fn assert_invariants(&self) {
553        assert!(
554            self.independent_transactions.len() <= self.by_id.len(),
555            "independent.len() > all.len()"
556        );
557        assert!(
558            self.highest_nonces.len() <= self.by_id.len(),
559            "independent_descendants.len() > all.len()"
560        );
561        assert_eq!(
562            self.highest_nonces.len(),
563            self.independent_transactions.len(),
564            "independent.len() = independent_descendants.len()"
565        );
566    }
567}
568
569/// A transaction that is ready to be included in a block.
570#[derive(Debug)]
571pub(crate) struct PendingTransaction<T: TransactionOrdering> {
572    /// Identifier that tags when transaction was submitted in the pool.
573    pub(crate) submission_id: u64,
574    /// Actual transaction.
575    pub(crate) transaction: Arc<ValidPoolTransaction<T::Transaction>>,
576    /// The priority value assigned by the used `Ordering` function.
577    pub(crate) priority: Priority<T::PriorityValue>,
578}
579
580impl<T: TransactionOrdering> PendingTransaction<T> {
581    /// The next transaction of the sender: `nonce + 1`
582    pub(crate) fn unlocks(&self) -> TransactionId {
583        self.transaction.transaction_id.descendant()
584    }
585}
586
587impl<T: TransactionOrdering> Clone for PendingTransaction<T> {
588    fn clone(&self) -> Self {
589        Self {
590            submission_id: self.submission_id,
591            transaction: Arc::clone(&self.transaction),
592            priority: self.priority.clone(),
593        }
594    }
595}
596
597impl<T: TransactionOrdering> Eq for PendingTransaction<T> {}
598
599impl<T: TransactionOrdering> PartialEq<Self> for PendingTransaction<T> {
600    fn eq(&self, other: &Self) -> bool {
601        self.cmp(other) == Ordering::Equal
602    }
603}
604
605impl<T: TransactionOrdering> PartialOrd<Self> for PendingTransaction<T> {
606    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
607        Some(self.cmp(other))
608    }
609}
610
611impl<T: TransactionOrdering> Ord for PendingTransaction<T> {
612    fn cmp(&self, other: &Self) -> Ordering {
613        // This compares by `priority` and only if two tx have the exact same priority this compares
614        // the unique `submission_id`. This ensures that transactions with same priority are not
615        // equal, so they're not replaced in the set
616        self.priority
617            .cmp(&other.priority)
618            .then_with(|| other.submission_id.cmp(&self.submission_id))
619    }
620}
621
622#[cfg(test)]
623mod tests {
624    use super::*;
625    use crate::{
626        test_utils::{MockOrdering, MockTransaction, MockTransactionFactory, MockTransactionSet},
627        PoolTransaction,
628    };
629    use alloy_consensus::{Transaction, TxType};
630    use alloy_primitives::address;
631    use std::collections::HashSet;
632
633    #[test]
634    fn test_enforce_basefee() {
635        let mut f = MockTransactionFactory::default();
636        let mut pool = PendingPool::new(MockOrdering::default());
637        let tx = f.validated_arc(MockTransaction::eip1559().inc_price());
638        pool.add_transaction(tx.clone(), 0);
639
640        assert!(pool.contains(tx.id()));
641        assert_eq!(pool.len(), 1);
642
643        let removed = pool.update_base_fee(0);
644        assert!(removed.is_empty());
645
646        let removed = pool.update_base_fee((tx.max_fee_per_gas() + 1) as u64);
647        assert_eq!(removed.len(), 1);
648        assert!(pool.is_empty());
649    }
650
651    #[test]
652    fn test_enforce_basefee_descendant() {
653        let mut f = MockTransactionFactory::default();
654        let mut pool = PendingPool::new(MockOrdering::default());
655        let t = MockTransaction::eip1559().inc_price_by(10);
656        let root_tx = f.validated_arc(t.clone());
657        pool.add_transaction(root_tx.clone(), 0);
658
659        let descendant_tx = f.validated_arc(t.inc_nonce().decr_price());
660        pool.add_transaction(descendant_tx.clone(), 0);
661
662        assert!(pool.contains(root_tx.id()));
663        assert!(pool.contains(descendant_tx.id()));
664        assert_eq!(pool.len(), 2);
665
666        assert_eq!(pool.independent_transactions.len(), 1);
667        assert_eq!(pool.highest_nonces.len(), 1);
668
669        let removed = pool.update_base_fee(0);
670        assert!(removed.is_empty());
671
672        // two dependent tx in the pool with decreasing fee
673
674        {
675            let mut pool2 = pool.clone();
676            let removed = pool2.update_base_fee((descendant_tx.max_fee_per_gas() + 1) as u64);
677            assert_eq!(removed.len(), 1);
678            assert_eq!(pool2.len(), 1);
679            // descendant got popped
680            assert!(pool2.contains(root_tx.id()));
681            assert!(!pool2.contains(descendant_tx.id()));
682        }
683
684        // remove root transaction via fee
685        let removed = pool.update_base_fee((root_tx.max_fee_per_gas() + 1) as u64);
686        assert_eq!(removed.len(), 2);
687        assert!(pool.is_empty());
688        pool.assert_invariants();
689    }
690
691    #[test]
692    fn evict_worst() {
693        let mut f = MockTransactionFactory::default();
694        let mut pool = PendingPool::new(MockOrdering::default());
695
696        let t = MockTransaction::eip1559();
697        pool.add_transaction(f.validated_arc(t.clone()), 0);
698
699        let t2 = MockTransaction::eip1559().inc_price_by(10);
700        pool.add_transaction(f.validated_arc(t2), 0);
701
702        // First transaction should be evicted.
703        assert_eq!(
704            pool.highest_nonces.values().min().map(|tx| *tx.transaction.hash()),
705            Some(*t.hash())
706        );
707
708        // truncate pool with max size = 1, ensure it's the same transaction
709        let removed = pool.truncate_pool(SubPoolLimit { max_txs: 1, max_size: usize::MAX });
710        assert_eq!(removed.len(), 1);
711        assert_eq!(removed[0].hash(), t.hash());
712    }
713
714    #[test]
715    fn correct_independent_descendants() {
716        // this test ensures that we set the right highest nonces set for each sender
717        let mut f = MockTransactionFactory::default();
718        let mut pool = PendingPool::new(MockOrdering::default());
719
720        let a_sender = address!("0x000000000000000000000000000000000000000a");
721        let b_sender = address!("0x000000000000000000000000000000000000000b");
722        let c_sender = address!("0x000000000000000000000000000000000000000c");
723        let d_sender = address!("0x000000000000000000000000000000000000000d");
724
725        // create a chain of transactions by sender A, B, C
726        let mut tx_set = MockTransactionSet::dependent(a_sender, 0, 4, TxType::Eip1559);
727        let a = tx_set.clone().into_vec();
728
729        let b = MockTransactionSet::dependent(b_sender, 0, 3, TxType::Eip1559).into_vec();
730        tx_set.extend(b.clone());
731
732        // C has the same number of txs as B
733        let c = MockTransactionSet::dependent(c_sender, 0, 3, TxType::Eip1559).into_vec();
734        tx_set.extend(c.clone());
735
736        let d = MockTransactionSet::dependent(d_sender, 0, 1, TxType::Eip1559).into_vec();
737        tx_set.extend(d.clone());
738
739        // add all the transactions to the pool
740        let all_txs = tx_set.into_vec();
741        for tx in all_txs {
742            pool.add_transaction(f.validated_arc(tx), 0);
743        }
744
745        pool.assert_invariants();
746
747        // the independent set is the roots of each of these tx chains, these are the highest
748        // nonces for each sender
749        let expected_highest_nonces = vec![d[0].clone(), c[2].clone(), b[2].clone(), a[3].clone()]
750            .iter()
751            .map(|tx| (tx.sender(), tx.nonce()))
752            .collect::<HashSet<_>>();
753        let actual_highest_nonces = pool
754            .highest_nonces
755            .values()
756            .map(|tx| (tx.transaction.sender(), tx.transaction.nonce()))
757            .collect::<HashSet<_>>();
758        assert_eq!(expected_highest_nonces, actual_highest_nonces);
759        pool.assert_invariants();
760    }
761
762    #[test]
763    fn truncate_by_sender() {
764        // This test ensures that transactions are removed from the pending pool by sender.
765        let mut f = MockTransactionFactory::default();
766        let mut pool = PendingPool::new(MockOrdering::default());
767
768        // Addresses for simulated senders A, B, C, and D.
769        let a = address!("0x000000000000000000000000000000000000000a");
770        let b = address!("0x000000000000000000000000000000000000000b");
771        let c = address!("0x000000000000000000000000000000000000000c");
772        let d = address!("0x000000000000000000000000000000000000000d");
773
774        // Create transaction chains for senders A, B, C, and D.
775        let a_txs = MockTransactionSet::sequential_transactions_by_sender(a, 4, TxType::Eip1559);
776        let b_txs = MockTransactionSet::sequential_transactions_by_sender(b, 3, TxType::Eip1559);
777        let c_txs = MockTransactionSet::sequential_transactions_by_sender(c, 3, TxType::Eip1559);
778        let d_txs = MockTransactionSet::sequential_transactions_by_sender(d, 1, TxType::Eip1559);
779
780        // Set up expected pending transactions.
781        let expected_pending = vec![
782            a_txs.transactions[0].clone(),
783            b_txs.transactions[0].clone(),
784            c_txs.transactions[0].clone(),
785            a_txs.transactions[1].clone(),
786        ]
787        .into_iter()
788        .map(|tx| (tx.sender(), tx.nonce()))
789        .collect::<HashSet<_>>();
790
791        // Set up expected removed transactions.
792        let expected_removed = vec![
793            d_txs.transactions[0].clone(),
794            c_txs.transactions[2].clone(),
795            b_txs.transactions[2].clone(),
796            a_txs.transactions[3].clone(),
797            c_txs.transactions[1].clone(),
798            b_txs.transactions[1].clone(),
799            a_txs.transactions[2].clone(),
800        ]
801        .into_iter()
802        .map(|tx| (tx.sender(), tx.nonce()))
803        .collect::<HashSet<_>>();
804
805        // Consolidate all transactions into a single vector.
806        let all_txs =
807            [a_txs.into_vec(), b_txs.into_vec(), c_txs.into_vec(), d_txs.into_vec()].concat();
808
809        // Add all the transactions to the pool.
810        for tx in all_txs {
811            pool.add_transaction(f.validated_arc(tx), 0);
812        }
813
814        // Sanity check, ensuring everything is consistent.
815        pool.assert_invariants();
816
817        // Define the maximum total transactions to be 4, removing transactions for each sender.
818        // Expected order of removal:
819        // * d1, c3, b3, a4
820        // * c2, b2, a3
821        //
822        // Remaining transactions:
823        // * a1, a2
824        // * b1
825        // * c1
826        let pool_limit = SubPoolLimit { max_txs: 4, max_size: usize::MAX };
827
828        // Truncate the pool based on the defined limit.
829        let removed = pool.truncate_pool(pool_limit);
830        pool.assert_invariants();
831        assert_eq!(removed.len(), expected_removed.len());
832
833        // Get the set of removed transactions and compare with the expected set.
834        let removed =
835            removed.into_iter().map(|tx| (tx.sender(), tx.nonce())).collect::<HashSet<_>>();
836        assert_eq!(removed, expected_removed);
837
838        // Retrieve the current pending transactions after truncation.
839        let pending = pool.all().collect::<Vec<_>>();
840        assert_eq!(pending.len(), expected_pending.len());
841
842        // Get the set of pending transactions and compare with the expected set.
843        let pending =
844            pending.into_iter().map(|tx| (tx.sender(), tx.nonce())).collect::<HashSet<_>>();
845        assert_eq!(pending, expected_pending);
846    }
847
848    // <https://github.com/paradigmxyz/reth/issues/12340>
849    #[test]
850    fn test_eligible_updates_promoted() {
851        let mut pool = PendingPool::new(MockOrdering::default());
852        let mut f = MockTransactionFactory::default();
853
854        let num_senders = 10;
855
856        let first_txs: Vec<_> = (0..num_senders) //
857            .map(|_| MockTransaction::eip1559())
858            .collect();
859        let second_txs: Vec<_> =
860            first_txs.iter().map(|tx| tx.clone().rng_hash().inc_nonce()).collect();
861
862        for tx in first_txs {
863            let valid_tx = f.validated(tx);
864            pool.add_transaction(Arc::new(valid_tx), 0);
865        }
866
867        let mut best = pool.best();
868
869        for _ in 0..num_senders {
870            if let Some(tx) = best.next() {
871                assert_eq!(tx.nonce(), 0);
872            } else {
873                panic!("cannot read one of first_txs");
874            }
875        }
876
877        for tx in second_txs {
878            let valid_tx = f.validated(tx);
879            pool.add_transaction(Arc::new(valid_tx), 0);
880        }
881
882        for _ in 0..num_senders {
883            if let Some(tx) = best.next() {
884                assert_eq!(tx.nonce(), 1);
885            } else {
886                panic!("cannot read one of second_txs");
887            }
888        }
889    }
890
891    #[test]
892    fn test_empty_pool_behavior() {
893        let mut pool = PendingPool::<MockOrdering>::new(MockOrdering::default());
894
895        // Ensure the pool is empty
896        assert!(pool.is_empty());
897        assert_eq!(pool.len(), 0);
898        assert_eq!(pool.size(), 0);
899
900        // Verify that attempting to truncate an empty pool does not panic and returns an empty vec
901        let removed = pool.truncate_pool(SubPoolLimit { max_txs: 10, max_size: 1000 });
902        assert!(removed.is_empty());
903
904        // Verify that retrieving transactions from an empty pool yields nothing
905        let all_txs: Vec<_> = pool.all().collect();
906        assert!(all_txs.is_empty());
907    }
908
909    #[test]
910    fn test_add_remove_transaction() {
911        let mut f = MockTransactionFactory::default();
912        let mut pool = PendingPool::new(MockOrdering::default());
913
914        // Add a transaction and check if it's in the pool
915        let tx = f.validated_arc(MockTransaction::eip1559());
916        pool.add_transaction(tx.clone(), 0);
917        assert!(pool.contains(tx.id()));
918        assert_eq!(pool.len(), 1);
919
920        // Remove the transaction and ensure it's no longer in the pool
921        let removed_tx = pool.remove_transaction(tx.id()).unwrap();
922        assert_eq!(removed_tx.id(), tx.id());
923        assert!(!pool.contains(tx.id()));
924        assert_eq!(pool.len(), 0);
925    }
926
927    #[test]
928    fn test_reorder_on_basefee_update() {
929        let mut f = MockTransactionFactory::default();
930        let mut pool = PendingPool::new(MockOrdering::default());
931
932        // Add two transactions with different fees
933        let tx1 = f.validated_arc(MockTransaction::eip1559().inc_price());
934        let tx2 = f.validated_arc(MockTransaction::eip1559().inc_price_by(20));
935        pool.add_transaction(tx1.clone(), 0);
936        pool.add_transaction(tx2.clone(), 0);
937
938        // Ensure the transactions are in the correct order
939        let mut best = pool.best();
940        assert_eq!(best.next().unwrap().hash(), tx2.hash());
941        assert_eq!(best.next().unwrap().hash(), tx1.hash());
942
943        // Update the base fee to a value higher than tx1's fee, causing it to be removed
944        let removed = pool.update_base_fee((tx1.max_fee_per_gas() + 1) as u64);
945        assert_eq!(removed.len(), 1);
946        assert_eq!(removed[0].hash(), tx1.hash());
947
948        // Verify that only tx2 remains in the pool
949        assert_eq!(pool.len(), 1);
950        assert!(pool.contains(tx2.id()));
951        assert!(!pool.contains(tx1.id()));
952    }
953
954    #[test]
955    #[should_panic(expected = "transaction already included")]
956    fn test_handle_duplicates() {
957        let mut f = MockTransactionFactory::default();
958        let mut pool = PendingPool::new(MockOrdering::default());
959
960        // Add the same transaction twice and ensure it only appears once
961        let tx = f.validated_arc(MockTransaction::eip1559());
962        pool.add_transaction(tx.clone(), 0);
963        assert!(pool.contains(tx.id()));
964        assert_eq!(pool.len(), 1);
965
966        // Attempt to add the same transaction again, which should be ignored
967        pool.add_transaction(tx, 0);
968    }
969
970    #[test]
971    fn test_update_blob_fee() {
972        let mut f = MockTransactionFactory::default();
973        let mut pool = PendingPool::new(MockOrdering::default());
974
975        // Add transactions with varying blob fees
976        let tx1 = f.validated_arc(MockTransaction::eip4844().set_blob_fee(50).clone());
977        let tx2 = f.validated_arc(MockTransaction::eip4844().set_blob_fee(150).clone());
978        pool.add_transaction(tx1.clone(), 0);
979        pool.add_transaction(tx2.clone(), 0);
980
981        // Update the blob fee to a value that causes tx1 to be removed
982        let removed = pool.update_blob_fee(100);
983        assert_eq!(removed.len(), 1);
984        assert_eq!(removed[0].hash(), tx1.hash());
985
986        // Verify that only tx2 remains in the pool
987        assert!(pool.contains(tx2.id()));
988        assert!(!pool.contains(tx1.id()));
989    }
990
991    #[test]
992    fn local_senders_tracking() {
993        let mut f = MockTransactionFactory::default();
994        let mut pool = PendingPool::new(MockOrdering::default());
995
996        // Addresses for simulated senders A, B, C
997        let a = address!("0x000000000000000000000000000000000000000a");
998        let b = address!("0x000000000000000000000000000000000000000b");
999        let c = address!("0x000000000000000000000000000000000000000c");
1000
1001        // sender A (local) - 11+ transactions (large enough to keep limit exceeded)
1002        // sender B (external) - 2 transactions
1003        // sender C (external) - 2 transactions
1004
1005        // Create transaction chains for senders A, B, C
1006        let a_txs = MockTransactionSet::sequential_transactions_by_sender(a, 11, TxType::Eip1559);
1007        let b_txs = MockTransactionSet::sequential_transactions_by_sender(b, 2, TxType::Eip1559);
1008        let c_txs = MockTransactionSet::sequential_transactions_by_sender(c, 2, TxType::Eip1559);
1009
1010        // create local txs for sender A
1011        for tx in a_txs.into_vec() {
1012            let final_tx = Arc::new(f.validated_with_origin(crate::TransactionOrigin::Local, tx));
1013
1014            pool.add_transaction(final_tx, 0);
1015        }
1016
1017        // create external txs for senders B and C
1018        let remaining_txs = [b_txs.into_vec(), c_txs.into_vec()].concat();
1019        for tx in remaining_txs {
1020            let final_tx = f.validated_arc(tx);
1021
1022            pool.add_transaction(final_tx, 0);
1023        }
1024
1025        // Sanity check, ensuring everything is consistent.
1026        pool.assert_invariants();
1027
1028        let pool_limit = SubPoolLimit { max_txs: 10, max_size: usize::MAX };
1029        pool.truncate_pool(pool_limit);
1030
1031        let sender_a = f.ids.sender_id(&a).unwrap();
1032        let sender_b = f.ids.sender_id(&b).unwrap();
1033        let sender_c = f.ids.sender_id(&c).unwrap();
1034
1035        assert_eq!(pool.get_txs_by_sender(sender_a).len(), 10);
1036        assert!(pool.get_txs_by_sender(sender_b).is_empty());
1037        assert!(pool.get_txs_by_sender(sender_c).is_empty());
1038    }
1039}