reth_node_events/
cl.rs

1//! Events related to Consensus Layer health.
2
3use alloy_consensus::Header;
4use futures::Stream;
5use reth_storage_api::CanonChainTracker;
6use std::{
7    fmt,
8    pin::Pin,
9    task::{ready, Context, Poll},
10    time::Duration,
11};
12use tokio::time::{Instant, Interval};
13
14/// Interval of checking Consensus Layer client health.
15const CHECK_INTERVAL: Duration = Duration::from_secs(300);
16/// Period of not exchanging transition configurations with Consensus Layer client,
17/// after which the warning is issued.
18const NO_TRANSITION_CONFIG_EXCHANGED_PERIOD: Duration = Duration::from_secs(120);
19/// Period of not receiving fork choice updates from Consensus Layer client,
20/// after which the warning is issued.
21const NO_FORKCHOICE_UPDATE_RECEIVED_PERIOD: Duration = Duration::from_secs(120);
22
23/// A Stream of [`ConsensusLayerHealthEvent`].
24pub struct ConsensusLayerHealthEvents<H = Header> {
25    interval: Interval,
26    canon_chain: Box<dyn CanonChainTracker<Header = H>>,
27}
28
29impl fmt::Debug for ConsensusLayerHealthEvents {
30    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31        f.debug_struct("ConsensusLayerHealthEvents").field("interval", &self.interval).finish()
32    }
33}
34
35impl<H> ConsensusLayerHealthEvents<H> {
36    /// Creates a new [`ConsensusLayerHealthEvents`] with the given canonical chain tracker.
37    pub fn new(canon_chain: Box<dyn CanonChainTracker<Header = H>>) -> Self {
38        // Skip the first tick to prevent the false `ConsensusLayerHealthEvent::NeverSeen` event.
39        let interval = tokio::time::interval_at(Instant::now() + CHECK_INTERVAL, CHECK_INTERVAL);
40        Self { interval, canon_chain }
41    }
42}
43
44impl Stream for ConsensusLayerHealthEvents {
45    type Item = ConsensusLayerHealthEvent;
46
47    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
48        let this = self.get_mut();
49
50        loop {
51            ready!(this.interval.poll_tick(cx));
52
53            if let Some(fork_choice) = this.canon_chain.last_received_update_timestamp() {
54                if fork_choice.elapsed() <= NO_FORKCHOICE_UPDATE_RECEIVED_PERIOD {
55                    // We had an FCU, and it's recent. CL is healthy.
56                    continue
57                }
58                // We had an FCU, but it's too old.
59                return Poll::Ready(Some(
60                    ConsensusLayerHealthEvent::HaveNotReceivedUpdatesForAWhile(
61                        fork_choice.elapsed(),
62                    ),
63                ))
64            }
65
66            if let Some(transition_config) =
67                this.canon_chain.last_exchanged_transition_configuration_timestamp()
68            {
69                return if transition_config.elapsed() <= NO_TRANSITION_CONFIG_EXCHANGED_PERIOD {
70                    // We never had an FCU, but had a transition config exchange, and it's recent.
71                    Poll::Ready(Some(ConsensusLayerHealthEvent::NeverReceivedUpdates))
72                } else {
73                    // We never had an FCU, but had a transition config exchange, but it's too old.
74                    Poll::Ready(Some(ConsensusLayerHealthEvent::HasNotBeenSeenForAWhile(
75                        transition_config.elapsed(),
76                    )))
77                }
78            }
79
80            // We never had both FCU and transition config exchange.
81            return Poll::Ready(Some(ConsensusLayerHealthEvent::NeverSeen))
82        }
83    }
84}
85
86/// Event that is triggered when Consensus Layer health is degraded from the
87/// Execution Layer point of view.
88#[derive(Clone, Copy, Debug)]
89pub enum ConsensusLayerHealthEvent {
90    /// Consensus Layer client was never seen.
91    NeverSeen,
92    /// Consensus Layer client has not been seen for a while.
93    HasNotBeenSeenForAWhile(Duration),
94    /// Updates from the Consensus Layer client were never received.
95    NeverReceivedUpdates,
96    /// Updates from the Consensus Layer client have not been received for a while.
97    HaveNotReceivedUpdatesForAWhile(Duration),
98}