1#![doc(
148 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
149 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
150 issue_tracker_base_url = "https://github.com/SeismicSystems/seismic-reth/issues/"
151)]
152#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
153#![cfg_attr(not(test), warn(unused_crate_dependencies))]
154
155pub use crate::{
156 blobstore::{BlobStore, BlobStoreError},
157 config::{
158 LocalTransactionConfig, PoolConfig, PriceBumpConfig, SubPoolLimit, DEFAULT_PRICE_BUMP,
159 DEFAULT_TXPOOL_ADDITIONAL_VALIDATION_TASKS, MAX_NEW_PENDING_TXS_NOTIFICATIONS,
160 REPLACE_BLOB_PRICE_BUMP, TXPOOL_MAX_ACCOUNT_SLOTS_PER_SENDER,
161 TXPOOL_SUBPOOL_MAX_SIZE_MB_DEFAULT, TXPOOL_SUBPOOL_MAX_TXS_DEFAULT,
162 },
163 error::PoolResult,
164 ordering::{CoinbaseTipOrdering, Priority, TransactionOrdering},
165 pool::{
166 blob_tx_priority, fee_delta, state::SubPool, AllTransactionsEvents, FullTransactionEvent,
167 NewTransactionEvent, TransactionEvent, TransactionEvents, TransactionListenerKind,
168 },
169 traits::*,
170 validate::{
171 EthTransactionValidator, TransactionValidationOutcome, TransactionValidationTaskExecutor,
172 TransactionValidator, ValidPoolTransaction,
173 },
174};
175use crate::{identifier::TransactionId, pool::PoolInner};
176use alloy_eips::{
177 eip4844::{BlobAndProofV1, BlobAndProofV2},
178 eip7594::BlobTransactionSidecarVariant,
179};
180use alloy_primitives::{Address, TxHash, B256, U256};
181use aquamarine as _;
182use reth_chainspec::{ChainSpecProvider, EthereumHardforks};
183use reth_eth_wire_types::HandleMempoolData;
184use reth_execution_types::ChangedAccount;
185use reth_primitives_traits::{Block, Recovered};
186use reth_storage_api::StateProviderFactory;
187use std::{collections::HashSet, sync::Arc};
188use tokio::sync::mpsc::Receiver;
189use tracing::{instrument, trace};
190
191pub mod error;
192pub mod maintain;
193pub mod metrics;
194pub mod noop;
195pub mod pool;
196pub mod validate;
197
198pub mod blobstore;
199mod config;
200pub mod identifier;
201mod ordering;
202mod traits;
203
204#[cfg(any(test, feature = "test-utils"))]
205pub mod test_utils;
207
208pub type EthTransactionPool<Client, S> = Pool<
210 TransactionValidationTaskExecutor<EthTransactionValidator<Client, EthPooledTransaction>>,
211 CoinbaseTipOrdering<EthPooledTransaction>,
212 S,
213>;
214
215#[derive(Debug)]
217pub struct Pool<V, T: TransactionOrdering, S> {
218 pool: Arc<PoolInner<V, T, S>>,
220}
221
222impl<V, T, S> Pool<V, T, S>
225where
226 V: TransactionValidator,
227 T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
228 S: BlobStore,
229{
230 pub fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
232 Self { pool: Arc::new(PoolInner::new(validator, ordering, blob_store, config)) }
233 }
234
235 pub(crate) fn inner(&self) -> &PoolInner<V, T, S> {
237 &self.pool
238 }
239
240 pub fn config(&self) -> &PoolConfig {
242 self.inner().config()
243 }
244
245 async fn validate_all(
249 &self,
250 origin: TransactionOrigin,
251 transactions: impl IntoIterator<Item = V::Transaction> + Send,
252 ) -> Vec<(TxHash, TransactionValidationOutcome<V::Transaction>)> {
253 self.pool
254 .validator()
255 .validate_transactions_with_origin(origin, transactions)
256 .await
257 .into_iter()
258 .map(|tx| (tx.tx_hash(), tx))
259 .collect()
260 }
261
262 async fn validate(
264 &self,
265 origin: TransactionOrigin,
266 transaction: V::Transaction,
267 ) -> (TxHash, TransactionValidationOutcome<V::Transaction>) {
268 let hash = *transaction.hash();
269
270 let outcome = self.pool.validator().validate_transaction(origin, transaction).await;
271
272 (hash, outcome)
273 }
274
275 pub fn len(&self) -> usize {
277 self.pool.len()
278 }
279
280 pub fn is_empty(&self) -> bool {
282 self.pool.is_empty()
283 }
284
285 pub fn is_exceeded(&self) -> bool {
287 self.pool.is_exceeded()
288 }
289
290 pub fn blob_store(&self) -> &S {
292 self.pool.blob_store()
293 }
294}
295
296impl<Client, S> EthTransactionPool<Client, S>
297where
298 Client:
299 ChainSpecProvider<ChainSpec: EthereumHardforks> + StateProviderFactory + Clone + 'static,
300 S: BlobStore,
301{
302 pub fn eth_pool(
330 validator: TransactionValidationTaskExecutor<
331 EthTransactionValidator<Client, EthPooledTransaction>,
332 >,
333 blob_store: S,
334 config: PoolConfig,
335 ) -> Self {
336 Self::new(validator, CoinbaseTipOrdering::default(), blob_store, config)
337 }
338}
339
340impl<V, T, S> TransactionPool for Pool<V, T, S>
342where
343 V: TransactionValidator,
344 <V as TransactionValidator>::Transaction: EthPoolTransaction,
345 T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
346 S: BlobStore,
347{
348 type Transaction = T::Transaction;
349
350 fn pool_size(&self) -> PoolSize {
351 self.pool.size()
352 }
353
354 fn block_info(&self) -> BlockInfo {
355 self.pool.block_info()
356 }
357
358 async fn add_transaction_and_subscribe(
359 &self,
360 origin: TransactionOrigin,
361 transaction: Self::Transaction,
362 ) -> PoolResult<TransactionEvents> {
363 let (_, tx) = self.validate(origin, transaction).await;
364 self.pool.add_transaction_and_subscribe(origin, tx)
365 }
366
367 async fn add_transaction(
368 &self,
369 origin: TransactionOrigin,
370 transaction: Self::Transaction,
371 ) -> PoolResult<TxHash> {
372 let (_, tx) = self.validate(origin, transaction).await;
373 let mut results = self.pool.add_transactions(origin, std::iter::once(tx));
374 results.pop().expect("result length is the same as the input")
375 }
376
377 async fn add_transactions(
378 &self,
379 origin: TransactionOrigin,
380 transactions: Vec<Self::Transaction>,
381 ) -> Vec<PoolResult<TxHash>> {
382 if transactions.is_empty() {
383 return Vec::new()
384 }
385 let validated = self.validate_all(origin, transactions).await;
386
387 self.pool.add_transactions(origin, validated.into_iter().map(|(_, tx)| tx))
388 }
389
390 fn transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
391 self.pool.add_transaction_event_listener(tx_hash)
392 }
393
394 fn all_transactions_event_listener(&self) -> AllTransactionsEvents<Self::Transaction> {
395 self.pool.add_all_transactions_event_listener()
396 }
397
398 fn pending_transactions_listener_for(&self, kind: TransactionListenerKind) -> Receiver<TxHash> {
399 self.pool.add_pending_listener(kind)
400 }
401
402 fn blob_transaction_sidecars_listener(&self) -> Receiver<NewBlobSidecar> {
403 self.pool.add_blob_sidecar_listener()
404 }
405
406 fn new_transactions_listener_for(
407 &self,
408 kind: TransactionListenerKind,
409 ) -> Receiver<NewTransactionEvent<Self::Transaction>> {
410 self.pool.add_new_transaction_listener(kind)
411 }
412
413 fn pooled_transaction_hashes(&self) -> Vec<TxHash> {
414 self.pool.pooled_transactions_hashes()
415 }
416
417 fn pooled_transaction_hashes_max(&self, max: usize) -> Vec<TxHash> {
418 self.pooled_transaction_hashes().into_iter().take(max).collect()
419 }
420
421 fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
422 self.pool.pooled_transactions()
423 }
424
425 fn pooled_transactions_max(
426 &self,
427 max: usize,
428 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
429 self.pool.pooled_transactions_max(max)
430 }
431
432 fn get_pooled_transaction_elements(
433 &self,
434 tx_hashes: Vec<TxHash>,
435 limit: GetPooledTransactionLimit,
436 ) -> Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled> {
437 self.pool.get_pooled_transaction_elements(tx_hashes, limit)
438 }
439
440 fn get_pooled_transaction_element(
441 &self,
442 tx_hash: TxHash,
443 ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
444 {
445 self.pool.get_pooled_transaction_element(tx_hash)
446 }
447
448 fn best_transactions(
449 &self,
450 ) -> Box<dyn BestTransactions<Item = Arc<ValidPoolTransaction<Self::Transaction>>>> {
451 Box::new(self.pool.best_transactions())
452 }
453
454 fn best_transactions_with_attributes(
455 &self,
456 best_transactions_attributes: BestTransactionsAttributes,
457 ) -> Box<dyn BestTransactions<Item = Arc<ValidPoolTransaction<Self::Transaction>>>> {
458 self.pool.best_transactions_with_attributes(best_transactions_attributes)
459 }
460
461 fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
462 self.pool.pending_transactions()
463 }
464
465 fn pending_transactions_max(
466 &self,
467 max: usize,
468 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
469 self.pool.pending_transactions_max(max)
470 }
471
472 fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
473 self.pool.queued_transactions()
474 }
475
476 fn all_transactions(&self) -> AllPoolTransactions<Self::Transaction> {
477 self.pool.all_transactions()
478 }
479
480 fn remove_transactions(
481 &self,
482 hashes: Vec<TxHash>,
483 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
484 self.pool.remove_transactions(hashes)
485 }
486
487 fn remove_transactions_and_descendants(
488 &self,
489 hashes: Vec<TxHash>,
490 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
491 self.pool.remove_transactions_and_descendants(hashes)
492 }
493
494 fn remove_transactions_by_sender(
495 &self,
496 sender: Address,
497 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
498 self.pool.remove_transactions_by_sender(sender)
499 }
500
501 fn retain_unknown<A>(&self, announcement: &mut A)
502 where
503 A: HandleMempoolData,
504 {
505 self.pool.retain_unknown(announcement)
506 }
507
508 fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
509 self.inner().get(tx_hash)
510 }
511
512 fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
513 self.inner().get_all(txs)
514 }
515
516 fn on_propagated(&self, txs: PropagatedTransactions) {
517 self.inner().on_propagated(txs)
518 }
519
520 fn get_transactions_by_sender(
521 &self,
522 sender: Address,
523 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
524 self.pool.get_transactions_by_sender(sender)
525 }
526
527 fn get_pending_transactions_with_predicate(
528 &self,
529 predicate: impl FnMut(&ValidPoolTransaction<Self::Transaction>) -> bool,
530 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
531 self.pool.pending_transactions_with_predicate(predicate)
532 }
533
534 fn get_pending_transactions_by_sender(
535 &self,
536 sender: Address,
537 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
538 self.pool.get_pending_transactions_by_sender(sender)
539 }
540
541 fn get_queued_transactions_by_sender(
542 &self,
543 sender: Address,
544 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
545 self.pool.get_queued_transactions_by_sender(sender)
546 }
547
548 fn get_highest_transaction_by_sender(
549 &self,
550 sender: Address,
551 ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
552 self.pool.get_highest_transaction_by_sender(sender)
553 }
554
555 fn get_highest_consecutive_transaction_by_sender(
556 &self,
557 sender: Address,
558 on_chain_nonce: u64,
559 ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
560 self.pool.get_highest_consecutive_transaction_by_sender(sender, on_chain_nonce)
561 }
562
563 fn get_transaction_by_sender_and_nonce(
564 &self,
565 sender: Address,
566 nonce: u64,
567 ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
568 let transaction_id = TransactionId::new(self.pool.get_sender_id(sender), nonce);
569
570 self.inner().get_pool_data().all().get(&transaction_id).map(|tx| tx.transaction.clone())
571 }
572
573 fn get_transactions_by_origin(
574 &self,
575 origin: TransactionOrigin,
576 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
577 self.pool.get_transactions_by_origin(origin)
578 }
579
580 fn get_pending_transactions_by_origin(
582 &self,
583 origin: TransactionOrigin,
584 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
585 self.pool.get_pending_transactions_by_origin(origin)
586 }
587
588 fn unique_senders(&self) -> HashSet<Address> {
589 self.pool.unique_senders()
590 }
591
592 fn get_blob(
593 &self,
594 tx_hash: TxHash,
595 ) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
596 self.pool.blob_store().get(tx_hash)
597 }
598
599 fn get_all_blobs(
600 &self,
601 tx_hashes: Vec<TxHash>,
602 ) -> Result<Vec<(TxHash, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
603 self.pool.blob_store().get_all(tx_hashes)
604 }
605
606 fn get_all_blobs_exact(
607 &self,
608 tx_hashes: Vec<TxHash>,
609 ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
610 self.pool.blob_store().get_exact(tx_hashes)
611 }
612
613 fn get_blobs_for_versioned_hashes_v1(
614 &self,
615 versioned_hashes: &[B256],
616 ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
617 self.pool.blob_store().get_by_versioned_hashes_v1(versioned_hashes)
618 }
619
620 fn get_blobs_for_versioned_hashes_v2(
621 &self,
622 versioned_hashes: &[B256],
623 ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
624 self.pool.blob_store().get_by_versioned_hashes_v2(versioned_hashes)
625 }
626}
627
628impl<V, T, S> TransactionPoolExt for Pool<V, T, S>
629where
630 V: TransactionValidator,
631 <V as TransactionValidator>::Transaction: EthPoolTransaction,
632 T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
633 S: BlobStore,
634{
635 #[instrument(skip(self), target = "txpool")]
636 fn set_block_info(&self, info: BlockInfo) {
637 trace!(target: "txpool", "updating pool block info");
638 self.pool.set_block_info(info)
639 }
640
641 fn on_canonical_state_change<B>(&self, update: CanonicalStateUpdate<'_, B>)
642 where
643 B: Block,
644 {
645 self.pool.on_canonical_state_change(update);
646 }
647
648 fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
649 self.pool.update_accounts(accounts);
650 }
651
652 fn delete_blob(&self, tx: TxHash) {
653 self.pool.delete_blob(tx)
654 }
655
656 fn delete_blobs(&self, txs: Vec<TxHash>) {
657 self.pool.delete_blobs(txs)
658 }
659
660 fn cleanup_blobs(&self) {
661 self.pool.cleanup_blobs()
662 }
663}
664
665impl<V, T: TransactionOrdering, S> Clone for Pool<V, T, S> {
666 fn clone(&self) -> Self {
667 Self { pool: Arc::clone(&self.pool) }
668 }
669}