1use crate::{
69 error::{PoolError, PoolErrorKind, PoolResult},
70 identifier::{SenderId, SenderIdentifiers, TransactionId},
71 pool::{
72 listener::PoolEventBroadcast,
73 state::SubPool,
74 txpool::{SenderInfo, TxPool},
75 },
76 traits::{
77 AllPoolTransactions, BestTransactionsAttributes, BlockInfo, NewTransactionEvent, PoolSize,
78 PoolTransaction, PropagatedTransactions, TransactionOrigin,
79 },
80 validate::{TransactionValidationOutcome, ValidPoolTransaction},
81 CanonicalStateUpdate, EthPoolTransaction, PoolConfig, TransactionOrdering,
82 TransactionValidator,
83};
84use alloy_primitives::{Address, TxHash, B256};
85use best::BestTransactions;
86use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
87use reth_eth_wire_types::HandleMempoolData;
88use reth_execution_types::ChangedAccount;
89
90use alloy_eips::eip4844::BlobTransactionSidecar;
91use reth_primitives::RecoveredTx;
92use rustc_hash::FxHashMap;
93use std::{collections::HashSet, fmt, sync::Arc, time::Instant};
94use tokio::sync::mpsc;
95use tracing::{debug, trace, warn};
96mod events;
97use crate::{
98 blobstore::BlobStore,
99 metrics::BlobStoreMetrics,
100 pool::txpool::UpdateOutcome,
101 traits::{GetPooledTransactionLimit, NewBlobSidecar, TransactionListenerKind},
102 validate::ValidTransaction,
103};
104pub use best::{
105 BestPayloadTransactions, BestTransactionFilter, BestTransactionsWithPrioritizedSenders,
106};
107pub use blob::{blob_tx_priority, fee_delta};
108pub use events::{FullTransactionEvent, TransactionEvent};
109pub use listener::{AllTransactionsEvents, TransactionEvents};
110pub use parked::{BasefeeOrd, ParkedOrd, ParkedPool, QueuedOrd};
111pub use pending::PendingPool;
112
113mod best;
114mod blob;
115mod listener;
116mod parked;
117pub(crate) mod pending;
118pub(crate) mod size;
119pub(crate) mod state;
120pub mod txpool;
121mod update;
122
123pub const PENDING_TX_LISTENER_BUFFER_SIZE: usize = 2048;
125pub const NEW_TX_LISTENER_BUFFER_SIZE: usize = 1024;
127
128const BLOB_SIDECAR_LISTENER_BUFFER_SIZE: usize = 512;
129
130pub struct PoolInner<V, T, S>
132where
133 T: TransactionOrdering,
134{
135 identifiers: RwLock<SenderIdentifiers>,
137 validator: V,
139 blob_store: S,
141 pool: RwLock<TxPool<T>>,
143 config: PoolConfig,
145 event_listener: RwLock<PoolEventBroadcast<T::Transaction>>,
147 pending_transaction_listener: Mutex<Vec<PendingTransactionHashListener>>,
149 transaction_listener: Mutex<Vec<TransactionListener<T::Transaction>>>,
151 blob_transaction_sidecar_listener: Mutex<Vec<BlobTransactionSidecarListener>>,
153 blob_store_metrics: BlobStoreMetrics,
155}
156
157impl<V, T, S> PoolInner<V, T, S>
160where
161 V: TransactionValidator,
162 T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
163 S: BlobStore,
164{
165 pub fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
167 Self {
168 identifiers: Default::default(),
169 validator,
170 event_listener: Default::default(),
171 pool: RwLock::new(TxPool::new(ordering, config.clone())),
172 pending_transaction_listener: Default::default(),
173 transaction_listener: Default::default(),
174 blob_transaction_sidecar_listener: Default::default(),
175 config,
176 blob_store,
177 blob_store_metrics: Default::default(),
178 }
179 }
180
181 pub const fn blob_store(&self) -> &S {
183 &self.blob_store
184 }
185
186 pub fn size(&self) -> PoolSize {
188 self.get_pool_data().size()
189 }
190
191 pub fn block_info(&self) -> BlockInfo {
193 self.get_pool_data().block_info()
194 }
195 pub fn set_block_info(&self, info: BlockInfo) {
197 self.pool.write().set_block_info(info)
198 }
199
200 pub fn get_sender_id(&self, addr: Address) -> SenderId {
202 self.identifiers.write().sender_id_or_create(addr)
203 }
204
205 pub fn unique_senders(&self) -> HashSet<Address> {
207 self.get_pool_data().unique_senders()
208 }
209
210 fn changed_senders(
213 &self,
214 accs: impl Iterator<Item = ChangedAccount>,
215 ) -> FxHashMap<SenderId, SenderInfo> {
216 let mut identifiers = self.identifiers.write();
217 accs.into_iter()
218 .map(|acc| {
219 let ChangedAccount { address, nonce, balance } = acc;
220 let sender_id = identifiers.sender_id_or_create(address);
221 (sender_id, SenderInfo { state_nonce: nonce, balance })
222 })
223 .collect()
224 }
225
226 pub const fn config(&self) -> &PoolConfig {
228 &self.config
229 }
230
231 pub const fn validator(&self) -> &V {
233 &self.validator
234 }
235
236 pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver<TxHash> {
239 let (sender, rx) = mpsc::channel(self.config.pending_tx_listener_buffer_size);
240 let listener = PendingTransactionHashListener { sender, kind };
241 self.pending_transaction_listener.lock().push(listener);
242 rx
243 }
244
245 pub fn add_new_transaction_listener(
247 &self,
248 kind: TransactionListenerKind,
249 ) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
250 let (sender, rx) = mpsc::channel(self.config.new_tx_listener_buffer_size);
251 let listener = TransactionListener { sender, kind };
252 self.transaction_listener.lock().push(listener);
253 rx
254 }
255 pub fn add_blob_sidecar_listener(&self) -> mpsc::Receiver<NewBlobSidecar> {
258 let (sender, rx) = mpsc::channel(BLOB_SIDECAR_LISTENER_BUFFER_SIZE);
259 let listener = BlobTransactionSidecarListener { sender };
260 self.blob_transaction_sidecar_listener.lock().push(listener);
261 rx
262 }
263
264 pub fn add_transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
267 self.get_pool_data()
268 .contains(&tx_hash)
269 .then(|| self.event_listener.write().subscribe(tx_hash))
270 }
271
272 pub fn add_all_transactions_event_listener(&self) -> AllTransactionsEvents<T::Transaction> {
274 self.event_listener.write().subscribe_all()
275 }
276
277 pub fn get_pool_data(&self) -> RwLockReadGuard<'_, TxPool<T>> {
279 self.pool.read()
280 }
281
282 pub fn pooled_transactions_hashes(&self) -> Vec<TxHash> {
284 self.get_pool_data()
285 .all()
286 .transactions_iter()
287 .filter(|tx| tx.propagate)
288 .map(|tx| *tx.hash())
289 .collect()
290 }
291
292 pub fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
294 self.get_pool_data().all().transactions_iter().filter(|tx| tx.propagate).cloned().collect()
295 }
296
297 pub fn pooled_transactions_max(
299 &self,
300 max: usize,
301 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
302 self.get_pool_data()
303 .all()
304 .transactions_iter()
305 .filter(|tx| tx.propagate)
306 .take(max)
307 .cloned()
308 .collect()
309 }
310
311 fn to_pooled_transaction(
316 &self,
317 transaction: Arc<ValidPoolTransaction<T::Transaction>>,
318 ) -> Option<RecoveredTx<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
319 where
320 <V as TransactionValidator>::Transaction: EthPoolTransaction,
321 {
322 if transaction.is_eip4844() {
323 let sidecar = self.blob_store.get(*transaction.hash()).ok()??;
324 transaction.transaction.clone().try_into_pooled_eip4844(sidecar)
325 } else {
326 transaction
327 .transaction
328 .clone()
329 .try_into_pooled()
330 .inspect_err(|err| {
331 debug!(
332 target: "txpool", %err,
333 "failed to convert transaction to pooled element; skipping",
334 );
335 })
336 .ok()
337 }
338 }
339
340 pub fn get_pooled_transaction_elements(
342 &self,
343 tx_hashes: Vec<TxHash>,
344 limit: GetPooledTransactionLimit,
345 ) -> Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>
346 where
347 <V as TransactionValidator>::Transaction: EthPoolTransaction,
348 {
349 let transactions = self.get_all(tx_hashes);
350 let mut elements = Vec::with_capacity(transactions.len());
351 let mut size = 0;
352 for transaction in transactions {
353 let encoded_len = transaction.encoded_length();
354 let Some(pooled) = self.to_pooled_transaction(transaction) else {
355 continue;
356 };
357
358 size += encoded_len;
359 elements.push(pooled.into_signed());
360
361 if limit.exceeds(size) {
362 break
363 }
364 }
365
366 elements
367 }
368
369 pub fn get_pooled_transaction_element(
371 &self,
372 tx_hash: TxHash,
373 ) -> Option<RecoveredTx<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
374 where
375 <V as TransactionValidator>::Transaction: EthPoolTransaction,
376 {
377 self.get(&tx_hash).and_then(|tx| self.to_pooled_transaction(tx))
378 }
379
380 pub fn on_canonical_state_change(&self, update: CanonicalStateUpdate<'_>) {
382 trace!(target: "txpool", ?update, "updating pool on canonical state change");
383
384 let block_info = update.block_info();
385 let CanonicalStateUpdate {
386 new_tip, changed_accounts, mined_transactions, update_kind, ..
387 } = update;
388 self.validator.on_new_head_block(new_tip);
389
390 let changed_senders = self.changed_senders(changed_accounts.into_iter());
391
392 let outcome = self.pool.write().on_canonical_state_change(
394 block_info,
395 mined_transactions,
396 changed_senders,
397 update_kind,
398 );
399
400 self.delete_discarded_blobs(outcome.discarded.iter());
402
403 self.notify_on_new_state(outcome);
405 }
406
407 pub fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
411 let changed_senders = self.changed_senders(accounts.into_iter());
412 let UpdateOutcome { promoted, discarded } =
413 self.pool.write().update_accounts(changed_senders);
414 let mut listener = self.event_listener.write();
415
416 promoted.iter().for_each(|tx| listener.pending(tx.hash(), None));
417 discarded.iter().for_each(|tx| listener.discarded(tx.hash()));
418
419 self.delete_discarded_blobs(discarded.iter());
422 }
423
424 fn add_transaction(
429 &self,
430 pool: &mut RwLockWriteGuard<'_, TxPool<T>>,
431 origin: TransactionOrigin,
432 tx: TransactionValidationOutcome<T::Transaction>,
433 ) -> PoolResult<TxHash> {
434 match tx {
435 TransactionValidationOutcome::Valid {
436 balance,
437 state_nonce,
438 transaction,
439 propagate,
440 } => {
441 let sender_id = self.get_sender_id(transaction.sender());
442 let transaction_id = TransactionId::new(sender_id, transaction.nonce());
443
444 let (transaction, maybe_sidecar) = match transaction {
446 ValidTransaction::Valid(tx) => (tx, None),
447 ValidTransaction::ValidWithSidecar { transaction, sidecar } => {
448 debug_assert!(
449 transaction.is_eip4844(),
450 "validator returned sidecar for non EIP-4844 transaction"
451 );
452 (transaction, Some(sidecar))
453 }
454 };
455
456 let tx = ValidPoolTransaction {
457 transaction,
458 transaction_id,
459 propagate,
460 timestamp: Instant::now(),
461 origin,
462 };
463
464 let added = pool.add_transaction(tx, balance, state_nonce)?;
465 let hash = *added.hash();
466
467 if let Some(sidecar) = maybe_sidecar {
469 self.on_new_blob_sidecar(&hash, &sidecar);
471 self.insert_blob(hash, sidecar);
473 }
474
475 if let Some(replaced) = added.replaced_blob_transaction() {
476 debug!(target: "txpool", "[{:?}] delete replaced blob sidecar", replaced);
477 self.delete_blob(replaced);
479 }
480
481 if let Some(pending) = added.as_pending() {
483 self.on_new_pending_transaction(pending);
484 }
485
486 self.notify_event_listeners(&added);
488
489 if let Some(discarded) = added.discarded_transactions() {
490 self.delete_discarded_blobs(discarded.iter());
491 }
492
493 self.on_new_transaction(added.into_new_transaction_event());
495
496 Ok(hash)
497 }
498 TransactionValidationOutcome::Invalid(tx, err) => {
499 let mut listener = self.event_listener.write();
500 listener.discarded(tx.hash());
501 Err(PoolError::new(*tx.hash(), err))
502 }
503 TransactionValidationOutcome::Error(tx_hash, err) => {
504 let mut listener = self.event_listener.write();
505 listener.discarded(&tx_hash);
506 Err(PoolError::other(tx_hash, err))
507 }
508 }
509 }
510
511 pub fn add_transaction_and_subscribe(
513 &self,
514 origin: TransactionOrigin,
515 tx: TransactionValidationOutcome<T::Transaction>,
516 ) -> PoolResult<TransactionEvents> {
517 let listener = {
518 let mut listener = self.event_listener.write();
519 listener.subscribe(tx.tx_hash())
520 };
521 let mut results = self.add_transactions(origin, std::iter::once(tx));
522 results.pop().expect("result length is the same as the input")?;
523 Ok(listener)
524 }
525
526 pub fn add_transactions(
532 &self,
533 origin: TransactionOrigin,
534 transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
535 ) -> Vec<PoolResult<TxHash>> {
536 let (mut added, discarded) = {
538 let mut pool = self.pool.write();
539 let added = transactions
540 .into_iter()
541 .map(|tx| self.add_transaction(&mut pool, origin, tx))
542 .collect::<Vec<_>>();
543
544 let discarded = if added.iter().any(Result::is_ok) {
546 pool.discard_worst()
547 } else {
548 Default::default()
549 };
550
551 (added, discarded)
552 };
553
554 if !discarded.is_empty() {
555 self.delete_discarded_blobs(discarded.iter());
557
558 let discarded_hashes =
559 discarded.into_iter().map(|tx| *tx.hash()).collect::<HashSet<_>>();
560
561 {
562 let mut listener = self.event_listener.write();
563 discarded_hashes.iter().for_each(|hash| listener.discarded(hash));
564 }
565
566 for res in &mut added {
569 if let Ok(hash) = res {
570 if discarded_hashes.contains(hash) {
571 *res = Err(PoolError::new(*hash, PoolErrorKind::DiscardedOnInsert))
572 }
573 }
574 }
575 }
576
577 added
578 }
579
580 fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction<T::Transaction>) {
582 let propagate_allowed = pending.is_propagate_allowed();
583
584 let mut transaction_listeners = self.pending_transaction_listener.lock();
585 transaction_listeners.retain_mut(|listener| {
586 if listener.kind.is_propagate_only() && !propagate_allowed {
587 return !listener.sender.is_closed()
590 }
591
592 listener.send_all(pending.pending_transactions(listener.kind))
594 });
595 }
596
597 fn on_new_transaction(&self, event: NewTransactionEvent<T::Transaction>) {
599 let mut transaction_listeners = self.transaction_listener.lock();
600 transaction_listeners.retain_mut(|listener| {
601 if listener.kind.is_propagate_only() && !event.transaction.propagate {
602 return !listener.sender.is_closed()
605 }
606
607 listener.send(event.clone())
608 });
609 }
610
611 fn on_new_blob_sidecar(&self, tx_hash: &TxHash, sidecar: &BlobTransactionSidecar) {
613 let mut sidecar_listeners = self.blob_transaction_sidecar_listener.lock();
614 if sidecar_listeners.is_empty() {
615 return
616 }
617 let sidecar = Arc::new(sidecar.clone());
618 sidecar_listeners.retain_mut(|listener| {
619 let new_blob_event = NewBlobSidecar { tx_hash: *tx_hash, sidecar: sidecar.clone() };
620 match listener.sender.try_send(new_blob_event) {
621 Ok(()) => true,
622 Err(err) => {
623 if matches!(err, mpsc::error::TrySendError::Full(_)) {
624 debug!(
625 target: "txpool",
626 "[{:?}] failed to send blob sidecar; channel full",
627 sidecar,
628 );
629 true
630 } else {
631 false
632 }
633 }
634 }
635 })
636 }
637
638 fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) {
640 trace!(target: "txpool", promoted=outcome.promoted.len(), discarded= outcome.discarded.len() ,"notifying listeners on state change");
641
642 self.pending_transaction_listener
645 .lock()
646 .retain_mut(|listener| listener.send_all(outcome.pending_transactions(listener.kind)));
647
648 self.transaction_listener.lock().retain_mut(|listener| {
650 listener.send_all(outcome.full_pending_transactions(listener.kind))
651 });
652
653 let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;
654
655 let mut listener = self.event_listener.write();
657
658 mined.iter().for_each(|tx| listener.mined(tx, block_hash));
659 promoted.iter().for_each(|tx| listener.pending(tx.hash(), None));
660 discarded.iter().for_each(|tx| listener.discarded(tx.hash()));
661 }
662
663 fn notify_event_listeners(&self, tx: &AddedTransaction<T::Transaction>) {
665 let mut listener = self.event_listener.write();
666
667 match tx {
668 AddedTransaction::Pending(tx) => {
669 let AddedPendingTransaction { transaction, promoted, discarded, replaced } = tx;
670
671 listener.pending(transaction.hash(), replaced.clone());
672 promoted.iter().for_each(|tx| listener.pending(tx.hash(), None));
673 discarded.iter().for_each(|tx| listener.discarded(tx.hash()));
674 }
675 AddedTransaction::Parked { transaction, replaced, .. } => {
676 listener.queued(transaction.hash());
677 if let Some(replaced) = replaced {
678 listener.replaced(replaced.clone(), *transaction.hash());
679 }
680 }
681 }
682 }
683
684 pub fn best_transactions(&self) -> BestTransactions<T> {
686 self.get_pool_data().best_transactions()
687 }
688
689 pub fn best_transactions_with_attributes(
692 &self,
693 best_transactions_attributes: BestTransactionsAttributes,
694 ) -> Box<dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T::Transaction>>>>
695 {
696 self.get_pool_data().best_transactions_with_attributes(best_transactions_attributes)
697 }
698
699 pub fn pending_transactions_max(
701 &self,
702 max: usize,
703 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
704 self.get_pool_data().pending_transactions_iter().take(max).collect()
705 }
706
707 pub fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
709 self.get_pool_data().pending_transactions()
710 }
711
712 pub fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
714 self.get_pool_data().queued_transactions()
715 }
716
717 pub fn all_transactions(&self) -> AllPoolTransactions<T::Transaction> {
719 let pool = self.get_pool_data();
720 AllPoolTransactions {
721 pending: pool.pending_transactions(),
722 queued: pool.queued_transactions(),
723 }
724 }
725
726 pub fn remove_transactions(
728 &self,
729 hashes: Vec<TxHash>,
730 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
731 if hashes.is_empty() {
732 return Vec::new()
733 }
734 let removed = self.pool.write().remove_transactions(hashes);
735
736 let mut listener = self.event_listener.write();
737
738 removed.iter().for_each(|tx| listener.discarded(tx.hash()));
739
740 removed
741 }
742
743 pub fn remove_transactions_and_descendants(
746 &self,
747 hashes: Vec<TxHash>,
748 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
749 if hashes.is_empty() {
750 return Vec::new()
751 }
752 let removed = self.pool.write().remove_transactions_and_descendants(hashes);
753
754 let mut listener = self.event_listener.write();
755
756 removed.iter().for_each(|tx| listener.discarded(tx.hash()));
757
758 removed
759 }
760
761 pub fn remove_transactions_by_sender(
763 &self,
764 sender: Address,
765 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
766 let sender_id = self.get_sender_id(sender);
767 let removed = self.pool.write().remove_transactions_by_sender(sender_id);
768
769 let mut listener = self.event_listener.write();
770
771 removed.iter().for_each(|tx| listener.discarded(tx.hash()));
772
773 removed
774 }
775
776 pub fn retain_unknown<A>(&self, announcement: &mut A)
778 where
779 A: HandleMempoolData,
780 {
781 if announcement.is_empty() {
782 return
783 }
784 let pool = self.get_pool_data();
785 announcement.retain_by_hash(|tx| !pool.contains(tx))
786 }
787
788 pub fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
790 self.get_pool_data().get(tx_hash)
791 }
792
793 pub fn get_transactions_by_sender(
795 &self,
796 sender: Address,
797 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
798 let sender_id = self.get_sender_id(sender);
799 self.get_pool_data().get_transactions_by_sender(sender_id)
800 }
801
802 pub fn get_queued_transactions_by_sender(
804 &self,
805 sender: Address,
806 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
807 let sender_id = self.get_sender_id(sender);
808 self.get_pool_data().pending_txs_by_sender(sender_id)
809 }
810
811 pub fn pending_transactions_with_predicate(
813 &self,
814 predicate: impl FnMut(&ValidPoolTransaction<T::Transaction>) -> bool,
815 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
816 self.get_pool_data().pending_transactions_with_predicate(predicate)
817 }
818
819 pub fn get_pending_transactions_by_sender(
821 &self,
822 sender: Address,
823 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
824 let sender_id = self.get_sender_id(sender);
825 self.get_pool_data().queued_txs_by_sender(sender_id)
826 }
827
828 pub fn get_highest_transaction_by_sender(
830 &self,
831 sender: Address,
832 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
833 let sender_id = self.get_sender_id(sender);
834 self.get_pool_data().get_highest_transaction_by_sender(sender_id)
835 }
836
837 pub fn get_highest_consecutive_transaction_by_sender(
839 &self,
840 sender: Address,
841 on_chain_nonce: u64,
842 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
843 let sender_id = self.get_sender_id(sender);
844 self.get_pool_data().get_highest_consecutive_transaction_by_sender(
845 sender_id.into_transaction_id(on_chain_nonce),
846 )
847 }
848
849 pub fn get_transaction_by_transaction_id(
851 &self,
852 transaction_id: &TransactionId,
853 ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
854 self.get_pool_data().all().get(transaction_id).map(|tx| tx.transaction.clone())
855 }
856
857 pub fn get_transactions_by_origin(
859 &self,
860 origin: TransactionOrigin,
861 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
862 self.get_pool_data()
863 .all()
864 .transactions_iter()
865 .filter(|tx| tx.origin == origin)
866 .cloned()
867 .collect()
868 }
869
870 pub fn get_pending_transactions_by_origin(
872 &self,
873 origin: TransactionOrigin,
874 ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
875 self.get_pool_data().pending_transactions_iter().filter(|tx| tx.origin == origin).collect()
876 }
877
878 pub fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
882 if txs.is_empty() {
883 return Vec::new()
884 }
885 self.get_pool_data().get_all(txs).collect()
886 }
887
888 pub fn on_propagated(&self, txs: PropagatedTransactions) {
890 if txs.0.is_empty() {
891 return
892 }
893 let mut listener = self.event_listener.write();
894
895 txs.0.into_iter().for_each(|(hash, peers)| listener.propagated(&hash, peers))
896 }
897
898 pub fn len(&self) -> usize {
900 self.get_pool_data().len()
901 }
902
903 pub fn is_empty(&self) -> bool {
905 self.get_pool_data().is_empty()
906 }
907
908 pub fn is_exceeded(&self) -> bool {
910 self.pool.read().is_exceeded()
911 }
912
913 fn insert_blob(&self, hash: TxHash, blob: BlobTransactionSidecar) {
915 debug!(target: "txpool", "[{:?}] storing blob sidecar", hash);
916 if let Err(err) = self.blob_store.insert(hash, blob) {
917 warn!(target: "txpool", %err, "[{:?}] failed to insert blob", hash);
918 self.blob_store_metrics.blobstore_failed_inserts.increment(1);
919 }
920 self.update_blob_store_metrics();
921 }
922
923 pub fn delete_blob(&self, blob: TxHash) {
925 let _ = self.blob_store.delete(blob);
926 }
927
928 pub fn delete_blobs(&self, txs: Vec<TxHash>) {
930 let _ = self.blob_store.delete_all(txs);
931 }
932
933 pub fn cleanup_blobs(&self) {
935 let stat = self.blob_store.cleanup();
936 self.blob_store_metrics.blobstore_failed_deletes.increment(stat.delete_failed as u64);
937 self.update_blob_store_metrics();
938 }
939
940 fn update_blob_store_metrics(&self) {
941 if let Some(data_size) = self.blob_store.data_size_hint() {
942 self.blob_store_metrics.blobstore_byte_size.set(data_size as f64);
943 }
944 self.blob_store_metrics.blobstore_entries.set(self.blob_store.blobs_len() as f64);
945 }
946
947 fn delete_discarded_blobs<'a>(
949 &'a self,
950 transactions: impl IntoIterator<Item = &'a Arc<ValidPoolTransaction<T::Transaction>>>,
951 ) {
952 let blob_txs = transactions
953 .into_iter()
954 .filter(|tx| tx.transaction.is_eip4844())
955 .map(|tx| *tx.hash())
956 .collect();
957 self.delete_blobs(blob_txs);
958 }
959}
960
961impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {
962 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
963 f.debug_struct("PoolInner").field("config", &self.config).finish_non_exhaustive()
964 }
965}
966
967#[derive(Debug)]
969struct PendingTransactionHashListener {
970 sender: mpsc::Sender<TxHash>,
971 kind: TransactionListenerKind,
973}
974
975impl PendingTransactionHashListener {
976 fn send_all(&self, hashes: impl IntoIterator<Item = TxHash>) -> bool {
980 for tx_hash in hashes {
981 match self.sender.try_send(tx_hash) {
982 Ok(()) => {}
983 Err(err) => {
984 return if matches!(err, mpsc::error::TrySendError::Full(_)) {
985 debug!(
986 target: "txpool",
987 "[{:?}] failed to send pending tx; channel full",
988 tx_hash,
989 );
990 true
991 } else {
992 false
993 }
994 }
995 }
996 }
997 true
998 }
999}
1000
1001#[derive(Debug)]
1003struct TransactionListener<T: PoolTransaction> {
1004 sender: mpsc::Sender<NewTransactionEvent<T>>,
1005 kind: TransactionListenerKind,
1007}
1008
1009impl<T: PoolTransaction> TransactionListener<T> {
1010 fn send(&self, event: NewTransactionEvent<T>) -> bool {
1014 self.send_all(std::iter::once(event))
1015 }
1016
1017 fn send_all(&self, events: impl IntoIterator<Item = NewTransactionEvent<T>>) -> bool {
1021 for event in events {
1022 match self.sender.try_send(event) {
1023 Ok(()) => {}
1024 Err(err) => {
1025 return if let mpsc::error::TrySendError::Full(event) = err {
1026 debug!(
1027 target: "txpool",
1028 "[{:?}] failed to send pending tx; channel full",
1029 event.transaction.hash(),
1030 );
1031 true
1032 } else {
1033 false
1034 }
1035 }
1036 }
1037 }
1038 true
1039 }
1040}
1041
1042#[derive(Debug)]
1044struct BlobTransactionSidecarListener {
1045 sender: mpsc::Sender<NewBlobSidecar>,
1046}
1047
1048#[derive(Debug, Clone)]
1050pub struct AddedPendingTransaction<T: PoolTransaction> {
1051 transaction: Arc<ValidPoolTransaction<T>>,
1053 replaced: Option<Arc<ValidPoolTransaction<T>>>,
1055 promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1057 discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1059}
1060
1061impl<T: PoolTransaction> AddedPendingTransaction<T> {
1062 pub(crate) fn pending_transactions(
1068 &self,
1069 kind: TransactionListenerKind,
1070 ) -> impl Iterator<Item = B256> + '_ {
1071 let iter = std::iter::once(&self.transaction).chain(self.promoted.iter());
1072 PendingTransactionIter { kind, iter }
1073 }
1074
1075 pub(crate) fn is_propagate_allowed(&self) -> bool {
1077 self.transaction.propagate
1078 }
1079}
1080
1081pub(crate) struct PendingTransactionIter<Iter> {
1082 kind: TransactionListenerKind,
1083 iter: Iter,
1084}
1085
1086impl<'a, Iter, T> Iterator for PendingTransactionIter<Iter>
1087where
1088 Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1089 T: PoolTransaction + 'a,
1090{
1091 type Item = B256;
1092
1093 fn next(&mut self) -> Option<Self::Item> {
1094 loop {
1095 let next = self.iter.next()?;
1096 if self.kind.is_propagate_only() && !next.propagate {
1097 continue
1098 }
1099 return Some(*next.hash())
1100 }
1101 }
1102}
1103
1104pub(crate) struct FullPendingTransactionIter<Iter> {
1106 kind: TransactionListenerKind,
1107 iter: Iter,
1108}
1109
1110impl<'a, Iter, T> Iterator for FullPendingTransactionIter<Iter>
1111where
1112 Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1113 T: PoolTransaction + 'a,
1114{
1115 type Item = NewTransactionEvent<T>;
1116
1117 fn next(&mut self) -> Option<Self::Item> {
1118 loop {
1119 let next = self.iter.next()?;
1120 if self.kind.is_propagate_only() && !next.propagate {
1121 continue
1122 }
1123 return Some(NewTransactionEvent {
1124 subpool: SubPool::Pending,
1125 transaction: next.clone(),
1126 })
1127 }
1128 }
1129}
1130
1131#[derive(Debug, Clone)]
1133pub enum AddedTransaction<T: PoolTransaction> {
1134 Pending(AddedPendingTransaction<T>),
1136 Parked {
1139 transaction: Arc<ValidPoolTransaction<T>>,
1141 replaced: Option<Arc<ValidPoolTransaction<T>>>,
1143 subpool: SubPool,
1145 },
1146}
1147
1148impl<T: PoolTransaction> AddedTransaction<T> {
1149 pub(crate) const fn as_pending(&self) -> Option<&AddedPendingTransaction<T>> {
1151 match self {
1152 Self::Pending(tx) => Some(tx),
1153 _ => None,
1154 }
1155 }
1156
1157 pub(crate) const fn replaced(&self) -> Option<&Arc<ValidPoolTransaction<T>>> {
1159 match self {
1160 Self::Pending(tx) => tx.replaced.as_ref(),
1161 Self::Parked { replaced, .. } => replaced.as_ref(),
1162 }
1163 }
1164
1165 pub(crate) fn discarded_transactions(&self) -> Option<&[Arc<ValidPoolTransaction<T>>]> {
1167 match self {
1168 Self::Pending(tx) => Some(&tx.discarded),
1169 Self::Parked { .. } => None,
1170 }
1171 }
1172
1173 pub(crate) fn replaced_blob_transaction(&self) -> Option<B256> {
1175 self.replaced().filter(|tx| tx.transaction.is_eip4844()).map(|tx| *tx.transaction.hash())
1176 }
1177
1178 pub(crate) fn hash(&self) -> &TxHash {
1180 match self {
1181 Self::Pending(tx) => tx.transaction.hash(),
1182 Self::Parked { transaction, .. } => transaction.hash(),
1183 }
1184 }
1185
1186 pub(crate) fn into_new_transaction_event(self) -> NewTransactionEvent<T> {
1188 match self {
1189 Self::Pending(tx) => {
1190 NewTransactionEvent { subpool: SubPool::Pending, transaction: tx.transaction }
1191 }
1192 Self::Parked { transaction, subpool, .. } => {
1193 NewTransactionEvent { transaction, subpool }
1194 }
1195 }
1196 }
1197
1198 #[cfg(test)]
1200 pub(crate) const fn subpool(&self) -> SubPool {
1201 match self {
1202 Self::Pending(_) => SubPool::Pending,
1203 Self::Parked { subpool, .. } => *subpool,
1204 }
1205 }
1206
1207 #[cfg(test)]
1209 pub(crate) fn id(&self) -> &TransactionId {
1210 match self {
1211 Self::Pending(added) => added.transaction.id(),
1212 Self::Parked { transaction, .. } => transaction.id(),
1213 }
1214 }
1215}
1216
1217#[derive(Debug)]
1219pub(crate) struct OnNewCanonicalStateOutcome<T: PoolTransaction> {
1220 pub(crate) block_hash: B256,
1222 pub(crate) mined: Vec<TxHash>,
1224 pub(crate) promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1226 pub(crate) discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1228}
1229
1230impl<T: PoolTransaction> OnNewCanonicalStateOutcome<T> {
1231 pub(crate) fn pending_transactions(
1237 &self,
1238 kind: TransactionListenerKind,
1239 ) -> impl Iterator<Item = B256> + '_ {
1240 let iter = self.promoted.iter();
1241 PendingTransactionIter { kind, iter }
1242 }
1243
1244 pub(crate) fn full_pending_transactions(
1250 &self,
1251 kind: TransactionListenerKind,
1252 ) -> impl Iterator<Item = NewTransactionEvent<T>> + '_ {
1253 let iter = self.promoted.iter();
1254 FullPendingTransactionIter { kind, iter }
1255 }
1256}
1257
1258#[cfg(test)]
1259mod tests {
1260 use crate::{
1261 blobstore::{BlobStore, InMemoryBlobStore},
1262 test_utils::{MockTransaction, TestPoolBuilder},
1263 validate::ValidTransaction,
1264 BlockInfo, PoolConfig, SubPoolLimit, TransactionOrigin, TransactionValidationOutcome, U256,
1265 };
1266 use alloy_eips::eip4844::BlobTransactionSidecar;
1267 use reth_primitives::kzg::Blob;
1268 use std::{fs, path::PathBuf};
1269
1270 #[test]
1271 fn test_discard_blobs_on_blob_tx_eviction() {
1272 let blob_limit = SubPoolLimit::new(1000, usize::MAX);
1274
1275 let test_pool = &TestPoolBuilder::default()
1277 .with_config(PoolConfig { blob_limit, ..Default::default() })
1278 .pool;
1279
1280 test_pool
1282 .set_block_info(BlockInfo { pending_blob_fee: Some(10_000_000), ..Default::default() });
1283
1284 let json_content = fs::read_to_string(
1286 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test_data/blob1.json"),
1287 )
1288 .expect("Failed to read the blob data file");
1289
1290 let json_value: serde_json::Value =
1292 serde_json::from_str(&json_content).expect("Failed to deserialize JSON");
1293
1294 let blobs: Vec<Blob> = vec![Blob::from_hex(
1296 json_value.get("data").unwrap().as_str().expect("Data is not a valid string"),
1298 )
1299 .unwrap()];
1300
1301 let sidecar = BlobTransactionSidecar::try_from_blobs(blobs).unwrap();
1303
1304 let blob_store = InMemoryBlobStore::default();
1306
1307 for n in 0..blob_limit.max_txs + 10 {
1309 let mut tx = MockTransaction::eip4844_with_sidecar(sidecar.clone());
1311
1312 tx.set_size(1844674407370951);
1314
1315 if n < blob_limit.max_txs {
1317 blob_store.insert(*tx.get_hash(), sidecar.clone()).unwrap();
1318 }
1319
1320 test_pool.add_transactions(
1322 TransactionOrigin::External,
1323 [TransactionValidationOutcome::Valid {
1324 balance: U256::from(1_000),
1325 state_nonce: 0,
1326 transaction: ValidTransaction::ValidWithSidecar {
1327 transaction: tx,
1328 sidecar: sidecar.clone(),
1329 },
1330 propagate: true,
1331 }],
1332 );
1333 }
1334
1335 assert_eq!(test_pool.size().blob, blob_limit.max_txs);
1337
1338 assert_eq!(test_pool.size().blob_size, 1844674407370951000);
1340
1341 assert_eq!(*test_pool.blob_store(), blob_store);
1343 }
1344}