reth_transaction_pool/traits.rs
1use crate::{
2 blobstore::BlobStoreError,
3 error::{InvalidPoolTransactionError, PoolResult},
4 pool::{
5 state::SubPool, BestTransactionFilter, NewTransactionEvent, TransactionEvents,
6 TransactionListenerKind,
7 },
8 validate::ValidPoolTransaction,
9 AllTransactionsEvents,
10};
11use alloy_consensus::{error::ValueError, BlockHeader, Signed, Typed2718};
12use alloy_eips::{
13 eip2718::{Encodable2718, WithEncoded},
14 eip2930::AccessList,
15 eip4844::{
16 env_settings::KzgSettings, BlobAndProofV1, BlobAndProofV2, BlobTransactionValidationError,
17 },
18 eip7594::BlobTransactionSidecarVariant,
19 eip7702::SignedAuthorization,
20};
21use alloy_primitives::{Address, Bytes, TxHash, TxKind, B256, U256};
22use futures_util::{ready, Stream};
23use reth_eth_wire_types::HandleMempoolData;
24use reth_ethereum_primitives::{PooledTransactionVariant, TransactionSigned};
25use reth_execution_types::ChangedAccount;
26use reth_primitives_traits::{Block, InMemorySize, Recovered, SealedBlock, SignedTransaction};
27#[cfg(feature = "serde")]
28use serde::{Deserialize, Serialize};
29use std::{
30 collections::{HashMap, HashSet},
31 fmt,
32 fmt::Debug,
33 future::Future,
34 pin::Pin,
35 sync::Arc,
36 task::{Context, Poll},
37};
38use tokio::sync::mpsc::Receiver;
39
40/// The `PeerId` type.
41pub type PeerId = alloy_primitives::B512;
42
43/// Helper type alias to access [`PoolTransaction`] for a given [`TransactionPool`].
44pub type PoolTx<P> = <P as TransactionPool>::Transaction;
45/// Helper type alias to access [`PoolTransaction::Consensus`] for a given [`TransactionPool`].
46pub type PoolConsensusTx<P> = <<P as TransactionPool>::Transaction as PoolTransaction>::Consensus;
47
48/// Helper type alias to access [`PoolTransaction::Pooled`] for a given [`TransactionPool`].
49pub type PoolPooledTx<P> = <<P as TransactionPool>::Transaction as PoolTransaction>::Pooled;
50
51/// General purpose abstraction of a transaction-pool.
52///
53/// This is intended to be used by API-consumers such as RPC that need inject new incoming,
54/// unverified transactions. And by block production that needs to get transactions to execute in a
55/// new block.
56///
57/// Note: This requires `Clone` for convenience, since it is assumed that this will be implemented
58/// for a wrapped `Arc` type, see also [`Pool`](crate::Pool).
59#[auto_impl::auto_impl(&, Arc)]
60pub trait TransactionPool: Clone + Debug + Send + Sync {
61 /// The transaction type of the pool
62 type Transaction: EthPoolTransaction;
63
64 /// Returns stats about the pool and all sub-pools.
65 fn pool_size(&self) -> PoolSize;
66
67 /// Returns the block the pool is currently tracking.
68 ///
69 /// This tracks the block that the pool has last seen.
70 fn block_info(&self) -> BlockInfo;
71
72 /// Imports an _external_ transaction.
73 ///
74 /// This is intended to be used by the network to insert incoming transactions received over the
75 /// p2p network.
76 ///
77 /// Consumer: P2P
78 fn add_external_transaction(
79 &self,
80 transaction: Self::Transaction,
81 ) -> impl Future<Output = PoolResult<TxHash>> + Send {
82 self.add_transaction(TransactionOrigin::External, transaction)
83 }
84
85 /// Imports all _external_ transactions
86 ///
87 /// Consumer: Utility
88 fn add_external_transactions(
89 &self,
90 transactions: Vec<Self::Transaction>,
91 ) -> impl Future<Output = Vec<PoolResult<TxHash>>> + Send {
92 self.add_transactions(TransactionOrigin::External, transactions)
93 }
94
95 /// Adds an _unvalidated_ transaction into the pool and subscribe to state changes.
96 ///
97 /// This is the same as [`TransactionPool::add_transaction`] but returns an event stream for the
98 /// given transaction.
99 ///
100 /// Consumer: Custom
101 fn add_transaction_and_subscribe(
102 &self,
103 origin: TransactionOrigin,
104 transaction: Self::Transaction,
105 ) -> impl Future<Output = PoolResult<TransactionEvents>> + Send;
106
107 /// Adds an _unvalidated_ transaction into the pool.
108 ///
109 /// Consumer: RPC
110 fn add_transaction(
111 &self,
112 origin: TransactionOrigin,
113 transaction: Self::Transaction,
114 ) -> impl Future<Output = PoolResult<TxHash>> + Send;
115
116 /// Adds the given _unvalidated_ transaction into the pool.
117 ///
118 /// Returns a list of results.
119 ///
120 /// Consumer: RPC
121 fn add_transactions(
122 &self,
123 origin: TransactionOrigin,
124 transactions: Vec<Self::Transaction>,
125 ) -> impl Future<Output = Vec<PoolResult<TxHash>>> + Send;
126
127 /// Returns a new transaction change event stream for the given transaction.
128 ///
129 /// Returns `None` if the transaction is not in the pool.
130 fn transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents>;
131
132 /// Returns a new transaction change event stream for _all_ transactions in the pool.
133 fn all_transactions_event_listener(&self) -> AllTransactionsEvents<Self::Transaction>;
134
135 /// Returns a new Stream that yields transactions hashes for new __pending__ transactions
136 /// inserted into the pool that are allowed to be propagated.
137 ///
138 /// Note: This is intended for networking and will __only__ yield transactions that are allowed
139 /// to be propagated over the network, see also [TransactionListenerKind].
140 ///
141 /// Consumer: RPC/P2P
142 fn pending_transactions_listener(&self) -> Receiver<TxHash> {
143 self.pending_transactions_listener_for(TransactionListenerKind::PropagateOnly)
144 }
145
146 /// Returns a new [Receiver] that yields transactions hashes for new __pending__ transactions
147 /// inserted into the pending pool depending on the given [TransactionListenerKind] argument.
148 fn pending_transactions_listener_for(&self, kind: TransactionListenerKind) -> Receiver<TxHash>;
149
150 /// Returns a new stream that yields new valid transactions added to the pool.
151 fn new_transactions_listener(&self) -> Receiver<NewTransactionEvent<Self::Transaction>> {
152 self.new_transactions_listener_for(TransactionListenerKind::PropagateOnly)
153 }
154
155 /// Returns a new [Receiver] that yields blob "sidecars" (blobs w/ assoc. kzg
156 /// commitments/proofs) for eip-4844 transactions inserted into the pool
157 fn blob_transaction_sidecars_listener(&self) -> Receiver<NewBlobSidecar>;
158
159 /// Returns a new stream that yields new valid transactions added to the pool
160 /// depending on the given [TransactionListenerKind] argument.
161 fn new_transactions_listener_for(
162 &self,
163 kind: TransactionListenerKind,
164 ) -> Receiver<NewTransactionEvent<Self::Transaction>>;
165
166 /// Returns a new Stream that yields new transactions added to the pending sub-pool.
167 ///
168 /// This is a convenience wrapper around [Self::new_transactions_listener] that filters for
169 /// [SubPool::Pending](crate::SubPool).
170 fn new_pending_pool_transactions_listener(
171 &self,
172 ) -> NewSubpoolTransactionStream<Self::Transaction> {
173 NewSubpoolTransactionStream::new(
174 self.new_transactions_listener_for(TransactionListenerKind::PropagateOnly),
175 SubPool::Pending,
176 )
177 }
178
179 /// Returns a new Stream that yields new transactions added to the basefee sub-pool.
180 ///
181 /// This is a convenience wrapper around [Self::new_transactions_listener] that filters for
182 /// [SubPool::BaseFee](crate::SubPool).
183 fn new_basefee_pool_transactions_listener(
184 &self,
185 ) -> NewSubpoolTransactionStream<Self::Transaction> {
186 NewSubpoolTransactionStream::new(self.new_transactions_listener(), SubPool::BaseFee)
187 }
188
189 /// Returns a new Stream that yields new transactions added to the queued-pool.
190 ///
191 /// This is a convenience wrapper around [Self::new_transactions_listener] that filters for
192 /// [SubPool::Queued](crate::SubPool).
193 fn new_queued_transactions_listener(&self) -> NewSubpoolTransactionStream<Self::Transaction> {
194 NewSubpoolTransactionStream::new(self.new_transactions_listener(), SubPool::Queued)
195 }
196
197 /// Returns the _hashes_ of all transactions in the pool.
198 ///
199 /// Note: This returns a `Vec` but should guarantee that all hashes are unique.
200 ///
201 /// Consumer: P2P
202 fn pooled_transaction_hashes(&self) -> Vec<TxHash>;
203
204 /// Returns only the first `max` hashes of transactions in the pool.
205 ///
206 /// Consumer: P2P
207 fn pooled_transaction_hashes_max(&self, max: usize) -> Vec<TxHash>;
208
209 /// Returns the _full_ transaction objects all transactions in the pool.
210 ///
211 /// This is intended to be used by the network for the initial exchange of pooled transaction
212 /// _hashes_
213 ///
214 /// Note: This returns a `Vec` but should guarantee that all transactions are unique.
215 ///
216 /// Caution: In case of blob transactions, this does not include the sidecar.
217 ///
218 /// Consumer: P2P
219 fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
220
221 /// Returns only the first `max` transactions in the pool.
222 ///
223 /// Consumer: P2P
224 fn pooled_transactions_max(
225 &self,
226 max: usize,
227 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
228
229 /// Returns converted [PooledTransactionVariant] for the given transaction hashes.
230 ///
231 /// This adheres to the expected behavior of
232 /// [`GetPooledTransactions`](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#getpooledtransactions-0x09):
233 ///
234 /// The transactions must be in same order as in the request, but it is OK to skip transactions
235 /// which are not available.
236 ///
237 /// If the transaction is a blob transaction, the sidecar will be included.
238 ///
239 /// Consumer: P2P
240 fn get_pooled_transaction_elements(
241 &self,
242 tx_hashes: Vec<TxHash>,
243 limit: GetPooledTransactionLimit,
244 ) -> Vec<<Self::Transaction as PoolTransaction>::Pooled>;
245
246 /// Returns the pooled transaction variant for the given transaction hash.
247 ///
248 /// This adheres to the expected behavior of
249 /// [`GetPooledTransactions`](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#getpooledtransactions-0x09):
250 ///
251 /// If the transaction is a blob transaction, the sidecar will be included.
252 ///
253 /// It is expected that this variant represents the valid p2p format for full transactions.
254 /// E.g. for EIP-4844 transactions this is the consensus transaction format with the blob
255 /// sidecar.
256 ///
257 /// Consumer: P2P
258 fn get_pooled_transaction_element(
259 &self,
260 tx_hash: TxHash,
261 ) -> Option<Recovered<<Self::Transaction as PoolTransaction>::Pooled>>;
262
263 /// Returns an iterator that yields transactions that are ready for block production.
264 ///
265 /// Consumer: Block production
266 fn best_transactions(
267 &self,
268 ) -> Box<dyn BestTransactions<Item = Arc<ValidPoolTransaction<Self::Transaction>>>>;
269
270 /// Returns an iterator that yields transactions that are ready for block production with the
271 /// given base fee and optional blob fee attributes.
272 ///
273 /// Consumer: Block production
274 fn best_transactions_with_attributes(
275 &self,
276 best_transactions_attributes: BestTransactionsAttributes,
277 ) -> Box<dyn BestTransactions<Item = Arc<ValidPoolTransaction<Self::Transaction>>>>;
278
279 /// Returns all transactions that can be included in the next block.
280 ///
281 /// This is primarily used for the `txpool_` RPC namespace:
282 /// <https://geth.ethereum.org/docs/interacting-with-geth/rpc/ns-txpool> which distinguishes
283 /// between `pending` and `queued` transactions, where `pending` are transactions ready for
284 /// inclusion in the next block and `queued` are transactions that are ready for inclusion in
285 /// future blocks.
286 ///
287 /// Consumer: RPC
288 fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
289
290 /// Returns first `max` transactions that can be included in the next block.
291 /// See <https://github.com/paradigmxyz/reth/issues/12767#issuecomment-2493223579>
292 ///
293 /// Consumer: Block production
294 fn pending_transactions_max(
295 &self,
296 max: usize,
297 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
298
299 /// Returns all transactions that can be included in _future_ blocks.
300 ///
301 /// This and [Self::pending_transactions] are mutually exclusive.
302 ///
303 /// Consumer: RPC
304 fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
305
306 /// Returns all transactions that are currently in the pool grouped by whether they are ready
307 /// for inclusion in the next block or not.
308 ///
309 /// This is primarily used for the `txpool_` namespace: <https://geth.ethereum.org/docs/interacting-with-geth/rpc/ns-txpool>
310 ///
311 /// Consumer: RPC
312 fn all_transactions(&self) -> AllPoolTransactions<Self::Transaction>;
313
314 /// Removes all transactions corresponding to the given hashes.
315 ///
316 /// Note: This removes the transactions as if they got discarded (_not_ mined).
317 ///
318 /// Consumer: Utility
319 fn remove_transactions(
320 &self,
321 hashes: Vec<TxHash>,
322 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
323
324 /// Removes all transactions corresponding to the given hashes.
325 ///
326 /// Also removes all _dependent_ transactions.
327 ///
328 /// Consumer: Utility
329 fn remove_transactions_and_descendants(
330 &self,
331 hashes: Vec<TxHash>,
332 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
333
334 /// Removes all transactions from the given sender
335 ///
336 /// Consumer: Utility
337 fn remove_transactions_by_sender(
338 &self,
339 sender: Address,
340 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
341
342 /// Retains only those hashes that are unknown to the pool.
343 /// In other words, removes all transactions from the given set that are currently present in
344 /// the pool. Returns hashes already known to the pool.
345 ///
346 /// Consumer: P2P
347 fn retain_unknown<A>(&self, announcement: &mut A)
348 where
349 A: HandleMempoolData;
350
351 /// Returns if the transaction for the given hash is already included in this pool.
352 fn contains(&self, tx_hash: &TxHash) -> bool {
353 self.get(tx_hash).is_some()
354 }
355
356 /// Returns the transaction for the given hash.
357 fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>>;
358
359 /// Returns all transactions objects for the given hashes.
360 ///
361 /// Caution: This in case of blob transactions, this does not include the sidecar.
362 fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
363
364 /// Notify the pool about transactions that are propagated to peers.
365 ///
366 /// Consumer: P2P
367 fn on_propagated(&self, txs: PropagatedTransactions);
368
369 /// Returns all transactions sent by a given user
370 fn get_transactions_by_sender(
371 &self,
372 sender: Address,
373 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
374
375 /// Returns all pending transactions filtered by predicate
376 fn get_pending_transactions_with_predicate(
377 &self,
378 predicate: impl FnMut(&ValidPoolTransaction<Self::Transaction>) -> bool,
379 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
380
381 /// Returns all pending transactions sent by a given user
382 fn get_pending_transactions_by_sender(
383 &self,
384 sender: Address,
385 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
386
387 /// Returns all queued transactions sent by a given user
388 fn get_queued_transactions_by_sender(
389 &self,
390 sender: Address,
391 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
392
393 /// Returns the highest transaction sent by a given user
394 fn get_highest_transaction_by_sender(
395 &self,
396 sender: Address,
397 ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>>;
398
399 /// Returns the transaction with the highest nonce that is executable given the on chain nonce.
400 /// In other words the highest non nonce gapped transaction.
401 ///
402 /// Note: The next pending pooled transaction must have the on chain nonce.
403 ///
404 /// For example, for a given on chain nonce of `5`, the next transaction must have that nonce.
405 /// If the pool contains txs `[5,6,7]` this returns tx `7`.
406 /// If the pool contains txs `[6,7]` this returns `None` because the next valid nonce (5) is
407 /// missing, which means txs `[6,7]` are nonce gapped.
408 fn get_highest_consecutive_transaction_by_sender(
409 &self,
410 sender: Address,
411 on_chain_nonce: u64,
412 ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>>;
413
414 /// Returns a transaction sent by a given user and a nonce
415 fn get_transaction_by_sender_and_nonce(
416 &self,
417 sender: Address,
418 nonce: u64,
419 ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>>;
420
421 /// Returns all transactions that where submitted with the given [TransactionOrigin]
422 fn get_transactions_by_origin(
423 &self,
424 origin: TransactionOrigin,
425 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
426
427 /// Returns all pending transactions filtered by [`TransactionOrigin`]
428 fn get_pending_transactions_by_origin(
429 &self,
430 origin: TransactionOrigin,
431 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
432
433 /// Returns all transactions that where submitted as [TransactionOrigin::Local]
434 fn get_local_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
435 self.get_transactions_by_origin(TransactionOrigin::Local)
436 }
437
438 /// Returns all transactions that where submitted as [TransactionOrigin::Private]
439 fn get_private_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
440 self.get_transactions_by_origin(TransactionOrigin::Private)
441 }
442
443 /// Returns all transactions that where submitted as [TransactionOrigin::External]
444 fn get_external_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
445 self.get_transactions_by_origin(TransactionOrigin::External)
446 }
447
448 /// Returns all pending transactions that where submitted as [TransactionOrigin::Local]
449 fn get_local_pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
450 self.get_pending_transactions_by_origin(TransactionOrigin::Local)
451 }
452
453 /// Returns all pending transactions that where submitted as [TransactionOrigin::Private]
454 fn get_private_pending_transactions(
455 &self,
456 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
457 self.get_pending_transactions_by_origin(TransactionOrigin::Private)
458 }
459
460 /// Returns all pending transactions that where submitted as [TransactionOrigin::External]
461 fn get_external_pending_transactions(
462 &self,
463 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
464 self.get_pending_transactions_by_origin(TransactionOrigin::External)
465 }
466
467 /// Returns a set of all senders of transactions in the pool
468 fn unique_senders(&self) -> HashSet<Address>;
469
470 /// Returns the [BlobTransactionSidecarVariant] for the given transaction hash if it exists in
471 /// the blob store.
472 fn get_blob(
473 &self,
474 tx_hash: TxHash,
475 ) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError>;
476
477 /// Returns all [BlobTransactionSidecarVariant] for the given transaction hashes if they exists
478 /// in the blob store.
479 ///
480 /// This only returns the blobs that were found in the store.
481 /// If there's no blob it will not be returned.
482 fn get_all_blobs(
483 &self,
484 tx_hashes: Vec<TxHash>,
485 ) -> Result<Vec<(TxHash, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError>;
486
487 /// Returns the exact [BlobTransactionSidecarVariant] for the given transaction hashes in the
488 /// order they were requested.
489 ///
490 /// Returns an error if any of the blobs are not found in the blob store.
491 fn get_all_blobs_exact(
492 &self,
493 tx_hashes: Vec<TxHash>,
494 ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError>;
495
496 /// Return the [`BlobAndProofV1`]s for a list of blob versioned hashes.
497 fn get_blobs_for_versioned_hashes_v1(
498 &self,
499 versioned_hashes: &[B256],
500 ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError>;
501
502 /// Return the [`BlobAndProofV2`]s for a list of blob versioned hashes.
503 /// Blobs and proofs are returned only if they are present for _all_ of the requested versioned
504 /// hashes.
505 fn get_blobs_for_versioned_hashes_v2(
506 &self,
507 versioned_hashes: &[B256],
508 ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError>;
509}
510
511/// Extension for [TransactionPool] trait that allows to set the current block info.
512#[auto_impl::auto_impl(&, Arc)]
513pub trait TransactionPoolExt: TransactionPool {
514 /// Sets the current block info for the pool.
515 fn set_block_info(&self, info: BlockInfo);
516
517 /// Event listener for when the pool needs to be updated.
518 ///
519 /// Implementers need to update the pool accordingly:
520 ///
521 /// ## Fee changes
522 ///
523 /// The [`CanonicalStateUpdate`] includes the base and blob fee of the pending block, which
524 /// affects the dynamic fee requirement of pending transactions in the pool.
525 ///
526 /// ## EIP-4844 Blob transactions
527 ///
528 /// Mined blob transactions need to be removed from the pool, but from the pool only. The blob
529 /// sidecar must not be removed from the blob store. Only after a blob transaction is
530 /// finalized, its sidecar is removed from the blob store. This ensures that in case of a reorg,
531 /// the sidecar is still available.
532 fn on_canonical_state_change<B>(&self, update: CanonicalStateUpdate<'_, B>)
533 where
534 B: Block;
535
536 /// Updates the accounts in the pool
537 fn update_accounts(&self, accounts: Vec<ChangedAccount>);
538
539 /// Deletes the blob sidecar for the given transaction from the blob store
540 fn delete_blob(&self, tx: B256);
541
542 /// Deletes multiple blob sidecars from the blob store
543 fn delete_blobs(&self, txs: Vec<B256>);
544
545 /// Maintenance function to cleanup blobs that are no longer needed.
546 fn cleanup_blobs(&self);
547}
548
549/// A Helper type that bundles all transactions in the pool.
550#[derive(Debug, Clone)]
551pub struct AllPoolTransactions<T: PoolTransaction> {
552 /// Transactions that are ready for inclusion in the next block.
553 pub pending: Vec<Arc<ValidPoolTransaction<T>>>,
554 /// Transactions that are ready for inclusion in _future_ blocks, but are currently parked,
555 /// because they depend on other transactions that are not yet included in the pool (nonce gap)
556 /// or otherwise blocked.
557 pub queued: Vec<Arc<ValidPoolTransaction<T>>>,
558}
559
560// === impl AllPoolTransactions ===
561
562impl<T: PoolTransaction> AllPoolTransactions<T> {
563 /// Returns an iterator over all pending [`Recovered`] transactions.
564 pub fn pending_recovered(&self) -> impl Iterator<Item = Recovered<T::Consensus>> + '_ {
565 self.pending.iter().map(|tx| tx.transaction.clone().into_consensus())
566 }
567
568 /// Returns an iterator over all queued [`Recovered`] transactions.
569 pub fn queued_recovered(&self) -> impl Iterator<Item = Recovered<T::Consensus>> + '_ {
570 self.queued.iter().map(|tx| tx.transaction.clone().into_consensus())
571 }
572
573 /// Returns an iterator over all transactions, both pending and queued.
574 pub fn all(&self) -> impl Iterator<Item = Recovered<T::Consensus>> + '_ {
575 self.pending
576 .iter()
577 .chain(self.queued.iter())
578 .map(|tx| tx.transaction.clone().into_consensus())
579 }
580}
581
582impl<T: PoolTransaction> Default for AllPoolTransactions<T> {
583 fn default() -> Self {
584 Self { pending: Default::default(), queued: Default::default() }
585 }
586}
587
588/// Represents transactions that were propagated over the network.
589#[derive(Debug, Clone, Eq, PartialEq, Default)]
590pub struct PropagatedTransactions(pub HashMap<TxHash, Vec<PropagateKind>>);
591
592/// Represents how a transaction was propagated over the network.
593#[derive(Debug, Copy, Clone, Eq, PartialEq)]
594#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
595pub enum PropagateKind {
596 /// The full transaction object was sent to the peer.
597 ///
598 /// This is equivalent to the `Transaction` message
599 Full(PeerId),
600 /// Only the Hash was propagated to the peer.
601 Hash(PeerId),
602}
603
604// === impl PropagateKind ===
605
606impl PropagateKind {
607 /// Returns the peer the transaction was sent to
608 pub const fn peer(&self) -> &PeerId {
609 match self {
610 Self::Full(peer) | Self::Hash(peer) => peer,
611 }
612 }
613
614 /// Returns true if the transaction was sent as a full transaction
615 pub const fn is_full(&self) -> bool {
616 matches!(self, Self::Full(_))
617 }
618
619 /// Returns true if the transaction was sent as a hash
620 pub const fn is_hash(&self) -> bool {
621 matches!(self, Self::Hash(_))
622 }
623}
624
625impl From<PropagateKind> for PeerId {
626 fn from(value: PropagateKind) -> Self {
627 match value {
628 PropagateKind::Full(peer) | PropagateKind::Hash(peer) => peer,
629 }
630 }
631}
632
633/// This type represents a new blob sidecar that has been stored in the transaction pool's
634/// blobstore; it includes the `TransactionHash` of the blob transaction along with the assoc.
635/// sidecar (blobs, commitments, proofs)
636#[derive(Debug, Clone)]
637pub struct NewBlobSidecar {
638 /// hash of the EIP-4844 transaction.
639 pub tx_hash: TxHash,
640 /// the blob transaction sidecar.
641 pub sidecar: Arc<BlobTransactionSidecarVariant>,
642}
643
644/// Where the transaction originates from.
645///
646/// Depending on where the transaction was picked up, it affects how the transaction is handled
647/// internally, e.g. limits for simultaneous transaction of one sender.
648#[derive(Debug, Copy, Clone, PartialEq, Eq, Default)]
649pub enum TransactionOrigin {
650 /// Transaction is coming from a local source.
651 #[default]
652 Local,
653 /// Transaction has been received externally.
654 ///
655 /// This is usually considered an "untrusted" source, for example received from another in the
656 /// network.
657 External,
658 /// Transaction is originated locally and is intended to remain private.
659 ///
660 /// This type of transaction should not be propagated to the network. It's meant for
661 /// private usage within the local node only.
662 Private,
663}
664
665// === impl TransactionOrigin ===
666
667impl TransactionOrigin {
668 /// Whether the transaction originates from a local source.
669 pub const fn is_local(&self) -> bool {
670 matches!(self, Self::Local)
671 }
672
673 /// Whether the transaction originates from an external source.
674 pub const fn is_external(&self) -> bool {
675 matches!(self, Self::External)
676 }
677 /// Whether the transaction originates from a private source.
678 pub const fn is_private(&self) -> bool {
679 matches!(self, Self::Private)
680 }
681}
682
683/// Represents the kind of update to the canonical state.
684#[derive(Debug, Clone, Copy, PartialEq, Eq)]
685pub enum PoolUpdateKind {
686 /// The update was due to a block commit.
687 Commit,
688 /// The update was due to a reorganization.
689 Reorg,
690}
691
692/// Represents changes after a new canonical block or range of canonical blocks was added to the
693/// chain.
694///
695/// It is expected that this is only used if the added blocks are canonical to the pool's last known
696/// block hash. In other words, the first added block of the range must be the child of the last
697/// known block hash.
698///
699/// This is used to update the pool state accordingly.
700#[derive(Clone, Debug)]
701pub struct CanonicalStateUpdate<'a, B: Block> {
702 /// Hash of the tip block.
703 pub new_tip: &'a SealedBlock<B>,
704 /// EIP-1559 Base fee of the _next_ (pending) block
705 ///
706 /// The base fee of a block depends on the utilization of the last block and its base fee.
707 pub pending_block_base_fee: u64,
708 /// EIP-4844 blob fee of the _next_ (pending) block
709 ///
710 /// Only after Cancun
711 pub pending_block_blob_fee: Option<u128>,
712 /// A set of changed accounts across a range of blocks.
713 pub changed_accounts: Vec<ChangedAccount>,
714 /// All mined transactions in the block range.
715 pub mined_transactions: Vec<B256>,
716 /// The kind of update to the canonical state.
717 pub update_kind: PoolUpdateKind,
718}
719
720impl<B> CanonicalStateUpdate<'_, B>
721where
722 B: Block,
723{
724 /// Returns the number of the tip block.
725 pub fn number(&self) -> u64 {
726 self.new_tip.number()
727 }
728
729 /// Returns the hash of the tip block.
730 pub fn hash(&self) -> B256 {
731 self.new_tip.hash()
732 }
733
734 /// Timestamp of the latest chain update
735 pub fn timestamp(&self) -> u64 {
736 self.new_tip.timestamp()
737 }
738
739 /// Returns the block info for the tip block.
740 pub fn block_info(&self) -> BlockInfo {
741 BlockInfo {
742 block_gas_limit: self.new_tip.gas_limit(),
743 last_seen_block_hash: self.hash(),
744 last_seen_block_number: self.number(),
745 pending_basefee: self.pending_block_base_fee,
746 pending_blob_fee: self.pending_block_blob_fee,
747 }
748 }
749}
750
751impl<B> fmt::Display for CanonicalStateUpdate<'_, B>
752where
753 B: Block,
754{
755 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
756 f.debug_struct("CanonicalStateUpdate")
757 .field("hash", &self.hash())
758 .field("number", &self.number())
759 .field("pending_block_base_fee", &self.pending_block_base_fee)
760 .field("pending_block_blob_fee", &self.pending_block_blob_fee)
761 .field("changed_accounts", &self.changed_accounts.len())
762 .field("mined_transactions", &self.mined_transactions.len())
763 .finish()
764 }
765}
766
767/// Alias to restrict the [`BestTransactions`] items to the pool's transaction type.
768pub type BestTransactionsFor<Pool> = Box<
769 dyn BestTransactions<Item = Arc<ValidPoolTransaction<<Pool as TransactionPool>::Transaction>>>,
770>;
771
772/// An `Iterator` that only returns transactions that are ready to be executed.
773///
774/// This makes no assumptions about the order of the transactions, but expects that _all_
775/// transactions are valid (no nonce gaps.) for the tracked state of the pool.
776///
777/// Note: this iterator will always return the best transaction that it currently knows.
778/// There is no guarantee transactions will be returned sequentially in decreasing
779/// priority order.
780pub trait BestTransactions: Iterator + Send {
781 /// Mark the transaction as invalid.
782 ///
783 /// Implementers must ensure all subsequent transaction _don't_ depend on this transaction.
784 /// In other words, this must remove the given transaction _and_ drain all transaction that
785 /// depend on it.
786 fn mark_invalid(&mut self, transaction: &Self::Item, kind: InvalidPoolTransactionError);
787
788 /// An iterator may be able to receive additional pending transactions that weren't present it
789 /// the pool when it was created.
790 ///
791 /// This ensures that iterator will return the best transaction that it currently knows and not
792 /// listen to pool updates.
793 fn no_updates(&mut self);
794
795 /// Convenience function for [`Self::no_updates`] that returns the iterator again.
796 fn without_updates(mut self) -> Self
797 where
798 Self: Sized,
799 {
800 self.no_updates();
801 self
802 }
803
804 /// Skip all blob transactions.
805 ///
806 /// There's only limited blob space available in a block, once exhausted, EIP-4844 transactions
807 /// can no longer be included.
808 ///
809 /// If called then the iterator will no longer yield blob transactions.
810 ///
811 /// Note: this will also exclude any transactions that depend on blob transactions.
812 fn skip_blobs(&mut self) {
813 self.set_skip_blobs(true);
814 }
815
816 /// Controls whether the iterator skips blob transactions or not.
817 ///
818 /// If set to true, no blob transactions will be returned.
819 fn set_skip_blobs(&mut self, skip_blobs: bool);
820
821 /// Convenience function for [`Self::skip_blobs`] that returns the iterator again.
822 fn without_blobs(mut self) -> Self
823 where
824 Self: Sized,
825 {
826 self.skip_blobs();
827 self
828 }
829
830 /// Creates an iterator which uses a closure to determine whether a transaction should be
831 /// returned by the iterator.
832 ///
833 /// All items the closure returns false for are marked as invalid via [`Self::mark_invalid`] and
834 /// descendant transactions will be skipped.
835 fn filter_transactions<P>(self, predicate: P) -> BestTransactionFilter<Self, P>
836 where
837 P: FnMut(&Self::Item) -> bool,
838 Self: Sized,
839 {
840 BestTransactionFilter::new(self, predicate)
841 }
842}
843
844impl<T> BestTransactions for Box<T>
845where
846 T: BestTransactions + ?Sized,
847{
848 fn mark_invalid(&mut self, transaction: &Self::Item, kind: InvalidPoolTransactionError) {
849 (**self).mark_invalid(transaction, kind)
850 }
851
852 fn no_updates(&mut self) {
853 (**self).no_updates();
854 }
855
856 fn skip_blobs(&mut self) {
857 (**self).skip_blobs();
858 }
859
860 fn set_skip_blobs(&mut self, skip_blobs: bool) {
861 (**self).set_skip_blobs(skip_blobs);
862 }
863}
864
865/// A no-op implementation that yields no transactions.
866impl<T> BestTransactions for std::iter::Empty<T> {
867 fn mark_invalid(&mut self, _tx: &T, _kind: InvalidPoolTransactionError) {}
868
869 fn no_updates(&mut self) {}
870
871 fn skip_blobs(&mut self) {}
872
873 fn set_skip_blobs(&mut self, _skip_blobs: bool) {}
874}
875
876/// A filter that allows to check if a transaction satisfies a set of conditions
877pub trait TransactionFilter {
878 /// The type of the transaction to check.
879 type Transaction;
880
881 /// Returns true if the transaction satisfies the conditions.
882 fn is_valid(&self, transaction: &Self::Transaction) -> bool;
883}
884
885/// A no-op implementation of [`TransactionFilter`] which
886/// marks all transactions as valid.
887#[derive(Debug, Clone)]
888pub struct NoopTransactionFilter<T>(std::marker::PhantomData<T>);
889
890// We can't derive Default because this forces T to be
891// Default as well, which isn't necessary.
892impl<T> Default for NoopTransactionFilter<T> {
893 fn default() -> Self {
894 Self(std::marker::PhantomData)
895 }
896}
897
898impl<T> TransactionFilter for NoopTransactionFilter<T> {
899 type Transaction = T;
900
901 fn is_valid(&self, _transaction: &Self::Transaction) -> bool {
902 true
903 }
904}
905
906/// A Helper type that bundles the best transactions attributes together.
907#[derive(Debug, Copy, Clone, PartialEq, Eq)]
908pub struct BestTransactionsAttributes {
909 /// The base fee attribute for best transactions.
910 pub basefee: u64,
911 /// The blob fee attribute for best transactions.
912 pub blob_fee: Option<u64>,
913}
914
915// === impl BestTransactionsAttributes ===
916
917impl BestTransactionsAttributes {
918 /// Creates a new `BestTransactionsAttributes` with the given basefee and blob fee.
919 pub const fn new(basefee: u64, blob_fee: Option<u64>) -> Self {
920 Self { basefee, blob_fee }
921 }
922
923 /// Creates a new `BestTransactionsAttributes` with the given basefee.
924 pub const fn base_fee(basefee: u64) -> Self {
925 Self::new(basefee, None)
926 }
927
928 /// Sets the given blob fee.
929 pub const fn with_blob_fee(mut self, blob_fee: u64) -> Self {
930 self.blob_fee = Some(blob_fee);
931 self
932 }
933}
934
935/// Trait for transaction types used inside the pool.
936///
937/// This supports two transaction formats
938/// - Consensus format: the form the transaction takes when it is included in a block.
939/// - Pooled format: the form the transaction takes when it is gossiping around the network.
940///
941/// This distinction is necessary for the EIP-4844 blob transactions, which require an additional
942/// sidecar when they are gossiped around the network. It is expected that the `Consensus` format is
943/// a subset of the `Pooled` format.
944///
945/// The assumption is that fallible conversion from `Consensus` to `Pooled` will encapsulate
946/// handling of all valid `Consensus` transactions that can't be pooled (e.g Deposit transactions or
947/// blob-less EIP-4844 transactions).
948pub trait PoolTransaction:
949 alloy_consensus::Transaction + InMemorySize + Debug + Send + Sync + Clone
950{
951 /// Associated error type for the `try_from_consensus` method.
952 type TryFromConsensusError: fmt::Display;
953
954 /// Associated type representing the raw consensus variant of the transaction.
955 type Consensus: SignedTransaction + From<Self::Pooled>;
956
957 /// Associated type representing the recovered pooled variant of the transaction.
958 type Pooled: TryFrom<Self::Consensus, Error = Self::TryFromConsensusError> + SignedTransaction;
959
960 /// Define a method to convert from the `Consensus` type to `Self`
961 ///
962 /// Note: this _must_ fail on any transactions that cannot be pooled (e.g OP Deposit
963 /// transactions).
964 fn try_from_consensus(
965 tx: Recovered<Self::Consensus>,
966 ) -> Result<Self, Self::TryFromConsensusError> {
967 let (tx, signer) = tx.into_parts();
968 Ok(Self::from_pooled(Recovered::new_unchecked(tx.try_into()?, signer)))
969 }
970
971 /// Clone the transaction into a consensus variant.
972 ///
973 /// This method is preferred when the [`PoolTransaction`] already wraps the consensus variant.
974 fn clone_into_consensus(&self) -> Recovered<Self::Consensus> {
975 self.clone().into_consensus()
976 }
977
978 /// Define a method to convert from the `Self` type to `Consensus`
979 fn into_consensus(self) -> Recovered<Self::Consensus>;
980
981 /// Converts the transaction into consensus format while preserving the EIP-2718 encoded bytes.
982 /// This is used to optimize transaction execution by reusing cached encoded bytes instead of
983 /// re-encoding the transaction. The cached bytes are particularly useful in payload building
984 /// where the same transaction may be executed multiple times.
985 fn into_consensus_with2718(self) -> WithEncoded<Recovered<Self::Consensus>> {
986 self.into_consensus().into_encoded()
987 }
988
989 /// Define a method to convert from the `Pooled` type to `Self`
990 fn from_pooled(pooled: Recovered<Self::Pooled>) -> Self;
991
992 /// Tries to convert the `Consensus` type into the `Pooled` type.
993 fn try_into_pooled(self) -> Result<Recovered<Self::Pooled>, Self::TryFromConsensusError> {
994 let consensus = self.into_consensus();
995 let (tx, signer) = consensus.into_parts();
996 Ok(Recovered::new_unchecked(tx.try_into()?, signer))
997 }
998
999 /// Converts the `Pooled` type into the `Consensus` type.
1000 fn pooled_into_consensus(tx: Self::Pooled) -> Self::Consensus {
1001 tx.into()
1002 }
1003
1004 /// Hash of the transaction.
1005 fn hash(&self) -> &TxHash;
1006
1007 /// The Sender of the transaction.
1008 fn sender(&self) -> Address;
1009
1010 /// Reference to the Sender of the transaction.
1011 fn sender_ref(&self) -> &Address;
1012
1013 /// Returns the cost that this transaction is allowed to consume:
1014 ///
1015 /// For EIP-1559 transactions: `max_fee_per_gas * gas_limit + tx_value`.
1016 /// For legacy transactions: `gas_price * gas_limit + tx_value`.
1017 /// For EIP-4844 blob transactions: `max_fee_per_gas * gas_limit + tx_value +
1018 /// max_blob_fee_per_gas * blob_gas_used`.
1019 fn cost(&self) -> &U256;
1020
1021 /// Returns the length of the rlp encoded transaction object
1022 ///
1023 /// Note: Implementations should cache this value.
1024 fn encoded_length(&self) -> usize;
1025
1026 /// Ensures that the transaction's code size does not exceed the provided `max_init_code_size`.
1027 ///
1028 /// This is specifically relevant for contract creation transactions ([`TxKind::Create`]),
1029 /// where the input data contains the initialization code. If the input code size exceeds
1030 /// the configured limit, an [`InvalidPoolTransactionError::ExceedsMaxInitCodeSize`] error is
1031 /// returned.
1032 fn ensure_max_init_code_size(
1033 &self,
1034 max_init_code_size: usize,
1035 ) -> Result<(), InvalidPoolTransactionError> {
1036 let input_len = self.input().len();
1037 if self.is_create() && input_len > max_init_code_size {
1038 Err(InvalidPoolTransactionError::ExceedsMaxInitCodeSize(input_len, max_init_code_size))
1039 } else {
1040 Ok(())
1041 }
1042 }
1043}
1044
1045/// Super trait for transactions that can be converted to and from Eth transactions intended for the
1046/// ethereum style pool.
1047///
1048/// This extends the [`PoolTransaction`] trait with additional methods that are specific to the
1049/// Ethereum pool.
1050pub trait EthPoolTransaction: PoolTransaction {
1051 /// Extracts the blob sidecar from the transaction.
1052 fn take_blob(&mut self) -> EthBlobTransactionSidecar;
1053
1054 /// A specialization for the EIP-4844 transaction type.
1055 /// Tries to reattach the blob sidecar to the transaction.
1056 ///
1057 /// This returns an option, but callers should ensure that the transaction is an EIP-4844
1058 /// transaction: [`Typed2718::is_eip4844`].
1059 fn try_into_pooled_eip4844(
1060 self,
1061 sidecar: Arc<BlobTransactionSidecarVariant>,
1062 ) -> Option<Recovered<Self::Pooled>>;
1063
1064 /// Tries to convert the `Consensus` type with a blob sidecar into the `Pooled` type.
1065 ///
1066 /// Returns `None` if passed transaction is not a blob transaction.
1067 fn try_from_eip4844(
1068 tx: Recovered<Self::Consensus>,
1069 sidecar: BlobTransactionSidecarVariant,
1070 ) -> Option<Self>;
1071
1072 /// Validates the blob sidecar of the transaction with the given settings.
1073 fn validate_blob(
1074 &self,
1075 blob: &BlobTransactionSidecarVariant,
1076 settings: &KzgSettings,
1077 ) -> Result<(), BlobTransactionValidationError>;
1078}
1079
1080/// The default [`PoolTransaction`] for the [Pool](crate::Pool) for Ethereum.
1081///
1082/// This type is essentially a wrapper around [`Recovered`] with additional
1083/// fields derived from the transaction that are frequently used by the pools for ordering.
1084#[derive(Debug, Clone, PartialEq, Eq)]
1085pub struct EthPooledTransaction<T = TransactionSigned> {
1086 /// `EcRecovered` transaction, the consensus format.
1087 pub transaction: Recovered<T>,
1088
1089 /// For EIP-1559 transactions: `max_fee_per_gas * gas_limit + tx_value`.
1090 /// For legacy transactions: `gas_price * gas_limit + tx_value`.
1091 /// For EIP-4844 blob transactions: `max_fee_per_gas * gas_limit + tx_value +
1092 /// max_blob_fee_per_gas * blob_gas_used`.
1093 pub cost: U256,
1094
1095 /// This is the RLP length of the transaction, computed when the transaction is added to the
1096 /// pool.
1097 pub encoded_length: usize,
1098
1099 /// The blob side car for this transaction
1100 pub blob_sidecar: EthBlobTransactionSidecar,
1101}
1102
1103impl<T: SignedTransaction> EthPooledTransaction<T> {
1104 /// Create new instance of [Self].
1105 ///
1106 /// Caution: In case of blob transactions, this does marks the blob sidecar as
1107 /// [`EthBlobTransactionSidecar::Missing`]
1108 pub fn new(transaction: Recovered<T>, encoded_length: usize) -> Self {
1109 let mut blob_sidecar = EthBlobTransactionSidecar::None;
1110
1111 let gas_cost = U256::from(transaction.max_fee_per_gas())
1112 .saturating_mul(U256::from(transaction.gas_limit()));
1113
1114 let mut cost = gas_cost.saturating_add(transaction.value());
1115
1116 if let (Some(blob_gas_used), Some(max_fee_per_blob_gas)) =
1117 (transaction.blob_gas_used(), transaction.max_fee_per_blob_gas())
1118 {
1119 // Add max blob cost using saturating math to avoid overflow
1120 cost = cost.saturating_add(U256::from(
1121 max_fee_per_blob_gas.saturating_mul(blob_gas_used as u128),
1122 ));
1123
1124 // because the blob sidecar is not included in this transaction variant, mark it as
1125 // missing
1126 blob_sidecar = EthBlobTransactionSidecar::Missing;
1127 }
1128
1129 Self { transaction, cost, encoded_length, blob_sidecar }
1130 }
1131
1132 /// Return the reference to the underlying transaction.
1133 pub const fn transaction(&self) -> &Recovered<T> {
1134 &self.transaction
1135 }
1136}
1137
1138impl PoolTransaction for EthPooledTransaction {
1139 type TryFromConsensusError = ValueError<TransactionSigned>;
1140
1141 type Consensus = TransactionSigned;
1142
1143 type Pooled = PooledTransactionVariant;
1144
1145 fn clone_into_consensus(&self) -> Recovered<Self::Consensus> {
1146 self.transaction().clone()
1147 }
1148
1149 fn into_consensus(self) -> Recovered<Self::Consensus> {
1150 self.transaction
1151 }
1152
1153 fn from_pooled(tx: Recovered<Self::Pooled>) -> Self {
1154 let encoded_length = tx.encode_2718_len();
1155 let (tx, signer) = tx.into_parts();
1156 match tx {
1157 PooledTransactionVariant::Eip4844(tx) => {
1158 // include the blob sidecar
1159 let (tx, sig, hash) = tx.into_parts();
1160 let (tx, blob) = tx.into_parts();
1161 let tx = Signed::new_unchecked(tx, sig, hash);
1162 let tx = TransactionSigned::from(tx);
1163 let tx = Recovered::new_unchecked(tx, signer);
1164 let mut pooled = Self::new(tx, encoded_length);
1165 pooled.blob_sidecar = EthBlobTransactionSidecar::Present(blob);
1166 pooled
1167 }
1168 tx => {
1169 // no blob sidecar
1170 let tx = Recovered::new_unchecked(tx.into(), signer);
1171 Self::new(tx, encoded_length)
1172 }
1173 }
1174 }
1175
1176 /// Returns hash of the transaction.
1177 fn hash(&self) -> &TxHash {
1178 self.transaction.tx_hash()
1179 }
1180
1181 /// Returns the Sender of the transaction.
1182 fn sender(&self) -> Address {
1183 self.transaction.signer()
1184 }
1185
1186 /// Returns a reference to the Sender of the transaction.
1187 fn sender_ref(&self) -> &Address {
1188 self.transaction.signer_ref()
1189 }
1190
1191 /// Returns the cost that this transaction is allowed to consume:
1192 ///
1193 /// For EIP-1559 transactions: `max_fee_per_gas * gas_limit + tx_value`.
1194 /// For legacy transactions: `gas_price * gas_limit + tx_value`.
1195 /// For EIP-4844 blob transactions: `max_fee_per_gas * gas_limit + tx_value +
1196 /// max_blob_fee_per_gas * blob_gas_used`.
1197 fn cost(&self) -> &U256 {
1198 &self.cost
1199 }
1200
1201 /// Returns the length of the rlp encoded object
1202 fn encoded_length(&self) -> usize {
1203 self.encoded_length
1204 }
1205}
1206
1207impl<T: Typed2718> Typed2718 for EthPooledTransaction<T> {
1208 fn ty(&self) -> u8 {
1209 self.transaction.ty()
1210 }
1211}
1212
1213impl<T: InMemorySize> InMemorySize for EthPooledTransaction<T> {
1214 fn size(&self) -> usize {
1215 self.transaction.size()
1216 }
1217}
1218
1219impl<T: alloy_consensus::Transaction> alloy_consensus::Transaction for EthPooledTransaction<T> {
1220 fn chain_id(&self) -> Option<alloy_primitives::ChainId> {
1221 self.transaction.chain_id()
1222 }
1223
1224 fn nonce(&self) -> u64 {
1225 self.transaction.nonce()
1226 }
1227
1228 fn gas_limit(&self) -> u64 {
1229 self.transaction.gas_limit()
1230 }
1231
1232 fn gas_price(&self) -> Option<u128> {
1233 self.transaction.gas_price()
1234 }
1235
1236 fn max_fee_per_gas(&self) -> u128 {
1237 self.transaction.max_fee_per_gas()
1238 }
1239
1240 fn max_priority_fee_per_gas(&self) -> Option<u128> {
1241 self.transaction.max_priority_fee_per_gas()
1242 }
1243
1244 fn max_fee_per_blob_gas(&self) -> Option<u128> {
1245 self.transaction.max_fee_per_blob_gas()
1246 }
1247
1248 fn priority_fee_or_price(&self) -> u128 {
1249 self.transaction.priority_fee_or_price()
1250 }
1251
1252 fn effective_gas_price(&self, base_fee: Option<u64>) -> u128 {
1253 self.transaction.effective_gas_price(base_fee)
1254 }
1255
1256 fn is_dynamic_fee(&self) -> bool {
1257 self.transaction.is_dynamic_fee()
1258 }
1259
1260 fn kind(&self) -> TxKind {
1261 self.transaction.kind()
1262 }
1263
1264 fn is_create(&self) -> bool {
1265 self.transaction.is_create()
1266 }
1267
1268 fn value(&self) -> U256 {
1269 self.transaction.value()
1270 }
1271
1272 fn input(&self) -> &Bytes {
1273 self.transaction.input()
1274 }
1275
1276 fn access_list(&self) -> Option<&AccessList> {
1277 self.transaction.access_list()
1278 }
1279
1280 fn blob_versioned_hashes(&self) -> Option<&[B256]> {
1281 self.transaction.blob_versioned_hashes()
1282 }
1283
1284 fn authorization_list(&self) -> Option<&[SignedAuthorization]> {
1285 self.transaction.authorization_list()
1286 }
1287}
1288
1289impl EthPoolTransaction for EthPooledTransaction {
1290 fn take_blob(&mut self) -> EthBlobTransactionSidecar {
1291 if self.is_eip4844() {
1292 std::mem::replace(&mut self.blob_sidecar, EthBlobTransactionSidecar::Missing)
1293 } else {
1294 EthBlobTransactionSidecar::None
1295 }
1296 }
1297
1298 fn try_into_pooled_eip4844(
1299 self,
1300 sidecar: Arc<BlobTransactionSidecarVariant>,
1301 ) -> Option<Recovered<Self::Pooled>> {
1302 let (signed_transaction, signer) = self.into_consensus().into_parts();
1303 let pooled_transaction =
1304 signed_transaction.try_into_pooled_eip4844(Arc::unwrap_or_clone(sidecar)).ok()?;
1305
1306 Some(Recovered::new_unchecked(pooled_transaction, signer))
1307 }
1308
1309 fn try_from_eip4844(
1310 tx: Recovered<Self::Consensus>,
1311 sidecar: BlobTransactionSidecarVariant,
1312 ) -> Option<Self> {
1313 let (tx, signer) = tx.into_parts();
1314 tx.try_into_pooled_eip4844(sidecar)
1315 .ok()
1316 .map(|tx| tx.with_signer(signer))
1317 .map(Self::from_pooled)
1318 }
1319
1320 fn validate_blob(
1321 &self,
1322 sidecar: &BlobTransactionSidecarVariant,
1323 settings: &KzgSettings,
1324 ) -> Result<(), BlobTransactionValidationError> {
1325 match self.transaction.inner().as_eip4844() {
1326 Some(tx) => tx.tx().validate_blob(sidecar, settings),
1327 _ => Err(BlobTransactionValidationError::NotBlobTransaction(self.ty())),
1328 }
1329 }
1330}
1331
1332/// Represents the blob sidecar of the [`EthPooledTransaction`].
1333#[derive(Debug, Clone, PartialEq, Eq)]
1334pub enum EthBlobTransactionSidecar {
1335 /// This transaction does not have a blob sidecar
1336 None,
1337 /// This transaction has a blob sidecar (EIP-4844) but it is missing
1338 ///
1339 /// It was either extracted after being inserted into the pool or re-injected after reorg
1340 /// without the blob sidecar
1341 Missing,
1342 /// The eip-4844 transaction was pulled from the network and still has its blob sidecar
1343 Present(BlobTransactionSidecarVariant),
1344}
1345
1346impl EthBlobTransactionSidecar {
1347 /// Returns the blob sidecar if it is present
1348 pub const fn maybe_sidecar(&self) -> Option<&BlobTransactionSidecarVariant> {
1349 match self {
1350 Self::Present(sidecar) => Some(sidecar),
1351 _ => None,
1352 }
1353 }
1354}
1355
1356/// Represents the current status of the pool.
1357#[derive(Debug, Clone, Copy, Default)]
1358pub struct PoolSize {
1359 /// Number of transactions in the _pending_ sub-pool.
1360 pub pending: usize,
1361 /// Reported size of transactions in the _pending_ sub-pool.
1362 pub pending_size: usize,
1363 /// Number of transactions in the _blob_ pool.
1364 pub blob: usize,
1365 /// Reported size of transactions in the _blob_ pool.
1366 pub blob_size: usize,
1367 /// Number of transactions in the _basefee_ pool.
1368 pub basefee: usize,
1369 /// Reported size of transactions in the _basefee_ sub-pool.
1370 pub basefee_size: usize,
1371 /// Number of transactions in the _queued_ sub-pool.
1372 pub queued: usize,
1373 /// Reported size of transactions in the _queued_ sub-pool.
1374 pub queued_size: usize,
1375 /// Number of all transactions of all sub-pools
1376 ///
1377 /// Note: this is the sum of ```pending + basefee + queued```
1378 pub total: usize,
1379}
1380
1381// === impl PoolSize ===
1382
1383impl PoolSize {
1384 /// Asserts that the invariants of the pool size are met.
1385 #[cfg(test)]
1386 pub(crate) fn assert_invariants(&self) {
1387 assert_eq!(self.total, self.pending + self.basefee + self.queued + self.blob);
1388 }
1389}
1390
1391/// Represents the current status of the pool.
1392#[derive(Default, Debug, Clone, Copy, Eq, PartialEq)]
1393pub struct BlockInfo {
1394 /// Hash for the currently tracked block.
1395 pub last_seen_block_hash: B256,
1396 /// Currently tracked block.
1397 pub last_seen_block_number: u64,
1398 /// Current block gas limit for the latest block.
1399 pub block_gas_limit: u64,
1400 /// Currently enforced base fee: the threshold for the basefee sub-pool.
1401 ///
1402 /// Note: this is the derived base fee of the _next_ block that builds on the block the pool is
1403 /// currently tracking.
1404 pub pending_basefee: u64,
1405 /// Currently enforced blob fee: the threshold for eip-4844 blob transactions.
1406 ///
1407 /// Note: this is the derived blob fee of the _next_ block that builds on the block the pool is
1408 /// currently tracking
1409 pub pending_blob_fee: Option<u128>,
1410}
1411
1412/// The limit to enforce for [`TransactionPool::get_pooled_transaction_elements`].
1413#[derive(Debug, Clone, Copy, Eq, PartialEq)]
1414pub enum GetPooledTransactionLimit {
1415 /// No limit, return all transactions.
1416 None,
1417 /// Enforce a size limit on the returned transactions, for example 2MB
1418 ResponseSizeSoftLimit(usize),
1419}
1420
1421impl GetPooledTransactionLimit {
1422 /// Returns true if the given size exceeds the limit.
1423 #[inline]
1424 pub const fn exceeds(&self, size: usize) -> bool {
1425 match self {
1426 Self::None => false,
1427 Self::ResponseSizeSoftLimit(limit) => size > *limit,
1428 }
1429 }
1430}
1431
1432/// A Stream that yields full transactions the subpool
1433#[must_use = "streams do nothing unless polled"]
1434#[derive(Debug)]
1435pub struct NewSubpoolTransactionStream<Tx: PoolTransaction> {
1436 st: Receiver<NewTransactionEvent<Tx>>,
1437 subpool: SubPool,
1438}
1439
1440// === impl NewSubpoolTransactionStream ===
1441
1442impl<Tx: PoolTransaction> NewSubpoolTransactionStream<Tx> {
1443 /// Create a new stream that yields full transactions from the subpool
1444 pub const fn new(st: Receiver<NewTransactionEvent<Tx>>, subpool: SubPool) -> Self {
1445 Self { st, subpool }
1446 }
1447
1448 /// Tries to receive the next value for this stream.
1449 pub fn try_recv(
1450 &mut self,
1451 ) -> Result<NewTransactionEvent<Tx>, tokio::sync::mpsc::error::TryRecvError> {
1452 loop {
1453 match self.st.try_recv() {
1454 Ok(event) => {
1455 if event.subpool == self.subpool {
1456 return Ok(event)
1457 }
1458 }
1459 Err(e) => return Err(e),
1460 }
1461 }
1462 }
1463}
1464
1465impl<Tx: PoolTransaction> Stream for NewSubpoolTransactionStream<Tx> {
1466 type Item = NewTransactionEvent<Tx>;
1467
1468 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1469 loop {
1470 match ready!(self.st.poll_recv(cx)) {
1471 Some(event) => {
1472 if event.subpool == self.subpool {
1473 return Poll::Ready(Some(event))
1474 }
1475 }
1476 None => return Poll::Ready(None),
1477 }
1478 }
1479 }
1480}
1481
1482#[cfg(test)]
1483mod tests {
1484 use super::*;
1485 use alloy_consensus::{
1486 EthereumTxEnvelope, SignableTransaction, TxEip1559, TxEip2930, TxEip4844, TxEip7702,
1487 TxEnvelope, TxLegacy,
1488 };
1489 use alloy_eips::eip4844::DATA_GAS_PER_BLOB;
1490 use alloy_primitives::Signature;
1491
1492 #[test]
1493 fn test_pool_size_invariants() {
1494 let pool_size = PoolSize {
1495 pending: 10,
1496 pending_size: 1000,
1497 blob: 5,
1498 blob_size: 500,
1499 basefee: 8,
1500 basefee_size: 800,
1501 queued: 7,
1502 queued_size: 700,
1503 total: 10 + 5 + 8 + 7, // Correct total
1504 };
1505
1506 // Call the assert_invariants method to check if the invariants are correct
1507 pool_size.assert_invariants();
1508 }
1509
1510 #[test]
1511 #[should_panic]
1512 fn test_pool_size_invariants_fail() {
1513 let pool_size = PoolSize {
1514 pending: 10,
1515 pending_size: 1000,
1516 blob: 5,
1517 blob_size: 500,
1518 basefee: 8,
1519 basefee_size: 800,
1520 queued: 7,
1521 queued_size: 700,
1522 total: 10 + 5 + 8, // Incorrect total
1523 };
1524
1525 // Call the assert_invariants method, which should panic
1526 pool_size.assert_invariants();
1527 }
1528
1529 #[test]
1530 fn test_eth_pooled_transaction_new_legacy() {
1531 // Create a legacy transaction with specific parameters
1532 let tx = TxEnvelope::Legacy(
1533 TxLegacy {
1534 gas_price: 10,
1535 gas_limit: 1000,
1536 value: U256::from(100),
1537 ..Default::default()
1538 }
1539 .into_signed(Signature::test_signature()),
1540 );
1541 let transaction = Recovered::new_unchecked(tx, Default::default());
1542 let pooled_tx = EthPooledTransaction::new(transaction.clone(), 200);
1543
1544 // Check that the pooled transaction is created correctly
1545 assert_eq!(pooled_tx.transaction, transaction);
1546 assert_eq!(pooled_tx.encoded_length, 200);
1547 assert_eq!(pooled_tx.blob_sidecar, EthBlobTransactionSidecar::None);
1548 assert_eq!(pooled_tx.cost, U256::from(100) + U256::from(10 * 1000));
1549 }
1550
1551 #[test]
1552 fn test_eth_pooled_transaction_new_eip2930() {
1553 // Create an EIP-2930 transaction with specific parameters
1554 let tx = TxEnvelope::Eip2930(
1555 TxEip2930 {
1556 gas_price: 10,
1557 gas_limit: 1000,
1558 value: U256::from(100),
1559 ..Default::default()
1560 }
1561 .into_signed(Signature::test_signature()),
1562 );
1563 let transaction = Recovered::new_unchecked(tx, Default::default());
1564 let pooled_tx = EthPooledTransaction::new(transaction.clone(), 200);
1565 let expected_cost = U256::from(100) + (U256::from(10 * 1000));
1566
1567 assert_eq!(pooled_tx.transaction, transaction);
1568 assert_eq!(pooled_tx.encoded_length, 200);
1569 assert_eq!(pooled_tx.blob_sidecar, EthBlobTransactionSidecar::None);
1570 assert_eq!(pooled_tx.cost, expected_cost);
1571 }
1572
1573 #[test]
1574 fn test_eth_pooled_transaction_new_eip1559() {
1575 // Create an EIP-1559 transaction with specific parameters
1576 let tx = TxEnvelope::Eip1559(
1577 TxEip1559 {
1578 max_fee_per_gas: 10,
1579 gas_limit: 1000,
1580 value: U256::from(100),
1581 ..Default::default()
1582 }
1583 .into_signed(Signature::test_signature()),
1584 );
1585 let transaction = Recovered::new_unchecked(tx, Default::default());
1586 let pooled_tx = EthPooledTransaction::new(transaction.clone(), 200);
1587
1588 // Check that the pooled transaction is created correctly
1589 assert_eq!(pooled_tx.transaction, transaction);
1590 assert_eq!(pooled_tx.encoded_length, 200);
1591 assert_eq!(pooled_tx.blob_sidecar, EthBlobTransactionSidecar::None);
1592 assert_eq!(pooled_tx.cost, U256::from(100) + U256::from(10 * 1000));
1593 }
1594
1595 #[test]
1596 fn test_eth_pooled_transaction_new_eip4844() {
1597 // Create an EIP-4844 transaction with specific parameters
1598 let tx = EthereumTxEnvelope::Eip4844(
1599 TxEip4844 {
1600 max_fee_per_gas: 10,
1601 gas_limit: 1000,
1602 value: U256::from(100),
1603 max_fee_per_blob_gas: 5,
1604 blob_versioned_hashes: vec![B256::default()],
1605 ..Default::default()
1606 }
1607 .into_signed(Signature::test_signature()),
1608 );
1609 let transaction = Recovered::new_unchecked(tx, Default::default());
1610 let pooled_tx = EthPooledTransaction::new(transaction.clone(), 300);
1611
1612 // Check that the pooled transaction is created correctly
1613 assert_eq!(pooled_tx.transaction, transaction);
1614 assert_eq!(pooled_tx.encoded_length, 300);
1615 assert_eq!(pooled_tx.blob_sidecar, EthBlobTransactionSidecar::Missing);
1616 let expected_cost =
1617 U256::from(100) + U256::from(10 * 1000) + U256::from(5 * DATA_GAS_PER_BLOB);
1618 assert_eq!(pooled_tx.cost, expected_cost);
1619 }
1620
1621 #[test]
1622 fn test_eth_pooled_transaction_new_eip7702() {
1623 // Init an EIP-7702 transaction with specific parameters
1624 let tx = EthereumTxEnvelope::<TxEip4844>::Eip7702(
1625 TxEip7702 {
1626 max_fee_per_gas: 10,
1627 gas_limit: 1000,
1628 value: U256::from(100),
1629 ..Default::default()
1630 }
1631 .into_signed(Signature::test_signature()),
1632 );
1633 let transaction = Recovered::new_unchecked(tx, Default::default());
1634 let pooled_tx = EthPooledTransaction::new(transaction.clone(), 200);
1635
1636 // Check that the pooled transaction is created correctly
1637 assert_eq!(pooled_tx.transaction, transaction);
1638 assert_eq!(pooled_tx.encoded_length, 200);
1639 assert_eq!(pooled_tx.blob_sidecar, EthBlobTransactionSidecar::None);
1640 assert_eq!(pooled_tx.cost, U256::from(100) + U256::from(10 * 1000));
1641 }
1642
1643 #[test]
1644 fn test_pooled_transaction_limit() {
1645 // No limit should never exceed
1646 let limit_none = GetPooledTransactionLimit::None;
1647 // Any size should return false
1648 assert!(!limit_none.exceeds(1000));
1649
1650 // Size limit of 2MB (2 * 1024 * 1024 bytes)
1651 let size_limit_2mb = GetPooledTransactionLimit::ResponseSizeSoftLimit(2 * 1024 * 1024);
1652
1653 // Test with size below the limit
1654 // 1MB is below 2MB, should return false
1655 assert!(!size_limit_2mb.exceeds(1024 * 1024));
1656
1657 // Test with size exactly at the limit
1658 // 2MB equals the limit, should return false
1659 assert!(!size_limit_2mb.exceeds(2 * 1024 * 1024));
1660
1661 // Test with size exceeding the limit
1662 // 3MB is above the 2MB limit, should return true
1663 assert!(size_limit_2mb.exceeds(3 * 1024 * 1024));
1664 }
1665}