reth_transaction_pool/pool/
listener.rs1use crate::{
4 pool::events::{FullTransactionEvent, TransactionEvent},
5 traits::PropagateKind,
6 PoolTransaction, ValidPoolTransaction,
7};
8use alloy_primitives::{TxHash, B256};
9use futures_util::Stream;
10use std::{
11 collections::{hash_map::Entry, HashMap},
12 pin::Pin,
13 sync::Arc,
14 task::{Context, Poll},
15};
16use tokio::sync::mpsc::{
17 error::TrySendError, Receiver, Sender, UnboundedReceiver, UnboundedSender,
18};
19
20const TX_POOL_EVENT_CHANNEL_SIZE: usize = 1024;
22
23#[derive(Debug)]
25#[must_use = "streams do nothing unless polled"]
26pub struct TransactionEvents {
27 hash: TxHash,
28 events: UnboundedReceiver<TransactionEvent>,
29}
30
31impl TransactionEvents {
32 pub const fn hash(&self) -> TxHash {
34 self.hash
35 }
36}
37
38impl Stream for TransactionEvents {
39 type Item = TransactionEvent;
40
41 fn poll_next(
42 self: std::pin::Pin<&mut Self>,
43 cx: &mut std::task::Context<'_>,
44 ) -> std::task::Poll<Option<Self::Item>> {
45 self.get_mut().events.poll_recv(cx)
46 }
47}
48
49#[derive(Debug)]
51#[must_use = "streams do nothing unless polled"]
52pub struct AllTransactionsEvents<T: PoolTransaction> {
53 pub(crate) events: Receiver<FullTransactionEvent<T>>,
54}
55
56impl<T: PoolTransaction> AllTransactionsEvents<T> {
57 pub const fn new(events: Receiver<FullTransactionEvent<T>>) -> Self {
59 Self { events }
60 }
61}
62
63impl<T: PoolTransaction> Stream for AllTransactionsEvents<T> {
64 type Item = FullTransactionEvent<T>;
65
66 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
67 self.get_mut().events.poll_recv(cx)
68 }
69}
70
71#[derive(Debug)]
76pub(crate) struct PoolEventBroadcast<T: PoolTransaction> {
77 all_events_broadcaster: AllPoolEventsBroadcaster<T>,
79 broadcasters_by_hash: HashMap<TxHash, PoolEventBroadcaster>,
81}
82
83impl<T: PoolTransaction> Default for PoolEventBroadcast<T> {
84 fn default() -> Self {
85 Self {
86 all_events_broadcaster: AllPoolEventsBroadcaster::default(),
87 broadcasters_by_hash: HashMap::default(),
88 }
89 }
90}
91
92impl<T: PoolTransaction> PoolEventBroadcast<T> {
93 fn broadcast_event(
95 &mut self,
96 hash: &TxHash,
97 event: TransactionEvent,
98 pool_event: FullTransactionEvent<T>,
99 ) {
100 if let Entry::Occupied(mut sink) = self.broadcasters_by_hash.entry(*hash) {
102 sink.get_mut().broadcast(event.clone());
103
104 if sink.get().is_empty() || event.is_final() {
105 sink.remove();
106 }
107 }
108
109 self.all_events_broadcaster.broadcast(pool_event);
111 }
112
113 pub(crate) fn subscribe(&mut self, tx_hash: TxHash) -> TransactionEvents {
115 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
116
117 match self.broadcasters_by_hash.entry(tx_hash) {
118 Entry::Occupied(mut entry) => {
119 entry.get_mut().senders.push(tx);
120 }
121 Entry::Vacant(entry) => {
122 entry.insert(PoolEventBroadcaster { senders: vec![tx] });
123 }
124 };
125 TransactionEvents { hash: tx_hash, events: rx }
126 }
127
128 pub(crate) fn subscribe_all(&mut self) -> AllTransactionsEvents<T> {
130 let (tx, rx) = tokio::sync::mpsc::channel(TX_POOL_EVENT_CHANNEL_SIZE);
131 self.all_events_broadcaster.senders.push(tx);
132 AllTransactionsEvents::new(rx)
133 }
134
135 pub(crate) fn pending(&mut self, tx: &TxHash, replaced: Option<Arc<ValidPoolTransaction<T>>>) {
137 self.broadcast_event(tx, TransactionEvent::Pending, FullTransactionEvent::Pending(*tx));
138
139 if let Some(replaced) = replaced {
140 self.replaced(replaced, *tx);
142 }
143 }
144
145 pub(crate) fn replaced(&mut self, tx: Arc<ValidPoolTransaction<T>>, replaced_by: TxHash) {
147 let transaction = Arc::clone(&tx);
148 self.broadcast_event(
149 tx.hash(),
150 TransactionEvent::Replaced(replaced_by),
151 FullTransactionEvent::Replaced { transaction, replaced_by },
152 );
153 }
154
155 pub(crate) fn queued(&mut self, tx: &TxHash) {
157 self.broadcast_event(tx, TransactionEvent::Queued, FullTransactionEvent::Queued(*tx));
158 }
159
160 pub(crate) fn propagated(&mut self, tx: &TxHash, peers: Vec<PropagateKind>) {
162 let peers = Arc::new(peers);
163 self.broadcast_event(
164 tx,
165 TransactionEvent::Propagated(Arc::clone(&peers)),
166 FullTransactionEvent::Propagated(peers),
167 );
168 }
169
170 pub(crate) fn discarded(&mut self, tx: &TxHash) {
172 self.broadcast_event(tx, TransactionEvent::Discarded, FullTransactionEvent::Discarded(*tx));
173 }
174
175 pub(crate) fn mined(&mut self, tx: &TxHash, block_hash: B256) {
177 self.broadcast_event(
178 tx,
179 TransactionEvent::Mined(block_hash),
180 FullTransactionEvent::Mined { tx_hash: *tx, block_hash },
181 );
182 }
183}
184
185#[derive(Debug)]
189struct AllPoolEventsBroadcaster<T: PoolTransaction> {
190 senders: Vec<Sender<FullTransactionEvent<T>>>,
192}
193
194impl<T: PoolTransaction> Default for AllPoolEventsBroadcaster<T> {
195 fn default() -> Self {
196 Self { senders: Vec::new() }
197 }
198}
199
200impl<T: PoolTransaction> AllPoolEventsBroadcaster<T> {
201 fn broadcast(&mut self, event: FullTransactionEvent<T>) {
203 self.senders.retain(|sender| match sender.try_send(event.clone()) {
204 Ok(_) | Err(TrySendError::Full(_)) => true,
205 Err(TrySendError::Closed(_)) => false,
206 })
207 }
208}
209
210#[derive(Default, Debug)]
214struct PoolEventBroadcaster {
215 senders: Vec<UnboundedSender<TransactionEvent>>,
217}
218
219impl PoolEventBroadcaster {
220 fn is_empty(&self) -> bool {
222 self.senders.is_empty()
223 }
224
225 fn broadcast(&mut self, event: TransactionEvent) {
227 self.senders.retain(|sender| sender.send(event.clone()).is_ok())
228 }
229}