1#![doc(
146 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
147 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
148 issue_tracker_base_url = "https://github.com/SeismicSystems/seismic-reth/issues/"
149)]
150#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
151#![cfg_attr(not(test), warn(unused_crate_dependencies))]
152
153use crate::{identifier::TransactionId, pool::PoolInner};
154use alloy_eips::eip4844::{BlobAndProofV1, BlobTransactionSidecar};
155use alloy_primitives::{Address, TxHash, B256, U256};
156use aquamarine as _;
157use reth_eth_wire_types::HandleMempoolData;
158use reth_execution_types::ChangedAccount;
159use reth_primitives::RecoveredTx;
160use reth_storage_api::StateProviderFactory;
161use std::{collections::HashSet, sync::Arc};
162use tokio::sync::mpsc::Receiver;
163use tracing::{instrument, trace};
164
165pub use crate::{
166 blobstore::{BlobStore, BlobStoreError},
167 config::{
168 LocalTransactionConfig, PoolConfig, PriceBumpConfig, SubPoolLimit, DEFAULT_PRICE_BUMP,
169 DEFAULT_TXPOOL_ADDITIONAL_VALIDATION_TASKS, MAX_NEW_PENDING_TXS_NOTIFICATIONS,
170 REPLACE_BLOB_PRICE_BUMP, TXPOOL_MAX_ACCOUNT_SLOTS_PER_SENDER,
171 TXPOOL_SUBPOOL_MAX_SIZE_MB_DEFAULT, TXPOOL_SUBPOOL_MAX_TXS_DEFAULT,
172 },
173 error::PoolResult,
174 ordering::{CoinbaseTipOrdering, Priority, TransactionOrdering},
175 pool::{
176 blob_tx_priority, fee_delta, state::SubPool, AllTransactionsEvents, FullTransactionEvent,
177 TransactionEvent, TransactionEvents,
178 },
179 traits::*,
180 validate::{
181 EthTransactionValidator, TransactionValidationOutcome, TransactionValidationTaskExecutor,
182 TransactionValidator, ValidPoolTransaction,
183 },
184};
185
186pub mod error;
187pub mod maintain;
188pub mod metrics;
189pub mod noop;
190pub mod pool;
191pub mod validate;
192
193pub mod blobstore;
194mod config;
195pub mod identifier;
196mod ordering;
197mod traits;
198
199#[cfg(any(test, feature = "test-utils"))]
200pub mod test_utils;
202
203pub type EthTransactionPool<Client, S> = Pool<
205 TransactionValidationTaskExecutor<EthTransactionValidator<Client, EthPooledTransaction>>,
206 CoinbaseTipOrdering<EthPooledTransaction>,
207 S,
208>;
209
210#[derive(Debug)]
212pub struct Pool<V, T: TransactionOrdering, S> {
213 pool: Arc<PoolInner<V, T, S>>,
215}
216
217impl<V, T, S> Pool<V, T, S>
220where
221 V: TransactionValidator,
222 T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
223 S: BlobStore,
224{
225 pub fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
227 Self { pool: Arc::new(PoolInner::new(validator, ordering, blob_store, config)) }
228 }
229
230 pub(crate) fn inner(&self) -> &PoolInner<V, T, S> {
232 &self.pool
233 }
234
235 pub fn config(&self) -> &PoolConfig {
237 self.inner().config()
238 }
239
240 async fn validate_all(
244 &self,
245 origin: TransactionOrigin,
246 transactions: impl IntoIterator<Item = V::Transaction>,
247 ) -> Vec<(TxHash, TransactionValidationOutcome<V::Transaction>)> {
248 futures_util::future::join_all(transactions.into_iter().map(|tx| self.validate(origin, tx)))
249 .await
250 }
251
252 async fn validate(
254 &self,
255 origin: TransactionOrigin,
256 transaction: V::Transaction,
257 ) -> (TxHash, TransactionValidationOutcome<V::Transaction>) {
258 let hash = *transaction.hash();
259
260 let outcome = self.pool.validator().validate_transaction(origin, transaction).await;
261
262 (hash, outcome)
263 }
264
265 pub fn len(&self) -> usize {
267 self.pool.len()
268 }
269
270 pub fn is_empty(&self) -> bool {
272 self.pool.is_empty()
273 }
274
275 pub fn is_exceeded(&self) -> bool {
277 self.pool.is_exceeded()
278 }
279}
280
281impl<Client, S> EthTransactionPool<Client, S>
282where
283 Client: StateProviderFactory + Clone + 'static,
284 S: BlobStore,
285{
286 pub fn eth_pool(
313 validator: TransactionValidationTaskExecutor<
314 EthTransactionValidator<Client, EthPooledTransaction>,
315 >,
316 blob_store: S,
317 config: PoolConfig,
318 ) -> Self {
319 Self::new(validator, CoinbaseTipOrdering::default(), blob_store, config)
320 }
321}
322
323impl<V, T, S> TransactionPool for Pool<V, T, S>
325where
326 V: TransactionValidator,
327 <V as TransactionValidator>::Transaction: EthPoolTransaction,
328 T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
329 S: BlobStore,
330{
331 type Transaction = T::Transaction;
332
333 fn pool_size(&self) -> PoolSize {
334 self.pool.size()
335 }
336
337 fn block_info(&self) -> BlockInfo {
338 self.pool.block_info()
339 }
340
341 async fn add_transaction_and_subscribe(
342 &self,
343 origin: TransactionOrigin,
344 transaction: Self::Transaction,
345 ) -> PoolResult<TransactionEvents> {
346 let (_, tx) = self.validate(origin, transaction).await;
347 self.pool.add_transaction_and_subscribe(origin, tx)
348 }
349
350 async fn add_transaction(
351 &self,
352 origin: TransactionOrigin,
353 transaction: Self::Transaction,
354 ) -> PoolResult<TxHash> {
355 let (_, tx) = self.validate(origin, transaction).await;
356 let mut results = self.pool.add_transactions(origin, std::iter::once(tx));
357 results.pop().expect("result length is the same as the input")
358 }
359
360 async fn add_transactions(
361 &self,
362 origin: TransactionOrigin,
363 transactions: Vec<Self::Transaction>,
364 ) -> Vec<PoolResult<TxHash>> {
365 if transactions.is_empty() {
366 return Vec::new()
367 }
368 let validated = self.validate_all(origin, transactions).await;
369
370 self.pool.add_transactions(origin, validated.into_iter().map(|(_, tx)| tx))
371 }
372
373 fn transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
374 self.pool.add_transaction_event_listener(tx_hash)
375 }
376
377 fn all_transactions_event_listener(&self) -> AllTransactionsEvents<Self::Transaction> {
378 self.pool.add_all_transactions_event_listener()
379 }
380
381 fn pending_transactions_listener_for(&self, kind: TransactionListenerKind) -> Receiver<TxHash> {
382 self.pool.add_pending_listener(kind)
383 }
384
385 fn blob_transaction_sidecars_listener(&self) -> Receiver<NewBlobSidecar> {
386 self.pool.add_blob_sidecar_listener()
387 }
388
389 fn new_transactions_listener_for(
390 &self,
391 kind: TransactionListenerKind,
392 ) -> Receiver<NewTransactionEvent<Self::Transaction>> {
393 self.pool.add_new_transaction_listener(kind)
394 }
395
396 fn pooled_transaction_hashes(&self) -> Vec<TxHash> {
397 self.pool.pooled_transactions_hashes()
398 }
399
400 fn pooled_transaction_hashes_max(&self, max: usize) -> Vec<TxHash> {
401 self.pooled_transaction_hashes().into_iter().take(max).collect()
402 }
403
404 fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
405 self.pool.pooled_transactions()
406 }
407
408 fn pooled_transactions_max(
409 &self,
410 max: usize,
411 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
412 self.pool.pooled_transactions_max(max)
413 }
414
415 fn get_pooled_transaction_elements(
416 &self,
417 tx_hashes: Vec<TxHash>,
418 limit: GetPooledTransactionLimit,
419 ) -> Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled> {
420 self.pool.get_pooled_transaction_elements(tx_hashes, limit)
421 }
422
423 fn get_pooled_transaction_element(
424 &self,
425 tx_hash: TxHash,
426 ) -> Option<RecoveredTx<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
427 {
428 self.pool.get_pooled_transaction_element(tx_hash)
429 }
430
431 fn best_transactions(
432 &self,
433 ) -> Box<dyn BestTransactions<Item = Arc<ValidPoolTransaction<Self::Transaction>>>> {
434 Box::new(self.pool.best_transactions())
435 }
436
437 fn best_transactions_with_attributes(
438 &self,
439 best_transactions_attributes: BestTransactionsAttributes,
440 ) -> Box<dyn BestTransactions<Item = Arc<ValidPoolTransaction<Self::Transaction>>>> {
441 self.pool.best_transactions_with_attributes(best_transactions_attributes)
442 }
443
444 fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
445 self.pool.pending_transactions()
446 }
447
448 fn pending_transactions_max(
449 &self,
450 max: usize,
451 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
452 self.pool.pending_transactions_max(max)
453 }
454
455 fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
456 self.pool.queued_transactions()
457 }
458
459 fn all_transactions(&self) -> AllPoolTransactions<Self::Transaction> {
460 self.pool.all_transactions()
461 }
462
463 fn remove_transactions(
464 &self,
465 hashes: Vec<TxHash>,
466 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
467 self.pool.remove_transactions(hashes)
468 }
469
470 fn remove_transactions_and_descendants(
471 &self,
472 hashes: Vec<TxHash>,
473 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
474 self.pool.remove_transactions_and_descendants(hashes)
475 }
476
477 fn remove_transactions_by_sender(
478 &self,
479 sender: Address,
480 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
481 self.pool.remove_transactions_by_sender(sender)
482 }
483
484 fn retain_unknown<A>(&self, announcement: &mut A)
485 where
486 A: HandleMempoolData,
487 {
488 self.pool.retain_unknown(announcement)
489 }
490
491 fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
492 self.inner().get(tx_hash)
493 }
494
495 fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
496 self.inner().get_all(txs)
497 }
498
499 fn on_propagated(&self, txs: PropagatedTransactions) {
500 self.inner().on_propagated(txs)
501 }
502
503 fn get_transactions_by_sender(
504 &self,
505 sender: Address,
506 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
507 self.pool.get_transactions_by_sender(sender)
508 }
509
510 fn get_pending_transactions_with_predicate(
511 &self,
512 predicate: impl FnMut(&ValidPoolTransaction<Self::Transaction>) -> bool,
513 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
514 self.pool.pending_transactions_with_predicate(predicate)
515 }
516
517 fn get_pending_transactions_by_sender(
518 &self,
519 sender: Address,
520 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
521 self.pool.get_pending_transactions_by_sender(sender)
522 }
523
524 fn get_queued_transactions_by_sender(
525 &self,
526 sender: Address,
527 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
528 self.pool.get_queued_transactions_by_sender(sender)
529 }
530
531 fn get_highest_transaction_by_sender(
532 &self,
533 sender: Address,
534 ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
535 self.pool.get_highest_transaction_by_sender(sender)
536 }
537
538 fn get_highest_consecutive_transaction_by_sender(
539 &self,
540 sender: Address,
541 on_chain_nonce: u64,
542 ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
543 self.pool.get_highest_consecutive_transaction_by_sender(sender, on_chain_nonce)
544 }
545
546 fn get_transaction_by_sender_and_nonce(
547 &self,
548 sender: Address,
549 nonce: u64,
550 ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
551 let transaction_id = TransactionId::new(self.pool.get_sender_id(sender), nonce);
552
553 self.inner().get_pool_data().all().get(&transaction_id).map(|tx| tx.transaction.clone())
554 }
555
556 fn get_transactions_by_origin(
557 &self,
558 origin: TransactionOrigin,
559 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
560 self.pool.get_transactions_by_origin(origin)
561 }
562
563 fn get_pending_transactions_by_origin(
565 &self,
566 origin: TransactionOrigin,
567 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
568 self.pool.get_pending_transactions_by_origin(origin)
569 }
570
571 fn unique_senders(&self) -> HashSet<Address> {
572 self.pool.unique_senders()
573 }
574
575 fn get_blob(
576 &self,
577 tx_hash: TxHash,
578 ) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
579 self.pool.blob_store().get(tx_hash)
580 }
581
582 fn get_all_blobs(
583 &self,
584 tx_hashes: Vec<TxHash>,
585 ) -> Result<Vec<(TxHash, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
586 self.pool.blob_store().get_all(tx_hashes)
587 }
588
589 fn get_all_blobs_exact(
590 &self,
591 tx_hashes: Vec<TxHash>,
592 ) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
593 self.pool.blob_store().get_exact(tx_hashes)
594 }
595
596 fn get_blobs_for_versioned_hashes(
597 &self,
598 versioned_hashes: &[B256],
599 ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
600 self.pool.blob_store().get_by_versioned_hashes(versioned_hashes)
601 }
602}
603
604impl<V, T, S> TransactionPoolExt for Pool<V, T, S>
605where
606 V: TransactionValidator,
607 <V as TransactionValidator>::Transaction: EthPoolTransaction,
608 T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
609 S: BlobStore,
610{
611 #[instrument(skip(self), target = "txpool")]
612 fn set_block_info(&self, info: BlockInfo) {
613 trace!(target: "txpool", "updating pool block info");
614 self.pool.set_block_info(info)
615 }
616
617 fn on_canonical_state_change(&self, update: CanonicalStateUpdate<'_>) {
618 self.pool.on_canonical_state_change(update);
619 }
620
621 fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
622 self.pool.update_accounts(accounts);
623 }
624
625 fn delete_blob(&self, tx: TxHash) {
626 self.pool.delete_blob(tx)
627 }
628
629 fn delete_blobs(&self, txs: Vec<TxHash>) {
630 self.pool.delete_blobs(txs)
631 }
632
633 fn cleanup_blobs(&self) {
634 self.pool.cleanup_blobs()
635 }
636}
637
638impl<V, T: TransactionOrdering, S> Clone for Pool<V, T, S> {
639 fn clone(&self) -> Self {
640 Self { pool: Arc::clone(&self.pool) }
641 }
642}