1use crate::cl::ConsensusLayerHealthEvent;
4use alloy_consensus::constants::GWEI_TO_WEI;
5use alloy_primitives::{BlockNumber, B256};
6use alloy_rpc_types_engine::ForkchoiceState;
7use futures::Stream;
8use reth_beacon_consensus::{BeaconConsensusEngineEvent, ConsensusEngineLiveSyncProgress};
9use reth_engine_primitives::ForkchoiceStatus;
10use reth_network_api::PeersInfo;
11use reth_primitives_traits::{format_gas, format_gas_throughput};
12use reth_prune_types::PrunerEvent;
13use reth_stages::{EntitiesCheckpoint, ExecOutput, PipelineEvent, StageCheckpoint, StageId};
14use reth_static_file_types::StaticFileProducerEvent;
15use std::{
16 fmt::{Display, Formatter},
17 future::Future,
18 pin::Pin,
19 task::{Context, Poll},
20 time::{Duration, Instant, SystemTime, UNIX_EPOCH},
21};
22use tokio::time::Interval;
23use tracing::{debug, info, warn};
24
25const INFO_MESSAGE_INTERVAL: Duration = Duration::from_secs(25);
27
28struct NodeState {
33 peers_info: Option<Box<dyn PeersInfo>>,
35 current_stage: Option<CurrentStage>,
37 latest_block: Option<BlockNumber>,
39 latest_block_time: Option<u64>,
41 head_block_hash: Option<B256>,
43 safe_block_hash: Option<B256>,
45 finalized_block_hash: Option<B256>,
47}
48
49impl NodeState {
50 const fn new(
51 peers_info: Option<Box<dyn PeersInfo>>,
52 latest_block: Option<BlockNumber>,
53 ) -> Self {
54 Self {
55 peers_info,
56 current_stage: None,
57 latest_block,
58 latest_block_time: None,
59 head_block_hash: None,
60 safe_block_hash: None,
61 finalized_block_hash: None,
62 }
63 }
64
65 fn num_connected_peers(&self) -> usize {
66 self.peers_info.as_ref().map(|info| info.num_connected_peers()).unwrap_or_default()
67 }
68
69 fn build_current_stage(
70 &self,
71 stage_id: StageId,
72 checkpoint: StageCheckpoint,
73 target: Option<BlockNumber>,
74 ) -> CurrentStage {
75 let (eta, entities_checkpoint) = self
76 .current_stage
77 .as_ref()
78 .filter(|current_stage| current_stage.stage_id == stage_id)
79 .map_or_else(
80 || (Eta::default(), None),
81 |current_stage| (current_stage.eta, current_stage.entities_checkpoint),
82 );
83
84 CurrentStage { stage_id, eta, checkpoint, entities_checkpoint, target }
85 }
86
87 fn handle_pipeline_event(&mut self, event: PipelineEvent) {
89 match event {
90 PipelineEvent::Prepare { pipeline_stages_progress, stage_id, checkpoint, target } => {
91 let checkpoint = checkpoint.unwrap_or_default();
92 let current_stage = self.build_current_stage(stage_id, checkpoint, target);
93
94 info!(
95 pipeline_stages = %pipeline_stages_progress,
96 stage = %stage_id,
97 checkpoint = %checkpoint.block_number,
98 target = %OptionalField(target),
99 "Preparing stage",
100 );
101
102 self.current_stage = Some(current_stage);
103 }
104 PipelineEvent::Run { pipeline_stages_progress, stage_id, checkpoint, target } => {
105 let checkpoint = checkpoint.unwrap_or_default();
106 let current_stage = self.build_current_stage(stage_id, checkpoint, target);
107
108 if let Some(stage_eta) = current_stage.eta.fmt_for_stage(stage_id) {
109 info!(
110 pipeline_stages = %pipeline_stages_progress,
111 stage = %stage_id,
112 checkpoint = %checkpoint.block_number,
113 target = %OptionalField(target),
114 %stage_eta,
115 "Executing stage",
116 );
117 } else {
118 info!(
119 pipeline_stages = %pipeline_stages_progress,
120 stage = %stage_id,
121 checkpoint = %checkpoint.block_number,
122 target = %OptionalField(target),
123 "Executing stage",
124 );
125 }
126
127 self.current_stage = Some(current_stage);
128 }
129 PipelineEvent::Ran {
130 pipeline_stages_progress,
131 stage_id,
132 result: ExecOutput { checkpoint, done },
133 } => {
134 if stage_id.is_finish() {
135 self.latest_block = Some(checkpoint.block_number);
136 }
137
138 if let Some(current_stage) = self.current_stage.as_mut() {
139 current_stage.checkpoint = checkpoint;
140 current_stage.entities_checkpoint = checkpoint.entities();
141 current_stage.eta.update(stage_id, checkpoint);
142
143 let target = OptionalField(current_stage.target);
144 let stage_progress = current_stage
145 .entities_checkpoint
146 .and_then(|entities| entities.fmt_percentage());
147 let stage_eta = current_stage.eta.fmt_for_stage(stage_id);
148
149 let message = if done { "Finished stage" } else { "Committed stage progress" };
150
151 match (stage_progress, stage_eta) {
152 (Some(stage_progress), Some(stage_eta)) => {
153 info!(
154 pipeline_stages = %pipeline_stages_progress,
155 stage = %stage_id,
156 checkpoint = %checkpoint.block_number,
157 %target,
158 %stage_progress,
159 %stage_eta,
160 "{message}",
161 )
162 }
163 (Some(stage_progress), None) => {
164 info!(
165 pipeline_stages = %pipeline_stages_progress,
166 stage = %stage_id,
167 checkpoint = %checkpoint.block_number,
168 %target,
169 %stage_progress,
170 "{message}",
171 )
172 }
173 (None, Some(stage_eta)) => {
174 info!(
175 pipeline_stages = %pipeline_stages_progress,
176 stage = %stage_id,
177 checkpoint = %checkpoint.block_number,
178 %target,
179 %stage_eta,
180 "{message}",
181 )
182 }
183 (None, None) => {
184 info!(
185 pipeline_stages = %pipeline_stages_progress,
186 stage = %stage_id,
187 checkpoint = %checkpoint.block_number,
188 %target,
189 "{message}",
190 )
191 }
192 }
193 }
194
195 if done {
196 self.current_stage = None;
197 }
198 }
199 PipelineEvent::Unwind { stage_id, input } => {
200 let current_stage = CurrentStage {
201 stage_id,
202 eta: Eta::default(),
203 checkpoint: input.checkpoint,
204 target: Some(input.unwind_to),
205 entities_checkpoint: input.checkpoint.entities(),
206 };
207
208 self.current_stage = Some(current_stage);
209 }
210 _ => (),
211 }
212 }
213
214 fn handle_consensus_engine_event(&mut self, event: BeaconConsensusEngineEvent) {
215 match event {
216 BeaconConsensusEngineEvent::ForkchoiceUpdated(state, status) => {
217 let ForkchoiceState { head_block_hash, safe_block_hash, finalized_block_hash } =
218 state;
219 if self.safe_block_hash != Some(safe_block_hash) &&
220 self.finalized_block_hash != Some(finalized_block_hash)
221 {
222 let msg = match status {
223 ForkchoiceStatus::Valid => "Forkchoice updated",
224 ForkchoiceStatus::Invalid => "Received invalid forkchoice updated message",
225 ForkchoiceStatus::Syncing => {
226 "Received forkchoice updated message when syncing"
227 }
228 };
229 info!(?head_block_hash, ?safe_block_hash, ?finalized_block_hash, "{}", msg);
230 }
231 self.head_block_hash = Some(head_block_hash);
232 self.safe_block_hash = Some(safe_block_hash);
233 self.finalized_block_hash = Some(finalized_block_hash);
234 }
235 BeaconConsensusEngineEvent::LiveSyncProgress(live_sync_progress) => {
236 match live_sync_progress {
237 ConsensusEngineLiveSyncProgress::DownloadingBlocks {
238 remaining_blocks,
239 target,
240 } => {
241 info!(
242 remaining_blocks,
243 target_block_hash=?target,
244 "Live sync in progress, downloading blocks"
245 );
246 }
247 }
248 }
249 BeaconConsensusEngineEvent::CanonicalBlockAdded(block, elapsed) => {
250 info!(
251 number=block.number,
252 hash=?block.hash(),
253 peers=self.num_connected_peers(),
254 txs=block.body.transactions.len(),
255 gas=%format_gas(block.header.gas_used),
256 gas_throughput=%format_gas_throughput(block.header.gas_used, elapsed),
257 full=%format!("{:.1}%", block.header.gas_used as f64 * 100.0 / block.header.gas_limit as f64),
258 base_fee=%format!("{:.2}gwei", block.header.base_fee_per_gas.unwrap_or(0) as f64 / GWEI_TO_WEI as f64),
259 blobs=block.header.blob_gas_used.unwrap_or(0) / alloy_eips::eip4844::DATA_GAS_PER_BLOB,
260 excess_blobs=block.header.excess_blob_gas.unwrap_or(0) / alloy_eips::eip4844::DATA_GAS_PER_BLOB,
261 ?elapsed,
262 "Block added to canonical chain"
263 );
264 }
265 BeaconConsensusEngineEvent::CanonicalChainCommitted(head, elapsed) => {
266 self.latest_block = Some(head.number);
267 self.latest_block_time = Some(head.timestamp);
268
269 info!(number=head.number, hash=?head.hash(), ?elapsed, "Canonical chain committed");
270 }
271 BeaconConsensusEngineEvent::ForkBlockAdded(block, elapsed) => {
272 info!(number=block.number, hash=?block.hash(), ?elapsed, "Block added to fork chain");
273 }
274 }
275 }
276
277 fn handle_consensus_layer_health_event(&self, event: ConsensusLayerHealthEvent) {
278 if self.current_stage.is_none() {
281 match event {
282 ConsensusLayerHealthEvent::NeverSeen => {
283 warn!("Post-merge network, but never seen beacon client. Please launch one to follow the chain!")
284 }
285 ConsensusLayerHealthEvent::HasNotBeenSeenForAWhile(period) => {
286 warn!(?period, "Post-merge network, but no beacon client seen for a while. Please launch one to follow the chain!")
287 }
288 ConsensusLayerHealthEvent::NeverReceivedUpdates => {
289 warn!("Beacon client online, but never received consensus updates. Please ensure your beacon client is operational to follow the chain!")
290 }
291 ConsensusLayerHealthEvent::HaveNotReceivedUpdatesForAWhile(period) => {
292 warn!(?period, "Beacon client online, but no consensus updates received for a while. This may be because of a reth error, or an error in the beacon client! Please investigate reth and beacon client logs!")
293 }
294 }
295 }
296 }
297
298 fn handle_pruner_event(&self, event: PrunerEvent) {
299 match event {
300 PrunerEvent::Started { tip_block_number } => {
301 debug!(tip_block_number, "Pruner started");
302 }
303 PrunerEvent::Finished { tip_block_number, elapsed, stats } => {
304 let stats = format!(
305 "[{}]",
306 stats.iter().map(|item| item.to_string()).collect::<Vec<_>>().join(", ")
307 );
308 debug!(tip_block_number, ?elapsed, pruned_segments = %stats, "Pruner finished");
309 }
310 }
311 }
312
313 fn handle_static_file_producer_event(&self, event: StaticFileProducerEvent) {
314 match event {
315 StaticFileProducerEvent::Started { targets } => {
316 debug!(?targets, "Static File Producer started");
317 }
318 StaticFileProducerEvent::Finished { targets, elapsed } => {
319 debug!(?targets, ?elapsed, "Static File Producer finished");
320 }
321 }
322 }
323}
324
325struct OptionalField<T: Display>(Option<T>);
329
330impl<T: Display> Display for OptionalField<T> {
331 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
332 if let Some(field) = &self.0 {
333 write!(f, "{field}")
334 } else {
335 write!(f, "None")
336 }
337 }
338}
339
340struct CurrentStage {
342 stage_id: StageId,
343 eta: Eta,
344 checkpoint: StageCheckpoint,
345 entities_checkpoint: Option<EntitiesCheckpoint>,
349 target: Option<BlockNumber>,
350}
351
352#[derive(Debug)]
354pub enum NodeEvent {
355 Pipeline(PipelineEvent),
357 ConsensusEngine(BeaconConsensusEngineEvent),
359 ConsensusLayerHealth(ConsensusLayerHealthEvent),
361 Pruner(PrunerEvent),
363 StaticFileProducer(StaticFileProducerEvent),
365 Other(String),
368}
369
370impl From<PipelineEvent> for NodeEvent {
371 fn from(event: PipelineEvent) -> Self {
372 Self::Pipeline(event)
373 }
374}
375
376impl From<BeaconConsensusEngineEvent> for NodeEvent {
377 fn from(event: BeaconConsensusEngineEvent) -> Self {
378 Self::ConsensusEngine(event)
379 }
380}
381
382impl From<ConsensusLayerHealthEvent> for NodeEvent {
383 fn from(event: ConsensusLayerHealthEvent) -> Self {
384 Self::ConsensusLayerHealth(event)
385 }
386}
387
388impl From<PrunerEvent> for NodeEvent {
389 fn from(event: PrunerEvent) -> Self {
390 Self::Pruner(event)
391 }
392}
393
394impl From<StaticFileProducerEvent> for NodeEvent {
395 fn from(event: StaticFileProducerEvent) -> Self {
396 Self::StaticFileProducer(event)
397 }
398}
399
400pub async fn handle_events<E>(
403 peers_info: Option<Box<dyn PeersInfo>>,
404 latest_block_number: Option<BlockNumber>,
405 events: E,
406) where
407 E: Stream<Item = NodeEvent> + Unpin,
408{
409 let state = NodeState::new(peers_info, latest_block_number);
410
411 let start = tokio::time::Instant::now() + Duration::from_secs(3);
412 let mut info_interval = tokio::time::interval_at(start, INFO_MESSAGE_INTERVAL);
413 info_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
414
415 let handler = EventHandler { state, events, info_interval };
416 handler.await
417}
418
419#[pin_project::pin_project]
421struct EventHandler<E> {
422 state: NodeState,
423 #[pin]
424 events: E,
425 #[pin]
426 info_interval: Interval,
427}
428
429impl<E> Future for EventHandler<E>
430where
431 E: Stream<Item = NodeEvent> + Unpin,
432{
433 type Output = ();
434
435 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
436 let mut this = self.project();
437
438 while this.info_interval.poll_tick(cx).is_ready() {
439 if let Some(CurrentStage { stage_id, eta, checkpoint, entities_checkpoint, target }) =
440 &this.state.current_stage
441 {
442 let stage_progress =
443 entities_checkpoint.and_then(|entities| entities.fmt_percentage());
444 let stage_eta = eta.fmt_for_stage(*stage_id);
445
446 match (stage_progress, stage_eta) {
447 (Some(stage_progress), Some(stage_eta)) => {
448 info!(
449 target: "reth::cli",
450 connected_peers = this.state.num_connected_peers(),
451 stage = %stage_id,
452 checkpoint = checkpoint.block_number,
453 target = %OptionalField(*target),
454 %stage_progress,
455 %stage_eta,
456 "Status"
457 )
458 }
459 (Some(stage_progress), None) => {
460 info!(
461 target: "reth::cli",
462 connected_peers = this.state.num_connected_peers(),
463 stage = %stage_id,
464 checkpoint = checkpoint.block_number,
465 target = %OptionalField(*target),
466 %stage_progress,
467 "Status"
468 )
469 }
470 (None, Some(stage_eta)) => {
471 info!(
472 target: "reth::cli",
473 connected_peers = this.state.num_connected_peers(),
474 stage = %stage_id,
475 checkpoint = checkpoint.block_number,
476 target = %OptionalField(*target),
477 %stage_eta,
478 "Status"
479 )
480 }
481 (None, None) => {
482 info!(
483 target: "reth::cli",
484 connected_peers = this.state.num_connected_peers(),
485 stage = %stage_id,
486 checkpoint = checkpoint.block_number,
487 target = %OptionalField(*target),
488 "Status"
489 )
490 }
491 }
492 } else if let Some(latest_block) = this.state.latest_block {
493 let now =
494 SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
495 if now.saturating_sub(this.state.latest_block_time.unwrap_or(0)) > 60 {
496 info!(
499 target: "reth::cli",
500 connected_peers = this.state.num_connected_peers(),
501 %latest_block,
502 "Status"
503 );
504 }
505 } else {
506 info!(
507 target: "reth::cli",
508 connected_peers = this.state.num_connected_peers(),
509 "Status"
510 );
511 }
512 }
513
514 while let Poll::Ready(Some(event)) = this.events.as_mut().poll_next(cx) {
515 match event {
516 NodeEvent::Pipeline(event) => {
517 this.state.handle_pipeline_event(event);
518 }
519 NodeEvent::ConsensusEngine(event) => {
520 this.state.handle_consensus_engine_event(event);
521 }
522 NodeEvent::ConsensusLayerHealth(event) => {
523 this.state.handle_consensus_layer_health_event(event)
524 }
525 NodeEvent::Pruner(event) => {
526 this.state.handle_pruner_event(event);
527 }
528 NodeEvent::StaticFileProducer(event) => {
529 this.state.handle_static_file_producer_event(event);
530 }
531 NodeEvent::Other(event_description) => {
532 warn!("{event_description}");
533 }
534 }
535 }
536
537 Poll::Pending
538 }
539}
540
541#[derive(Default, Copy, Clone)]
546struct Eta {
547 last_checkpoint: EntitiesCheckpoint,
549 last_checkpoint_time: Option<Instant>,
551 eta: Option<Duration>,
553}
554
555impl Eta {
556 fn update(&mut self, stage: StageId, checkpoint: StageCheckpoint) {
558 let Some(current) = checkpoint.entities() else { return };
559
560 if let Some(last_checkpoint_time) = &self.last_checkpoint_time {
561 let Some(processed_since_last) =
562 current.processed.checked_sub(self.last_checkpoint.processed)
563 else {
564 self.eta = None;
565 debug!(target: "reth::cli", %stage, ?current, ?self.last_checkpoint, "Failed to calculate the ETA: processed entities is less than the last checkpoint");
566 return
567 };
568 let elapsed = last_checkpoint_time.elapsed();
569 let per_second = processed_since_last as f64 / elapsed.as_secs_f64();
570
571 let Some(remaining) = current.total.checked_sub(current.processed) else {
572 self.eta = None;
573 debug!(target: "reth::cli", %stage, ?current, "Failed to calculate the ETA: total entities is less than processed entities");
574 return
575 };
576
577 self.eta = Duration::try_from_secs_f64(remaining as f64 / per_second).ok();
578 }
579
580 self.last_checkpoint = current;
581 self.last_checkpoint_time = Some(Instant::now());
582 }
583
584 fn is_available(&self) -> bool {
586 self.eta.zip(self.last_checkpoint_time).is_some()
587 }
588
589 fn fmt_for_stage(&self, stage: StageId) -> Option<String> {
595 if !self.is_available() ||
596 matches!(stage, StageId::Headers | StageId::Bodies | StageId::Execution)
597 {
598 None
599 } else {
600 Some(self.to_string())
601 }
602 }
603}
604
605impl Display for Eta {
606 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
607 if let Some((eta, last_checkpoint_time)) = self.eta.zip(self.last_checkpoint_time) {
608 let remaining = eta.checked_sub(last_checkpoint_time.elapsed());
609
610 if let Some(remaining) = remaining {
611 return write!(
612 f,
613 "{}",
614 humantime::format_duration(Duration::from_secs(remaining.as_secs()))
615 )
616 }
617 }
618
619 write!(f, "unknown")
620 }
621}
622
623#[cfg(test)]
624mod tests {
625 use super::*;
626
627 #[test]
628 fn eta_display_no_milliseconds() {
629 let eta = Eta {
630 last_checkpoint_time: Some(Instant::now()),
631 eta: Some(Duration::from_millis(
632 13 * 60 * 1000 + 37 * 1000 + 999, )),
636 ..Default::default()
637 }
638 .to_string();
639
640 assert_eq!(eta, "13m 37s");
641 }
642}