reth_transaction_pool/
traits.rs

1use crate::{
2    blobstore::BlobStoreError,
3    error::{InvalidPoolTransactionError, PoolResult},
4    pool::{
5        state::SubPool, BestTransactionFilter, NewTransactionEvent, TransactionEvents,
6        TransactionListenerKind,
7    },
8    validate::ValidPoolTransaction,
9    AllTransactionsEvents,
10};
11use alloy_consensus::{error::ValueError, BlockHeader, Signed, Typed2718};
12use alloy_eips::{
13    eip2718::{Encodable2718, WithEncoded},
14    eip2930::AccessList,
15    eip4844::{
16        env_settings::KzgSettings, BlobAndProofV1, BlobAndProofV2, BlobTransactionValidationError,
17    },
18    eip7594::BlobTransactionSidecarVariant,
19    eip7702::SignedAuthorization,
20};
21use alloy_primitives::{Address, Bytes, TxHash, TxKind, B256, U256};
22use futures_util::{ready, Stream};
23use reth_eth_wire_types::HandleMempoolData;
24use reth_ethereum_primitives::{PooledTransactionVariant, TransactionSigned};
25use reth_execution_types::ChangedAccount;
26use reth_primitives_traits::{Block, InMemorySize, Recovered, SealedBlock, SignedTransaction};
27#[cfg(feature = "serde")]
28use serde::{Deserialize, Serialize};
29use std::{
30    collections::{HashMap, HashSet},
31    fmt,
32    fmt::Debug,
33    future::Future,
34    pin::Pin,
35    sync::Arc,
36    task::{Context, Poll},
37};
38use tokio::sync::mpsc::Receiver;
39
40/// The `PeerId` type.
41pub type PeerId = alloy_primitives::B512;
42
43/// Helper type alias to access [`PoolTransaction`] for a given [`TransactionPool`].
44pub type PoolTx<P> = <P as TransactionPool>::Transaction;
45/// Helper type alias to access [`PoolTransaction::Consensus`] for a given [`TransactionPool`].
46pub type PoolConsensusTx<P> = <<P as TransactionPool>::Transaction as PoolTransaction>::Consensus;
47
48/// Helper type alias to access [`PoolTransaction::Pooled`] for a given [`TransactionPool`].
49pub type PoolPooledTx<P> = <<P as TransactionPool>::Transaction as PoolTransaction>::Pooled;
50
51/// General purpose abstraction of a transaction-pool.
52///
53/// This is intended to be used by API-consumers such as RPC that need inject new incoming,
54/// unverified transactions. And by block production that needs to get transactions to execute in a
55/// new block.
56///
57/// Note: This requires `Clone` for convenience, since it is assumed that this will be implemented
58/// for a wrapped `Arc` type, see also [`Pool`](crate::Pool).
59#[auto_impl::auto_impl(&, Arc)]
60pub trait TransactionPool: Clone + Debug + Send + Sync {
61    /// The transaction type of the pool
62    type Transaction: EthPoolTransaction;
63
64    /// Returns stats about the pool and all sub-pools.
65    fn pool_size(&self) -> PoolSize;
66
67    /// Returns the block the pool is currently tracking.
68    ///
69    /// This tracks the block that the pool has last seen.
70    fn block_info(&self) -> BlockInfo;
71
72    /// Imports an _external_ transaction.
73    ///
74    /// This is intended to be used by the network to insert incoming transactions received over the
75    /// p2p network.
76    ///
77    /// Consumer: P2P
78    fn add_external_transaction(
79        &self,
80        transaction: Self::Transaction,
81    ) -> impl Future<Output = PoolResult<TxHash>> + Send {
82        self.add_transaction(TransactionOrigin::External, transaction)
83    }
84
85    /// Imports all _external_ transactions
86    ///
87    /// Consumer: Utility
88    fn add_external_transactions(
89        &self,
90        transactions: Vec<Self::Transaction>,
91    ) -> impl Future<Output = Vec<PoolResult<TxHash>>> + Send {
92        self.add_transactions(TransactionOrigin::External, transactions)
93    }
94
95    /// Adds an _unvalidated_ transaction into the pool and subscribe to state changes.
96    ///
97    /// This is the same as [`TransactionPool::add_transaction`] but returns an event stream for the
98    /// given transaction.
99    ///
100    /// Consumer: Custom
101    fn add_transaction_and_subscribe(
102        &self,
103        origin: TransactionOrigin,
104        transaction: Self::Transaction,
105    ) -> impl Future<Output = PoolResult<TransactionEvents>> + Send;
106
107    /// Adds an _unvalidated_ transaction into the pool.
108    ///
109    /// Consumer: RPC
110    fn add_transaction(
111        &self,
112        origin: TransactionOrigin,
113        transaction: Self::Transaction,
114    ) -> impl Future<Output = PoolResult<TxHash>> + Send;
115
116    /// Adds the given _unvalidated_ transaction into the pool.
117    ///
118    /// Returns a list of results.
119    ///
120    /// Consumer: RPC
121    fn add_transactions(
122        &self,
123        origin: TransactionOrigin,
124        transactions: Vec<Self::Transaction>,
125    ) -> impl Future<Output = Vec<PoolResult<TxHash>>> + Send;
126
127    /// Returns a new transaction change event stream for the given transaction.
128    ///
129    /// Returns `None` if the transaction is not in the pool.
130    fn transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents>;
131
132    /// Returns a new transaction change event stream for _all_ transactions in the pool.
133    fn all_transactions_event_listener(&self) -> AllTransactionsEvents<Self::Transaction>;
134
135    /// Returns a new Stream that yields transactions hashes for new __pending__ transactions
136    /// inserted into the pool that are allowed to be propagated.
137    ///
138    /// Note: This is intended for networking and will __only__ yield transactions that are allowed
139    /// to be propagated over the network, see also [TransactionListenerKind].
140    ///
141    /// Consumer: RPC/P2P
142    fn pending_transactions_listener(&self) -> Receiver<TxHash> {
143        self.pending_transactions_listener_for(TransactionListenerKind::PropagateOnly)
144    }
145
146    /// Returns a new [Receiver] that yields transactions hashes for new __pending__ transactions
147    /// inserted into the pending pool depending on the given [TransactionListenerKind] argument.
148    fn pending_transactions_listener_for(&self, kind: TransactionListenerKind) -> Receiver<TxHash>;
149
150    /// Returns a new stream that yields new valid transactions added to the pool.
151    fn new_transactions_listener(&self) -> Receiver<NewTransactionEvent<Self::Transaction>> {
152        self.new_transactions_listener_for(TransactionListenerKind::PropagateOnly)
153    }
154
155    /// Returns a new [Receiver] that yields blob "sidecars" (blobs w/ assoc. kzg
156    /// commitments/proofs) for eip-4844 transactions inserted into the pool
157    fn blob_transaction_sidecars_listener(&self) -> Receiver<NewBlobSidecar>;
158
159    /// Returns a new stream that yields new valid transactions added to the pool
160    /// depending on the given [TransactionListenerKind] argument.
161    fn new_transactions_listener_for(
162        &self,
163        kind: TransactionListenerKind,
164    ) -> Receiver<NewTransactionEvent<Self::Transaction>>;
165
166    /// Returns a new Stream that yields new transactions added to the pending sub-pool.
167    ///
168    /// This is a convenience wrapper around [Self::new_transactions_listener] that filters for
169    /// [SubPool::Pending](crate::SubPool).
170    fn new_pending_pool_transactions_listener(
171        &self,
172    ) -> NewSubpoolTransactionStream<Self::Transaction> {
173        NewSubpoolTransactionStream::new(
174            self.new_transactions_listener_for(TransactionListenerKind::PropagateOnly),
175            SubPool::Pending,
176        )
177    }
178
179    /// Returns a new Stream that yields new transactions added to the basefee sub-pool.
180    ///
181    /// This is a convenience wrapper around [Self::new_transactions_listener] that filters for
182    /// [SubPool::BaseFee](crate::SubPool).
183    fn new_basefee_pool_transactions_listener(
184        &self,
185    ) -> NewSubpoolTransactionStream<Self::Transaction> {
186        NewSubpoolTransactionStream::new(self.new_transactions_listener(), SubPool::BaseFee)
187    }
188
189    /// Returns a new Stream that yields new transactions added to the queued-pool.
190    ///
191    /// This is a convenience wrapper around [Self::new_transactions_listener] that filters for
192    /// [SubPool::Queued](crate::SubPool).
193    fn new_queued_transactions_listener(&self) -> NewSubpoolTransactionStream<Self::Transaction> {
194        NewSubpoolTransactionStream::new(self.new_transactions_listener(), SubPool::Queued)
195    }
196
197    /// Returns the _hashes_ of all transactions in the pool.
198    ///
199    /// Note: This returns a `Vec` but should guarantee that all hashes are unique.
200    ///
201    /// Consumer: P2P
202    fn pooled_transaction_hashes(&self) -> Vec<TxHash>;
203
204    /// Returns only the first `max` hashes of transactions in the pool.
205    ///
206    /// Consumer: P2P
207    fn pooled_transaction_hashes_max(&self, max: usize) -> Vec<TxHash>;
208
209    /// Returns the _full_ transaction objects all transactions in the pool.
210    ///
211    /// This is intended to be used by the network for the initial exchange of pooled transaction
212    /// _hashes_
213    ///
214    /// Note: This returns a `Vec` but should guarantee that all transactions are unique.
215    ///
216    /// Caution: In case of blob transactions, this does not include the sidecar.
217    ///
218    /// Consumer: P2P
219    fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
220
221    /// Returns only the first `max` transactions in the pool.
222    ///
223    /// Consumer: P2P
224    fn pooled_transactions_max(
225        &self,
226        max: usize,
227    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
228
229    /// Returns converted [PooledTransactionVariant] for the given transaction hashes.
230    ///
231    /// This adheres to the expected behavior of
232    /// [`GetPooledTransactions`](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#getpooledtransactions-0x09):
233    ///
234    /// The transactions must be in same order as in the request, but it is OK to skip transactions
235    /// which are not available.
236    ///
237    /// If the transaction is a blob transaction, the sidecar will be included.
238    ///
239    /// Consumer: P2P
240    fn get_pooled_transaction_elements(
241        &self,
242        tx_hashes: Vec<TxHash>,
243        limit: GetPooledTransactionLimit,
244    ) -> Vec<<Self::Transaction as PoolTransaction>::Pooled>;
245
246    /// Returns the pooled transaction variant for the given transaction hash.
247    ///
248    /// This adheres to the expected behavior of
249    /// [`GetPooledTransactions`](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#getpooledtransactions-0x09):
250    ///
251    /// If the transaction is a blob transaction, the sidecar will be included.
252    ///
253    /// It is expected that this variant represents the valid p2p format for full transactions.
254    /// E.g. for EIP-4844 transactions this is the consensus transaction format with the blob
255    /// sidecar.
256    ///
257    /// Consumer: P2P
258    fn get_pooled_transaction_element(
259        &self,
260        tx_hash: TxHash,
261    ) -> Option<Recovered<<Self::Transaction as PoolTransaction>::Pooled>>;
262
263    /// Returns an iterator that yields transactions that are ready for block production.
264    ///
265    /// Consumer: Block production
266    fn best_transactions(
267        &self,
268    ) -> Box<dyn BestTransactions<Item = Arc<ValidPoolTransaction<Self::Transaction>>>>;
269
270    /// Returns an iterator that yields transactions that are ready for block production with the
271    /// given base fee and optional blob fee attributes.
272    ///
273    /// Consumer: Block production
274    fn best_transactions_with_attributes(
275        &self,
276        best_transactions_attributes: BestTransactionsAttributes,
277    ) -> Box<dyn BestTransactions<Item = Arc<ValidPoolTransaction<Self::Transaction>>>>;
278
279    /// Returns all transactions that can be included in the next block.
280    ///
281    /// This is primarily used for the `txpool_` RPC namespace:
282    /// <https://geth.ethereum.org/docs/interacting-with-geth/rpc/ns-txpool> which distinguishes
283    /// between `pending` and `queued` transactions, where `pending` are transactions ready for
284    /// inclusion in the next block and `queued` are transactions that are ready for inclusion in
285    /// future blocks.
286    ///
287    /// Consumer: RPC
288    fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
289
290    /// Returns first `max` transactions that can be included in the next block.
291    /// See <https://github.com/paradigmxyz/reth/issues/12767#issuecomment-2493223579>
292    ///
293    /// Consumer: Block production
294    fn pending_transactions_max(
295        &self,
296        max: usize,
297    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
298
299    /// Returns all transactions that can be included in _future_ blocks.
300    ///
301    /// This and [Self::pending_transactions] are mutually exclusive.
302    ///
303    /// Consumer: RPC
304    fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
305
306    /// Returns all transactions that are currently in the pool grouped by whether they are ready
307    /// for inclusion in the next block or not.
308    ///
309    /// This is primarily used for the `txpool_` namespace: <https://geth.ethereum.org/docs/interacting-with-geth/rpc/ns-txpool>
310    ///
311    /// Consumer: RPC
312    fn all_transactions(&self) -> AllPoolTransactions<Self::Transaction>;
313
314    /// Removes all transactions corresponding to the given hashes.
315    ///
316    /// Note: This removes the transactions as if they got discarded (_not_ mined).
317    ///
318    /// Consumer: Utility
319    fn remove_transactions(
320        &self,
321        hashes: Vec<TxHash>,
322    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
323
324    /// Removes all transactions corresponding to the given hashes.
325    ///
326    /// Also removes all _dependent_ transactions.
327    ///
328    /// Consumer: Utility
329    fn remove_transactions_and_descendants(
330        &self,
331        hashes: Vec<TxHash>,
332    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
333
334    /// Removes all transactions from the given sender
335    ///
336    /// Consumer: Utility
337    fn remove_transactions_by_sender(
338        &self,
339        sender: Address,
340    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
341
342    /// Retains only those hashes that are unknown to the pool.
343    /// In other words, removes all transactions from the given set that are currently present in
344    /// the pool. Returns hashes already known to the pool.
345    ///
346    /// Consumer: P2P
347    fn retain_unknown<A>(&self, announcement: &mut A)
348    where
349        A: HandleMempoolData;
350
351    /// Returns if the transaction for the given hash is already included in this pool.
352    fn contains(&self, tx_hash: &TxHash) -> bool {
353        self.get(tx_hash).is_some()
354    }
355
356    /// Returns the transaction for the given hash.
357    fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>>;
358
359    /// Returns all transactions objects for the given hashes.
360    ///
361    /// Caution: This in case of blob transactions, this does not include the sidecar.
362    fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
363
364    /// Notify the pool about transactions that are propagated to peers.
365    ///
366    /// Consumer: P2P
367    fn on_propagated(&self, txs: PropagatedTransactions);
368
369    /// Returns all transactions sent by a given user
370    fn get_transactions_by_sender(
371        &self,
372        sender: Address,
373    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
374
375    /// Returns all pending transactions filtered by predicate
376    fn get_pending_transactions_with_predicate(
377        &self,
378        predicate: impl FnMut(&ValidPoolTransaction<Self::Transaction>) -> bool,
379    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
380
381    /// Returns all pending transactions sent by a given user
382    fn get_pending_transactions_by_sender(
383        &self,
384        sender: Address,
385    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
386
387    /// Returns all queued transactions sent by a given user
388    fn get_queued_transactions_by_sender(
389        &self,
390        sender: Address,
391    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
392
393    /// Returns the highest transaction sent by a given user
394    fn get_highest_transaction_by_sender(
395        &self,
396        sender: Address,
397    ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>>;
398
399    /// Returns the transaction with the highest nonce that is executable given the on chain nonce.
400    /// In other words the highest non nonce gapped transaction.
401    ///
402    /// Note: The next pending pooled transaction must have the on chain nonce.
403    ///
404    /// For example, for a given on chain nonce of `5`, the next transaction must have that nonce.
405    /// If the pool contains txs `[5,6,7]` this returns tx `7`.
406    /// If the pool contains txs `[6,7]` this returns `None` because the next valid nonce (5) is
407    /// missing, which means txs `[6,7]` are nonce gapped.
408    fn get_highest_consecutive_transaction_by_sender(
409        &self,
410        sender: Address,
411        on_chain_nonce: u64,
412    ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>>;
413
414    /// Returns a transaction sent by a given user and a nonce
415    fn get_transaction_by_sender_and_nonce(
416        &self,
417        sender: Address,
418        nonce: u64,
419    ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>>;
420
421    /// Returns all transactions that where submitted with the given [TransactionOrigin]
422    fn get_transactions_by_origin(
423        &self,
424        origin: TransactionOrigin,
425    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
426
427    /// Returns all pending transactions filtered by [`TransactionOrigin`]
428    fn get_pending_transactions_by_origin(
429        &self,
430        origin: TransactionOrigin,
431    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
432
433    /// Returns all transactions that where submitted as [TransactionOrigin::Local]
434    fn get_local_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
435        self.get_transactions_by_origin(TransactionOrigin::Local)
436    }
437
438    /// Returns all transactions that where submitted as [TransactionOrigin::Private]
439    fn get_private_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
440        self.get_transactions_by_origin(TransactionOrigin::Private)
441    }
442
443    /// Returns all transactions that where submitted as [TransactionOrigin::External]
444    fn get_external_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
445        self.get_transactions_by_origin(TransactionOrigin::External)
446    }
447
448    /// Returns all pending transactions that where submitted as [TransactionOrigin::Local]
449    fn get_local_pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
450        self.get_pending_transactions_by_origin(TransactionOrigin::Local)
451    }
452
453    /// Returns all pending transactions that where submitted as [TransactionOrigin::Private]
454    fn get_private_pending_transactions(
455        &self,
456    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
457        self.get_pending_transactions_by_origin(TransactionOrigin::Private)
458    }
459
460    /// Returns all pending transactions that where submitted as [TransactionOrigin::External]
461    fn get_external_pending_transactions(
462        &self,
463    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
464        self.get_pending_transactions_by_origin(TransactionOrigin::External)
465    }
466
467    /// Returns a set of all senders of transactions in the pool
468    fn unique_senders(&self) -> HashSet<Address>;
469
470    /// Returns the [BlobTransactionSidecarVariant] for the given transaction hash if it exists in
471    /// the blob store.
472    fn get_blob(
473        &self,
474        tx_hash: TxHash,
475    ) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError>;
476
477    /// Returns all [BlobTransactionSidecarVariant] for the given transaction hashes if they exists
478    /// in the blob store.
479    ///
480    /// This only returns the blobs that were found in the store.
481    /// If there's no blob it will not be returned.
482    fn get_all_blobs(
483        &self,
484        tx_hashes: Vec<TxHash>,
485    ) -> Result<Vec<(TxHash, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError>;
486
487    /// Returns the exact [BlobTransactionSidecarVariant] for the given transaction hashes in the
488    /// order they were requested.
489    ///
490    /// Returns an error if any of the blobs are not found in the blob store.
491    fn get_all_blobs_exact(
492        &self,
493        tx_hashes: Vec<TxHash>,
494    ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError>;
495
496    /// Return the [`BlobAndProofV1`]s for a list of blob versioned hashes.
497    fn get_blobs_for_versioned_hashes_v1(
498        &self,
499        versioned_hashes: &[B256],
500    ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError>;
501
502    /// Return the [`BlobAndProofV2`]s for a list of blob versioned hashes.
503    /// Blobs and proofs are returned only if they are present for _all_ of the requested versioned
504    /// hashes.
505    fn get_blobs_for_versioned_hashes_v2(
506        &self,
507        versioned_hashes: &[B256],
508    ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError>;
509}
510
511/// Extension for [TransactionPool] trait that allows to set the current block info.
512#[auto_impl::auto_impl(&, Arc)]
513pub trait TransactionPoolExt: TransactionPool {
514    /// Sets the current block info for the pool.
515    fn set_block_info(&self, info: BlockInfo);
516
517    /// Event listener for when the pool needs to be updated.
518    ///
519    /// Implementers need to update the pool accordingly:
520    ///
521    /// ## Fee changes
522    ///
523    /// The [`CanonicalStateUpdate`] includes the base and blob fee of the pending block, which
524    /// affects the dynamic fee requirement of pending transactions in the pool.
525    ///
526    /// ## EIP-4844 Blob transactions
527    ///
528    /// Mined blob transactions need to be removed from the pool, but from the pool only. The blob
529    /// sidecar must not be removed from the blob store. Only after a blob transaction is
530    /// finalized, its sidecar is removed from the blob store. This ensures that in case of a reorg,
531    /// the sidecar is still available.
532    fn on_canonical_state_change<B>(&self, update: CanonicalStateUpdate<'_, B>)
533    where
534        B: Block;
535
536    /// Updates the accounts in the pool
537    fn update_accounts(&self, accounts: Vec<ChangedAccount>);
538
539    /// Deletes the blob sidecar for the given transaction from the blob store
540    fn delete_blob(&self, tx: B256);
541
542    /// Deletes multiple blob sidecars from the blob store
543    fn delete_blobs(&self, txs: Vec<B256>);
544
545    /// Maintenance function to cleanup blobs that are no longer needed.
546    fn cleanup_blobs(&self);
547}
548
549/// A Helper type that bundles all transactions in the pool.
550#[derive(Debug, Clone)]
551pub struct AllPoolTransactions<T: PoolTransaction> {
552    /// Transactions that are ready for inclusion in the next block.
553    pub pending: Vec<Arc<ValidPoolTransaction<T>>>,
554    /// Transactions that are ready for inclusion in _future_ blocks, but are currently parked,
555    /// because they depend on other transactions that are not yet included in the pool (nonce gap)
556    /// or otherwise blocked.
557    pub queued: Vec<Arc<ValidPoolTransaction<T>>>,
558}
559
560// === impl AllPoolTransactions ===
561
562impl<T: PoolTransaction> AllPoolTransactions<T> {
563    /// Returns an iterator over all pending [`Recovered`] transactions.
564    pub fn pending_recovered(&self) -> impl Iterator<Item = Recovered<T::Consensus>> + '_ {
565        self.pending.iter().map(|tx| tx.transaction.clone().into_consensus())
566    }
567
568    /// Returns an iterator over all queued [`Recovered`] transactions.
569    pub fn queued_recovered(&self) -> impl Iterator<Item = Recovered<T::Consensus>> + '_ {
570        self.queued.iter().map(|tx| tx.transaction.clone().into_consensus())
571    }
572
573    /// Returns an iterator over all transactions, both pending and queued.
574    pub fn all(&self) -> impl Iterator<Item = Recovered<T::Consensus>> + '_ {
575        self.pending
576            .iter()
577            .chain(self.queued.iter())
578            .map(|tx| tx.transaction.clone().into_consensus())
579    }
580}
581
582impl<T: PoolTransaction> Default for AllPoolTransactions<T> {
583    fn default() -> Self {
584        Self { pending: Default::default(), queued: Default::default() }
585    }
586}
587
588/// Represents transactions that were propagated over the network.
589#[derive(Debug, Clone, Eq, PartialEq, Default)]
590pub struct PropagatedTransactions(pub HashMap<TxHash, Vec<PropagateKind>>);
591
592/// Represents how a transaction was propagated over the network.
593#[derive(Debug, Copy, Clone, Eq, PartialEq)]
594#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
595pub enum PropagateKind {
596    /// The full transaction object was sent to the peer.
597    ///
598    /// This is equivalent to the `Transaction` message
599    Full(PeerId),
600    /// Only the Hash was propagated to the peer.
601    Hash(PeerId),
602}
603
604// === impl PropagateKind ===
605
606impl PropagateKind {
607    /// Returns the peer the transaction was sent to
608    pub const fn peer(&self) -> &PeerId {
609        match self {
610            Self::Full(peer) | Self::Hash(peer) => peer,
611        }
612    }
613
614    /// Returns true if the transaction was sent as a full transaction
615    pub const fn is_full(&self) -> bool {
616        matches!(self, Self::Full(_))
617    }
618
619    /// Returns true if the transaction was sent as a hash
620    pub const fn is_hash(&self) -> bool {
621        matches!(self, Self::Hash(_))
622    }
623}
624
625impl From<PropagateKind> for PeerId {
626    fn from(value: PropagateKind) -> Self {
627        match value {
628            PropagateKind::Full(peer) | PropagateKind::Hash(peer) => peer,
629        }
630    }
631}
632
633/// This type represents a new blob sidecar that has been stored in the transaction pool's
634/// blobstore; it includes the `TransactionHash` of the blob transaction along with the assoc.
635/// sidecar (blobs, commitments, proofs)
636#[derive(Debug, Clone)]
637pub struct NewBlobSidecar {
638    /// hash of the EIP-4844 transaction.
639    pub tx_hash: TxHash,
640    /// the blob transaction sidecar.
641    pub sidecar: Arc<BlobTransactionSidecarVariant>,
642}
643
644/// Where the transaction originates from.
645///
646/// Depending on where the transaction was picked up, it affects how the transaction is handled
647/// internally, e.g. limits for simultaneous transaction of one sender.
648#[derive(Debug, Copy, Clone, PartialEq, Eq, Default)]
649pub enum TransactionOrigin {
650    /// Transaction is coming from a local source.
651    #[default]
652    Local,
653    /// Transaction has been received externally.
654    ///
655    /// This is usually considered an "untrusted" source, for example received from another in the
656    /// network.
657    External,
658    /// Transaction is originated locally and is intended to remain private.
659    ///
660    /// This type of transaction should not be propagated to the network. It's meant for
661    /// private usage within the local node only.
662    Private,
663}
664
665// === impl TransactionOrigin ===
666
667impl TransactionOrigin {
668    /// Whether the transaction originates from a local source.
669    pub const fn is_local(&self) -> bool {
670        matches!(self, Self::Local)
671    }
672
673    /// Whether the transaction originates from an external source.
674    pub const fn is_external(&self) -> bool {
675        matches!(self, Self::External)
676    }
677    /// Whether the transaction originates from a private source.
678    pub const fn is_private(&self) -> bool {
679        matches!(self, Self::Private)
680    }
681}
682
683/// Represents the kind of update to the canonical state.
684#[derive(Debug, Clone, Copy, PartialEq, Eq)]
685pub enum PoolUpdateKind {
686    /// The update was due to a block commit.
687    Commit,
688    /// The update was due to a reorganization.
689    Reorg,
690}
691
692/// Represents changes after a new canonical block or range of canonical blocks was added to the
693/// chain.
694///
695/// It is expected that this is only used if the added blocks are canonical to the pool's last known
696/// block hash. In other words, the first added block of the range must be the child of the last
697/// known block hash.
698///
699/// This is used to update the pool state accordingly.
700#[derive(Clone, Debug)]
701pub struct CanonicalStateUpdate<'a, B: Block> {
702    /// Hash of the tip block.
703    pub new_tip: &'a SealedBlock<B>,
704    /// EIP-1559 Base fee of the _next_ (pending) block
705    ///
706    /// The base fee of a block depends on the utilization of the last block and its base fee.
707    pub pending_block_base_fee: u64,
708    /// EIP-4844 blob fee of the _next_ (pending) block
709    ///
710    /// Only after Cancun
711    pub pending_block_blob_fee: Option<u128>,
712    /// A set of changed accounts across a range of blocks.
713    pub changed_accounts: Vec<ChangedAccount>,
714    /// All mined transactions in the block range.
715    pub mined_transactions: Vec<B256>,
716    /// The kind of update to the canonical state.
717    pub update_kind: PoolUpdateKind,
718}
719
720impl<B> CanonicalStateUpdate<'_, B>
721where
722    B: Block,
723{
724    /// Returns the number of the tip block.
725    pub fn number(&self) -> u64 {
726        self.new_tip.number()
727    }
728
729    /// Returns the hash of the tip block.
730    pub fn hash(&self) -> B256 {
731        self.new_tip.hash()
732    }
733
734    /// Timestamp of the latest chain update
735    pub fn timestamp(&self) -> u64 {
736        self.new_tip.timestamp()
737    }
738
739    /// Returns the block info for the tip block.
740    pub fn block_info(&self) -> BlockInfo {
741        BlockInfo {
742            block_gas_limit: self.new_tip.gas_limit(),
743            last_seen_block_hash: self.hash(),
744            last_seen_block_number: self.number(),
745            pending_basefee: self.pending_block_base_fee,
746            pending_blob_fee: self.pending_block_blob_fee,
747        }
748    }
749}
750
751impl<B> fmt::Display for CanonicalStateUpdate<'_, B>
752where
753    B: Block,
754{
755    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
756        f.debug_struct("CanonicalStateUpdate")
757            .field("hash", &self.hash())
758            .field("number", &self.number())
759            .field("pending_block_base_fee", &self.pending_block_base_fee)
760            .field("pending_block_blob_fee", &self.pending_block_blob_fee)
761            .field("changed_accounts", &self.changed_accounts.len())
762            .field("mined_transactions", &self.mined_transactions.len())
763            .finish()
764    }
765}
766
767/// Alias to restrict the [`BestTransactions`] items to the pool's transaction type.
768pub type BestTransactionsFor<Pool> = Box<
769    dyn BestTransactions<Item = Arc<ValidPoolTransaction<<Pool as TransactionPool>::Transaction>>>,
770>;
771
772/// An `Iterator` that only returns transactions that are ready to be executed.
773///
774/// This makes no assumptions about the order of the transactions, but expects that _all_
775/// transactions are valid (no nonce gaps.) for the tracked state of the pool.
776///
777/// Note: this iterator will always return the best transaction that it currently knows.
778/// There is no guarantee transactions will be returned sequentially in decreasing
779/// priority order.
780pub trait BestTransactions: Iterator + Send {
781    /// Mark the transaction as invalid.
782    ///
783    /// Implementers must ensure all subsequent transaction _don't_ depend on this transaction.
784    /// In other words, this must remove the given transaction _and_ drain all transaction that
785    /// depend on it.
786    fn mark_invalid(&mut self, transaction: &Self::Item, kind: InvalidPoolTransactionError);
787
788    /// An iterator may be able to receive additional pending transactions that weren't present it
789    /// the pool when it was created.
790    ///
791    /// This ensures that iterator will return the best transaction that it currently knows and not
792    /// listen to pool updates.
793    fn no_updates(&mut self);
794
795    /// Convenience function for [`Self::no_updates`] that returns the iterator again.
796    fn without_updates(mut self) -> Self
797    where
798        Self: Sized,
799    {
800        self.no_updates();
801        self
802    }
803
804    /// Skip all blob transactions.
805    ///
806    /// There's only limited blob space available in a block, once exhausted, EIP-4844 transactions
807    /// can no longer be included.
808    ///
809    /// If called then the iterator will no longer yield blob transactions.
810    ///
811    /// Note: this will also exclude any transactions that depend on blob transactions.
812    fn skip_blobs(&mut self) {
813        self.set_skip_blobs(true);
814    }
815
816    /// Controls whether the iterator skips blob transactions or not.
817    ///
818    /// If set to true, no blob transactions will be returned.
819    fn set_skip_blobs(&mut self, skip_blobs: bool);
820
821    /// Convenience function for [`Self::skip_blobs`] that returns the iterator again.
822    fn without_blobs(mut self) -> Self
823    where
824        Self: Sized,
825    {
826        self.skip_blobs();
827        self
828    }
829
830    /// Creates an iterator which uses a closure to determine whether a transaction should be
831    /// returned by the iterator.
832    ///
833    /// All items the closure returns false for are marked as invalid via [`Self::mark_invalid`] and
834    /// descendant transactions will be skipped.
835    fn filter_transactions<P>(self, predicate: P) -> BestTransactionFilter<Self, P>
836    where
837        P: FnMut(&Self::Item) -> bool,
838        Self: Sized,
839    {
840        BestTransactionFilter::new(self, predicate)
841    }
842}
843
844impl<T> BestTransactions for Box<T>
845where
846    T: BestTransactions + ?Sized,
847{
848    fn mark_invalid(&mut self, transaction: &Self::Item, kind: InvalidPoolTransactionError) {
849        (**self).mark_invalid(transaction, kind)
850    }
851
852    fn no_updates(&mut self) {
853        (**self).no_updates();
854    }
855
856    fn skip_blobs(&mut self) {
857        (**self).skip_blobs();
858    }
859
860    fn set_skip_blobs(&mut self, skip_blobs: bool) {
861        (**self).set_skip_blobs(skip_blobs);
862    }
863}
864
865/// A no-op implementation that yields no transactions.
866impl<T> BestTransactions for std::iter::Empty<T> {
867    fn mark_invalid(&mut self, _tx: &T, _kind: InvalidPoolTransactionError) {}
868
869    fn no_updates(&mut self) {}
870
871    fn skip_blobs(&mut self) {}
872
873    fn set_skip_blobs(&mut self, _skip_blobs: bool) {}
874}
875
876/// A filter that allows to check if a transaction satisfies a set of conditions
877pub trait TransactionFilter {
878    /// The type of the transaction to check.
879    type Transaction;
880
881    /// Returns true if the transaction satisfies the conditions.
882    fn is_valid(&self, transaction: &Self::Transaction) -> bool;
883}
884
885/// A no-op implementation of [`TransactionFilter`] which
886/// marks all transactions as valid.
887#[derive(Debug, Clone)]
888pub struct NoopTransactionFilter<T>(std::marker::PhantomData<T>);
889
890// We can't derive Default because this forces T to be
891// Default as well, which isn't necessary.
892impl<T> Default for NoopTransactionFilter<T> {
893    fn default() -> Self {
894        Self(std::marker::PhantomData)
895    }
896}
897
898impl<T> TransactionFilter for NoopTransactionFilter<T> {
899    type Transaction = T;
900
901    fn is_valid(&self, _transaction: &Self::Transaction) -> bool {
902        true
903    }
904}
905
906/// A Helper type that bundles the best transactions attributes together.
907#[derive(Debug, Copy, Clone, PartialEq, Eq)]
908pub struct BestTransactionsAttributes {
909    /// The base fee attribute for best transactions.
910    pub basefee: u64,
911    /// The blob fee attribute for best transactions.
912    pub blob_fee: Option<u64>,
913}
914
915// === impl BestTransactionsAttributes ===
916
917impl BestTransactionsAttributes {
918    /// Creates a new `BestTransactionsAttributes` with the given basefee and blob fee.
919    pub const fn new(basefee: u64, blob_fee: Option<u64>) -> Self {
920        Self { basefee, blob_fee }
921    }
922
923    /// Creates a new `BestTransactionsAttributes` with the given basefee.
924    pub const fn base_fee(basefee: u64) -> Self {
925        Self::new(basefee, None)
926    }
927
928    /// Sets the given blob fee.
929    pub const fn with_blob_fee(mut self, blob_fee: u64) -> Self {
930        self.blob_fee = Some(blob_fee);
931        self
932    }
933}
934
935/// Trait for transaction types used inside the pool.
936///
937/// This supports two transaction formats
938/// - Consensus format: the form the transaction takes when it is included in a block.
939/// - Pooled format: the form the transaction takes when it is gossiping around the network.
940///
941/// This distinction is necessary for the EIP-4844 blob transactions, which require an additional
942/// sidecar when they are gossiped around the network. It is expected that the `Consensus` format is
943/// a subset of the `Pooled` format.
944///
945/// The assumption is that fallible conversion from `Consensus` to `Pooled` will encapsulate
946/// handling of all valid `Consensus` transactions that can't be pooled (e.g Deposit transactions or
947/// blob-less EIP-4844 transactions).
948pub trait PoolTransaction:
949    alloy_consensus::Transaction + InMemorySize + Debug + Send + Sync + Clone
950{
951    /// Associated error type for the `try_from_consensus` method.
952    type TryFromConsensusError: fmt::Display;
953
954    /// Associated type representing the raw consensus variant of the transaction.
955    type Consensus: SignedTransaction + From<Self::Pooled>;
956
957    /// Associated type representing the recovered pooled variant of the transaction.
958    type Pooled: TryFrom<Self::Consensus, Error = Self::TryFromConsensusError> + SignedTransaction;
959
960    /// Define a method to convert from the `Consensus` type to `Self`
961    ///
962    /// Note: this _must_ fail on any transactions that cannot be pooled (e.g OP Deposit
963    /// transactions).
964    fn try_from_consensus(
965        tx: Recovered<Self::Consensus>,
966    ) -> Result<Self, Self::TryFromConsensusError> {
967        let (tx, signer) = tx.into_parts();
968        Ok(Self::from_pooled(Recovered::new_unchecked(tx.try_into()?, signer)))
969    }
970
971    /// Clone the transaction into a consensus variant.
972    ///
973    /// This method is preferred when the [`PoolTransaction`] already wraps the consensus variant.
974    fn clone_into_consensus(&self) -> Recovered<Self::Consensus> {
975        self.clone().into_consensus()
976    }
977
978    /// Define a method to convert from the `Self` type to `Consensus`
979    fn into_consensus(self) -> Recovered<Self::Consensus>;
980
981    /// Converts the transaction into consensus format while preserving the EIP-2718 encoded bytes.
982    /// This is used to optimize transaction execution by reusing cached encoded bytes instead of
983    /// re-encoding the transaction. The cached bytes are particularly useful in payload building
984    /// where the same transaction may be executed multiple times.
985    fn into_consensus_with2718(self) -> WithEncoded<Recovered<Self::Consensus>> {
986        self.into_consensus().into_encoded()
987    }
988
989    /// Define a method to convert from the `Pooled` type to `Self`
990    fn from_pooled(pooled: Recovered<Self::Pooled>) -> Self;
991
992    /// Tries to convert the `Consensus` type into the `Pooled` type.
993    fn try_into_pooled(self) -> Result<Recovered<Self::Pooled>, Self::TryFromConsensusError> {
994        let consensus = self.into_consensus();
995        let (tx, signer) = consensus.into_parts();
996        Ok(Recovered::new_unchecked(tx.try_into()?, signer))
997    }
998
999    /// Converts the `Pooled` type into the `Consensus` type.
1000    fn pooled_into_consensus(tx: Self::Pooled) -> Self::Consensus {
1001        tx.into()
1002    }
1003
1004    /// Hash of the transaction.
1005    fn hash(&self) -> &TxHash;
1006
1007    /// The Sender of the transaction.
1008    fn sender(&self) -> Address;
1009
1010    /// Reference to the Sender of the transaction.
1011    fn sender_ref(&self) -> &Address;
1012
1013    /// Returns the cost that this transaction is allowed to consume:
1014    ///
1015    /// For EIP-1559 transactions: `max_fee_per_gas * gas_limit + tx_value`.
1016    /// For legacy transactions: `gas_price * gas_limit + tx_value`.
1017    /// For EIP-4844 blob transactions: `max_fee_per_gas * gas_limit + tx_value +
1018    /// max_blob_fee_per_gas * blob_gas_used`.
1019    fn cost(&self) -> &U256;
1020
1021    /// Returns the length of the rlp encoded transaction object
1022    ///
1023    /// Note: Implementations should cache this value.
1024    fn encoded_length(&self) -> usize;
1025
1026    /// Ensures that the transaction's code size does not exceed the provided `max_init_code_size`.
1027    ///
1028    /// This is specifically relevant for contract creation transactions ([`TxKind::Create`]),
1029    /// where the input data contains the initialization code. If the input code size exceeds
1030    /// the configured limit, an [`InvalidPoolTransactionError::ExceedsMaxInitCodeSize`] error is
1031    /// returned.
1032    fn ensure_max_init_code_size(
1033        &self,
1034        max_init_code_size: usize,
1035    ) -> Result<(), InvalidPoolTransactionError> {
1036        let input_len = self.input().len();
1037        if self.is_create() && input_len > max_init_code_size {
1038            Err(InvalidPoolTransactionError::ExceedsMaxInitCodeSize(input_len, max_init_code_size))
1039        } else {
1040            Ok(())
1041        }
1042    }
1043}
1044
1045/// Super trait for transactions that can be converted to and from Eth transactions intended for the
1046/// ethereum style pool.
1047///
1048/// This extends the [`PoolTransaction`] trait with additional methods that are specific to the
1049/// Ethereum pool.
1050pub trait EthPoolTransaction: PoolTransaction {
1051    /// Extracts the blob sidecar from the transaction.
1052    fn take_blob(&mut self) -> EthBlobTransactionSidecar;
1053
1054    /// A specialization for the EIP-4844 transaction type.
1055    /// Tries to reattach the blob sidecar to the transaction.
1056    ///
1057    /// This returns an option, but callers should ensure that the transaction is an EIP-4844
1058    /// transaction: [`Typed2718::is_eip4844`].
1059    fn try_into_pooled_eip4844(
1060        self,
1061        sidecar: Arc<BlobTransactionSidecarVariant>,
1062    ) -> Option<Recovered<Self::Pooled>>;
1063
1064    /// Tries to convert the `Consensus` type with a blob sidecar into the `Pooled` type.
1065    ///
1066    /// Returns `None` if passed transaction is not a blob transaction.
1067    fn try_from_eip4844(
1068        tx: Recovered<Self::Consensus>,
1069        sidecar: BlobTransactionSidecarVariant,
1070    ) -> Option<Self>;
1071
1072    /// Validates the blob sidecar of the transaction with the given settings.
1073    fn validate_blob(
1074        &self,
1075        blob: &BlobTransactionSidecarVariant,
1076        settings: &KzgSettings,
1077    ) -> Result<(), BlobTransactionValidationError>;
1078}
1079
1080/// The default [`PoolTransaction`] for the [Pool](crate::Pool) for Ethereum.
1081///
1082/// This type is essentially a wrapper around [`Recovered`] with additional
1083/// fields derived from the transaction that are frequently used by the pools for ordering.
1084#[derive(Debug, Clone, PartialEq, Eq)]
1085pub struct EthPooledTransaction<T = TransactionSigned> {
1086    /// `EcRecovered` transaction, the consensus format.
1087    pub transaction: Recovered<T>,
1088
1089    /// For EIP-1559 transactions: `max_fee_per_gas * gas_limit + tx_value`.
1090    /// For legacy transactions: `gas_price * gas_limit + tx_value`.
1091    /// For EIP-4844 blob transactions: `max_fee_per_gas * gas_limit + tx_value +
1092    /// max_blob_fee_per_gas * blob_gas_used`.
1093    pub cost: U256,
1094
1095    /// This is the RLP length of the transaction, computed when the transaction is added to the
1096    /// pool.
1097    pub encoded_length: usize,
1098
1099    /// The blob side car for this transaction
1100    pub blob_sidecar: EthBlobTransactionSidecar,
1101}
1102
1103impl<T: SignedTransaction> EthPooledTransaction<T> {
1104    /// Create new instance of [Self].
1105    ///
1106    /// Caution: In case of blob transactions, this does marks the blob sidecar as
1107    /// [`EthBlobTransactionSidecar::Missing`]
1108    pub fn new(transaction: Recovered<T>, encoded_length: usize) -> Self {
1109        let mut blob_sidecar = EthBlobTransactionSidecar::None;
1110
1111        let gas_cost = U256::from(transaction.max_fee_per_gas())
1112            .saturating_mul(U256::from(transaction.gas_limit()));
1113
1114        let mut cost = gas_cost.saturating_add(transaction.value());
1115
1116        if let (Some(blob_gas_used), Some(max_fee_per_blob_gas)) =
1117            (transaction.blob_gas_used(), transaction.max_fee_per_blob_gas())
1118        {
1119            // Add max blob cost using saturating math to avoid overflow
1120            cost = cost.saturating_add(U256::from(
1121                max_fee_per_blob_gas.saturating_mul(blob_gas_used as u128),
1122            ));
1123
1124            // because the blob sidecar is not included in this transaction variant, mark it as
1125            // missing
1126            blob_sidecar = EthBlobTransactionSidecar::Missing;
1127        }
1128
1129        Self { transaction, cost, encoded_length, blob_sidecar }
1130    }
1131
1132    /// Return the reference to the underlying transaction.
1133    pub const fn transaction(&self) -> &Recovered<T> {
1134        &self.transaction
1135    }
1136}
1137
1138impl PoolTransaction for EthPooledTransaction {
1139    type TryFromConsensusError = ValueError<TransactionSigned>;
1140
1141    type Consensus = TransactionSigned;
1142
1143    type Pooled = PooledTransactionVariant;
1144
1145    fn clone_into_consensus(&self) -> Recovered<Self::Consensus> {
1146        self.transaction().clone()
1147    }
1148
1149    fn into_consensus(self) -> Recovered<Self::Consensus> {
1150        self.transaction
1151    }
1152
1153    fn from_pooled(tx: Recovered<Self::Pooled>) -> Self {
1154        let encoded_length = tx.encode_2718_len();
1155        let (tx, signer) = tx.into_parts();
1156        match tx {
1157            PooledTransactionVariant::Eip4844(tx) => {
1158                // include the blob sidecar
1159                let (tx, sig, hash) = tx.into_parts();
1160                let (tx, blob) = tx.into_parts();
1161                let tx = Signed::new_unchecked(tx, sig, hash);
1162                let tx = TransactionSigned::from(tx);
1163                let tx = Recovered::new_unchecked(tx, signer);
1164                let mut pooled = Self::new(tx, encoded_length);
1165                pooled.blob_sidecar = EthBlobTransactionSidecar::Present(blob);
1166                pooled
1167            }
1168            tx => {
1169                // no blob sidecar
1170                let tx = Recovered::new_unchecked(tx.into(), signer);
1171                Self::new(tx, encoded_length)
1172            }
1173        }
1174    }
1175
1176    /// Returns hash of the transaction.
1177    fn hash(&self) -> &TxHash {
1178        self.transaction.tx_hash()
1179    }
1180
1181    /// Returns the Sender of the transaction.
1182    fn sender(&self) -> Address {
1183        self.transaction.signer()
1184    }
1185
1186    /// Returns a reference to the Sender of the transaction.
1187    fn sender_ref(&self) -> &Address {
1188        self.transaction.signer_ref()
1189    }
1190
1191    /// Returns the cost that this transaction is allowed to consume:
1192    ///
1193    /// For EIP-1559 transactions: `max_fee_per_gas * gas_limit + tx_value`.
1194    /// For legacy transactions: `gas_price * gas_limit + tx_value`.
1195    /// For EIP-4844 blob transactions: `max_fee_per_gas * gas_limit + tx_value +
1196    /// max_blob_fee_per_gas * blob_gas_used`.
1197    fn cost(&self) -> &U256 {
1198        &self.cost
1199    }
1200
1201    /// Returns the length of the rlp encoded object
1202    fn encoded_length(&self) -> usize {
1203        self.encoded_length
1204    }
1205}
1206
1207impl<T: Typed2718> Typed2718 for EthPooledTransaction<T> {
1208    fn ty(&self) -> u8 {
1209        self.transaction.ty()
1210    }
1211}
1212
1213impl<T: InMemorySize> InMemorySize for EthPooledTransaction<T> {
1214    fn size(&self) -> usize {
1215        self.transaction.size()
1216    }
1217}
1218
1219impl<T: alloy_consensus::Transaction> alloy_consensus::Transaction for EthPooledTransaction<T> {
1220    fn chain_id(&self) -> Option<alloy_primitives::ChainId> {
1221        self.transaction.chain_id()
1222    }
1223
1224    fn nonce(&self) -> u64 {
1225        self.transaction.nonce()
1226    }
1227
1228    fn gas_limit(&self) -> u64 {
1229        self.transaction.gas_limit()
1230    }
1231
1232    fn gas_price(&self) -> Option<u128> {
1233        self.transaction.gas_price()
1234    }
1235
1236    fn max_fee_per_gas(&self) -> u128 {
1237        self.transaction.max_fee_per_gas()
1238    }
1239
1240    fn max_priority_fee_per_gas(&self) -> Option<u128> {
1241        self.transaction.max_priority_fee_per_gas()
1242    }
1243
1244    fn max_fee_per_blob_gas(&self) -> Option<u128> {
1245        self.transaction.max_fee_per_blob_gas()
1246    }
1247
1248    fn priority_fee_or_price(&self) -> u128 {
1249        self.transaction.priority_fee_or_price()
1250    }
1251
1252    fn effective_gas_price(&self, base_fee: Option<u64>) -> u128 {
1253        self.transaction.effective_gas_price(base_fee)
1254    }
1255
1256    fn is_dynamic_fee(&self) -> bool {
1257        self.transaction.is_dynamic_fee()
1258    }
1259
1260    fn kind(&self) -> TxKind {
1261        self.transaction.kind()
1262    }
1263
1264    fn is_create(&self) -> bool {
1265        self.transaction.is_create()
1266    }
1267
1268    fn value(&self) -> U256 {
1269        self.transaction.value()
1270    }
1271
1272    fn input(&self) -> &Bytes {
1273        self.transaction.input()
1274    }
1275
1276    fn access_list(&self) -> Option<&AccessList> {
1277        self.transaction.access_list()
1278    }
1279
1280    fn blob_versioned_hashes(&self) -> Option<&[B256]> {
1281        self.transaction.blob_versioned_hashes()
1282    }
1283
1284    fn authorization_list(&self) -> Option<&[SignedAuthorization]> {
1285        self.transaction.authorization_list()
1286    }
1287}
1288
1289impl EthPoolTransaction for EthPooledTransaction {
1290    fn take_blob(&mut self) -> EthBlobTransactionSidecar {
1291        if self.is_eip4844() {
1292            std::mem::replace(&mut self.blob_sidecar, EthBlobTransactionSidecar::Missing)
1293        } else {
1294            EthBlobTransactionSidecar::None
1295        }
1296    }
1297
1298    fn try_into_pooled_eip4844(
1299        self,
1300        sidecar: Arc<BlobTransactionSidecarVariant>,
1301    ) -> Option<Recovered<Self::Pooled>> {
1302        let (signed_transaction, signer) = self.into_consensus().into_parts();
1303        let pooled_transaction =
1304            signed_transaction.try_into_pooled_eip4844(Arc::unwrap_or_clone(sidecar)).ok()?;
1305
1306        Some(Recovered::new_unchecked(pooled_transaction, signer))
1307    }
1308
1309    fn try_from_eip4844(
1310        tx: Recovered<Self::Consensus>,
1311        sidecar: BlobTransactionSidecarVariant,
1312    ) -> Option<Self> {
1313        let (tx, signer) = tx.into_parts();
1314        tx.try_into_pooled_eip4844(sidecar)
1315            .ok()
1316            .map(|tx| tx.with_signer(signer))
1317            .map(Self::from_pooled)
1318    }
1319
1320    fn validate_blob(
1321        &self,
1322        sidecar: &BlobTransactionSidecarVariant,
1323        settings: &KzgSettings,
1324    ) -> Result<(), BlobTransactionValidationError> {
1325        match self.transaction.inner().as_eip4844() {
1326            Some(tx) => tx.tx().validate_blob(sidecar, settings),
1327            _ => Err(BlobTransactionValidationError::NotBlobTransaction(self.ty())),
1328        }
1329    }
1330}
1331
1332/// Represents the blob sidecar of the [`EthPooledTransaction`].
1333#[derive(Debug, Clone, PartialEq, Eq)]
1334pub enum EthBlobTransactionSidecar {
1335    /// This transaction does not have a blob sidecar
1336    None,
1337    /// This transaction has a blob sidecar (EIP-4844) but it is missing
1338    ///
1339    /// It was either extracted after being inserted into the pool or re-injected after reorg
1340    /// without the blob sidecar
1341    Missing,
1342    /// The eip-4844 transaction was pulled from the network and still has its blob sidecar
1343    Present(BlobTransactionSidecarVariant),
1344}
1345
1346impl EthBlobTransactionSidecar {
1347    /// Returns the blob sidecar if it is present
1348    pub const fn maybe_sidecar(&self) -> Option<&BlobTransactionSidecarVariant> {
1349        match self {
1350            Self::Present(sidecar) => Some(sidecar),
1351            _ => None,
1352        }
1353    }
1354}
1355
1356/// Represents the current status of the pool.
1357#[derive(Debug, Clone, Copy, Default)]
1358pub struct PoolSize {
1359    /// Number of transactions in the _pending_ sub-pool.
1360    pub pending: usize,
1361    /// Reported size of transactions in the _pending_ sub-pool.
1362    pub pending_size: usize,
1363    /// Number of transactions in the _blob_ pool.
1364    pub blob: usize,
1365    /// Reported size of transactions in the _blob_ pool.
1366    pub blob_size: usize,
1367    /// Number of transactions in the _basefee_ pool.
1368    pub basefee: usize,
1369    /// Reported size of transactions in the _basefee_ sub-pool.
1370    pub basefee_size: usize,
1371    /// Number of transactions in the _queued_ sub-pool.
1372    pub queued: usize,
1373    /// Reported size of transactions in the _queued_ sub-pool.
1374    pub queued_size: usize,
1375    /// Number of all transactions of all sub-pools
1376    ///
1377    /// Note: this is the sum of ```pending + basefee + queued```
1378    pub total: usize,
1379}
1380
1381// === impl PoolSize ===
1382
1383impl PoolSize {
1384    /// Asserts that the invariants of the pool size are met.
1385    #[cfg(test)]
1386    pub(crate) fn assert_invariants(&self) {
1387        assert_eq!(self.total, self.pending + self.basefee + self.queued + self.blob);
1388    }
1389}
1390
1391/// Represents the current status of the pool.
1392#[derive(Default, Debug, Clone, Copy, Eq, PartialEq)]
1393pub struct BlockInfo {
1394    /// Hash for the currently tracked block.
1395    pub last_seen_block_hash: B256,
1396    /// Currently tracked block.
1397    pub last_seen_block_number: u64,
1398    /// Current block gas limit for the latest block.
1399    pub block_gas_limit: u64,
1400    /// Currently enforced base fee: the threshold for the basefee sub-pool.
1401    ///
1402    /// Note: this is the derived base fee of the _next_ block that builds on the block the pool is
1403    /// currently tracking.
1404    pub pending_basefee: u64,
1405    /// Currently enforced blob fee: the threshold for eip-4844 blob transactions.
1406    ///
1407    /// Note: this is the derived blob fee of the _next_ block that builds on the block the pool is
1408    /// currently tracking
1409    pub pending_blob_fee: Option<u128>,
1410}
1411
1412/// The limit to enforce for [`TransactionPool::get_pooled_transaction_elements`].
1413#[derive(Debug, Clone, Copy, Eq, PartialEq)]
1414pub enum GetPooledTransactionLimit {
1415    /// No limit, return all transactions.
1416    None,
1417    /// Enforce a size limit on the returned transactions, for example 2MB
1418    ResponseSizeSoftLimit(usize),
1419}
1420
1421impl GetPooledTransactionLimit {
1422    /// Returns true if the given size exceeds the limit.
1423    #[inline]
1424    pub const fn exceeds(&self, size: usize) -> bool {
1425        match self {
1426            Self::None => false,
1427            Self::ResponseSizeSoftLimit(limit) => size > *limit,
1428        }
1429    }
1430}
1431
1432/// A Stream that yields full transactions the subpool
1433#[must_use = "streams do nothing unless polled"]
1434#[derive(Debug)]
1435pub struct NewSubpoolTransactionStream<Tx: PoolTransaction> {
1436    st: Receiver<NewTransactionEvent<Tx>>,
1437    subpool: SubPool,
1438}
1439
1440// === impl NewSubpoolTransactionStream ===
1441
1442impl<Tx: PoolTransaction> NewSubpoolTransactionStream<Tx> {
1443    /// Create a new stream that yields full transactions from the subpool
1444    pub const fn new(st: Receiver<NewTransactionEvent<Tx>>, subpool: SubPool) -> Self {
1445        Self { st, subpool }
1446    }
1447
1448    /// Tries to receive the next value for this stream.
1449    pub fn try_recv(
1450        &mut self,
1451    ) -> Result<NewTransactionEvent<Tx>, tokio::sync::mpsc::error::TryRecvError> {
1452        loop {
1453            match self.st.try_recv() {
1454                Ok(event) => {
1455                    if event.subpool == self.subpool {
1456                        return Ok(event)
1457                    }
1458                }
1459                Err(e) => return Err(e),
1460            }
1461        }
1462    }
1463}
1464
1465impl<Tx: PoolTransaction> Stream for NewSubpoolTransactionStream<Tx> {
1466    type Item = NewTransactionEvent<Tx>;
1467
1468    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1469        loop {
1470            match ready!(self.st.poll_recv(cx)) {
1471                Some(event) => {
1472                    if event.subpool == self.subpool {
1473                        return Poll::Ready(Some(event))
1474                    }
1475                }
1476                None => return Poll::Ready(None),
1477            }
1478        }
1479    }
1480}
1481
1482#[cfg(test)]
1483mod tests {
1484    use super::*;
1485    use alloy_consensus::{
1486        EthereumTxEnvelope, SignableTransaction, TxEip1559, TxEip2930, TxEip4844, TxEip7702,
1487        TxEnvelope, TxLegacy,
1488    };
1489    use alloy_eips::eip4844::DATA_GAS_PER_BLOB;
1490    use alloy_primitives::Signature;
1491
1492    #[test]
1493    fn test_pool_size_invariants() {
1494        let pool_size = PoolSize {
1495            pending: 10,
1496            pending_size: 1000,
1497            blob: 5,
1498            blob_size: 500,
1499            basefee: 8,
1500            basefee_size: 800,
1501            queued: 7,
1502            queued_size: 700,
1503            total: 10 + 5 + 8 + 7, // Correct total
1504        };
1505
1506        // Call the assert_invariants method to check if the invariants are correct
1507        pool_size.assert_invariants();
1508    }
1509
1510    #[test]
1511    #[should_panic]
1512    fn test_pool_size_invariants_fail() {
1513        let pool_size = PoolSize {
1514            pending: 10,
1515            pending_size: 1000,
1516            blob: 5,
1517            blob_size: 500,
1518            basefee: 8,
1519            basefee_size: 800,
1520            queued: 7,
1521            queued_size: 700,
1522            total: 10 + 5 + 8, // Incorrect total
1523        };
1524
1525        // Call the assert_invariants method, which should panic
1526        pool_size.assert_invariants();
1527    }
1528
1529    #[test]
1530    fn test_eth_pooled_transaction_new_legacy() {
1531        // Create a legacy transaction with specific parameters
1532        let tx = TxEnvelope::Legacy(
1533            TxLegacy {
1534                gas_price: 10,
1535                gas_limit: 1000,
1536                value: U256::from(100),
1537                ..Default::default()
1538            }
1539            .into_signed(Signature::test_signature()),
1540        );
1541        let transaction = Recovered::new_unchecked(tx, Default::default());
1542        let pooled_tx = EthPooledTransaction::new(transaction.clone(), 200);
1543
1544        // Check that the pooled transaction is created correctly
1545        assert_eq!(pooled_tx.transaction, transaction);
1546        assert_eq!(pooled_tx.encoded_length, 200);
1547        assert_eq!(pooled_tx.blob_sidecar, EthBlobTransactionSidecar::None);
1548        assert_eq!(pooled_tx.cost, U256::from(100) + U256::from(10 * 1000));
1549    }
1550
1551    #[test]
1552    fn test_eth_pooled_transaction_new_eip2930() {
1553        // Create an EIP-2930 transaction with specific parameters
1554        let tx = TxEnvelope::Eip2930(
1555            TxEip2930 {
1556                gas_price: 10,
1557                gas_limit: 1000,
1558                value: U256::from(100),
1559                ..Default::default()
1560            }
1561            .into_signed(Signature::test_signature()),
1562        );
1563        let transaction = Recovered::new_unchecked(tx, Default::default());
1564        let pooled_tx = EthPooledTransaction::new(transaction.clone(), 200);
1565        let expected_cost = U256::from(100) + (U256::from(10 * 1000));
1566
1567        assert_eq!(pooled_tx.transaction, transaction);
1568        assert_eq!(pooled_tx.encoded_length, 200);
1569        assert_eq!(pooled_tx.blob_sidecar, EthBlobTransactionSidecar::None);
1570        assert_eq!(pooled_tx.cost, expected_cost);
1571    }
1572
1573    #[test]
1574    fn test_eth_pooled_transaction_new_eip1559() {
1575        // Create an EIP-1559 transaction with specific parameters
1576        let tx = TxEnvelope::Eip1559(
1577            TxEip1559 {
1578                max_fee_per_gas: 10,
1579                gas_limit: 1000,
1580                value: U256::from(100),
1581                ..Default::default()
1582            }
1583            .into_signed(Signature::test_signature()),
1584        );
1585        let transaction = Recovered::new_unchecked(tx, Default::default());
1586        let pooled_tx = EthPooledTransaction::new(transaction.clone(), 200);
1587
1588        // Check that the pooled transaction is created correctly
1589        assert_eq!(pooled_tx.transaction, transaction);
1590        assert_eq!(pooled_tx.encoded_length, 200);
1591        assert_eq!(pooled_tx.blob_sidecar, EthBlobTransactionSidecar::None);
1592        assert_eq!(pooled_tx.cost, U256::from(100) + U256::from(10 * 1000));
1593    }
1594
1595    #[test]
1596    fn test_eth_pooled_transaction_new_eip4844() {
1597        // Create an EIP-4844 transaction with specific parameters
1598        let tx = EthereumTxEnvelope::Eip4844(
1599            TxEip4844 {
1600                max_fee_per_gas: 10,
1601                gas_limit: 1000,
1602                value: U256::from(100),
1603                max_fee_per_blob_gas: 5,
1604                blob_versioned_hashes: vec![B256::default()],
1605                ..Default::default()
1606            }
1607            .into_signed(Signature::test_signature()),
1608        );
1609        let transaction = Recovered::new_unchecked(tx, Default::default());
1610        let pooled_tx = EthPooledTransaction::new(transaction.clone(), 300);
1611
1612        // Check that the pooled transaction is created correctly
1613        assert_eq!(pooled_tx.transaction, transaction);
1614        assert_eq!(pooled_tx.encoded_length, 300);
1615        assert_eq!(pooled_tx.blob_sidecar, EthBlobTransactionSidecar::Missing);
1616        let expected_cost =
1617            U256::from(100) + U256::from(10 * 1000) + U256::from(5 * DATA_GAS_PER_BLOB);
1618        assert_eq!(pooled_tx.cost, expected_cost);
1619    }
1620
1621    #[test]
1622    fn test_eth_pooled_transaction_new_eip7702() {
1623        // Init an EIP-7702 transaction with specific parameters
1624        let tx = EthereumTxEnvelope::<TxEip4844>::Eip7702(
1625            TxEip7702 {
1626                max_fee_per_gas: 10,
1627                gas_limit: 1000,
1628                value: U256::from(100),
1629                ..Default::default()
1630            }
1631            .into_signed(Signature::test_signature()),
1632        );
1633        let transaction = Recovered::new_unchecked(tx, Default::default());
1634        let pooled_tx = EthPooledTransaction::new(transaction.clone(), 200);
1635
1636        // Check that the pooled transaction is created correctly
1637        assert_eq!(pooled_tx.transaction, transaction);
1638        assert_eq!(pooled_tx.encoded_length, 200);
1639        assert_eq!(pooled_tx.blob_sidecar, EthBlobTransactionSidecar::None);
1640        assert_eq!(pooled_tx.cost, U256::from(100) + U256::from(10 * 1000));
1641    }
1642
1643    #[test]
1644    fn test_pooled_transaction_limit() {
1645        // No limit should never exceed
1646        let limit_none = GetPooledTransactionLimit::None;
1647        // Any size should return false
1648        assert!(!limit_none.exceeds(1000));
1649
1650        // Size limit of 2MB (2 * 1024 * 1024 bytes)
1651        let size_limit_2mb = GetPooledTransactionLimit::ResponseSizeSoftLimit(2 * 1024 * 1024);
1652
1653        // Test with size below the limit
1654        // 1MB is below 2MB, should return false
1655        assert!(!size_limit_2mb.exceeds(1024 * 1024));
1656
1657        // Test with size exactly at the limit
1658        // 2MB equals the limit, should return false
1659        assert!(!size_limit_2mb.exceeds(2 * 1024 * 1024));
1660
1661        // Test with size exceeding the limit
1662        // 3MB is above the 2MB limit, should return true
1663        assert!(size_limit_2mb.exceeds(3 * 1024 * 1024));
1664    }
1665}