reth_node_events/
node.rs

1//! Support for handling events emitted by node components.
2
3use crate::cl::ConsensusLayerHealthEvent;
4use alloy_consensus::constants::GWEI_TO_WEI;
5use alloy_primitives::{BlockNumber, B256};
6use alloy_rpc_types_engine::ForkchoiceState;
7use futures::Stream;
8use reth_beacon_consensus::{BeaconConsensusEngineEvent, ConsensusEngineLiveSyncProgress};
9use reth_engine_primitives::ForkchoiceStatus;
10use reth_network_api::PeersInfo;
11use reth_primitives_traits::{format_gas, format_gas_throughput};
12use reth_prune_types::PrunerEvent;
13use reth_stages::{EntitiesCheckpoint, ExecOutput, PipelineEvent, StageCheckpoint, StageId};
14use reth_static_file_types::StaticFileProducerEvent;
15use std::{
16    fmt::{Display, Formatter},
17    future::Future,
18    pin::Pin,
19    task::{Context, Poll},
20    time::{Duration, Instant, SystemTime, UNIX_EPOCH},
21};
22use tokio::time::Interval;
23use tracing::{debug, info, warn};
24
25/// Interval of reporting node state.
26const INFO_MESSAGE_INTERVAL: Duration = Duration::from_secs(25);
27
28/// The current high-level state of the node, including the node's database environment, network
29/// connections, current processing stage, and the latest block information. It provides
30/// methods to handle different types of events that affect the node's state, such as pipeline
31/// events, network events, and consensus engine events.
32struct NodeState {
33    /// Information about connected peers.
34    peers_info: Option<Box<dyn PeersInfo>>,
35    /// The stage currently being executed.
36    current_stage: Option<CurrentStage>,
37    /// The latest block reached by either pipeline or consensus engine.
38    latest_block: Option<BlockNumber>,
39    /// The time of the latest block seen by the pipeline
40    latest_block_time: Option<u64>,
41    /// Hash of the head block last set by fork choice update
42    head_block_hash: Option<B256>,
43    /// Hash of the safe block last set by fork choice update
44    safe_block_hash: Option<B256>,
45    /// Hash of finalized block last set by fork choice update
46    finalized_block_hash: Option<B256>,
47}
48
49impl NodeState {
50    const fn new(
51        peers_info: Option<Box<dyn PeersInfo>>,
52        latest_block: Option<BlockNumber>,
53    ) -> Self {
54        Self {
55            peers_info,
56            current_stage: None,
57            latest_block,
58            latest_block_time: None,
59            head_block_hash: None,
60            safe_block_hash: None,
61            finalized_block_hash: None,
62        }
63    }
64
65    fn num_connected_peers(&self) -> usize {
66        self.peers_info.as_ref().map(|info| info.num_connected_peers()).unwrap_or_default()
67    }
68
69    fn build_current_stage(
70        &self,
71        stage_id: StageId,
72        checkpoint: StageCheckpoint,
73        target: Option<BlockNumber>,
74    ) -> CurrentStage {
75        let (eta, entities_checkpoint) = self
76            .current_stage
77            .as_ref()
78            .filter(|current_stage| current_stage.stage_id == stage_id)
79            .map_or_else(
80                || (Eta::default(), None),
81                |current_stage| (current_stage.eta, current_stage.entities_checkpoint),
82            );
83
84        CurrentStage { stage_id, eta, checkpoint, entities_checkpoint, target }
85    }
86
87    /// Processes an event emitted by the pipeline
88    fn handle_pipeline_event(&mut self, event: PipelineEvent) {
89        match event {
90            PipelineEvent::Prepare { pipeline_stages_progress, stage_id, checkpoint, target } => {
91                let checkpoint = checkpoint.unwrap_or_default();
92                let current_stage = self.build_current_stage(stage_id, checkpoint, target);
93
94                info!(
95                    pipeline_stages = %pipeline_stages_progress,
96                    stage = %stage_id,
97                    checkpoint = %checkpoint.block_number,
98                    target = %OptionalField(target),
99                    "Preparing stage",
100                );
101
102                self.current_stage = Some(current_stage);
103            }
104            PipelineEvent::Run { pipeline_stages_progress, stage_id, checkpoint, target } => {
105                let checkpoint = checkpoint.unwrap_or_default();
106                let current_stage = self.build_current_stage(stage_id, checkpoint, target);
107
108                if let Some(stage_eta) = current_stage.eta.fmt_for_stage(stage_id) {
109                    info!(
110                        pipeline_stages = %pipeline_stages_progress,
111                        stage = %stage_id,
112                        checkpoint = %checkpoint.block_number,
113                        target = %OptionalField(target),
114                        %stage_eta,
115                        "Executing stage",
116                    );
117                } else {
118                    info!(
119                        pipeline_stages = %pipeline_stages_progress,
120                        stage = %stage_id,
121                        checkpoint = %checkpoint.block_number,
122                        target = %OptionalField(target),
123                        "Executing stage",
124                    );
125                }
126
127                self.current_stage = Some(current_stage);
128            }
129            PipelineEvent::Ran {
130                pipeline_stages_progress,
131                stage_id,
132                result: ExecOutput { checkpoint, done },
133            } => {
134                if stage_id.is_finish() {
135                    self.latest_block = Some(checkpoint.block_number);
136                }
137
138                if let Some(current_stage) = self.current_stage.as_mut() {
139                    current_stage.checkpoint = checkpoint;
140                    current_stage.entities_checkpoint = checkpoint.entities();
141                    current_stage.eta.update(stage_id, checkpoint);
142
143                    let target = OptionalField(current_stage.target);
144                    let stage_progress = current_stage
145                        .entities_checkpoint
146                        .and_then(|entities| entities.fmt_percentage());
147                    let stage_eta = current_stage.eta.fmt_for_stage(stage_id);
148
149                    let message = if done { "Finished stage" } else { "Committed stage progress" };
150
151                    match (stage_progress, stage_eta) {
152                        (Some(stage_progress), Some(stage_eta)) => {
153                            info!(
154                                pipeline_stages = %pipeline_stages_progress,
155                                stage = %stage_id,
156                                checkpoint = %checkpoint.block_number,
157                                %target,
158                                %stage_progress,
159                                %stage_eta,
160                                "{message}",
161                            )
162                        }
163                        (Some(stage_progress), None) => {
164                            info!(
165                                pipeline_stages = %pipeline_stages_progress,
166                                stage = %stage_id,
167                                checkpoint = %checkpoint.block_number,
168                                %target,
169                                %stage_progress,
170                                "{message}",
171                            )
172                        }
173                        (None, Some(stage_eta)) => {
174                            info!(
175                                pipeline_stages = %pipeline_stages_progress,
176                                stage = %stage_id,
177                                checkpoint = %checkpoint.block_number,
178                                %target,
179                                %stage_eta,
180                                "{message}",
181                            )
182                        }
183                        (None, None) => {
184                            info!(
185                                pipeline_stages = %pipeline_stages_progress,
186                                stage = %stage_id,
187                                checkpoint = %checkpoint.block_number,
188                                %target,
189                                "{message}",
190                            )
191                        }
192                    }
193                }
194
195                if done {
196                    self.current_stage = None;
197                }
198            }
199            PipelineEvent::Unwind { stage_id, input } => {
200                let current_stage = CurrentStage {
201                    stage_id,
202                    eta: Eta::default(),
203                    checkpoint: input.checkpoint,
204                    target: Some(input.unwind_to),
205                    entities_checkpoint: input.checkpoint.entities(),
206                };
207
208                self.current_stage = Some(current_stage);
209            }
210            _ => (),
211        }
212    }
213
214    fn handle_consensus_engine_event(&mut self, event: BeaconConsensusEngineEvent) {
215        match event {
216            BeaconConsensusEngineEvent::ForkchoiceUpdated(state, status) => {
217                let ForkchoiceState { head_block_hash, safe_block_hash, finalized_block_hash } =
218                    state;
219                if self.safe_block_hash != Some(safe_block_hash) &&
220                    self.finalized_block_hash != Some(finalized_block_hash)
221                {
222                    let msg = match status {
223                        ForkchoiceStatus::Valid => "Forkchoice updated",
224                        ForkchoiceStatus::Invalid => "Received invalid forkchoice updated message",
225                        ForkchoiceStatus::Syncing => {
226                            "Received forkchoice updated message when syncing"
227                        }
228                    };
229                    info!(?head_block_hash, ?safe_block_hash, ?finalized_block_hash, "{}", msg);
230                }
231                self.head_block_hash = Some(head_block_hash);
232                self.safe_block_hash = Some(safe_block_hash);
233                self.finalized_block_hash = Some(finalized_block_hash);
234            }
235            BeaconConsensusEngineEvent::LiveSyncProgress(live_sync_progress) => {
236                match live_sync_progress {
237                    ConsensusEngineLiveSyncProgress::DownloadingBlocks {
238                        remaining_blocks,
239                        target,
240                    } => {
241                        info!(
242                            remaining_blocks,
243                            target_block_hash=?target,
244                            "Live sync in progress, downloading blocks"
245                        );
246                    }
247                }
248            }
249            BeaconConsensusEngineEvent::CanonicalBlockAdded(block, elapsed) => {
250                info!(
251                    number=block.number,
252                    hash=?block.hash(),
253                    peers=self.num_connected_peers(),
254                    txs=block.body.transactions.len(),
255                    gas=%format_gas(block.header.gas_used),
256                    gas_throughput=%format_gas_throughput(block.header.gas_used, elapsed),
257                    full=%format!("{:.1}%", block.header.gas_used as f64 * 100.0 / block.header.gas_limit as f64),
258                    base_fee=%format!("{:.2}gwei", block.header.base_fee_per_gas.unwrap_or(0) as f64 / GWEI_TO_WEI as f64),
259                    blobs=block.header.blob_gas_used.unwrap_or(0) / alloy_eips::eip4844::DATA_GAS_PER_BLOB,
260                    excess_blobs=block.header.excess_blob_gas.unwrap_or(0) / alloy_eips::eip4844::DATA_GAS_PER_BLOB,
261                    ?elapsed,
262                    "Block added to canonical chain"
263                );
264            }
265            BeaconConsensusEngineEvent::CanonicalChainCommitted(head, elapsed) => {
266                self.latest_block = Some(head.number);
267                self.latest_block_time = Some(head.timestamp);
268
269                info!(number=head.number, hash=?head.hash(), ?elapsed, "Canonical chain committed");
270            }
271            BeaconConsensusEngineEvent::ForkBlockAdded(block, elapsed) => {
272                info!(number=block.number, hash=?block.hash(), ?elapsed, "Block added to fork chain");
273            }
274        }
275    }
276
277    fn handle_consensus_layer_health_event(&self, event: ConsensusLayerHealthEvent) {
278        // If pipeline is running, it's fine to not receive any messages from the CL.
279        // So we need to report about CL health only when pipeline is idle.
280        if self.current_stage.is_none() {
281            match event {
282                ConsensusLayerHealthEvent::NeverSeen => {
283                    warn!("Post-merge network, but never seen beacon client. Please launch one to follow the chain!")
284                }
285                ConsensusLayerHealthEvent::HasNotBeenSeenForAWhile(period) => {
286                    warn!(?period, "Post-merge network, but no beacon client seen for a while. Please launch one to follow the chain!")
287                }
288                ConsensusLayerHealthEvent::NeverReceivedUpdates => {
289                    warn!("Beacon client online, but never received consensus updates. Please ensure your beacon client is operational to follow the chain!")
290                }
291                ConsensusLayerHealthEvent::HaveNotReceivedUpdatesForAWhile(period) => {
292                    warn!(?period, "Beacon client online, but no consensus updates received for a while. This may be because of a reth error, or an error in the beacon client! Please investigate reth and beacon client logs!")
293                }
294            }
295        }
296    }
297
298    fn handle_pruner_event(&self, event: PrunerEvent) {
299        match event {
300            PrunerEvent::Started { tip_block_number } => {
301                debug!(tip_block_number, "Pruner started");
302            }
303            PrunerEvent::Finished { tip_block_number, elapsed, stats } => {
304                let stats = format!(
305                    "[{}]",
306                    stats.iter().map(|item| item.to_string()).collect::<Vec<_>>().join(", ")
307                );
308                debug!(tip_block_number, ?elapsed, pruned_segments = %stats, "Pruner finished");
309            }
310        }
311    }
312
313    fn handle_static_file_producer_event(&self, event: StaticFileProducerEvent) {
314        match event {
315            StaticFileProducerEvent::Started { targets } => {
316                debug!(?targets, "Static File Producer started");
317            }
318            StaticFileProducerEvent::Finished { targets, elapsed } => {
319                debug!(?targets, ?elapsed, "Static File Producer finished");
320            }
321        }
322    }
323}
324
325/// Helper type for formatting of optional fields:
326/// - If [Some(x)], then `x` is written
327/// - If [None], then `None` is written
328struct OptionalField<T: Display>(Option<T>);
329
330impl<T: Display> Display for OptionalField<T> {
331    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
332        if let Some(field) = &self.0 {
333            write!(f, "{field}")
334        } else {
335            write!(f, "None")
336        }
337    }
338}
339
340/// The stage currently being executed.
341struct CurrentStage {
342    stage_id: StageId,
343    eta: Eta,
344    checkpoint: StageCheckpoint,
345    /// The entities checkpoint for reporting the progress. If `None`, then the progress is not
346    /// available, probably because the stage didn't finish running and didn't update its
347    /// checkpoint yet.
348    entities_checkpoint: Option<EntitiesCheckpoint>,
349    target: Option<BlockNumber>,
350}
351
352/// A node event.
353#[derive(Debug)]
354pub enum NodeEvent {
355    /// A sync pipeline event.
356    Pipeline(PipelineEvent),
357    /// A consensus engine event.
358    ConsensusEngine(BeaconConsensusEngineEvent),
359    /// A Consensus Layer health event.
360    ConsensusLayerHealth(ConsensusLayerHealthEvent),
361    /// A pruner event
362    Pruner(PrunerEvent),
363    /// A `static_file_producer` event
364    StaticFileProducer(StaticFileProducerEvent),
365    /// Used to encapsulate various conditions or situations that do not
366    /// naturally fit into the other more specific variants.
367    Other(String),
368}
369
370impl From<PipelineEvent> for NodeEvent {
371    fn from(event: PipelineEvent) -> Self {
372        Self::Pipeline(event)
373    }
374}
375
376impl From<BeaconConsensusEngineEvent> for NodeEvent {
377    fn from(event: BeaconConsensusEngineEvent) -> Self {
378        Self::ConsensusEngine(event)
379    }
380}
381
382impl From<ConsensusLayerHealthEvent> for NodeEvent {
383    fn from(event: ConsensusLayerHealthEvent) -> Self {
384        Self::ConsensusLayerHealth(event)
385    }
386}
387
388impl From<PrunerEvent> for NodeEvent {
389    fn from(event: PrunerEvent) -> Self {
390        Self::Pruner(event)
391    }
392}
393
394impl From<StaticFileProducerEvent> for NodeEvent {
395    fn from(event: StaticFileProducerEvent) -> Self {
396        Self::StaticFileProducer(event)
397    }
398}
399
400/// Displays relevant information to the user from components of the node, and periodically
401/// displays the high-level status of the node.
402pub async fn handle_events<E>(
403    peers_info: Option<Box<dyn PeersInfo>>,
404    latest_block_number: Option<BlockNumber>,
405    events: E,
406) where
407    E: Stream<Item = NodeEvent> + Unpin,
408{
409    let state = NodeState::new(peers_info, latest_block_number);
410
411    let start = tokio::time::Instant::now() + Duration::from_secs(3);
412    let mut info_interval = tokio::time::interval_at(start, INFO_MESSAGE_INTERVAL);
413    info_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
414
415    let handler = EventHandler { state, events, info_interval };
416    handler.await
417}
418
419/// Handles events emitted by the node and logs them accordingly.
420#[pin_project::pin_project]
421struct EventHandler<E> {
422    state: NodeState,
423    #[pin]
424    events: E,
425    #[pin]
426    info_interval: Interval,
427}
428
429impl<E> Future for EventHandler<E>
430where
431    E: Stream<Item = NodeEvent> + Unpin,
432{
433    type Output = ();
434
435    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
436        let mut this = self.project();
437
438        while this.info_interval.poll_tick(cx).is_ready() {
439            if let Some(CurrentStage { stage_id, eta, checkpoint, entities_checkpoint, target }) =
440                &this.state.current_stage
441            {
442                let stage_progress =
443                    entities_checkpoint.and_then(|entities| entities.fmt_percentage());
444                let stage_eta = eta.fmt_for_stage(*stage_id);
445
446                match (stage_progress, stage_eta) {
447                    (Some(stage_progress), Some(stage_eta)) => {
448                        info!(
449                            target: "reth::cli",
450                            connected_peers = this.state.num_connected_peers(),
451                            stage = %stage_id,
452                            checkpoint = checkpoint.block_number,
453                            target = %OptionalField(*target),
454                            %stage_progress,
455                            %stage_eta,
456                            "Status"
457                        )
458                    }
459                    (Some(stage_progress), None) => {
460                        info!(
461                            target: "reth::cli",
462                            connected_peers = this.state.num_connected_peers(),
463                            stage = %stage_id,
464                            checkpoint = checkpoint.block_number,
465                            target = %OptionalField(*target),
466                            %stage_progress,
467                            "Status"
468                        )
469                    }
470                    (None, Some(stage_eta)) => {
471                        info!(
472                            target: "reth::cli",
473                            connected_peers = this.state.num_connected_peers(),
474                            stage = %stage_id,
475                            checkpoint = checkpoint.block_number,
476                            target = %OptionalField(*target),
477                            %stage_eta,
478                            "Status"
479                        )
480                    }
481                    (None, None) => {
482                        info!(
483                            target: "reth::cli",
484                            connected_peers = this.state.num_connected_peers(),
485                            stage = %stage_id,
486                            checkpoint = checkpoint.block_number,
487                            target = %OptionalField(*target),
488                            "Status"
489                        )
490                    }
491                }
492            } else if let Some(latest_block) = this.state.latest_block {
493                let now =
494                    SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
495                if now.saturating_sub(this.state.latest_block_time.unwrap_or(0)) > 60 {
496                    // Once we start receiving consensus nodes, don't emit status unless stalled for
497                    // 1 minute
498                    info!(
499                        target: "reth::cli",
500                        connected_peers = this.state.num_connected_peers(),
501                        %latest_block,
502                        "Status"
503                    );
504                }
505            } else {
506                info!(
507                    target: "reth::cli",
508                    connected_peers = this.state.num_connected_peers(),
509                    "Status"
510                );
511            }
512        }
513
514        while let Poll::Ready(Some(event)) = this.events.as_mut().poll_next(cx) {
515            match event {
516                NodeEvent::Pipeline(event) => {
517                    this.state.handle_pipeline_event(event);
518                }
519                NodeEvent::ConsensusEngine(event) => {
520                    this.state.handle_consensus_engine_event(event);
521                }
522                NodeEvent::ConsensusLayerHealth(event) => {
523                    this.state.handle_consensus_layer_health_event(event)
524                }
525                NodeEvent::Pruner(event) => {
526                    this.state.handle_pruner_event(event);
527                }
528                NodeEvent::StaticFileProducer(event) => {
529                    this.state.handle_static_file_producer_event(event);
530                }
531                NodeEvent::Other(event_description) => {
532                    warn!("{event_description}");
533                }
534            }
535        }
536
537        Poll::Pending
538    }
539}
540
541/// A container calculating the estimated time that a stage will complete in, based on stage
542/// checkpoints reported by the pipeline.
543///
544/// One `Eta` is only valid for a single stage.
545#[derive(Default, Copy, Clone)]
546struct Eta {
547    /// The last stage checkpoint
548    last_checkpoint: EntitiesCheckpoint,
549    /// The last time the stage reported its checkpoint
550    last_checkpoint_time: Option<Instant>,
551    /// The current ETA
552    eta: Option<Duration>,
553}
554
555impl Eta {
556    /// Update the ETA given the checkpoint, if possible.
557    fn update(&mut self, stage: StageId, checkpoint: StageCheckpoint) {
558        let Some(current) = checkpoint.entities() else { return };
559
560        if let Some(last_checkpoint_time) = &self.last_checkpoint_time {
561            let Some(processed_since_last) =
562                current.processed.checked_sub(self.last_checkpoint.processed)
563            else {
564                self.eta = None;
565                debug!(target: "reth::cli", %stage, ?current, ?self.last_checkpoint, "Failed to calculate the ETA: processed entities is less than the last checkpoint");
566                return
567            };
568            let elapsed = last_checkpoint_time.elapsed();
569            let per_second = processed_since_last as f64 / elapsed.as_secs_f64();
570
571            let Some(remaining) = current.total.checked_sub(current.processed) else {
572                self.eta = None;
573                debug!(target: "reth::cli", %stage, ?current, "Failed to calculate the ETA: total entities is less than processed entities");
574                return
575            };
576
577            self.eta = Duration::try_from_secs_f64(remaining as f64 / per_second).ok();
578        }
579
580        self.last_checkpoint = current;
581        self.last_checkpoint_time = Some(Instant::now());
582    }
583
584    /// Returns `true` if the ETA is available, i.e. at least one checkpoint has been reported.
585    fn is_available(&self) -> bool {
586        self.eta.zip(self.last_checkpoint_time).is_some()
587    }
588
589    /// Format ETA for a given stage.
590    ///
591    /// NOTE: Currently ETA is enabled only for the stages that have predictable progress.
592    /// It's not the case for network-dependent ([`StageId::Headers`] and [`StageId::Bodies`]) and
593    /// [`StageId::Execution`] stages.
594    fn fmt_for_stage(&self, stage: StageId) -> Option<String> {
595        if !self.is_available() ||
596            matches!(stage, StageId::Headers | StageId::Bodies | StageId::Execution)
597        {
598            None
599        } else {
600            Some(self.to_string())
601        }
602    }
603}
604
605impl Display for Eta {
606    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
607        if let Some((eta, last_checkpoint_time)) = self.eta.zip(self.last_checkpoint_time) {
608            let remaining = eta.checked_sub(last_checkpoint_time.elapsed());
609
610            if let Some(remaining) = remaining {
611                return write!(
612                    f,
613                    "{}",
614                    humantime::format_duration(Duration::from_secs(remaining.as_secs()))
615                )
616            }
617        }
618
619        write!(f, "unknown")
620    }
621}
622
623#[cfg(test)]
624mod tests {
625    use super::*;
626
627    #[test]
628    fn eta_display_no_milliseconds() {
629        let eta = Eta {
630            last_checkpoint_time: Some(Instant::now()),
631            eta: Some(Duration::from_millis(
632                13 * 60 * 1000 + // Minutes
633                    37 * 1000 + // Seconds
634                    999, // Milliseconds
635            )),
636            ..Default::default()
637        }
638        .to_string();
639
640        assert_eq!(eta, "13m 37s");
641    }
642}