1use crate::{
69 blobstore::BlobStore,
70 error::{PoolError, PoolErrorKind, PoolResult},
71 identifier::{SenderId, SenderIdentifiers, TransactionId},
72 metrics::BlobStoreMetrics,
73 pool::{
74 listener::{
75 BlobTransactionSidecarListener, PendingTransactionHashListener, PoolEventBroadcast,
76 TransactionListener,
77 },
78 state::SubPool,
79 txpool::{SenderInfo, TxPool},
80 update::UpdateOutcome,
81 },
82 traits::{
83 AllPoolTransactions, BestTransactionsAttributes, BlockInfo, GetPooledTransactionLimit,
84 NewBlobSidecar, PoolSize, PoolTransaction, PropagatedTransactions, TransactionOrigin,
85 },
86 validate::{TransactionValidationOutcome, ValidPoolTransaction, ValidTransaction},
87 CanonicalStateUpdate, EthPoolTransaction, PoolConfig, TransactionOrdering,
88 TransactionValidator,
89};
90
91use alloy_primitives::{Address, TxHash, B256};
92use best::BestTransactions;
93use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
94use reth_eth_wire_types::HandleMempoolData;
95use reth_execution_types::ChangedAccount;
96
97use alloy_eips::{eip7594::BlobTransactionSidecarVariant, Typed2718};
98use reth_primitives_traits::Recovered;
99use rustc_hash::FxHashMap;
100use std::{collections::HashSet, fmt, sync::Arc, time::Instant};
101use tokio::sync::mpsc;
102use tracing::{debug, trace, warn};
103mod events;
104pub use best::{BestTransactionFilter, BestTransactionsWithPrioritizedSenders};
105pub use blob::{blob_tx_priority, fee_delta, BlobOrd, BlobTransactions};
106pub use events::{FullTransactionEvent, NewTransactionEvent, TransactionEvent};
107pub use listener::{AllTransactionsEvents, TransactionEvents, TransactionListenerKind};
108pub use parked::{BasefeeOrd, ParkedOrd, ParkedPool, QueuedOrd};
109pub use pending::PendingPool;
110use reth_primitives_traits::Block;
111
112mod best;
113mod blob;
114mod listener;
115mod parked;
116pub(crate) mod pending;
117pub(crate) mod size;
118pub(crate) mod state;
119pub mod txpool;
120mod update;
121
122pub const PENDING_TX_LISTENER_BUFFER_SIZE: usize = 2048;
124pub const NEW_TX_LISTENER_BUFFER_SIZE: usize = 1024;
126
127const BLOB_SIDECAR_LISTENER_BUFFER_SIZE: usize = 512;
128
129pub struct PoolInner<V, T, S>
131where
132 T: TransactionOrdering,
133{
134 identifiers: RwLock<SenderIdentifiers>,
136 validator: V,
138 blob_store: S,
140 pool: RwLock<TxPool<T>>,
142 config: PoolConfig,
144 event_listener: RwLock<PoolEventBroadcast<T::Transaction>>,
146 pending_transaction_listener: Mutex<Vec<PendingTransactionHashListener>>,
148 transaction_listener: Mutex<Vec<TransactionListener<T::Transaction>>>,
150 blob_transaction_sidecar_listener: Mutex<Vec<BlobTransactionSidecarListener>>,
152 blob_store_metrics: BlobStoreMetrics,
154}
155
156impl<V, T, S> PoolInner<V, T, S>
159where
160 V: TransactionValidator,
161 T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
162 S: BlobStore,
163{
164 pub fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
166 Self {
167 identifiers: Default::default(),
168 validator,
169 event_listener: Default::default(),
170 pool: RwLock::new(TxPool::new(ordering, config.clone())),
171 pending_transaction_listener: Default::default(),
172 transaction_listener: Default::default(),
173 blob_transaction_sidecar_listener: Default::default(),
174 config,
175 blob_store,
176 blob_store_metrics: Default::default(),
177 }
178 }
179
180 pub const fn blob_store(&self) -> &S {
182 &self.blob_store
183 }
184
185 pub fn size(&self) -> PoolSize {
187 self.get_pool_data().size()
188 }
189
190 pub fn block_info(&self) -> BlockInfo {
192 self.get_pool_data().block_info()
193 }
194 pub fn set_block_info(&self, info: BlockInfo) {
196 self.pool.write().set_block_info(info)
197 }
198
199 pub fn get_sender_id(&self, addr: Address) -> SenderId {
201 self.identifiers.write().sender_id_or_create(addr)
202 }
203
204 pub fn get_sender_ids(&self, addrs: impl IntoIterator<Item = Address>) -> Vec<SenderId> {
206 self.identifiers.write().sender_ids_or_create(addrs)
207 }
208
209 pub fn unique_senders(&self) -> HashSet<Address> {
211 self.get_pool_data().unique_senders()
212 }
213
214 fn changed_senders(
217 &self,
218 accs: impl Iterator<Item = ChangedAccount>,
219 ) -> FxHashMap<SenderId, SenderInfo> {
220 let mut identifiers = self.identifiers.write();
221 accs.into_iter()
222 .map(|acc| {
223 let ChangedAccount { address, nonce, balance } = acc;
224 let sender_id = identifiers.sender_id_or_create(address);
225 (sender_id, SenderInfo { state_nonce: nonce, balance })
226 })
227 .collect()
228 }
229
230 pub const fn config(&self) -> &PoolConfig {
232 &self.config
233 }
234
235 pub const fn validator(&self) -> &V {
237 &self.validator
238 }
239
240 pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver<TxHash> {
243 let (sender, rx) = mpsc::channel(self.config.pending_tx_listener_buffer_size);
244 let listener = PendingTransactionHashListener { sender, kind };
245 self.pending_transaction_listener.lock().push(listener);
246 rx
247 }
248
249 pub fn add_new_transaction_listener(
251 &self,
252 kind: TransactionListenerKind,
253 ) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
254 let (sender, rx) = mpsc::channel(self.config.new_tx_listener_buffer_size);
255 let listener = TransactionListener { sender, kind };
256 self.transaction_listener.lock().push(listener);
257 rx
258 }
259 pub fn add_blob_sidecar_listener(&self) -> mpsc::Receiver<NewBlobSidecar> {
262 let (sender, rx) = mpsc::channel(BLOB_SIDECAR_LISTENER_BUFFER_SIZE);
263 let listener = BlobTransactionSidecarListener { sender };
264 self.blob_transaction_sidecar_listener.lock().push(listener);
265 rx
266 }
267
268 pub fn add_transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
271 self.get_pool_data()
272 .contains(&tx_hash)
273 .then(|| self.event_listener.write().subscribe(tx_hash))
274 }
275
276 pub fn add_all_transactions_event_listener(&self) -> AllTransactionsEvents<T::Transaction> {
278 self.event_listener.write().subscribe_all()
279 }
280
281 pub fn get_pool_data(&self) -> RwLockReadGuard<'_, TxPool<T>> {
283 self.pool.read()
284 }
285
286 pub fn pooled_transactions_hashes(&self) -> Vec<TxHash> {
288 self.get_pool_data()
289 .all()
290 .transactions_iter()
291 .filter(|tx| tx.propagate)
292 .map(|tx| *tx.hash())
293 .collect()
294 }
295
296 pub fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
298 self.get_pool_data().all().transactions_iter().filter(|tx| tx.propagate).cloned().collect()
299 }
300
301 pub fn pooled_transactions_max(
303 &self,
304 max: usize,
305 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
306 self.get_pool_data()
307 .all()
308 .transactions_iter()
309 .filter(|tx| tx.propagate)
310 .take(max)
311 .cloned()
312 .collect()
313 }
314
315 fn to_pooled_transaction(
320 &self,
321 transaction: Arc<ValidPoolTransaction<T::Transaction>>,
322 ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
323 where
324 <V as TransactionValidator>::Transaction: EthPoolTransaction,
325 {
326 if transaction.is_eip4844() {
327 let sidecar = self.blob_store.get(*transaction.hash()).ok()??;
328 transaction.transaction.clone().try_into_pooled_eip4844(sidecar)
329 } else {
330 transaction
331 .transaction
332 .clone()
333 .try_into_pooled()
334 .inspect_err(|err| {
335 debug!(
336 target: "txpool", %err,
337 "failed to convert transaction to pooled element; skipping",
338 );
339 })
340 .ok()
341 }
342 }
343
344 pub fn get_pooled_transaction_elements(
346 &self,
347 tx_hashes: Vec<TxHash>,
348 limit: GetPooledTransactionLimit,
349 ) -> Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>
350 where
351 <V as TransactionValidator>::Transaction: EthPoolTransaction,
352 {
353 let transactions = self.get_all(tx_hashes);
354 let mut elements = Vec::with_capacity(transactions.len());
355 let mut size = 0;
356 for transaction in transactions {
357 let encoded_len = transaction.encoded_length();
358 let Some(pooled) = self.to_pooled_transaction(transaction) else {
359 continue;
360 };
361
362 size += encoded_len;
363 elements.push(pooled.into_inner());
364
365 if limit.exceeds(size) {
366 break
367 }
368 }
369
370 elements
371 }
372
373 pub fn get_pooled_transaction_element(
375 &self,
376 tx_hash: TxHash,
377 ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
378 where
379 <V as TransactionValidator>::Transaction: EthPoolTransaction,
380 {
381 self.get(&tx_hash).and_then(|tx| self.to_pooled_transaction(tx))
382 }
383
384 pub fn on_canonical_state_change<B>(&self, update: CanonicalStateUpdate<'_, B>)
386 where
387 B: Block,
388 {
389 trace!(target: "txpool", ?update, "updating pool on canonical state change");
390
391 let block_info = update.block_info();
392 let CanonicalStateUpdate {
393 new_tip, changed_accounts, mined_transactions, update_kind, ..
394 } = update;
395 self.validator.on_new_head_block(new_tip);
396
397 let changed_senders = self.changed_senders(changed_accounts.into_iter());
398
399 let outcome = self.pool.write().on_canonical_state_change(
401 block_info,
402 mined_transactions,
403 changed_senders,
404 update_kind,
405 );
406
407 self.delete_discarded_blobs(outcome.discarded.iter());
409
410 self.notify_on_new_state(outcome);
412 }
413
414 pub fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
418 let changed_senders = self.changed_senders(accounts.into_iter());
419 let UpdateOutcome { promoted, discarded } =
420 self.pool.write().update_accounts(changed_senders);
421 let mut listener = self.event_listener.write();
422
423 promoted.iter().for_each(|tx| listener.pending(tx.hash(), None));
424 discarded.iter().for_each(|tx| listener.discarded(tx.hash()));
425
426 self.delete_discarded_blobs(discarded.iter());
429 }
430
431 fn add_transaction(
436 &self,
437 pool: &mut RwLockWriteGuard<'_, TxPool<T>>,
438 origin: TransactionOrigin,
439 tx: TransactionValidationOutcome<T::Transaction>,
440 ) -> PoolResult<TxHash> {
441 match tx {
442 TransactionValidationOutcome::Valid {
443 balance,
444 state_nonce,
445 transaction,
446 propagate,
447 bytecode_hash,
448 authorities,
449 } => {
450 let sender_id = self.get_sender_id(transaction.sender());
451 let transaction_id = TransactionId::new(sender_id, transaction.nonce());
452
453 let (transaction, maybe_sidecar) = match transaction {
455 ValidTransaction::Valid(tx) => (tx, None),
456 ValidTransaction::ValidWithSidecar { transaction, sidecar } => {
457 debug_assert!(
458 transaction.is_eip4844(),
459 "validator returned sidecar for non EIP-4844 transaction"
460 );
461 (transaction, Some(sidecar))
462 }
463 };
464
465 let tx = ValidPoolTransaction {
466 transaction,
467 transaction_id,
468 propagate,
469 timestamp: Instant::now(),
470 origin,
471 authority_ids: authorities.map(|auths| self.get_sender_ids(auths)),
472 };
473
474 let added = pool.add_transaction(tx, balance, state_nonce, bytecode_hash)?;
475 let hash = *added.hash();
476
477 if let Some(sidecar) = maybe_sidecar {
479 self.on_new_blob_sidecar(&hash, &sidecar);
481 self.insert_blob(hash, sidecar);
483 }
484
485 if let Some(replaced) = added.replaced_blob_transaction() {
486 debug!(target: "txpool", "[{:?}] delete replaced blob sidecar", replaced);
487 self.delete_blob(replaced);
489 }
490
491 if let Some(pending) = added.as_pending() {
493 self.on_new_pending_transaction(pending);
494 }
495
496 self.notify_event_listeners(&added);
498
499 if let Some(discarded) = added.discarded_transactions() {
500 self.delete_discarded_blobs(discarded.iter());
501 }
502
503 self.on_new_transaction(added.into_new_transaction_event());
505
506 Ok(hash)
507 }
508 TransactionValidationOutcome::Invalid(tx, err) => {
509 let mut listener = self.event_listener.write();
510 listener.invalid(tx.hash());
511 Err(PoolError::new(*tx.hash(), err))
512 }
513 TransactionValidationOutcome::Error(tx_hash, err) => {
514 let mut listener = self.event_listener.write();
515 listener.discarded(&tx_hash);
516 Err(PoolError::other(tx_hash, err))
517 }
518 }
519 }
520
521 pub fn add_transaction_and_subscribe(
523 &self,
524 origin: TransactionOrigin,
525 tx: TransactionValidationOutcome<T::Transaction>,
526 ) -> PoolResult<TransactionEvents> {
527 let listener = {
528 let mut listener = self.event_listener.write();
529 listener.subscribe(tx.tx_hash())
530 };
531 let mut results = self.add_transactions(origin, std::iter::once(tx));
532 results.pop().expect("result length is the same as the input")?;
533 Ok(listener)
534 }
535
536 pub fn add_transactions(
542 &self,
543 origin: TransactionOrigin,
544 transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
545 ) -> Vec<PoolResult<TxHash>> {
546 let (mut added, discarded) = {
548 let mut pool = self.pool.write();
549 let added = transactions
550 .into_iter()
551 .map(|tx| self.add_transaction(&mut pool, origin, tx))
552 .collect::<Vec<_>>();
553
554 let discarded = if added.iter().any(Result::is_ok) {
556 pool.discard_worst()
557 } else {
558 Default::default()
559 };
560
561 (added, discarded)
562 };
563
564 if !discarded.is_empty() {
565 self.delete_discarded_blobs(discarded.iter());
567
568 let discarded_hashes =
569 discarded.into_iter().map(|tx| *tx.hash()).collect::<HashSet<_>>();
570
571 {
572 let mut listener = self.event_listener.write();
573 discarded_hashes.iter().for_each(|hash| listener.discarded(hash));
574 }
575
576 for res in &mut added {
579 if let Ok(hash) = res {
580 if discarded_hashes.contains(hash) {
581 *res = Err(PoolError::new(*hash, PoolErrorKind::DiscardedOnInsert))
582 }
583 }
584 }
585 }
586
587 added
588 }
589
590 fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction<T::Transaction>) {
592 let propagate_allowed = pending.is_propagate_allowed();
593
594 let mut transaction_listeners = self.pending_transaction_listener.lock();
595 transaction_listeners.retain_mut(|listener| {
596 if listener.kind.is_propagate_only() && !propagate_allowed {
597 return !listener.sender.is_closed()
600 }
601
602 listener.send_all(pending.pending_transactions(listener.kind))
604 });
605 }
606
607 fn on_new_transaction(&self, event: NewTransactionEvent<T::Transaction>) {
609 let mut transaction_listeners = self.transaction_listener.lock();
610 transaction_listeners.retain_mut(|listener| {
611 if listener.kind.is_propagate_only() && !event.transaction.propagate {
612 return !listener.sender.is_closed()
615 }
616
617 listener.send(event.clone())
618 });
619 }
620
621 fn on_new_blob_sidecar(&self, tx_hash: &TxHash, sidecar: &BlobTransactionSidecarVariant) {
623 let mut sidecar_listeners = self.blob_transaction_sidecar_listener.lock();
624 if sidecar_listeners.is_empty() {
625 return
626 }
627 let sidecar = Arc::new(sidecar.clone());
628 sidecar_listeners.retain_mut(|listener| {
629 let new_blob_event = NewBlobSidecar { tx_hash: *tx_hash, sidecar: sidecar.clone() };
630 match listener.sender.try_send(new_blob_event) {
631 Ok(()) => true,
632 Err(err) => {
633 if matches!(err, mpsc::error::TrySendError::Full(_)) {
634 debug!(
635 target: "txpool",
636 "[{:?}] failed to send blob sidecar; channel full",
637 sidecar,
638 );
639 true
640 } else {
641 false
642 }
643 }
644 }
645 })
646 }
647
648 fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) {
650 trace!(target: "txpool", promoted=outcome.promoted.len(), discarded= outcome.discarded.len() ,"notifying listeners on state change");
651
652 self.pending_transaction_listener
655 .lock()
656 .retain_mut(|listener| listener.send_all(outcome.pending_transactions(listener.kind)));
657
658 self.transaction_listener.lock().retain_mut(|listener| {
660 listener.send_all(outcome.full_pending_transactions(listener.kind))
661 });
662
663 let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;
664
665 let mut listener = self.event_listener.write();
667
668 mined.iter().for_each(|tx| listener.mined(tx, block_hash));
669 promoted.iter().for_each(|tx| listener.pending(tx.hash(), None));
670 discarded.iter().for_each(|tx| listener.discarded(tx.hash()));
671 }
672
673 fn notify_event_listeners(&self, tx: &AddedTransaction<T::Transaction>) {
675 let mut listener = self.event_listener.write();
676
677 match tx {
678 AddedTransaction::Pending(tx) => {
679 let AddedPendingTransaction { transaction, promoted, discarded, replaced } = tx;
680
681 listener.pending(transaction.hash(), replaced.clone());
682 promoted.iter().for_each(|tx| listener.pending(tx.hash(), None));
683 discarded.iter().for_each(|tx| listener.discarded(tx.hash()));
684 }
685 AddedTransaction::Parked { transaction, replaced, .. } => {
686 listener.queued(transaction.hash());
687 if let Some(replaced) = replaced {
688 listener.replaced(replaced.clone(), *transaction.hash());
689 }
690 }
691 }
692 }
693
694 pub fn best_transactions(&self) -> BestTransactions<T> {
696 self.get_pool_data().best_transactions()
697 }
698
699 pub fn best_transactions_with_attributes(
702 &self,
703 best_transactions_attributes: BestTransactionsAttributes,
704 ) -> Box<dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T::Transaction>>>>
705 {
706 self.get_pool_data().best_transactions_with_attributes(best_transactions_attributes)
707 }
708
709 pub fn pending_transactions_max(
711 &self,
712 max: usize,
713 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
714 self.get_pool_data().pending_transactions_iter().take(max).collect()
715 }
716
717 pub fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
719 self.get_pool_data().pending_transactions()
720 }
721
722 pub fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
724 self.get_pool_data().queued_transactions()
725 }
726
727 pub fn all_transactions(&self) -> AllPoolTransactions<T::Transaction> {
729 let pool = self.get_pool_data();
730 AllPoolTransactions {
731 pending: pool.pending_transactions(),
732 queued: pool.queued_transactions(),
733 }
734 }
735
736 pub fn remove_transactions(
741 &self,
742 hashes: Vec<TxHash>,
743 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
744 if hashes.is_empty() {
745 return Vec::new()
746 }
747 let removed = self.pool.write().remove_transactions(hashes);
748
749 let mut listener = self.event_listener.write();
750
751 removed.iter().for_each(|tx| listener.discarded(tx.hash()));
752
753 removed
754 }
755
756 pub fn remove_transactions_and_descendants(
759 &self,
760 hashes: Vec<TxHash>,
761 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
762 if hashes.is_empty() {
763 return Vec::new()
764 }
765 let removed = self.pool.write().remove_transactions_and_descendants(hashes);
766
767 let mut listener = self.event_listener.write();
768
769 removed.iter().for_each(|tx| listener.discarded(tx.hash()));
770
771 removed
772 }
773
774 pub fn remove_transactions_by_sender(
776 &self,
777 sender: Address,
778 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
779 let sender_id = self.get_sender_id(sender);
780 let removed = self.pool.write().remove_transactions_by_sender(sender_id);
781
782 let mut listener = self.event_listener.write();
783
784 removed.iter().for_each(|tx| listener.discarded(tx.hash()));
785
786 removed
787 }
788
789 pub fn retain_unknown<A>(&self, announcement: &mut A)
791 where
792 A: HandleMempoolData,
793 {
794 if announcement.is_empty() {
795 return
796 }
797 let pool = self.get_pool_data();
798 announcement.retain_by_hash(|tx| !pool.contains(tx))
799 }
800
801 pub fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
803 self.get_pool_data().get(tx_hash)
804 }
805
806 pub fn get_transactions_by_sender(
808 &self,
809 sender: Address,
810 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
811 let sender_id = self.get_sender_id(sender);
812 self.get_pool_data().get_transactions_by_sender(sender_id)
813 }
814
815 pub fn get_queued_transactions_by_sender(
817 &self,
818 sender: Address,
819 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
820 let sender_id = self.get_sender_id(sender);
821 self.get_pool_data().queued_txs_by_sender(sender_id)
822 }
823
824 pub fn pending_transactions_with_predicate(
826 &self,
827 predicate: impl FnMut(&ValidPoolTransaction<T::Transaction>) -> bool,
828 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
829 self.get_pool_data().pending_transactions_with_predicate(predicate)
830 }
831
832 pub fn get_pending_transactions_by_sender(
834 &self,
835 sender: Address,
836 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
837 let sender_id = self.get_sender_id(sender);
838 self.get_pool_data().pending_txs_by_sender(sender_id)
839 }
840
841 pub fn get_highest_transaction_by_sender(
843 &self,
844 sender: Address,
845 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
846 let sender_id = self.get_sender_id(sender);
847 self.get_pool_data().get_highest_transaction_by_sender(sender_id)
848 }
849
850 pub fn get_highest_consecutive_transaction_by_sender(
852 &self,
853 sender: Address,
854 on_chain_nonce: u64,
855 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
856 let sender_id = self.get_sender_id(sender);
857 self.get_pool_data().get_highest_consecutive_transaction_by_sender(
858 sender_id.into_transaction_id(on_chain_nonce),
859 )
860 }
861
862 pub fn get_transaction_by_transaction_id(
864 &self,
865 transaction_id: &TransactionId,
866 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
867 self.get_pool_data().all().get(transaction_id).map(|tx| tx.transaction.clone())
868 }
869
870 pub fn get_transactions_by_origin(
872 &self,
873 origin: TransactionOrigin,
874 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
875 self.get_pool_data()
876 .all()
877 .transactions_iter()
878 .filter(|tx| tx.origin == origin)
879 .cloned()
880 .collect()
881 }
882
883 pub fn get_pending_transactions_by_origin(
885 &self,
886 origin: TransactionOrigin,
887 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
888 self.get_pool_data().pending_transactions_iter().filter(|tx| tx.origin == origin).collect()
889 }
890
891 pub fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
895 if txs.is_empty() {
896 return Vec::new()
897 }
898 self.get_pool_data().get_all(txs).collect()
899 }
900
901 pub fn on_propagated(&self, txs: PropagatedTransactions) {
903 if txs.0.is_empty() {
904 return
905 }
906 let mut listener = self.event_listener.write();
907
908 txs.0.into_iter().for_each(|(hash, peers)| listener.propagated(&hash, peers))
909 }
910
911 pub fn len(&self) -> usize {
913 self.get_pool_data().len()
914 }
915
916 pub fn is_empty(&self) -> bool {
918 self.get_pool_data().is_empty()
919 }
920
921 pub fn is_exceeded(&self) -> bool {
923 self.pool.read().is_exceeded()
924 }
925
926 fn insert_blob(&self, hash: TxHash, blob: BlobTransactionSidecarVariant) {
928 debug!(target: "txpool", "[{:?}] storing blob sidecar", hash);
929 if let Err(err) = self.blob_store.insert(hash, blob) {
930 warn!(target: "txpool", %err, "[{:?}] failed to insert blob", hash);
931 self.blob_store_metrics.blobstore_failed_inserts.increment(1);
932 }
933 self.update_blob_store_metrics();
934 }
935
936 pub fn delete_blob(&self, blob: TxHash) {
938 let _ = self.blob_store.delete(blob);
939 }
940
941 pub fn delete_blobs(&self, txs: Vec<TxHash>) {
943 let _ = self.blob_store.delete_all(txs);
944 }
945
946 pub fn cleanup_blobs(&self) {
948 let stat = self.blob_store.cleanup();
949 self.blob_store_metrics.blobstore_failed_deletes.increment(stat.delete_failed as u64);
950 self.update_blob_store_metrics();
951 }
952
953 fn update_blob_store_metrics(&self) {
954 if let Some(data_size) = self.blob_store.data_size_hint() {
955 self.blob_store_metrics.blobstore_byte_size.set(data_size as f64);
956 }
957 self.blob_store_metrics.blobstore_entries.set(self.blob_store.blobs_len() as f64);
958 }
959
960 fn delete_discarded_blobs<'a>(
962 &'a self,
963 transactions: impl IntoIterator<Item = &'a Arc<ValidPoolTransaction<T::Transaction>>>,
964 ) {
965 let blob_txs = transactions
966 .into_iter()
967 .filter(|tx| tx.transaction.is_eip4844())
968 .map(|tx| *tx.hash())
969 .collect();
970 self.delete_blobs(blob_txs);
971 }
972}
973
974impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {
975 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
976 f.debug_struct("PoolInner").field("config", &self.config).finish_non_exhaustive()
977 }
978}
979
980#[derive(Debug, Clone)]
982pub struct AddedPendingTransaction<T: PoolTransaction> {
983 transaction: Arc<ValidPoolTransaction<T>>,
985 replaced: Option<Arc<ValidPoolTransaction<T>>>,
987 promoted: Vec<Arc<ValidPoolTransaction<T>>>,
989 discarded: Vec<Arc<ValidPoolTransaction<T>>>,
991}
992
993impl<T: PoolTransaction> AddedPendingTransaction<T> {
994 pub(crate) fn pending_transactions(
1000 &self,
1001 kind: TransactionListenerKind,
1002 ) -> impl Iterator<Item = B256> + '_ {
1003 let iter = std::iter::once(&self.transaction).chain(self.promoted.iter());
1004 PendingTransactionIter { kind, iter }
1005 }
1006
1007 pub(crate) fn is_propagate_allowed(&self) -> bool {
1009 self.transaction.propagate
1010 }
1011}
1012
1013pub(crate) struct PendingTransactionIter<Iter> {
1014 kind: TransactionListenerKind,
1015 iter: Iter,
1016}
1017
1018impl<'a, Iter, T> Iterator for PendingTransactionIter<Iter>
1019where
1020 Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1021 T: PoolTransaction + 'a,
1022{
1023 type Item = B256;
1024
1025 fn next(&mut self) -> Option<Self::Item> {
1026 loop {
1027 let next = self.iter.next()?;
1028 if self.kind.is_propagate_only() && !next.propagate {
1029 continue
1030 }
1031 return Some(*next.hash())
1032 }
1033 }
1034}
1035
1036pub(crate) struct FullPendingTransactionIter<Iter> {
1038 kind: TransactionListenerKind,
1039 iter: Iter,
1040}
1041
1042impl<'a, Iter, T> Iterator for FullPendingTransactionIter<Iter>
1043where
1044 Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1045 T: PoolTransaction + 'a,
1046{
1047 type Item = NewTransactionEvent<T>;
1048
1049 fn next(&mut self) -> Option<Self::Item> {
1050 loop {
1051 let next = self.iter.next()?;
1052 if self.kind.is_propagate_only() && !next.propagate {
1053 continue
1054 }
1055 return Some(NewTransactionEvent {
1056 subpool: SubPool::Pending,
1057 transaction: next.clone(),
1058 })
1059 }
1060 }
1061}
1062
1063#[derive(Debug, Clone)]
1065pub enum AddedTransaction<T: PoolTransaction> {
1066 Pending(AddedPendingTransaction<T>),
1068 Parked {
1071 transaction: Arc<ValidPoolTransaction<T>>,
1073 replaced: Option<Arc<ValidPoolTransaction<T>>>,
1075 subpool: SubPool,
1077 },
1078}
1079
1080impl<T: PoolTransaction> AddedTransaction<T> {
1081 pub(crate) const fn as_pending(&self) -> Option<&AddedPendingTransaction<T>> {
1083 match self {
1084 Self::Pending(tx) => Some(tx),
1085 _ => None,
1086 }
1087 }
1088
1089 pub(crate) const fn replaced(&self) -> Option<&Arc<ValidPoolTransaction<T>>> {
1091 match self {
1092 Self::Pending(tx) => tx.replaced.as_ref(),
1093 Self::Parked { replaced, .. } => replaced.as_ref(),
1094 }
1095 }
1096
1097 pub(crate) fn discarded_transactions(&self) -> Option<&[Arc<ValidPoolTransaction<T>>]> {
1099 match self {
1100 Self::Pending(tx) => Some(&tx.discarded),
1101 Self::Parked { .. } => None,
1102 }
1103 }
1104
1105 pub(crate) fn replaced_blob_transaction(&self) -> Option<B256> {
1107 self.replaced().filter(|tx| tx.transaction.is_eip4844()).map(|tx| *tx.transaction.hash())
1108 }
1109
1110 pub(crate) fn hash(&self) -> &TxHash {
1112 match self {
1113 Self::Pending(tx) => tx.transaction.hash(),
1114 Self::Parked { transaction, .. } => transaction.hash(),
1115 }
1116 }
1117
1118 pub(crate) fn into_new_transaction_event(self) -> NewTransactionEvent<T> {
1120 match self {
1121 Self::Pending(tx) => {
1122 NewTransactionEvent { subpool: SubPool::Pending, transaction: tx.transaction }
1123 }
1124 Self::Parked { transaction, subpool, .. } => {
1125 NewTransactionEvent { transaction, subpool }
1126 }
1127 }
1128 }
1129
1130 #[cfg(test)]
1132 pub(crate) const fn subpool(&self) -> SubPool {
1133 match self {
1134 Self::Pending(_) => SubPool::Pending,
1135 Self::Parked { subpool, .. } => *subpool,
1136 }
1137 }
1138
1139 #[cfg(test)]
1141 pub(crate) fn id(&self) -> &TransactionId {
1142 match self {
1143 Self::Pending(added) => added.transaction.id(),
1144 Self::Parked { transaction, .. } => transaction.id(),
1145 }
1146 }
1147}
1148
1149#[derive(Debug)]
1151pub(crate) struct OnNewCanonicalStateOutcome<T: PoolTransaction> {
1152 pub(crate) block_hash: B256,
1154 pub(crate) mined: Vec<TxHash>,
1156 pub(crate) promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1158 pub(crate) discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1160}
1161
1162impl<T: PoolTransaction> OnNewCanonicalStateOutcome<T> {
1163 pub(crate) fn pending_transactions(
1169 &self,
1170 kind: TransactionListenerKind,
1171 ) -> impl Iterator<Item = B256> + '_ {
1172 let iter = self.promoted.iter();
1173 PendingTransactionIter { kind, iter }
1174 }
1175
1176 pub(crate) fn full_pending_transactions(
1182 &self,
1183 kind: TransactionListenerKind,
1184 ) -> impl Iterator<Item = NewTransactionEvent<T>> + '_ {
1185 let iter = self.promoted.iter();
1186 FullPendingTransactionIter { kind, iter }
1187 }
1188}
1189
1190#[cfg(test)]
1191mod tests {
1192 use crate::{
1193 blobstore::{BlobStore, InMemoryBlobStore},
1194 test_utils::{MockTransaction, TestPoolBuilder},
1195 validate::ValidTransaction,
1196 BlockInfo, PoolConfig, SubPoolLimit, TransactionOrigin, TransactionValidationOutcome, U256,
1197 };
1198 use alloy_eips::{eip4844::BlobTransactionSidecar, eip7594::BlobTransactionSidecarVariant};
1199 use std::{fs, path::PathBuf};
1200
1201 #[test]
1202 fn test_discard_blobs_on_blob_tx_eviction() {
1203 let blobs = {
1204 let json_content = fs::read_to_string(
1206 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test_data/blob1.json"),
1207 )
1208 .expect("Failed to read the blob data file");
1209
1210 let json_value: serde_json::Value =
1212 serde_json::from_str(&json_content).expect("Failed to deserialize JSON");
1213
1214 vec![
1216 json_value
1218 .get("data")
1219 .unwrap()
1220 .as_str()
1221 .expect("Data is not a valid string")
1222 .to_string(),
1223 ]
1224 };
1225
1226 let sidecar = BlobTransactionSidecarVariant::Eip4844(
1228 BlobTransactionSidecar::try_from_blobs_hex(blobs).unwrap(),
1229 );
1230
1231 let blob_limit = SubPoolLimit::new(1000, usize::MAX);
1233
1234 let test_pool = &TestPoolBuilder::default()
1236 .with_config(PoolConfig { blob_limit, ..Default::default() })
1237 .pool;
1238
1239 test_pool
1241 .set_block_info(BlockInfo { pending_blob_fee: Some(10_000_000), ..Default::default() });
1242
1243 let blob_store = InMemoryBlobStore::default();
1245
1246 for n in 0..blob_limit.max_txs + 10 {
1248 let mut tx = MockTransaction::eip4844_with_sidecar(sidecar.clone());
1250
1251 tx.set_size(1844674407370951);
1253
1254 if n < blob_limit.max_txs {
1256 blob_store.insert(*tx.get_hash(), sidecar.clone()).unwrap();
1257 }
1258
1259 test_pool.add_transactions(
1261 TransactionOrigin::External,
1262 [TransactionValidationOutcome::Valid {
1263 balance: U256::from(1_000),
1264 state_nonce: 0,
1265 bytecode_hash: None,
1266 transaction: ValidTransaction::ValidWithSidecar {
1267 transaction: tx,
1268 sidecar: sidecar.clone(),
1269 },
1270 propagate: true,
1271 authorities: None,
1272 }],
1273 );
1274 }
1275
1276 assert_eq!(test_pool.size().blob, blob_limit.max_txs);
1278
1279 assert_eq!(test_pool.size().blob_size, 1844674407370951000);
1281
1282 assert_eq!(*test_pool.blob_store(), blob_store);
1284 }
1285}