reth_transaction_pool/pool/
listener.rs

1//! Listeners for the transaction-pool
2
3use crate::{
4    pool::events::{FullTransactionEvent, TransactionEvent},
5    traits::PropagateKind,
6    PoolTransaction, ValidPoolTransaction,
7};
8use alloy_primitives::{TxHash, B256};
9use futures_util::Stream;
10use std::{
11    collections::{hash_map::Entry, HashMap},
12    pin::Pin,
13    sync::Arc,
14    task::{Context, Poll},
15};
16use tokio::sync::mpsc::{
17    error::TrySendError, Receiver, Sender, UnboundedReceiver, UnboundedSender,
18};
19
20/// The size of the event channel used to propagate transaction events.
21const TX_POOL_EVENT_CHANNEL_SIZE: usize = 1024;
22
23/// A Stream that receives [`TransactionEvent`] only for the transaction with the given hash.
24#[derive(Debug)]
25#[must_use = "streams do nothing unless polled"]
26pub struct TransactionEvents {
27    hash: TxHash,
28    events: UnboundedReceiver<TransactionEvent>,
29}
30
31impl TransactionEvents {
32    /// The hash for this transaction
33    pub const fn hash(&self) -> TxHash {
34        self.hash
35    }
36}
37
38impl Stream for TransactionEvents {
39    type Item = TransactionEvent;
40
41    fn poll_next(
42        self: std::pin::Pin<&mut Self>,
43        cx: &mut std::task::Context<'_>,
44    ) -> std::task::Poll<Option<Self::Item>> {
45        self.get_mut().events.poll_recv(cx)
46    }
47}
48
49/// A Stream that receives [`FullTransactionEvent`] for _all_ transaction.
50#[derive(Debug)]
51#[must_use = "streams do nothing unless polled"]
52pub struct AllTransactionsEvents<T: PoolTransaction> {
53    pub(crate) events: Receiver<FullTransactionEvent<T>>,
54}
55
56impl<T: PoolTransaction> AllTransactionsEvents<T> {
57    /// Create a new instance of this stream.
58    pub const fn new(events: Receiver<FullTransactionEvent<T>>) -> Self {
59        Self { events }
60    }
61}
62
63impl<T: PoolTransaction> Stream for AllTransactionsEvents<T> {
64    type Item = FullTransactionEvent<T>;
65
66    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
67        self.get_mut().events.poll_recv(cx)
68    }
69}
70
71/// A type that broadcasts [`TransactionEvent`] to installed listeners.
72///
73/// This is essentially a multi-producer, multi-consumer channel where each event is broadcast to
74/// all active receivers.
75#[derive(Debug)]
76pub(crate) struct PoolEventBroadcast<T: PoolTransaction> {
77    /// All listeners for all transaction events.
78    all_events_broadcaster: AllPoolEventsBroadcaster<T>,
79    /// All listeners for events for a certain transaction hash.
80    broadcasters_by_hash: HashMap<TxHash, PoolEventBroadcaster>,
81}
82
83impl<T: PoolTransaction> Default for PoolEventBroadcast<T> {
84    fn default() -> Self {
85        Self {
86            all_events_broadcaster: AllPoolEventsBroadcaster::default(),
87            broadcasters_by_hash: HashMap::default(),
88        }
89    }
90}
91
92impl<T: PoolTransaction> PoolEventBroadcast<T> {
93    /// Calls the broadcast callback with the `PoolEventBroadcaster` that belongs to the hash.
94    fn broadcast_event(
95        &mut self,
96        hash: &TxHash,
97        event: TransactionEvent,
98        pool_event: FullTransactionEvent<T>,
99    ) {
100        // Broadcast to all listeners for the transaction hash.
101        if let Entry::Occupied(mut sink) = self.broadcasters_by_hash.entry(*hash) {
102            sink.get_mut().broadcast(event.clone());
103
104            if sink.get().is_empty() || event.is_final() {
105                sink.remove();
106            }
107        }
108
109        // Broadcast to all listeners for all transactions.
110        self.all_events_broadcaster.broadcast(pool_event);
111    }
112
113    /// Create a new subscription for the given transaction hash.
114    pub(crate) fn subscribe(&mut self, tx_hash: TxHash) -> TransactionEvents {
115        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
116
117        match self.broadcasters_by_hash.entry(tx_hash) {
118            Entry::Occupied(mut entry) => {
119                entry.get_mut().senders.push(tx);
120            }
121            Entry::Vacant(entry) => {
122                entry.insert(PoolEventBroadcaster { senders: vec![tx] });
123            }
124        };
125        TransactionEvents { hash: tx_hash, events: rx }
126    }
127
128    /// Create a new subscription for all transactions.
129    pub(crate) fn subscribe_all(&mut self) -> AllTransactionsEvents<T> {
130        let (tx, rx) = tokio::sync::mpsc::channel(TX_POOL_EVENT_CHANNEL_SIZE);
131        self.all_events_broadcaster.senders.push(tx);
132        AllTransactionsEvents::new(rx)
133    }
134
135    /// Notify listeners about a transaction that was added to the pending queue.
136    pub(crate) fn pending(&mut self, tx: &TxHash, replaced: Option<Arc<ValidPoolTransaction<T>>>) {
137        self.broadcast_event(tx, TransactionEvent::Pending, FullTransactionEvent::Pending(*tx));
138
139        if let Some(replaced) = replaced {
140            // notify listeners that this transaction was replaced
141            self.replaced(replaced, *tx);
142        }
143    }
144
145    /// Notify listeners about a transaction that was replaced.
146    pub(crate) fn replaced(&mut self, tx: Arc<ValidPoolTransaction<T>>, replaced_by: TxHash) {
147        let transaction = Arc::clone(&tx);
148        self.broadcast_event(
149            tx.hash(),
150            TransactionEvent::Replaced(replaced_by),
151            FullTransactionEvent::Replaced { transaction, replaced_by },
152        );
153    }
154
155    /// Notify listeners about a transaction that was added to the queued pool.
156    pub(crate) fn queued(&mut self, tx: &TxHash) {
157        self.broadcast_event(tx, TransactionEvent::Queued, FullTransactionEvent::Queued(*tx));
158    }
159
160    /// Notify listeners about a transaction that was propagated.
161    pub(crate) fn propagated(&mut self, tx: &TxHash, peers: Vec<PropagateKind>) {
162        let peers = Arc::new(peers);
163        self.broadcast_event(
164            tx,
165            TransactionEvent::Propagated(Arc::clone(&peers)),
166            FullTransactionEvent::Propagated(peers),
167        );
168    }
169
170    /// Notify listeners about a transaction that was discarded.
171    pub(crate) fn discarded(&mut self, tx: &TxHash) {
172        self.broadcast_event(tx, TransactionEvent::Discarded, FullTransactionEvent::Discarded(*tx));
173    }
174
175    /// Notify listeners that the transaction was mined
176    pub(crate) fn mined(&mut self, tx: &TxHash, block_hash: B256) {
177        self.broadcast_event(
178            tx,
179            TransactionEvent::Mined(block_hash),
180            FullTransactionEvent::Mined { tx_hash: *tx, block_hash },
181        );
182    }
183}
184
185/// All Sender half(s) of the event channels for all transactions.
186///
187/// This mimics [`tokio::sync::broadcast`] but uses separate channels.
188#[derive(Debug)]
189struct AllPoolEventsBroadcaster<T: PoolTransaction> {
190    /// Corresponding sender half(s) for event listener channel
191    senders: Vec<Sender<FullTransactionEvent<T>>>,
192}
193
194impl<T: PoolTransaction> Default for AllPoolEventsBroadcaster<T> {
195    fn default() -> Self {
196        Self { senders: Vec::new() }
197    }
198}
199
200impl<T: PoolTransaction> AllPoolEventsBroadcaster<T> {
201    // Broadcast an event to all listeners. Dropped listeners are silently evicted.
202    fn broadcast(&mut self, event: FullTransactionEvent<T>) {
203        self.senders.retain(|sender| match sender.try_send(event.clone()) {
204            Ok(_) | Err(TrySendError::Full(_)) => true,
205            Err(TrySendError::Closed(_)) => false,
206        })
207    }
208}
209
210/// All Sender half(s) of the event channels for a specific transaction.
211///
212/// This mimics [`tokio::sync::broadcast`] but uses separate channels and is unbounded.
213#[derive(Default, Debug)]
214struct PoolEventBroadcaster {
215    /// Corresponding sender half(s) for event listener channel
216    senders: Vec<UnboundedSender<TransactionEvent>>,
217}
218
219impl PoolEventBroadcaster {
220    /// Returns `true` if there are no more listeners remaining.
221    fn is_empty(&self) -> bool {
222        self.senders.is_empty()
223    }
224
225    // Broadcast an event to all listeners. Dropped listeners are silently evicted.
226    fn broadcast(&mut self, event: TransactionEvent) {
227        self.senders.retain(|sender| sender.send(event.clone()).is_ok())
228    }
229}