reth_chain_state/
notifications.rs

1//! Canonical chain state notification trait and types.
2
3use alloy_eips::eip2718::Encodable2718;
4use derive_more::{Deref, DerefMut};
5use reth_execution_types::{BlockReceipts, Chain};
6use reth_primitives::{NodePrimitives, SealedBlockWithSenders, SealedHeader};
7use reth_storage_api::NodePrimitivesProvider;
8use std::{
9    pin::Pin,
10    sync::Arc,
11    task::{ready, Context, Poll},
12};
13use tokio::sync::{broadcast, watch};
14use tokio_stream::{
15    wrappers::{BroadcastStream, WatchStream},
16    Stream,
17};
18use tracing::debug;
19
20/// Type alias for a receiver that receives [`CanonStateNotification`]
21pub type CanonStateNotifications<N = reth_primitives::EthPrimitives> =
22    broadcast::Receiver<CanonStateNotification<N>>;
23
24/// Type alias for a sender that sends [`CanonStateNotification`]
25pub type CanonStateNotificationSender<N = reth_primitives::EthPrimitives> =
26    broadcast::Sender<CanonStateNotification<N>>;
27
28/// A type that allows to register chain related event subscriptions.
29pub trait CanonStateSubscriptions: NodePrimitivesProvider + Send + Sync {
30    /// Get notified when a new canonical chain was imported.
31    ///
32    /// A canonical chain be one or more blocks, a reorg or a revert.
33    fn subscribe_to_canonical_state(&self) -> CanonStateNotifications<Self::Primitives>;
34
35    /// Convenience method to get a stream of [`CanonStateNotification`].
36    fn canonical_state_stream(&self) -> CanonStateNotificationStream<Self::Primitives> {
37        CanonStateNotificationStream {
38            st: BroadcastStream::new(self.subscribe_to_canonical_state()),
39        }
40    }
41}
42
43impl<T: CanonStateSubscriptions> CanonStateSubscriptions for &T {
44    fn subscribe_to_canonical_state(&self) -> CanonStateNotifications<Self::Primitives> {
45        (*self).subscribe_to_canonical_state()
46    }
47
48    fn canonical_state_stream(&self) -> CanonStateNotificationStream<Self::Primitives> {
49        (*self).canonical_state_stream()
50    }
51}
52
53/// A Stream of [`CanonStateNotification`].
54#[derive(Debug)]
55#[pin_project::pin_project]
56pub struct CanonStateNotificationStream<N: NodePrimitives = reth_primitives::EthPrimitives> {
57    #[pin]
58    st: BroadcastStream<CanonStateNotification<N>>,
59}
60
61impl<N: NodePrimitives> Stream for CanonStateNotificationStream<N> {
62    type Item = CanonStateNotification<N>;
63
64    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
65        loop {
66            return match ready!(self.as_mut().project().st.poll_next(cx)) {
67                Some(Ok(notification)) => Poll::Ready(Some(notification)),
68                Some(Err(err)) => {
69                    debug!(%err, "canonical state notification stream lagging behind");
70                    continue
71                }
72                None => Poll::Ready(None),
73            }
74        }
75    }
76}
77
78/// A notification that is sent when a new block is imported, or an old block is reverted.
79///
80/// The notification contains at least one [`Chain`] with the imported segment. If some blocks were
81/// reverted (e.g. during a reorg), the old chain is also returned.
82#[derive(Clone, Debug, PartialEq, Eq)]
83pub enum CanonStateNotification<N: NodePrimitives = reth_primitives::EthPrimitives> {
84    /// The canonical chain was extended.
85    Commit {
86        /// The newly added chain segment.
87        new: Arc<Chain<N>>,
88    },
89    /// A chain segment was reverted or reorged.
90    ///
91    /// - In the case of a reorg, the reverted blocks are present in `old`, and the new blocks are
92    ///   present in `new`.
93    /// - In the case of a revert, the reverted blocks are present in `old`, and `new` is an empty
94    ///   chain segment.
95    Reorg {
96        /// The chain segment that was reverted.
97        old: Arc<Chain<N>>,
98        /// The chain segment that was added on top of the canonical chain, minus the reverted
99        /// blocks.
100        ///
101        /// In the case of a revert, not a reorg, this chain segment is empty.
102        new: Arc<Chain<N>>,
103    },
104}
105
106impl<N: NodePrimitives> CanonStateNotification<N> {
107    /// Get the chain segment that was reverted, if any.
108    pub fn reverted(&self) -> Option<Arc<Chain<N>>> {
109        match self {
110            Self::Commit { .. } => None,
111            Self::Reorg { old, .. } => Some(old.clone()),
112        }
113    }
114
115    /// Get the newly imported chain segment, if any.
116    pub fn committed(&self) -> Arc<Chain<N>> {
117        match self {
118            Self::Commit { new } | Self::Reorg { new, .. } => new.clone(),
119        }
120    }
121
122    /// Get the new tip of the chain.
123    ///
124    /// Returns the new tip for [`Self::Reorg`] and [`Self::Commit`] variants which commit at least
125    /// 1 new block.
126    pub fn tip(&self) -> &SealedBlockWithSenders<N::Block> {
127        match self {
128            Self::Commit { new } | Self::Reorg { new, .. } => new.tip(),
129        }
130    }
131
132    /// Get receipts in the reverted and newly imported chain segments with their corresponding
133    /// block numbers and transaction hashes.
134    ///
135    /// The boolean in the tuple (2nd element) denotes whether the receipt was from the reverted
136    /// chain segment.
137    pub fn block_receipts(&self) -> Vec<(BlockReceipts<N::Receipt>, bool)>
138    where
139        N::SignedTx: Encodable2718,
140    {
141        let mut receipts = Vec::new();
142
143        // get old receipts
144        if let Some(old) = self.reverted() {
145            receipts
146                .extend(old.receipts_with_attachment().into_iter().map(|receipt| (receipt, true)));
147        }
148        // get new receipts
149        receipts.extend(
150            self.committed().receipts_with_attachment().into_iter().map(|receipt| (receipt, false)),
151        );
152        receipts
153    }
154}
155
156/// Wrapper around a broadcast receiver that receives fork choice notifications.
157#[derive(Debug, Deref, DerefMut)]
158pub struct ForkChoiceNotifications<T = alloy_consensus::Header>(
159    pub watch::Receiver<Option<SealedHeader<T>>>,
160);
161
162/// A trait that allows to register to fork choice related events
163/// and get notified when a new fork choice is available.
164pub trait ForkChoiceSubscriptions: Send + Sync {
165    /// Block Header type.
166    type Header: Clone + Send + Sync + 'static;
167
168    /// Get notified when a new safe block of the chain is selected.
169    fn subscribe_safe_block(&self) -> ForkChoiceNotifications<Self::Header>;
170
171    /// Get notified when a new finalized block of the chain is selected.
172    fn subscribe_finalized_block(&self) -> ForkChoiceNotifications<Self::Header>;
173
174    /// Convenience method to get a stream of the new safe blocks of the chain.
175    fn safe_block_stream(&self) -> ForkChoiceStream<SealedHeader<Self::Header>> {
176        ForkChoiceStream::new(self.subscribe_safe_block().0)
177    }
178
179    /// Convenience method to get a stream of the new finalized blocks of the chain.
180    fn finalized_block_stream(&self) -> ForkChoiceStream<SealedHeader<Self::Header>> {
181        ForkChoiceStream::new(self.subscribe_finalized_block().0)
182    }
183}
184
185/// A stream for fork choice watch channels (pending, safe or finalized watchers)
186#[derive(Debug)]
187#[pin_project::pin_project]
188pub struct ForkChoiceStream<T> {
189    #[pin]
190    st: WatchStream<Option<T>>,
191}
192
193impl<T: Clone + Sync + Send + 'static> ForkChoiceStream<T> {
194    /// Creates a new `ForkChoiceStream`
195    pub fn new(rx: watch::Receiver<Option<T>>) -> Self {
196        Self { st: WatchStream::from_changes(rx) }
197    }
198}
199
200impl<T: Clone + Sync + Send + 'static> Stream for ForkChoiceStream<T> {
201    type Item = T;
202
203    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
204        loop {
205            match ready!(self.as_mut().project().st.poll_next(cx)) {
206                Some(Some(notification)) => return Poll::Ready(Some(notification)),
207                Some(None) => continue,
208                None => return Poll::Ready(None),
209            }
210        }
211    }
212}
213
214#[cfg(test)]
215mod tests {
216    use super::*;
217    use alloy_primitives::{b256, B256};
218    use reth_execution_types::ExecutionOutcome;
219    use reth_primitives::{Receipt, Receipts, TransactionSigned, TxType};
220
221    #[test]
222    fn test_commit_notification() {
223        let block: SealedBlockWithSenders = Default::default();
224        let block1_hash = B256::new([0x01; 32]);
225        let block2_hash = B256::new([0x02; 32]);
226
227        let mut block1 = block.clone();
228        block1.set_block_number(1);
229        block1.set_hash(block1_hash);
230
231        let mut block2 = block;
232        block2.set_block_number(2);
233        block2.set_hash(block2_hash);
234
235        let chain: Arc<Chain> = Arc::new(Chain::new(
236            vec![block1.clone(), block2.clone()],
237            ExecutionOutcome::default(),
238            None,
239        ));
240
241        // Create a commit notification
242        let notification = CanonStateNotification::Commit { new: chain.clone() };
243
244        // Test that `committed` returns the correct chain
245        assert_eq!(notification.committed(), chain);
246
247        // Test that `reverted` returns None for `Commit`
248        assert!(notification.reverted().is_none());
249
250        // Test that `tip` returns the correct block
251        assert_eq!(*notification.tip(), block2);
252    }
253
254    #[test]
255    fn test_reorg_notification() {
256        let block: SealedBlockWithSenders = Default::default();
257        let block1_hash = B256::new([0x01; 32]);
258        let block2_hash = B256::new([0x02; 32]);
259        let block3_hash = B256::new([0x03; 32]);
260
261        let mut block1 = block.clone();
262        block1.set_block_number(1);
263        block1.set_hash(block1_hash);
264
265        let mut block2 = block.clone();
266        block2.set_block_number(2);
267        block2.set_hash(block2_hash);
268
269        let mut block3 = block;
270        block3.set_block_number(3);
271        block3.set_hash(block3_hash);
272
273        let old_chain: Arc<Chain> =
274            Arc::new(Chain::new(vec![block1.clone()], ExecutionOutcome::default(), None));
275        let new_chain = Arc::new(Chain::new(
276            vec![block2.clone(), block3.clone()],
277            ExecutionOutcome::default(),
278            None,
279        ));
280
281        // Create a reorg notification
282        let notification =
283            CanonStateNotification::Reorg { old: old_chain.clone(), new: new_chain.clone() };
284
285        // Test that `reverted` returns the old chain
286        assert_eq!(notification.reverted(), Some(old_chain));
287
288        // Test that `committed` returns the new chain
289        assert_eq!(notification.committed(), new_chain);
290
291        // Test that `tip` returns the tip of the new chain (last block in the new chain)
292        assert_eq!(*notification.tip(), block3);
293    }
294
295    #[test]
296    fn test_block_receipts_commit() {
297        // Create a default block instance for use in block definitions.
298        let block: SealedBlockWithSenders = Default::default();
299
300        // Define unique hashes for two blocks to differentiate them in the chain.
301        let block1_hash = B256::new([0x01; 32]);
302        let block2_hash = B256::new([0x02; 32]);
303
304        // Create a default transaction to include in block1's transactions.
305        let tx = TransactionSigned::default();
306
307        // Create a clone of the default block and customize it to act as block1.
308        let mut block1 = block.clone();
309        block1.set_block_number(1);
310        block1.set_hash(block1_hash);
311        // Add the transaction to block1's transactions.
312        block1.block.body.transactions.push(tx);
313
314        // Clone the default block and customize it to act as block2.
315        let mut block2 = block;
316        block2.set_block_number(2);
317        block2.set_hash(block2_hash);
318
319        // Create a receipt for the transaction in block1.
320        #[allow(clippy::needless_update)]
321        let receipt1 = Receipt {
322            tx_type: TxType::Legacy,
323            cumulative_gas_used: 12345,
324            logs: vec![],
325            success: true,
326            ..Default::default()
327        };
328
329        // Wrap the receipt in a `Receipts` structure, as expected in the `ExecutionOutcome`.
330        let receipts = Receipts { receipt_vec: vec![vec![Some(receipt1.clone())]] };
331
332        // Define an `ExecutionOutcome` with the created receipts.
333        let execution_outcome = ExecutionOutcome { receipts, ..Default::default() };
334
335        // Create a new chain segment with `block1` and `block2` and the execution outcome.
336        let new_chain: Arc<Chain> =
337            Arc::new(Chain::new(vec![block1.clone(), block2.clone()], execution_outcome, None));
338
339        // Create a commit notification containing the new chain segment.
340        let notification = CanonStateNotification::Commit { new: new_chain };
341
342        // Call `block_receipts` on the commit notification to retrieve block receipts.
343        let block_receipts = notification.block_receipts();
344
345        // Assert that only one receipt entry exists in the `block_receipts` list.
346        assert_eq!(block_receipts.len(), 1);
347
348        // Verify that the first entry matches block1's hash and transaction receipt.
349        assert_eq!(
350            block_receipts[0].0,
351            BlockReceipts {
352                block: block1.num_hash(),
353                tx_receipts: vec![(
354                    // Transaction hash of a Transaction::default()
355                    b256!("20b5378c6fe992c118b557d2f8e8bbe0b7567f6fe5483a8f0f1c51e93a9d91ab"),
356                    receipt1
357                )]
358            }
359        );
360
361        // Assert that the receipt is from the committed segment (not reverted).
362        assert!(!block_receipts[0].1);
363    }
364
365    #[test]
366    fn test_block_receipts_reorg() {
367        // Define block1 for the old chain segment, which will be reverted.
368        let mut old_block1: SealedBlockWithSenders = Default::default();
369        old_block1.set_block_number(1);
370        old_block1.set_hash(B256::new([0x01; 32]));
371        old_block1.block.body.transactions.push(TransactionSigned::default());
372
373        // Create a receipt for a transaction in the reverted block.
374        #[allow(clippy::needless_update)]
375        let old_receipt = Receipt {
376            tx_type: TxType::Legacy,
377            cumulative_gas_used: 54321,
378            logs: vec![],
379            success: false,
380            ..Default::default()
381        };
382        let old_receipts = Receipts { receipt_vec: vec![vec![Some(old_receipt.clone())]] };
383
384        let old_execution_outcome =
385            ExecutionOutcome { receipts: old_receipts, ..Default::default() };
386
387        // Create an old chain segment to be reverted, containing `old_block1`.
388        let old_chain: Arc<Chain> =
389            Arc::new(Chain::new(vec![old_block1.clone()], old_execution_outcome, None));
390
391        // Define block2 for the new chain segment, which will be committed.
392        let mut new_block1: SealedBlockWithSenders = Default::default();
393        new_block1.set_block_number(2);
394        new_block1.set_hash(B256::new([0x02; 32]));
395        new_block1.block.body.transactions.push(TransactionSigned::default());
396
397        // Create a receipt for a transaction in the new committed block.
398        #[allow(clippy::needless_update)]
399        let new_receipt = Receipt {
400            tx_type: TxType::Legacy,
401            cumulative_gas_used: 12345,
402            logs: vec![],
403            success: true,
404            ..Default::default()
405        };
406        let new_receipts = Receipts { receipt_vec: vec![vec![Some(new_receipt.clone())]] };
407
408        let new_execution_outcome =
409            ExecutionOutcome { receipts: new_receipts, ..Default::default() };
410
411        // Create a new chain segment to be committed, containing `new_block1`.
412        let new_chain = Arc::new(Chain::new(vec![new_block1.clone()], new_execution_outcome, None));
413
414        // Create a reorg notification with both reverted (old) and committed (new) chain segments.
415        let notification = CanonStateNotification::Reorg { old: old_chain, new: new_chain };
416
417        // Retrieve receipts from both old (reverted) and new (committed) segments.
418        let block_receipts = notification.block_receipts();
419
420        // Assert there are two receipt entries, one from each chain segment.
421        assert_eq!(block_receipts.len(), 2);
422
423        // Verify that the first entry matches old_block1 and its receipt from the reverted segment.
424        assert_eq!(
425            block_receipts[0].0,
426            BlockReceipts {
427                block: old_block1.num_hash(),
428                tx_receipts: vec![(
429                    // Transaction hash of a Transaction::default()
430                    b256!("20b5378c6fe992c118b557d2f8e8bbe0b7567f6fe5483a8f0f1c51e93a9d91ab"),
431                    old_receipt
432                )]
433            }
434        );
435        // Confirm this is from the reverted segment.
436        assert!(block_receipts[0].1);
437
438        // Verify that the second entry matches new_block1 and its receipt from the committed
439        // segment.
440        assert_eq!(
441            block_receipts[1].0,
442            BlockReceipts {
443                block: new_block1.num_hash(),
444                tx_receipts: vec![(
445                    // Transaction hash of a Transaction::default()
446                    b256!("20b5378c6fe992c118b557d2f8e8bbe0b7567f6fe5483a8f0f1c51e93a9d91ab"),
447                    new_receipt
448                )]
449            }
450        );
451        // Confirm this is from the committed segment.
452        assert!(!block_receipts[1].1);
453    }
454}