1use alloy_eips::eip2718::Encodable2718;
4use derive_more::{Deref, DerefMut};
5use reth_execution_types::{BlockReceipts, Chain};
6use reth_primitives::{NodePrimitives, SealedBlockWithSenders, SealedHeader};
7use reth_storage_api::NodePrimitivesProvider;
8use std::{
9 pin::Pin,
10 sync::Arc,
11 task::{ready, Context, Poll},
12};
13use tokio::sync::{broadcast, watch};
14use tokio_stream::{
15 wrappers::{BroadcastStream, WatchStream},
16 Stream,
17};
18use tracing::debug;
19
20pub type CanonStateNotifications<N = reth_primitives::EthPrimitives> =
22 broadcast::Receiver<CanonStateNotification<N>>;
23
24pub type CanonStateNotificationSender<N = reth_primitives::EthPrimitives> =
26 broadcast::Sender<CanonStateNotification<N>>;
27
28pub trait CanonStateSubscriptions: NodePrimitivesProvider + Send + Sync {
30 fn subscribe_to_canonical_state(&self) -> CanonStateNotifications<Self::Primitives>;
34
35 fn canonical_state_stream(&self) -> CanonStateNotificationStream<Self::Primitives> {
37 CanonStateNotificationStream {
38 st: BroadcastStream::new(self.subscribe_to_canonical_state()),
39 }
40 }
41}
42
43impl<T: CanonStateSubscriptions> CanonStateSubscriptions for &T {
44 fn subscribe_to_canonical_state(&self) -> CanonStateNotifications<Self::Primitives> {
45 (*self).subscribe_to_canonical_state()
46 }
47
48 fn canonical_state_stream(&self) -> CanonStateNotificationStream<Self::Primitives> {
49 (*self).canonical_state_stream()
50 }
51}
52
53#[derive(Debug)]
55#[pin_project::pin_project]
56pub struct CanonStateNotificationStream<N: NodePrimitives = reth_primitives::EthPrimitives> {
57 #[pin]
58 st: BroadcastStream<CanonStateNotification<N>>,
59}
60
61impl<N: NodePrimitives> Stream for CanonStateNotificationStream<N> {
62 type Item = CanonStateNotification<N>;
63
64 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
65 loop {
66 return match ready!(self.as_mut().project().st.poll_next(cx)) {
67 Some(Ok(notification)) => Poll::Ready(Some(notification)),
68 Some(Err(err)) => {
69 debug!(%err, "canonical state notification stream lagging behind");
70 continue
71 }
72 None => Poll::Ready(None),
73 }
74 }
75 }
76}
77
78#[derive(Clone, Debug, PartialEq, Eq)]
83pub enum CanonStateNotification<N: NodePrimitives = reth_primitives::EthPrimitives> {
84 Commit {
86 new: Arc<Chain<N>>,
88 },
89 Reorg {
96 old: Arc<Chain<N>>,
98 new: Arc<Chain<N>>,
103 },
104}
105
106impl<N: NodePrimitives> CanonStateNotification<N> {
107 pub fn reverted(&self) -> Option<Arc<Chain<N>>> {
109 match self {
110 Self::Commit { .. } => None,
111 Self::Reorg { old, .. } => Some(old.clone()),
112 }
113 }
114
115 pub fn committed(&self) -> Arc<Chain<N>> {
117 match self {
118 Self::Commit { new } | Self::Reorg { new, .. } => new.clone(),
119 }
120 }
121
122 pub fn tip(&self) -> &SealedBlockWithSenders<N::Block> {
127 match self {
128 Self::Commit { new } | Self::Reorg { new, .. } => new.tip(),
129 }
130 }
131
132 pub fn block_receipts(&self) -> Vec<(BlockReceipts<N::Receipt>, bool)>
138 where
139 N::SignedTx: Encodable2718,
140 {
141 let mut receipts = Vec::new();
142
143 if let Some(old) = self.reverted() {
145 receipts
146 .extend(old.receipts_with_attachment().into_iter().map(|receipt| (receipt, true)));
147 }
148 receipts.extend(
150 self.committed().receipts_with_attachment().into_iter().map(|receipt| (receipt, false)),
151 );
152 receipts
153 }
154}
155
156#[derive(Debug, Deref, DerefMut)]
158pub struct ForkChoiceNotifications<T = alloy_consensus::Header>(
159 pub watch::Receiver<Option<SealedHeader<T>>>,
160);
161
162pub trait ForkChoiceSubscriptions: Send + Sync {
165 type Header: Clone + Send + Sync + 'static;
167
168 fn subscribe_safe_block(&self) -> ForkChoiceNotifications<Self::Header>;
170
171 fn subscribe_finalized_block(&self) -> ForkChoiceNotifications<Self::Header>;
173
174 fn safe_block_stream(&self) -> ForkChoiceStream<SealedHeader<Self::Header>> {
176 ForkChoiceStream::new(self.subscribe_safe_block().0)
177 }
178
179 fn finalized_block_stream(&self) -> ForkChoiceStream<SealedHeader<Self::Header>> {
181 ForkChoiceStream::new(self.subscribe_finalized_block().0)
182 }
183}
184
185#[derive(Debug)]
187#[pin_project::pin_project]
188pub struct ForkChoiceStream<T> {
189 #[pin]
190 st: WatchStream<Option<T>>,
191}
192
193impl<T: Clone + Sync + Send + 'static> ForkChoiceStream<T> {
194 pub fn new(rx: watch::Receiver<Option<T>>) -> Self {
196 Self { st: WatchStream::from_changes(rx) }
197 }
198}
199
200impl<T: Clone + Sync + Send + 'static> Stream for ForkChoiceStream<T> {
201 type Item = T;
202
203 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
204 loop {
205 match ready!(self.as_mut().project().st.poll_next(cx)) {
206 Some(Some(notification)) => return Poll::Ready(Some(notification)),
207 Some(None) => continue,
208 None => return Poll::Ready(None),
209 }
210 }
211 }
212}
213
214#[cfg(test)]
215mod tests {
216 use super::*;
217 use alloy_primitives::{b256, B256};
218 use reth_execution_types::ExecutionOutcome;
219 use reth_primitives::{Receipt, Receipts, TransactionSigned, TxType};
220
221 #[test]
222 fn test_commit_notification() {
223 let block: SealedBlockWithSenders = Default::default();
224 let block1_hash = B256::new([0x01; 32]);
225 let block2_hash = B256::new([0x02; 32]);
226
227 let mut block1 = block.clone();
228 block1.set_block_number(1);
229 block1.set_hash(block1_hash);
230
231 let mut block2 = block;
232 block2.set_block_number(2);
233 block2.set_hash(block2_hash);
234
235 let chain: Arc<Chain> = Arc::new(Chain::new(
236 vec![block1.clone(), block2.clone()],
237 ExecutionOutcome::default(),
238 None,
239 ));
240
241 let notification = CanonStateNotification::Commit { new: chain.clone() };
243
244 assert_eq!(notification.committed(), chain);
246
247 assert!(notification.reverted().is_none());
249
250 assert_eq!(*notification.tip(), block2);
252 }
253
254 #[test]
255 fn test_reorg_notification() {
256 let block: SealedBlockWithSenders = Default::default();
257 let block1_hash = B256::new([0x01; 32]);
258 let block2_hash = B256::new([0x02; 32]);
259 let block3_hash = B256::new([0x03; 32]);
260
261 let mut block1 = block.clone();
262 block1.set_block_number(1);
263 block1.set_hash(block1_hash);
264
265 let mut block2 = block.clone();
266 block2.set_block_number(2);
267 block2.set_hash(block2_hash);
268
269 let mut block3 = block;
270 block3.set_block_number(3);
271 block3.set_hash(block3_hash);
272
273 let old_chain: Arc<Chain> =
274 Arc::new(Chain::new(vec![block1.clone()], ExecutionOutcome::default(), None));
275 let new_chain = Arc::new(Chain::new(
276 vec![block2.clone(), block3.clone()],
277 ExecutionOutcome::default(),
278 None,
279 ));
280
281 let notification =
283 CanonStateNotification::Reorg { old: old_chain.clone(), new: new_chain.clone() };
284
285 assert_eq!(notification.reverted(), Some(old_chain));
287
288 assert_eq!(notification.committed(), new_chain);
290
291 assert_eq!(*notification.tip(), block3);
293 }
294
295 #[test]
296 fn test_block_receipts_commit() {
297 let block: SealedBlockWithSenders = Default::default();
299
300 let block1_hash = B256::new([0x01; 32]);
302 let block2_hash = B256::new([0x02; 32]);
303
304 let tx = TransactionSigned::default();
306
307 let mut block1 = block.clone();
309 block1.set_block_number(1);
310 block1.set_hash(block1_hash);
311 block1.block.body.transactions.push(tx);
313
314 let mut block2 = block;
316 block2.set_block_number(2);
317 block2.set_hash(block2_hash);
318
319 #[allow(clippy::needless_update)]
321 let receipt1 = Receipt {
322 tx_type: TxType::Legacy,
323 cumulative_gas_used: 12345,
324 logs: vec![],
325 success: true,
326 ..Default::default()
327 };
328
329 let receipts = Receipts { receipt_vec: vec![vec![Some(receipt1.clone())]] };
331
332 let execution_outcome = ExecutionOutcome { receipts, ..Default::default() };
334
335 let new_chain: Arc<Chain> =
337 Arc::new(Chain::new(vec![block1.clone(), block2.clone()], execution_outcome, None));
338
339 let notification = CanonStateNotification::Commit { new: new_chain };
341
342 let block_receipts = notification.block_receipts();
344
345 assert_eq!(block_receipts.len(), 1);
347
348 assert_eq!(
350 block_receipts[0].0,
351 BlockReceipts {
352 block: block1.num_hash(),
353 tx_receipts: vec![(
354 b256!("20b5378c6fe992c118b557d2f8e8bbe0b7567f6fe5483a8f0f1c51e93a9d91ab"),
356 receipt1
357 )]
358 }
359 );
360
361 assert!(!block_receipts[0].1);
363 }
364
365 #[test]
366 fn test_block_receipts_reorg() {
367 let mut old_block1: SealedBlockWithSenders = Default::default();
369 old_block1.set_block_number(1);
370 old_block1.set_hash(B256::new([0x01; 32]));
371 old_block1.block.body.transactions.push(TransactionSigned::default());
372
373 #[allow(clippy::needless_update)]
375 let old_receipt = Receipt {
376 tx_type: TxType::Legacy,
377 cumulative_gas_used: 54321,
378 logs: vec![],
379 success: false,
380 ..Default::default()
381 };
382 let old_receipts = Receipts { receipt_vec: vec![vec![Some(old_receipt.clone())]] };
383
384 let old_execution_outcome =
385 ExecutionOutcome { receipts: old_receipts, ..Default::default() };
386
387 let old_chain: Arc<Chain> =
389 Arc::new(Chain::new(vec![old_block1.clone()], old_execution_outcome, None));
390
391 let mut new_block1: SealedBlockWithSenders = Default::default();
393 new_block1.set_block_number(2);
394 new_block1.set_hash(B256::new([0x02; 32]));
395 new_block1.block.body.transactions.push(TransactionSigned::default());
396
397 #[allow(clippy::needless_update)]
399 let new_receipt = Receipt {
400 tx_type: TxType::Legacy,
401 cumulative_gas_used: 12345,
402 logs: vec![],
403 success: true,
404 ..Default::default()
405 };
406 let new_receipts = Receipts { receipt_vec: vec![vec![Some(new_receipt.clone())]] };
407
408 let new_execution_outcome =
409 ExecutionOutcome { receipts: new_receipts, ..Default::default() };
410
411 let new_chain = Arc::new(Chain::new(vec![new_block1.clone()], new_execution_outcome, None));
413
414 let notification = CanonStateNotification::Reorg { old: old_chain, new: new_chain };
416
417 let block_receipts = notification.block_receipts();
419
420 assert_eq!(block_receipts.len(), 2);
422
423 assert_eq!(
425 block_receipts[0].0,
426 BlockReceipts {
427 block: old_block1.num_hash(),
428 tx_receipts: vec![(
429 b256!("20b5378c6fe992c118b557d2f8e8bbe0b7567f6fe5483a8f0f1c51e93a9d91ab"),
431 old_receipt
432 )]
433 }
434 );
435 assert!(block_receipts[0].1);
437
438 assert_eq!(
441 block_receipts[1].0,
442 BlockReceipts {
443 block: new_block1.num_hash(),
444 tx_receipts: vec![(
445 b256!("20b5378c6fe992c118b557d2f8e8bbe0b7567f6fe5483a8f0f1c51e93a9d91ab"),
447 new_receipt
448 )]
449 }
450 );
451 assert!(!block_receipts[1].1);
453 }
454}