reth_beacon_consensus/engine/
mod.rs

1use alloy_consensus::{BlockHeader, Header};
2use alloy_eips::{merge::EPOCH_SLOTS, BlockNumHash};
3use alloy_primitives::{BlockNumber, B256};
4use alloy_rpc_types_engine::{
5    ExecutionPayload, ExecutionPayloadSidecar, ForkchoiceState, PayloadStatus, PayloadStatusEnum,
6    PayloadValidationError,
7};
8use futures::{stream::BoxStream, Future, StreamExt};
9use itertools::Either;
10use reth_blockchain_tree_api::{
11    error::{BlockchainTreeError, CanonicalError, InsertBlockError, InsertBlockErrorKind},
12    BlockStatus, BlockValidationKind, BlockchainTreeEngine, CanonicalOutcome, InsertPayloadOk,
13};
14use reth_engine_primitives::{
15    BeaconEngineMessage, BeaconOnNewPayloadError, EngineApiMessageVersion, EngineTypes,
16    ForkchoiceStateHash, ForkchoiceStateTracker, ForkchoiceStatus, OnForkChoiceUpdated,
17    PayloadTypes,
18};
19use reth_errors::{BlockValidationError, ProviderResult, RethError, RethResult};
20use reth_network_p2p::{
21    sync::{NetworkSyncUpdater, SyncState},
22    EthBlockClient,
23};
24use reth_node_types::{Block, BlockTy, HeaderTy, NodeTypesWithEngine};
25use reth_payload_builder::PayloadBuilderHandle;
26use reth_payload_builder_primitives::PayloadBuilder;
27use reth_payload_primitives::{PayloadAttributes, PayloadBuilderAttributes};
28use reth_payload_validator::ExecutionPayloadValidator;
29use reth_primitives::{Head, SealedBlock, SealedHeader};
30use reth_provider::{
31    providers::{ProviderNodeTypes, TreeNodeTypes},
32    BlockIdReader, BlockReader, BlockSource, CanonChainTracker, ChainSpecProvider, ProviderError,
33    StageCheckpointReader,
34};
35use reth_stages_api::{ControlFlow, Pipeline, PipelineTarget, StageId};
36use reth_tasks::TaskSpawner;
37use reth_tokio_util::EventSender;
38use std::{
39    pin::Pin,
40    sync::Arc,
41    task::{Context, Poll},
42    time::{Duration, Instant},
43};
44use tokio::sync::{
45    mpsc::{self, UnboundedSender},
46    oneshot,
47};
48use tokio_stream::wrappers::UnboundedReceiverStream;
49use tracing::*;
50
51mod error;
52pub use error::{BeaconConsensusEngineError, BeaconEngineResult, BeaconForkChoiceUpdateError};
53
54mod invalid_headers;
55pub use invalid_headers::InvalidHeaderCache;
56
57mod event;
58pub use event::{BeaconConsensusEngineEvent, ConsensusEngineLiveSyncProgress};
59
60mod handle;
61pub use handle::BeaconConsensusEngineHandle;
62
63mod metrics;
64use metrics::EngineMetrics;
65
66pub mod sync;
67use sync::{EngineSyncController, EngineSyncEvent};
68
69/// Hooks for running during the main loop of
70/// [consensus engine][`crate::engine::BeaconConsensusEngine`].
71pub mod hooks;
72use hooks::{EngineHookContext, EngineHookEvent, EngineHooks, EngineHooksController, PolledHook};
73
74#[cfg(test)]
75pub mod test_utils;
76
77/// The maximum number of invalid headers that can be tracked by the engine.
78const MAX_INVALID_HEADERS: u32 = 512u32;
79
80/// The largest gap for which the tree will be used for sync. See docs for `pipeline_run_threshold`
81/// for more information.
82///
83/// This is the default threshold, the distance to the head that the tree will be used for sync.
84/// If the distance exceeds this threshold, the pipeline will be used for sync.
85pub const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
86
87/// Helper trait expressing requirements for node types to be used in engine.
88pub trait EngineNodeTypes: ProviderNodeTypes + NodeTypesWithEngine {}
89
90impl<T> EngineNodeTypes for T where T: ProviderNodeTypes + NodeTypesWithEngine {}
91
92/// Represents a pending forkchoice update.
93///
94/// This type encapsulates the necessary components for a pending forkchoice update
95/// in the context of a beacon consensus engine.
96///
97/// It consists of:
98/// - The current fork choice state.
99/// - Optional payload attributes specific to the engine type.
100/// - Sender for the result of an oneshot channel, conveying the outcome of the fork choice update.
101type PendingForkchoiceUpdate<PayloadAttributes> =
102    (ForkchoiceState, Option<PayloadAttributes>, oneshot::Sender<RethResult<OnForkChoiceUpdated>>);
103
104/// The beacon consensus engine is the driver that switches between historical and live sync.
105///
106/// The beacon consensus engine is itself driven by messages from the Consensus Layer, which are
107/// received by Engine API (JSON-RPC).
108///
109/// The consensus engine is idle until it receives the first
110/// [`BeaconEngineMessage::ForkchoiceUpdated`] message from the CL which would initiate the sync. At
111/// first, the consensus engine would run the [Pipeline] until the latest known block hash.
112/// Afterward, it would attempt to create/restore the [`BlockchainTreeEngine`] from the blocks
113/// that are currently available. In case the restoration is successful, the consensus engine would
114/// run in a live sync mode, populating the [`BlockchainTreeEngine`] with new blocks as they arrive
115/// via engine API and downloading any missing blocks from the network to fill potential gaps.
116///
117/// The consensus engine has two data input sources:
118///
119/// ## New Payload (`engine_newPayloadV{}`)
120///
121/// The engine receives new payloads from the CL. If the payload is connected to the canonical
122/// chain, it will be fully validated added to a chain in the [`BlockchainTreeEngine`]: `VALID`
123///
124/// If the payload's chain is disconnected (at least 1 block is missing) then it will be buffered:
125/// `SYNCING` ([`BlockStatus::Disconnected`]).
126///
127/// ## Forkchoice Update (FCU) (`engine_forkchoiceUpdatedV{}`)
128///
129/// This contains the latest forkchoice state and the payload attributes. The engine will attempt to
130/// make a new canonical chain based on the `head_hash` of the update and trigger payload building
131/// if the `payload_attrs` are present and the FCU is `VALID`.
132///
133/// The `head_hash` forms a chain by walking backwards from the `head_hash` towards the canonical
134/// blocks of the chain.
135///
136/// Making a new canonical chain can result in the following relevant outcomes:
137///
138/// ### The chain is connected
139///
140/// All blocks of the `head_hash`'s chain are present in the [`BlockchainTreeEngine`] and are
141/// committed to the canonical chain. This also includes reorgs.
142///
143/// ### The chain is disconnected
144///
145/// In this case the [`BlockchainTreeEngine`] doesn't know how the new chain connects to the
146/// existing canonical chain. It could be a simple commit (new blocks extend the current head) or a
147/// re-org that requires unwinding the canonical chain.
148///
149/// This further distinguishes between two variants:
150///
151/// #### `head_hash`'s block exists
152///
153/// The `head_hash`'s block was already received/downloaded, but at least one block is missing to
154/// form a _connected_ chain. The engine will attempt to download the missing blocks from the
155/// network by walking backwards (`parent_hash`), and then try to make the block canonical as soon
156/// as the chain becomes connected.
157///
158/// However, it still can be the case that the chain and the FCU is `INVALID`.
159///
160/// #### `head_hash` block is missing
161///
162/// This is similar to the previous case, but the `head_hash`'s block is missing. At which point the
163/// engine doesn't know where the new head will point to: new chain could be a re-org or a simple
164/// commit. The engine will download the missing head first and then proceed as in the previous
165/// case.
166///
167/// # Panics
168///
169/// If the future is polled more than once. Leads to undefined state.
170#[must_use = "Future does nothing unless polled"]
171#[allow(missing_debug_implementations)]
172pub struct BeaconConsensusEngine<N, BT, Client>
173where
174    N: EngineNodeTypes,
175    Client: EthBlockClient,
176    BT: BlockchainTreeEngine
177        + BlockReader
178        + BlockIdReader
179        + CanonChainTracker
180        + StageCheckpointReader,
181{
182    /// Controls syncing triggered by engine updates.
183    sync: EngineSyncController<N, Client>,
184    /// The type we can use to query both the database and the blockchain tree.
185    blockchain: BT,
186    /// Used for emitting updates about whether the engine is syncing or not.
187    sync_state_updater: Box<dyn NetworkSyncUpdater>,
188    /// The Engine API message receiver.
189    engine_message_stream: BoxStream<'static, BeaconEngineMessage<N::Engine>>,
190    /// A clone of the handle
191    handle: BeaconConsensusEngineHandle<N::Engine>,
192    /// Tracks the received forkchoice state updates received by the CL.
193    forkchoice_state_tracker: ForkchoiceStateTracker,
194    /// The payload store.
195    payload_builder: PayloadBuilderHandle<N::Engine>,
196    /// Validator for execution payloads
197    payload_validator: ExecutionPayloadValidator<N::ChainSpec>,
198    /// Current blockchain tree action.
199    blockchain_tree_action: Option<BlockchainTreeAction<N::Engine>>,
200    /// Pending forkchoice update.
201    /// It is recorded if we cannot process the forkchoice update because
202    /// a hook with database read-write access is active.
203    /// This is a temporary solution to always process missed FCUs.
204    pending_forkchoice_update:
205        Option<PendingForkchoiceUpdate<<N::Engine as PayloadTypes>::PayloadAttributes>>,
206    /// Tracks the header of invalid payloads that were rejected by the engine because they're
207    /// invalid.
208    invalid_headers: InvalidHeaderCache,
209    /// After downloading a block corresponding to a recent forkchoice update, the engine will
210    /// check whether or not we can connect the block to the current canonical chain. If we can't,
211    /// we need to download and execute the missing parents of that block.
212    ///
213    /// When the block can't be connected, its block number will be compared to the canonical head,
214    /// resulting in a heuristic for the number of missing blocks, or the size of the gap between
215    /// the new block and the canonical head.
216    ///
217    /// If the gap is larger than this threshold, the engine will download and execute the missing
218    /// blocks using the pipeline. Otherwise, the engine, sync controller, and blockchain tree will
219    /// be used to download and execute the missing blocks.
220    pipeline_run_threshold: u64,
221    hooks: EngineHooksController,
222    /// Sender for engine events.
223    event_sender: EventSender<BeaconConsensusEngineEvent>,
224    /// Consensus engine metrics.
225    metrics: EngineMetrics,
226}
227
228impl<N, BT, Client> BeaconConsensusEngine<N, BT, Client>
229where
230    N: TreeNodeTypes,
231    BT: BlockchainTreeEngine
232        + BlockReader<Block = BlockTy<N>, Header = HeaderTy<N>>
233        + BlockIdReader
234        + CanonChainTracker<Header = HeaderTy<N>>
235        + StageCheckpointReader
236        + ChainSpecProvider<ChainSpec = N::ChainSpec>
237        + 'static,
238    Client: EthBlockClient + 'static,
239{
240    /// Create a new instance of the [`BeaconConsensusEngine`].
241    #[allow(clippy::too_many_arguments)]
242    pub fn new(
243        client: Client,
244        pipeline: Pipeline<N>,
245        blockchain: BT,
246        task_spawner: Box<dyn TaskSpawner>,
247        sync_state_updater: Box<dyn NetworkSyncUpdater>,
248        max_block: Option<BlockNumber>,
249        payload_builder: PayloadBuilderHandle<N::Engine>,
250        target: Option<B256>,
251        pipeline_run_threshold: u64,
252        hooks: EngineHooks,
253    ) -> RethResult<(Self, BeaconConsensusEngineHandle<N::Engine>)> {
254        let (to_engine, rx) = mpsc::unbounded_channel();
255        Self::with_channel(
256            client,
257            pipeline,
258            blockchain,
259            task_spawner,
260            sync_state_updater,
261            max_block,
262            payload_builder,
263            target,
264            pipeline_run_threshold,
265            to_engine,
266            Box::pin(UnboundedReceiverStream::from(rx)),
267            hooks,
268        )
269    }
270
271    /// Create a new instance of the [`BeaconConsensusEngine`] using the given channel to configure
272    /// the [`BeaconEngineMessage`] communication channel.
273    ///
274    /// By default the engine is started with idle pipeline.
275    /// The pipeline can be launched immediately in one of the following ways descending in
276    /// priority:
277    /// - Explicit [`Option::Some`] target block hash provided via a constructor argument.
278    /// - The process was previously interrupted amidst the pipeline run. This is checked by
279    ///   comparing the checkpoints of the first ([`StageId::Headers`]) and last
280    ///   ([`StageId::Finish`]) stages. In this case, the latest available header in the database is
281    ///   used as the target.
282    ///
283    /// Propagates any database related error.
284    #[allow(clippy::too_many_arguments)]
285    pub fn with_channel(
286        client: Client,
287        pipeline: Pipeline<N>,
288        blockchain: BT,
289        task_spawner: Box<dyn TaskSpawner>,
290        sync_state_updater: Box<dyn NetworkSyncUpdater>,
291        max_block: Option<BlockNumber>,
292        payload_builder: PayloadBuilderHandle<N::Engine>,
293        target: Option<B256>,
294        pipeline_run_threshold: u64,
295        to_engine: UnboundedSender<BeaconEngineMessage<N::Engine>>,
296        engine_message_stream: BoxStream<'static, BeaconEngineMessage<N::Engine>>,
297        hooks: EngineHooks,
298    ) -> RethResult<(Self, BeaconConsensusEngineHandle<N::Engine>)> {
299        let event_sender = EventSender::default();
300        let handle = BeaconConsensusEngineHandle::new(to_engine, event_sender.clone());
301        let sync = EngineSyncController::new(
302            pipeline,
303            client,
304            task_spawner.clone(),
305            max_block,
306            blockchain.chain_spec(),
307            event_sender.clone(),
308        );
309        let mut this = Self {
310            sync,
311            payload_validator: ExecutionPayloadValidator::new(blockchain.chain_spec()),
312            blockchain,
313            sync_state_updater,
314            engine_message_stream,
315            handle: handle.clone(),
316            forkchoice_state_tracker: Default::default(),
317            payload_builder,
318            invalid_headers: InvalidHeaderCache::new(MAX_INVALID_HEADERS),
319            blockchain_tree_action: None,
320            pending_forkchoice_update: None,
321            pipeline_run_threshold,
322            hooks: EngineHooksController::new(hooks),
323            event_sender,
324            metrics: EngineMetrics::default(),
325        };
326
327        let maybe_pipeline_target = match target {
328            // Provided target always takes precedence.
329            target @ Some(_) => target,
330            None => this.check_pipeline_consistency()?,
331        };
332
333        if let Some(target) = maybe_pipeline_target {
334            this.sync.set_pipeline_sync_target(target.into());
335        }
336
337        Ok((this, handle))
338    }
339
340    /// Returns current [`EngineHookContext`] that's used for polling engine hooks.
341    fn current_engine_hook_context(&self) -> RethResult<EngineHookContext> {
342        Ok(EngineHookContext {
343            tip_block_number: self.blockchain.canonical_tip().number,
344            finalized_block_number: self
345                .blockchain
346                .finalized_block_number()
347                .map_err(RethError::Provider)?,
348        })
349    }
350
351    /// Set the next blockchain tree action.
352    fn set_blockchain_tree_action(&mut self, action: BlockchainTreeAction<N::Engine>) {
353        let previous_action = self.blockchain_tree_action.replace(action);
354        debug_assert!(previous_action.is_none(), "Pre-existing action found");
355    }
356
357    /// Pre-validate forkchoice update and check whether it can be processed.
358    ///
359    /// This method returns the update outcome if validation fails or
360    /// the node is syncing and the update cannot be processed at the moment.
361    fn pre_validate_forkchoice_update(
362        &mut self,
363        state: ForkchoiceState,
364    ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
365        if state.head_block_hash.is_zero() {
366            return Ok(Some(OnForkChoiceUpdated::invalid_state()))
367        }
368
369        // check if the new head hash is connected to any ancestor that we previously marked as
370        // invalid
371        let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
372        if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
373            return Ok(Some(OnForkChoiceUpdated::with_invalid(status)))
374        }
375
376        if self.sync.is_pipeline_active() {
377            // We can only process new forkchoice updates if the pipeline is idle, since it requires
378            // exclusive access to the database
379            trace!(target: "consensus::engine", "Pipeline is syncing, skipping forkchoice update");
380            return Ok(Some(OnForkChoiceUpdated::syncing()))
381        }
382
383        Ok(None)
384    }
385
386    /// Process the result of attempting to make forkchoice state head hash canonical.
387    ///
388    /// # Returns
389    ///
390    /// A forkchoice state update outcome or fatal error.
391    fn on_forkchoice_updated_make_canonical_result(
392        &mut self,
393        state: ForkchoiceState,
394        mut attrs: Option<<N::Engine as PayloadTypes>::PayloadAttributes>,
395        make_canonical_result: Result<CanonicalOutcome, CanonicalError>,
396        elapsed: Duration,
397    ) -> Result<OnForkChoiceUpdated, CanonicalError> {
398        match make_canonical_result {
399            Ok(outcome) => {
400                let should_update_head = match &outcome {
401                    CanonicalOutcome::AlreadyCanonical { head, header } => {
402                        self.on_head_already_canonical(head, header, &mut attrs)
403                    }
404                    CanonicalOutcome::Committed { head } => {
405                        // new VALID update that moved the canonical chain forward
406                        debug!(target: "consensus::engine", hash=?state.head_block_hash, number=head.number, "Canonicalized new head");
407                        true
408                    }
409                };
410
411                if should_update_head {
412                    let head = outcome.header();
413                    let _ = self.update_head(head.clone());
414                    self.event_sender.notify(BeaconConsensusEngineEvent::CanonicalChainCommitted(
415                        Box::new(head.clone()),
416                        elapsed,
417                    ));
418                }
419
420                // Validate that the forkchoice state is consistent.
421                let on_updated = if let Some(invalid_fcu_response) =
422                    self.ensure_consistent_forkchoice_state(state)?
423                {
424                    trace!(target: "consensus::engine", ?state, "Forkchoice state is inconsistent");
425                    invalid_fcu_response
426                } else if let Some(attrs) = attrs {
427                    // the CL requested to build a new payload on top of this new VALID head
428                    let head = outcome.into_header().unseal();
429                    self.process_payload_attributes(
430                        attrs,
431                        head,
432                        state,
433                        EngineApiMessageVersion::default(),
434                    )
435                } else {
436                    OnForkChoiceUpdated::valid(PayloadStatus::new(
437                        PayloadStatusEnum::Valid,
438                        Some(state.head_block_hash),
439                    ))
440                };
441                Ok(on_updated)
442            }
443            Err(err) => {
444                if err.is_fatal() {
445                    error!(target: "consensus::engine", %err, "Encountered fatal error");
446                    Err(err)
447                } else {
448                    Ok(OnForkChoiceUpdated::valid(
449                        self.on_failed_canonical_forkchoice_update(&state, err)?,
450                    ))
451                }
452            }
453        }
454    }
455
456    /// Invoked when head hash references a `VALID` block that is already canonical.
457    ///
458    /// Returns `true` if the head needs to be updated.
459    fn on_head_already_canonical(
460        &self,
461        head: &BlockNumHash,
462        header: &SealedHeader,
463        attrs: &mut Option<<N::Engine as PayloadTypes>::PayloadAttributes>,
464    ) -> bool {
465        // On Optimism, the proposers are allowed to reorg their own chain at will.
466        #[cfg(feature = "optimism")]
467        if reth_chainspec::EthChainSpec::is_optimism(&self.blockchain.chain_spec()) {
468            debug!(
469                target: "consensus::engine",
470                fcu_head_num=?header.number,
471                current_head_num=?head.number,
472                "[Optimism] Allowing beacon reorg to old head"
473            );
474            return true
475        }
476
477        // 2. Client software MAY skip an update of the forkchoice state and MUST NOT begin a
478        //    payload build process if `forkchoiceState.headBlockHash` references a `VALID` ancestor
479        //    of the head of canonical chain, i.e. the ancestor passed payload validation process
480        //    and deemed `VALID`. In the case of such an event, client software MUST return
481        //    `{payloadStatus: {status: VALID, latestValidHash: forkchoiceState.headBlockHash,
482        //    validationError: null}, payloadId: null}`
483        if head != &header.num_hash() {
484            attrs.take();
485        }
486
487        debug!(
488            target: "consensus::engine",
489            fcu_head_num=?header.number,
490            current_head_num=?head.number,
491            "Ignoring beacon update to old head"
492        );
493        false
494    }
495
496    /// Invoked when we receive a new forkchoice update message. Calls into the blockchain tree
497    /// to resolve chain forks and ensure that the Execution Layer is working with the latest valid
498    /// chain.
499    ///
500    /// These responses should adhere to the [Engine API Spec for
501    /// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1).
502    ///
503    /// Returns an error if an internal error occurred like a database error.
504    fn on_forkchoice_updated(
505        &mut self,
506        state: ForkchoiceState,
507        attrs: Option<<N::Engine as PayloadTypes>::PayloadAttributes>,
508        tx: oneshot::Sender<RethResult<OnForkChoiceUpdated>>,
509    ) {
510        self.metrics.forkchoice_updated_messages.increment(1);
511        self.blockchain.on_forkchoice_update_received(&state);
512        trace!(target: "consensus::engine", ?state, "Received new forkchoice state update");
513
514        match self.pre_validate_forkchoice_update(state) {
515            Ok(on_updated_result) => {
516                if let Some(on_updated) = on_updated_result {
517                    // Pre-validate forkchoice state update and return if it's invalid
518                    // or cannot be processed at the moment.
519                    self.on_forkchoice_updated_status(state, on_updated, tx);
520                } else if let Some(hook) = self.hooks.active_db_write_hook() {
521                    // We can only process new forkchoice updates if no hook with db write is
522                    // running, since it requires exclusive access to the
523                    // database
524                    let replaced_pending =
525                        self.pending_forkchoice_update.replace((state, attrs, tx));
526                    warn!(
527                        target: "consensus::engine",
528                        hook = %hook.name(),
529                        head_block_hash = ?state.head_block_hash,
530                        safe_block_hash = ?state.safe_block_hash,
531                        finalized_block_hash = ?state.finalized_block_hash,
532                        replaced_pending = ?replaced_pending.map(|(state, _, _)| state),
533                        "Hook is in progress, delaying forkchoice update. \
534                        This may affect the performance of your node as a validator."
535                    );
536                } else {
537                    self.set_blockchain_tree_action(
538                        BlockchainTreeAction::MakeForkchoiceHeadCanonical { state, attrs, tx },
539                    );
540                }
541            }
542            Err(error) => {
543                let _ = tx.send(Err(error.into()));
544            }
545        }
546    }
547
548    /// Called after the forkchoice update status has been resolved.
549    /// Depending on the outcome, the method updates the sync state and notifies the listeners
550    /// about new processed FCU.
551    fn on_forkchoice_updated_status(
552        &mut self,
553        state: ForkchoiceState,
554        on_updated: OnForkChoiceUpdated,
555        tx: oneshot::Sender<RethResult<OnForkChoiceUpdated>>,
556    ) {
557        // send the response to the CL ASAP
558        let status = on_updated.forkchoice_status();
559        let _ = tx.send(Ok(on_updated));
560
561        // update the forkchoice state tracker
562        self.forkchoice_state_tracker.set_latest(state, status);
563
564        match status {
565            ForkchoiceStatus::Invalid => {}
566            ForkchoiceStatus::Valid => {
567                // FCU head is valid, we're no longer syncing
568                self.sync_state_updater.update_sync_state(SyncState::Idle);
569                // node's fully synced, clear active download requests
570                self.sync.clear_block_download_requests();
571            }
572            ForkchoiceStatus::Syncing => {
573                // we're syncing
574                self.sync_state_updater.update_sync_state(SyncState::Syncing);
575            }
576        }
577
578        // notify listeners about new processed FCU
579        self.event_sender.notify(BeaconConsensusEngineEvent::ForkchoiceUpdated(state, status));
580    }
581
582    /// Check if the pipeline is consistent (all stages have the checkpoint block numbers no less
583    /// than the checkpoint of the first stage).
584    ///
585    /// This will return the pipeline target if:
586    ///  * the pipeline was interrupted during its previous run
587    ///  * a new stage was added
588    ///  * stage data was dropped manually through `reth stage drop ...`
589    ///
590    /// # Returns
591    ///
592    /// A target block hash if the pipeline is inconsistent, otherwise `None`.
593    fn check_pipeline_consistency(&self) -> RethResult<Option<B256>> {
594        // If no target was provided, check if the stages are congruent - check if the
595        // checkpoint of the last stage matches the checkpoint of the first.
596        let first_stage_checkpoint = self
597            .blockchain
598            .get_stage_checkpoint(*StageId::ALL.first().unwrap())?
599            .unwrap_or_default()
600            .block_number;
601
602        // Skip the first stage as we've already retrieved it and comparing all other checkpoints
603        // against it.
604        for stage_id in StageId::ALL.iter().skip(1) {
605            let stage_checkpoint =
606                self.blockchain.get_stage_checkpoint(*stage_id)?.unwrap_or_default().block_number;
607
608            // If the checkpoint of any stage is less than the checkpoint of the first stage,
609            // retrieve and return the block hash of the latest header and use it as the target.
610            if stage_checkpoint < first_stage_checkpoint {
611                debug!(
612                    target: "consensus::engine",
613                    first_stage_checkpoint,
614                    inconsistent_stage_id = %stage_id,
615                    inconsistent_stage_checkpoint = stage_checkpoint,
616                    "Pipeline sync progress is inconsistent"
617                );
618                return Ok(self.blockchain.block_hash(first_stage_checkpoint)?)
619            }
620        }
621
622        Ok(None)
623    }
624
625    /// Returns a new [`BeaconConsensusEngineHandle`] that can be cloned and shared.
626    ///
627    /// The [`BeaconConsensusEngineHandle`] can be used to interact with this
628    /// [`BeaconConsensusEngine`]
629    pub fn handle(&self) -> BeaconConsensusEngineHandle<N::Engine> {
630        self.handle.clone()
631    }
632
633    /// Returns true if the distance from the local tip to the block is greater than the configured
634    /// threshold.
635    ///
636    /// If the `local_tip` is greater than the `block`, then this will return false.
637    #[inline]
638    const fn exceeds_pipeline_run_threshold(&self, local_tip: u64, block: u64) -> bool {
639        block > local_tip && block - local_tip > self.pipeline_run_threshold
640    }
641
642    /// Returns the finalized hash to sync to if the distance from the local tip to the block is
643    /// greater than the configured threshold and we're not synced to the finalized block yet
644    /// yet (if we've seen that block already).
645    ///
646    /// If this is invoked after a new block has been downloaded, the downloaded block could be the
647    /// (missing) finalized block.
648    fn can_pipeline_sync_to_finalized(
649        &self,
650        canonical_tip_num: u64,
651        target_block_number: u64,
652        downloaded_block: Option<BlockNumHash>,
653    ) -> Option<B256> {
654        let sync_target_state = self.forkchoice_state_tracker.sync_target_state();
655
656        // check if the distance exceeds the threshold for pipeline sync
657        let mut exceeds_pipeline_run_threshold =
658            self.exceeds_pipeline_run_threshold(canonical_tip_num, target_block_number);
659
660        // check if the downloaded block is the tracked finalized block
661        if let Some(ref buffered_finalized) = sync_target_state
662            .as_ref()
663            .and_then(|state| self.blockchain.buffered_header_by_hash(state.finalized_block_hash))
664        {
665            // if we have buffered the finalized block, we should check how far
666            // we're off
667            exceeds_pipeline_run_threshold =
668                self.exceeds_pipeline_run_threshold(canonical_tip_num, buffered_finalized.number);
669        }
670
671        // If this is invoked after we downloaded a block we can check if this block is the
672        // finalized block
673        if let (Some(downloaded_block), Some(ref state)) = (downloaded_block, sync_target_state) {
674            if downloaded_block.hash == state.finalized_block_hash {
675                // we downloaded the finalized block
676                exceeds_pipeline_run_threshold =
677                    self.exceeds_pipeline_run_threshold(canonical_tip_num, downloaded_block.number);
678            }
679        }
680
681        // if the number of missing blocks is greater than the max, run the
682        // pipeline
683        if exceeds_pipeline_run_threshold {
684            if let Some(state) = sync_target_state {
685                // if we have already canonicalized the finalized block, we should
686                // skip the pipeline run
687                match self.blockchain.header_by_hash_or_number(state.finalized_block_hash.into()) {
688                    Err(err) => {
689                        warn!(target: "consensus::engine", %err, "Failed to get finalized block header");
690                    }
691                    Ok(None) => {
692                        // ensure the finalized block is known (not the zero hash)
693                        if !state.finalized_block_hash.is_zero() {
694                            // we don't have the block yet and the distance exceeds the allowed
695                            // threshold
696                            return Some(state.finalized_block_hash)
697                        }
698
699                        // OPTIMISTIC SYNCING
700                        //
701                        // It can happen when the node is doing an
702                        // optimistic sync, where the CL has no knowledge of the finalized hash,
703                        // but is expecting the EL to sync as high
704                        // as possible before finalizing.
705                        //
706                        // This usually doesn't happen on ETH mainnet since CLs use the more
707                        // secure checkpoint syncing.
708                        //
709                        // However, optimism chains will do this. The risk of a reorg is however
710                        // low.
711                        debug!(target: "consensus::engine", hash=?state.head_block_hash, "Setting head hash as an optimistic pipeline target.");
712                        return Some(state.head_block_hash)
713                    }
714                    Ok(Some(_)) => {
715                        // we're fully synced to the finalized block
716                        // but we want to continue downloading the missing parent
717                    }
718                }
719            }
720        }
721
722        None
723    }
724
725    /// Returns how far the local tip is from the given block. If the local tip is at the same
726    /// height or its block number is greater than the given block, this returns None.
727    #[inline]
728    const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
729        if block > local_tip {
730            Some(block - local_tip)
731        } else {
732            None
733        }
734    }
735
736    /// If validation fails, the response MUST contain the latest valid hash:
737    ///
738    ///   - The block hash of the ancestor of the invalid payload satisfying the following two
739    ///     conditions:
740    ///     - It is fully validated and deemed VALID
741    ///     - Any other ancestor of the invalid payload with a higher blockNumber is INVALID
742    ///   - 0x0000000000000000000000000000000000000000000000000000000000000000 if the above
743    ///     conditions are satisfied by a `PoW` block.
744    ///   - null if client software cannot determine the ancestor of the invalid payload satisfying
745    ///     the above conditions.
746    fn latest_valid_hash_for_invalid_payload(
747        &mut self,
748        parent_hash: B256,
749    ) -> ProviderResult<Option<B256>> {
750        // Check if parent exists in side chain or in canonical chain.
751        if self.blockchain.find_block_by_hash(parent_hash, BlockSource::Any)?.is_some() {
752            return Ok(Some(parent_hash))
753        }
754
755        // iterate over ancestors in the invalid cache
756        // until we encounter the first valid ancestor
757        let mut current_hash = parent_hash;
758        let mut current_block = self.invalid_headers.get(&current_hash);
759        while let Some(block) = current_block {
760            current_hash = block.parent;
761            current_block = self.invalid_headers.get(&current_hash);
762
763            // If current_header is None, then the current_hash does not have an invalid
764            // ancestor in the cache, check its presence in blockchain tree
765            if current_block.is_none() &&
766                self.blockchain.find_block_by_hash(current_hash, BlockSource::Any)?.is_some()
767            {
768                return Ok(Some(current_hash))
769            }
770        }
771        Ok(None)
772    }
773
774    /// Prepares the invalid payload response for the given hash, checking the
775    /// database for the parent hash and populating the payload status with the latest valid hash
776    /// according to the engine api spec.
777    fn prepare_invalid_response(&mut self, mut parent_hash: B256) -> ProviderResult<PayloadStatus> {
778        // Edge case: the `latestValid` field is the zero hash if the parent block is the terminal
779        // PoW block, which we need to identify by looking at the parent's block difficulty
780        if let Ok(Some(parent)) = self.blockchain.header_by_hash_or_number(parent_hash.into()) {
781            if !parent.is_zero_difficulty() {
782                parent_hash = B256::ZERO;
783            }
784        }
785
786        let valid_parent_hash = self.latest_valid_hash_for_invalid_payload(parent_hash)?;
787        Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
788            validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
789        })
790        .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
791    }
792
793    /// Checks if the given `check` hash points to an invalid header, inserting the given `head`
794    /// block into the invalid header cache if the `check` hash has a known invalid ancestor.
795    ///
796    /// Returns a payload status response according to the engine API spec if the block is known to
797    /// be invalid.
798    fn check_invalid_ancestor_with_head(
799        &mut self,
800        check: B256,
801        head: B256,
802    ) -> ProviderResult<Option<PayloadStatus>> {
803        // check if the check hash was previously marked as invalid
804        let Some(block) = self.invalid_headers.get(&check) else { return Ok(None) };
805
806        // populate the latest valid hash field
807        let status = self.prepare_invalid_response(block.parent)?;
808
809        // insert the head block into the invalid header cache
810        self.invalid_headers.insert_with_invalid_ancestor(head, block);
811
812        Ok(Some(status))
813    }
814
815    /// Checks if the given `head` points to an invalid header, which requires a specific response
816    /// to a forkchoice update.
817    fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
818        // check if the head was previously marked as invalid
819        let Some(block) = self.invalid_headers.get(&head) else { return Ok(None) };
820
821        // populate the latest valid hash field
822        Ok(Some(self.prepare_invalid_response(block.parent)?))
823    }
824
825    /// Record latency metrics for one call to make a block canonical
826    /// Takes start time of the call and result of the make canonical call
827    ///
828    /// Handles cases for error, already canonical and committed blocks
829    fn record_make_canonical_latency(
830        &self,
831        start: Instant,
832        outcome: &Result<CanonicalOutcome, CanonicalError>,
833    ) -> Duration {
834        let elapsed = start.elapsed();
835        self.metrics.make_canonical_latency.record(elapsed);
836        match outcome {
837            Ok(CanonicalOutcome::AlreadyCanonical { .. }) => {
838                self.metrics.make_canonical_already_canonical_latency.record(elapsed)
839            }
840            Ok(CanonicalOutcome::Committed { .. }) => {
841                self.metrics.make_canonical_committed_latency.record(elapsed)
842            }
843            Err(_) => self.metrics.make_canonical_error_latency.record(elapsed),
844        }
845        elapsed
846    }
847
848    /// Ensures that the given forkchoice state is consistent, assuming the head block has been
849    /// made canonical.
850    ///
851    /// If the forkchoice state is consistent, this will return Ok(None). Otherwise, this will
852    /// return an instance of [`OnForkChoiceUpdated`] that is INVALID.
853    ///
854    /// This also updates the safe and finalized blocks in the [`CanonChainTracker`], if they are
855    /// consistent with the head block.
856    fn ensure_consistent_forkchoice_state(
857        &self,
858        state: ForkchoiceState,
859    ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
860        // Ensure that the finalized block, if not zero, is known and in the canonical chain
861        // after the head block is canonicalized.
862        //
863        // This ensures that the finalized block is consistent with the head block, i.e. the
864        // finalized block is an ancestor of the head block.
865        if !state.finalized_block_hash.is_zero() &&
866            !self.blockchain.is_canonical(state.finalized_block_hash)?
867        {
868            return Ok(Some(OnForkChoiceUpdated::invalid_state()))
869        }
870
871        // Finalized block is consistent, so update it in the canon chain tracker.
872        self.update_finalized_block(state.finalized_block_hash)?;
873
874        // Also ensure that the safe block, if not zero, is known and in the canonical chain
875        // after the head block is canonicalized.
876        //
877        // This ensures that the safe block is consistent with the head block, i.e. the safe
878        // block is an ancestor of the head block.
879        if !state.safe_block_hash.is_zero() &&
880            !self.blockchain.is_canonical(state.safe_block_hash)?
881        {
882            return Ok(Some(OnForkChoiceUpdated::invalid_state()))
883        }
884
885        // Safe block is consistent, so update it in the canon chain tracker.
886        self.update_safe_block(state.safe_block_hash)?;
887
888        Ok(None)
889    }
890
891    /// Sets the state of the canon chain tracker based to the given head.
892    ///
893    /// This expects the given head to be the new canonical head.
894    ///
895    /// Additionally, updates the head used for p2p handshakes.
896    ///
897    /// This also updates the tracked safe and finalized blocks, and should be called before
898    /// returning a VALID forkchoice update response
899    fn update_canon_chain(&self, head: SealedHeader, update: &ForkchoiceState) -> RethResult<()> {
900        self.update_head(head)?;
901        self.update_finalized_block(update.finalized_block_hash)?;
902        self.update_safe_block(update.safe_block_hash)?;
903        Ok(())
904    }
905
906    /// Updates the state of the canon chain tracker based on the given head.
907    ///
908    /// This expects the given head to be the new canonical head.
909    /// Additionally, updates the head used for p2p handshakes.
910    ///
911    /// This should be called before returning a VALID forkchoice update response
912    #[inline]
913    fn update_head(&self, head: SealedHeader) -> RethResult<()> {
914        let mut head_block = Head {
915            number: head.number,
916            hash: head.hash(),
917            difficulty: head.difficulty,
918            timestamp: head.timestamp,
919            // NOTE: this will be set later
920            total_difficulty: Default::default(),
921        };
922
923        // we update the tracked header first
924        self.blockchain.set_canonical_head(head);
925
926        head_block.total_difficulty =
927            self.blockchain.header_td_by_number(head_block.number)?.ok_or_else(|| {
928                RethError::Provider(ProviderError::TotalDifficultyNotFound(head_block.number))
929            })?;
930        self.sync_state_updater.update_status(head_block);
931
932        Ok(())
933    }
934
935    /// Updates the tracked safe block if we have it
936    ///
937    /// Returns an error if the block is not found.
938    #[inline]
939    fn update_safe_block(&self, safe_block_hash: B256) -> ProviderResult<()> {
940        if !safe_block_hash.is_zero() {
941            if self.blockchain.safe_block_hash()? == Some(safe_block_hash) {
942                // nothing to update
943                return Ok(())
944            }
945
946            let safe = self
947                .blockchain
948                .find_block_by_hash(safe_block_hash, BlockSource::Any)?
949                .ok_or(ProviderError::UnknownBlockHash(safe_block_hash))?;
950            self.blockchain.set_safe(SealedHeader::new(safe.split().0, safe_block_hash));
951        }
952        Ok(())
953    }
954
955    /// Updates the tracked finalized block if we have it
956    ///
957    /// Returns an error if the block is not found.
958    #[inline]
959    fn update_finalized_block(&self, finalized_block_hash: B256) -> ProviderResult<()> {
960        if !finalized_block_hash.is_zero() {
961            if self.blockchain.finalized_block_hash()? == Some(finalized_block_hash) {
962                // nothing to update
963                return Ok(())
964            }
965
966            let finalized = self
967                .blockchain
968                .find_block_by_hash(finalized_block_hash, BlockSource::Any)?
969                .ok_or(ProviderError::UnknownBlockHash(finalized_block_hash))?;
970            self.blockchain.finalize_block(finalized.header().number())?;
971            self.blockchain
972                .set_finalized(SealedHeader::new(finalized.split().0, finalized_block_hash));
973        }
974        Ok(())
975    }
976
977    /// Handler for a failed a forkchoice update due to a canonicalization error.
978    ///
979    /// This will determine if the state's head is invalid, and if so, return immediately.
980    ///
981    /// If the newest head is not invalid, then this will trigger a new pipeline run to sync the gap
982    ///
983    /// See [`Self::on_forkchoice_updated`] and [`BlockchainTreeEngine::make_canonical`].
984    fn on_failed_canonical_forkchoice_update(
985        &mut self,
986        state: &ForkchoiceState,
987        error: CanonicalError,
988    ) -> ProviderResult<PayloadStatus> {
989        debug_assert!(self.sync.is_pipeline_idle(), "pipeline must be idle");
990
991        // check if the new head was previously invalidated, if so then we deem this FCU
992        // as invalid
993        if let Some(invalid_ancestor) = self.check_invalid_ancestor(state.head_block_hash)? {
994            warn!(target: "consensus::engine", %error, ?state, ?invalid_ancestor, head=?state.head_block_hash, "Failed to canonicalize the head hash, head is also considered invalid");
995            debug!(target: "consensus::engine", head=?state.head_block_hash, current_error=%error, "Head was previously marked as invalid");
996            return Ok(invalid_ancestor)
997        }
998
999        match &error {
1000            CanonicalError::Validation(BlockValidationError::BlockPreMerge { .. }) => {
1001                warn!(target: "consensus::engine", %error, ?state, "Failed to canonicalize the head hash");
1002                return Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
1003                    validation_error: error.to_string(),
1004                })
1005                .with_latest_valid_hash(B256::ZERO))
1006            }
1007            CanonicalError::BlockchainTree(BlockchainTreeError::BlockHashNotFoundInChain {
1008                ..
1009            }) => {
1010                // This just means we couldn't find the block when attempting to make it canonical,
1011                // so we should not warn the user, since this will result in us attempting to sync
1012                // to a new target and is considered normal operation during sync
1013            }
1014            CanonicalError::OptimisticTargetRevert(block_number) => {
1015                self.sync.set_pipeline_sync_target(PipelineTarget::Unwind(*block_number));
1016                return Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing))
1017            }
1018            _ => {
1019                warn!(target: "consensus::engine", %error, ?state, "Failed to canonicalize the head hash");
1020                // TODO(mattsse) better error handling before attempting to sync (FCU could be
1021                // invalid): only trigger sync if we can't determine whether the FCU is invalid
1022            }
1023        }
1024
1025        // we assume the FCU is valid and at least the head is missing,
1026        // so we need to start syncing to it
1027        //
1028        // find the appropriate target to sync to, if we don't have the safe block hash then we
1029        // start syncing to the safe block via pipeline first
1030        let target = if self.forkchoice_state_tracker.is_empty() &&
1031            // check that safe block is valid and missing
1032            !state.safe_block_hash.is_zero() &&
1033            self.blockchain.block_number(state.safe_block_hash).ok().flatten().is_none()
1034        {
1035            state.safe_block_hash
1036        } else {
1037            state.head_block_hash
1038        };
1039
1040        // we need to first check the buffer for the target and its ancestors
1041        let target = self.lowest_buffered_ancestor_or(target);
1042
1043        // if the threshold is zero, we should not download the block first, and just use the
1044        // pipeline. Otherwise we use the tree to insert the block first
1045        if self.pipeline_run_threshold == 0 {
1046            // use the pipeline to sync to the target
1047            trace!(target: "consensus::engine", %target, "Triggering pipeline run to sync missing ancestors of the new head");
1048            self.sync.set_pipeline_sync_target(target.into());
1049        } else {
1050            // trigger a full block download for missing hash, or the parent of its lowest buffered
1051            // ancestor
1052            trace!(target: "consensus::engine", request=%target, "Triggering full block download for missing ancestors of the new head");
1053            self.sync.download_full_block(target);
1054        }
1055
1056        debug!(target: "consensus::engine", %target, "Syncing to new target");
1057        Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing))
1058    }
1059
1060    /// Return the parent hash of the lowest buffered ancestor for the requested block, if there
1061    /// are any buffered ancestors. If there are no buffered ancestors, and the block itself does
1062    /// not exist in the buffer, this returns the hash that is passed in.
1063    ///
1064    /// Returns the parent hash of the block itself if the block is buffered and has no other
1065    /// buffered ancestors.
1066    fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
1067        self.blockchain
1068            .lowest_buffered_ancestor(hash)
1069            .map(|block| block.parent_hash)
1070            .unwrap_or_else(|| hash)
1071    }
1072
1073    /// When the Consensus layer receives a new block via the consensus gossip protocol,
1074    /// the transactions in the block are sent to the execution layer in the form of a
1075    /// [`ExecutionPayload`]. The Execution layer executes the transactions and validates the
1076    /// state in the block header, then passes validation data back to Consensus layer, that
1077    /// adds the block to the head of its own blockchain and attests to it. The block is then
1078    /// broadcast over the consensus p2p network in the form of a "Beacon block".
1079    ///
1080    /// These responses should adhere to the [Engine API Spec for
1081    /// `engine_newPayload`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification).
1082    ///
1083    /// This returns a [`PayloadStatus`] that represents the outcome of a processed new payload and
1084    /// returns an error if an internal error occurred.
1085    #[instrument(level = "trace", skip(self, payload, sidecar), fields(block_hash = ?payload.block_hash(), block_number = %payload.block_number(), is_pipeline_idle = %self.sync.is_pipeline_idle()), target = "consensus::engine")]
1086    fn on_new_payload(
1087        &mut self,
1088        payload: ExecutionPayload,
1089        sidecar: ExecutionPayloadSidecar,
1090    ) -> Result<Either<PayloadStatus, SealedBlock>, BeaconOnNewPayloadError> {
1091        self.metrics.new_payload_messages.increment(1);
1092
1093        // Ensures that the given payload does not violate any consensus rules that concern the
1094        // block's layout, like:
1095        //    - missing or invalid base fee
1096        //    - invalid extra data
1097        //    - invalid transactions
1098        //    - incorrect hash
1099        //    - the versioned hashes passed with the payload do not exactly match transaction
1100        //      versioned hashes
1101        //    - the block does not contain blob transactions if it is pre-cancun
1102        //
1103        // This validates the following engine API rule:
1104        //
1105        // 3. Given the expected array of blob versioned hashes client software **MUST** run its
1106        //    validation by taking the following steps:
1107        //
1108        //   1. Obtain the actual array by concatenating blob versioned hashes lists
1109        //      (`tx.blob_versioned_hashes`) of each [blob
1110        //      transaction](https://eips.ethereum.org/EIPS/eip-4844#new-transaction-type) included
1111        //      in the payload, respecting the order of inclusion. If the payload has no blob
1112        //      transactions the expected array **MUST** be `[]`.
1113        //
1114        //   2. Return `{status: INVALID, latestValidHash: null, validationError: errorMessage |
1115        //      null}` if the expected and the actual arrays don't match.
1116        //
1117        // This validation **MUST** be instantly run in all cases even during active sync process.
1118        let parent_hash = payload.parent_hash();
1119        let block = match self.payload_validator.ensure_well_formed_payload(payload, sidecar) {
1120            Ok(block) => block,
1121            Err(error) => {
1122                error!(target: "consensus::engine", %error, "Invalid payload");
1123                // we need to convert the error to a payload status (response to the CL)
1124
1125                let latest_valid_hash =
1126                    if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
1127                        // Engine-API rules:
1128                        // > `latestValidHash: null` if the blockHash validation has failed (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/shanghai.md?plain=1#L113>)
1129                        // > `latestValidHash: null` if the expected and the actual arrays don't match (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md?plain=1#L103>)
1130                        None
1131                    } else {
1132                        self.latest_valid_hash_for_invalid_payload(parent_hash)
1133                            .map_err(BeaconOnNewPayloadError::internal)?
1134                    };
1135
1136                let status = PayloadStatusEnum::from(error);
1137                return Ok(Either::Left(PayloadStatus::new(status, latest_valid_hash)))
1138            }
1139        };
1140
1141        let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block.hash());
1142        if lowest_buffered_ancestor == block.hash() {
1143            lowest_buffered_ancestor = block.parent_hash;
1144        }
1145
1146        // now check the block itself
1147        if let Some(status) = self
1148            .check_invalid_ancestor_with_head(lowest_buffered_ancestor, block.hash())
1149            .map_err(BeaconOnNewPayloadError::internal)?
1150        {
1151            Ok(Either::Left(status))
1152        } else {
1153            Ok(Either::Right(block))
1154        }
1155    }
1156
1157    /// Validates the payload attributes with respect to the header and fork choice state.
1158    ///
1159    /// Note: At this point, the fork choice update is considered to be VALID, however, we can still
1160    /// return an error if the payload attributes are invalid.
1161    fn process_payload_attributes(
1162        &self,
1163        attrs: <N::Engine as PayloadTypes>::PayloadAttributes,
1164        head: Header,
1165        state: ForkchoiceState,
1166        version: EngineApiMessageVersion,
1167    ) -> OnForkChoiceUpdated {
1168        // 7. Client software MUST ensure that payloadAttributes.timestamp is greater than timestamp
1169        //    of a block referenced by forkchoiceState.headBlockHash. If this condition isn't held
1170        //    client software MUST respond with -38003: `Invalid payload attributes` and MUST NOT
1171        //    begin a payload build process. In such an event, the forkchoiceState update MUST NOT
1172        //    be rolled back.
1173        if attrs.timestamp() <= head.timestamp {
1174            return OnForkChoiceUpdated::invalid_payload_attributes()
1175        }
1176
1177        // 8. Client software MUST begin a payload build process building on top of
1178        //    forkchoiceState.headBlockHash and identified via buildProcessId value if
1179        //    payloadAttributes is not null and the forkchoice state has been updated successfully.
1180        //    The build process is specified in the Payload building section.
1181        match <<N:: Engine as PayloadTypes>::PayloadBuilderAttributes as PayloadBuilderAttributes>::try_new(
1182            state.head_block_hash,
1183            attrs,
1184            version as u8
1185        ) {
1186            Ok(attributes) => {
1187                // send the payload to the builder and return the receiver for the pending payload
1188                // id, initiating payload job is handled asynchronously
1189                let pending_payload_id = self.payload_builder.send_new_payload(attributes);
1190
1191                // Client software MUST respond to this method call in the following way:
1192                // {
1193                //      payloadStatus: {
1194                //          status: VALID,
1195                //          latestValidHash: forkchoiceState.headBlockHash,
1196                //          validationError: null
1197                //      },
1198                //      payloadId: buildProcessId
1199                // }
1200                //
1201                // if the payload is deemed VALID and the build process has begun.
1202                OnForkChoiceUpdated::updated_with_pending_payload_id(
1203                    PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
1204                    pending_payload_id,
1205                )
1206            }
1207            Err(_) => OnForkChoiceUpdated::invalid_payload_attributes(),
1208        }
1209    }
1210
1211    /// When the pipeline is active, the tree is unable to commit any additional blocks since the
1212    /// pipeline holds exclusive access to the database.
1213    ///
1214    /// In this scenario we buffer the payload in the tree if the payload is valid, once the
1215    /// pipeline is finished, the tree is then able to also use the buffered payloads to commit to a
1216    /// (newer) canonical chain.
1217    ///
1218    /// This will return `SYNCING` if the block was buffered successfully, and an error if an error
1219    /// occurred while buffering the block.
1220    #[instrument(level = "trace", skip_all, target = "consensus::engine", ret)]
1221    fn try_buffer_payload(
1222        &mut self,
1223        block: SealedBlock,
1224    ) -> Result<PayloadStatus, InsertBlockError> {
1225        self.blockchain.buffer_block_without_senders(block)?;
1226        Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing))
1227    }
1228
1229    /// Attempts to insert a new payload into the tree.
1230    ///
1231    /// Caution: This expects that the pipeline is idle.
1232    #[instrument(level = "trace", skip_all, target = "consensus::engine", ret)]
1233    fn try_insert_new_payload(
1234        &mut self,
1235        block: SealedBlock,
1236    ) -> Result<PayloadStatus, InsertBlockError> {
1237        debug_assert!(self.sync.is_pipeline_idle(), "pipeline must be idle");
1238
1239        let block_hash = block.hash();
1240        let start = Instant::now();
1241        let status = self
1242            .blockchain
1243            .insert_block_without_senders(block.clone(), BlockValidationKind::Exhaustive)?;
1244
1245        let elapsed = start.elapsed();
1246        let mut latest_valid_hash = None;
1247        let status = match status {
1248            InsertPayloadOk::Inserted(BlockStatus::Valid(attachment)) => {
1249                latest_valid_hash = Some(block_hash);
1250                let block = Arc::new(block);
1251                let event = if attachment.is_canonical() {
1252                    BeaconConsensusEngineEvent::CanonicalBlockAdded(block, elapsed)
1253                } else {
1254                    BeaconConsensusEngineEvent::ForkBlockAdded(block, elapsed)
1255                };
1256                self.event_sender.notify(event);
1257                PayloadStatusEnum::Valid
1258            }
1259            InsertPayloadOk::AlreadySeen(BlockStatus::Valid(_)) => {
1260                latest_valid_hash = Some(block_hash);
1261                PayloadStatusEnum::Valid
1262            }
1263            InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) |
1264            InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
1265                // check if the block's parent is already marked as invalid
1266                if let Some(status) =
1267                    self.check_invalid_ancestor_with_head(block.parent_hash, block.hash()).map_err(
1268                        |error| InsertBlockError::new(block, InsertBlockErrorKind::Provider(error)),
1269                    )?
1270                {
1271                    return Ok(status)
1272                }
1273
1274                // not known to be invalid, but we don't know anything else
1275                PayloadStatusEnum::Syncing
1276            }
1277        };
1278        Ok(PayloadStatus::new(status, latest_valid_hash))
1279    }
1280
1281    /// This handles downloaded blocks that are shown to be disconnected from the canonical chain.
1282    ///
1283    /// This mainly compares the missing parent of the downloaded block with the current canonical
1284    /// tip, and decides whether or not the pipeline should be run.
1285    ///
1286    /// The canonical tip is compared to the missing parent using `exceeds_pipeline_run_threshold`,
1287    /// which returns true if the missing parent is sufficiently ahead of the canonical tip. If so,
1288    /// the pipeline is run. Otherwise, we need to insert blocks using the blockchain tree, and
1289    /// must download blocks outside of the pipeline. In this case, the distance is used to
1290    /// determine how many blocks we should download at once.
1291    fn on_disconnected_block(
1292        &mut self,
1293        downloaded_block: BlockNumHash,
1294        missing_parent: BlockNumHash,
1295        head: BlockNumHash,
1296    ) {
1297        // compare the missing parent with the canonical tip
1298        if let Some(target) = self.can_pipeline_sync_to_finalized(
1299            head.number,
1300            missing_parent.number,
1301            Some(downloaded_block),
1302        ) {
1303            // we don't have the block yet and the distance exceeds the allowed
1304            // threshold
1305            self.sync.set_pipeline_sync_target(target.into());
1306            // we can exit early here because the pipeline will take care of syncing
1307            return
1308        }
1309
1310        // continue downloading the missing parent
1311        //
1312        // this happens if either:
1313        //  * the missing parent block num < canonical tip num
1314        //    * this case represents a missing block on a fork that is shorter than the canonical
1315        //      chain
1316        //  * the missing parent block num >= canonical tip num, but the number of missing blocks is
1317        //    less than the pipeline threshold
1318        //    * this case represents a potentially long range of blocks to download and execute
1319        if let Some(distance) = self.distance_from_local_tip(head.number, missing_parent.number) {
1320            self.sync.download_block_range(missing_parent.hash, distance)
1321        } else {
1322            // This happens when the missing parent is on an outdated
1323            // sidechain
1324            self.sync.download_full_block(missing_parent.hash);
1325        }
1326    }
1327
1328    /// Attempt to form a new canonical chain based on the current sync target.
1329    ///
1330    /// This is invoked when we successfully __downloaded__ a new block from the network which
1331    /// resulted in [`BlockStatus::Valid`].
1332    ///
1333    /// Note: This will not succeed if the sync target has changed since the block download request
1334    /// was issued and the new target is still disconnected and additional missing blocks are
1335    /// downloaded
1336    fn try_make_sync_target_canonical(
1337        &mut self,
1338        inserted: BlockNumHash,
1339    ) -> Result<(), (B256, CanonicalError)> {
1340        let Some(target) = self.forkchoice_state_tracker.sync_target_state() else { return Ok(()) };
1341
1342        // optimistically try to make the head of the current FCU target canonical, the sync
1343        // target might have changed since the block download request was issued
1344        // (new FCU received)
1345        let start = Instant::now();
1346        let make_canonical_result = self.blockchain.make_canonical(target.head_block_hash);
1347        let elapsed = self.record_make_canonical_latency(start, &make_canonical_result);
1348        match make_canonical_result {
1349            Ok(outcome) => {
1350                if let CanonicalOutcome::Committed { head } = &outcome {
1351                    self.event_sender.notify(BeaconConsensusEngineEvent::CanonicalChainCommitted(
1352                        Box::new(head.clone()),
1353                        elapsed,
1354                    ));
1355                }
1356
1357                let new_head = outcome.into_header();
1358                debug!(target: "consensus::engine", hash=?new_head.hash(), number=new_head.number, "Canonicalized new head");
1359
1360                // we can update the FCU blocks
1361                if let Err(err) = self.update_canon_chain(new_head, &target) {
1362                    debug!(target: "consensus::engine", ?err, ?target, "Failed to update the canonical chain tracker");
1363                }
1364
1365                // we're no longer syncing
1366                self.sync_state_updater.update_sync_state(SyncState::Idle);
1367
1368                // clear any active block requests
1369                self.sync.clear_block_download_requests();
1370                Ok(())
1371            }
1372            Err(err) => {
1373                // if we failed to make the FCU's head canonical, because we don't have that
1374                // block yet, then we can try to make the inserted block canonical if we know
1375                // it's part of the canonical chain: if it's the safe or the finalized block
1376                if err.is_block_hash_not_found() {
1377                    // if the inserted block is the currently targeted `finalized` or `safe`
1378                    // block, we will attempt to make them canonical,
1379                    // because they are also part of the canonical chain and
1380                    // their missing block range might already be downloaded (buffered).
1381                    if let Some(target_hash) =
1382                        ForkchoiceStateHash::find(&target, inserted.hash).filter(|h| !h.is_head())
1383                    {
1384                        // TODO: do not ignore this
1385                        let _ = self.blockchain.make_canonical(*target_hash.as_ref());
1386                    }
1387                } else if let Some(block_number) = err.optimistic_revert_block_number() {
1388                    self.sync.set_pipeline_sync_target(PipelineTarget::Unwind(block_number));
1389                }
1390
1391                Err((target.head_block_hash, err))
1392            }
1393        }
1394    }
1395
1396    /// Event handler for events emitted by the [`EngineSyncController`].
1397    ///
1398    /// This returns a result to indicate whether the engine future should resolve (fatal error).
1399    fn on_sync_event(
1400        &mut self,
1401        event: EngineSyncEvent,
1402    ) -> Result<EngineEventOutcome, BeaconConsensusEngineError> {
1403        let outcome = match event {
1404            EngineSyncEvent::FetchedFullBlock(block) => {
1405                trace!(target: "consensus::engine", hash=?block.hash(), number=%block.number, "Downloaded full block");
1406                // Insert block only if the block's parent is not marked as invalid
1407                if self
1408                    .check_invalid_ancestor_with_head(block.parent_hash, block.hash())
1409                    .map_err(|error| BeaconConsensusEngineError::Common(error.into()))?
1410                    .is_none()
1411                {
1412                    self.set_blockchain_tree_action(
1413                        BlockchainTreeAction::InsertDownloadedPayload { block },
1414                    );
1415                }
1416                EngineEventOutcome::Processed
1417            }
1418            EngineSyncEvent::PipelineStarted(target) => {
1419                trace!(target: "consensus::engine", ?target, continuous = target.is_none(), "Started the pipeline");
1420                self.metrics.pipeline_runs.increment(1);
1421                self.sync_state_updater.update_sync_state(SyncState::Syncing);
1422                EngineEventOutcome::Processed
1423            }
1424            EngineSyncEvent::PipelineFinished { result, reached_max_block } => {
1425                trace!(target: "consensus::engine", ?result, ?reached_max_block, "Pipeline finished");
1426                // Any pipeline error at this point is fatal.
1427                let ctrl = result?;
1428                if reached_max_block {
1429                    // Terminate the sync early if it's reached the maximum user-configured block.
1430                    EngineEventOutcome::ReachedMaxBlock
1431                } else {
1432                    self.on_pipeline_outcome(ctrl)?;
1433                    EngineEventOutcome::Processed
1434                }
1435            }
1436            EngineSyncEvent::PipelineTaskDropped => {
1437                error!(target: "consensus::engine", "Failed to receive spawned pipeline");
1438                return Err(BeaconConsensusEngineError::PipelineChannelClosed)
1439            }
1440        };
1441
1442        Ok(outcome)
1443    }
1444
1445    /// Invoked when the pipeline has successfully finished.
1446    ///
1447    /// Updates the internal sync state depending on the pipeline configuration,
1448    /// the outcome of the pipeline run and the last observed forkchoice state.
1449    fn on_pipeline_outcome(&mut self, ctrl: ControlFlow) -> RethResult<()> {
1450        // Pipeline unwound, memorize the invalid block and wait for CL for next sync target.
1451        if let ControlFlow::Unwind { bad_block, .. } = ctrl {
1452            warn!(target: "consensus::engine", invalid_num_hash=?bad_block.block, "Bad block detected in unwind");
1453            // update the `invalid_headers` cache with the new invalid header
1454            self.invalid_headers.insert(*bad_block);
1455            return Ok(())
1456        }
1457
1458        let sync_target_state = match self.forkchoice_state_tracker.sync_target_state() {
1459            Some(current_state) => current_state,
1460            None => {
1461                // This is only possible if the node was run with `debug.tip`
1462                // argument and without CL.
1463                warn!(target: "consensus::engine", "No fork choice state available");
1464                return Ok(())
1465            }
1466        };
1467
1468        if sync_target_state.finalized_block_hash.is_zero() {
1469            self.set_canonical_head(ctrl.block_number().unwrap_or_default())?;
1470            self.blockchain.update_block_hashes_and_clear_buffered()?;
1471            self.blockchain.connect_buffered_blocks_to_canonical_hashes()?;
1472            // We are on an optimistic syncing process, better to wait for the next FCU to handle
1473            return Ok(())
1474        }
1475
1476        // Next, we check if we need to schedule another pipeline run or transition
1477        // to live sync via tree.
1478        // This can arise if we buffer the forkchoice head, and if the head is an
1479        // ancestor of an invalid block.
1480        //
1481        //  * The forkchoice head could be buffered if it were first sent as a `newPayload` request.
1482        //
1483        // In this case, we won't have the head hash in the database, so we would
1484        // set the pipeline sync target to a known-invalid head.
1485        //
1486        // This is why we check the invalid header cache here.
1487        let lowest_buffered_ancestor =
1488            self.lowest_buffered_ancestor_or(sync_target_state.head_block_hash);
1489
1490        // this inserts the head into invalid headers cache
1491        // if the lowest buffered ancestor is invalid
1492        if self
1493            .check_invalid_ancestor_with_head(
1494                lowest_buffered_ancestor,
1495                sync_target_state.head_block_hash,
1496            )?
1497            .is_some()
1498        {
1499            warn!(
1500                target: "consensus::engine",
1501                invalid_ancestor = %lowest_buffered_ancestor,
1502                head = %sync_target_state.head_block_hash,
1503                "Current head has an invalid ancestor"
1504            );
1505            return Ok(())
1506        }
1507
1508        // get the block number of the finalized block, if we have it
1509        let newest_finalized = self
1510            .blockchain
1511            .buffered_header_by_hash(sync_target_state.finalized_block_hash)
1512            .map(|header| header.number);
1513
1514        // The block number that the pipeline finished at - if the progress or newest
1515        // finalized is None then we can't check the distance anyways.
1516        //
1517        // If both are Some, we perform another distance check and return the desired
1518        // pipeline target
1519        let pipeline_target =
1520            ctrl.block_number().zip(newest_finalized).and_then(|(progress, finalized_number)| {
1521                // Determines whether or not we should run the pipeline again, in case
1522                // the new gap is large enough to warrant
1523                // running the pipeline.
1524                self.can_pipeline_sync_to_finalized(progress, finalized_number, None)
1525            });
1526
1527        // If the distance is large enough, we should run the pipeline again to prevent
1528        // the tree update from executing too many blocks and blocking.
1529        if let Some(target) = pipeline_target {
1530            // run the pipeline to the target since the distance is sufficient
1531            self.sync.set_pipeline_sync_target(target.into());
1532        } else if let Some(number) =
1533            self.blockchain.block_number(sync_target_state.finalized_block_hash)?
1534        {
1535            // Finalized block is in the database, attempt to restore the tree with
1536            // the most recent canonical hashes.
1537            self.blockchain.connect_buffered_blocks_to_canonical_hashes_and_finalize(number).inspect_err(|error| {
1538                error!(target: "consensus::engine", %error, "Error restoring blockchain tree state");
1539            })?;
1540        } else {
1541            // We don't have the finalized block in the database, so we need to
1542            // trigger another pipeline run.
1543            self.sync.set_pipeline_sync_target(sync_target_state.finalized_block_hash.into());
1544        }
1545
1546        Ok(())
1547    }
1548
1549    fn set_canonical_head(&self, max_block: BlockNumber) -> RethResult<()> {
1550        let max_header = self.blockchain.sealed_header(max_block)
1551        .inspect_err(|error| {
1552            error!(target: "consensus::engine", %error, "Error getting canonical header for continuous sync");
1553        })?
1554        .ok_or_else(|| ProviderError::HeaderNotFound(max_block.into()))?;
1555        self.blockchain.set_canonical_head(max_header);
1556
1557        Ok(())
1558    }
1559
1560    fn on_hook_result(&self, polled_hook: PolledHook) -> Result<(), BeaconConsensusEngineError> {
1561        if let EngineHookEvent::Finished(Err(error)) = &polled_hook.event {
1562            error!(
1563                target: "consensus::engine",
1564                name = %polled_hook.name,
1565                ?error,
1566                "Hook finished with error"
1567            )
1568        }
1569
1570        if polled_hook.db_access_level.is_read_write() {
1571            match polled_hook.event {
1572                EngineHookEvent::NotReady => {}
1573                EngineHookEvent::Started => {
1574                    // If the hook has read-write access to the database, it means that the engine
1575                    // can't process any FCU messages from CL. To prevent CL from sending us
1576                    // unneeded updates, we need to respond `true` on `eth_syncing` request.
1577                    self.sync_state_updater.update_sync_state(SyncState::Syncing)
1578                }
1579                EngineHookEvent::Finished(_) => {
1580                    // Hook with read-write access to the database has finished running, so engine
1581                    // can process new FCU messages from CL again. It's safe to
1582                    // return `false` on `eth_syncing` request.
1583                    self.sync_state_updater.update_sync_state(SyncState::Idle);
1584                    // If the hook had read-write access to the database, it means that the engine
1585                    // may have accumulated some buffered blocks.
1586                    if let Err(error) =
1587                        self.blockchain.connect_buffered_blocks_to_canonical_hashes()
1588                    {
1589                        error!(target: "consensus::engine", %error, "Error connecting buffered blocks to canonical hashes on hook result");
1590                        return Err(RethError::Canonical(error).into())
1591                    }
1592                }
1593            }
1594        }
1595
1596        Ok(())
1597    }
1598
1599    /// Process the next set blockchain tree action.
1600    /// The handler might set next blockchain tree action to perform,
1601    /// so the state change should be handled accordingly.
1602    fn on_blockchain_tree_action(
1603        &mut self,
1604        action: BlockchainTreeAction<N::Engine>,
1605    ) -> RethResult<EngineEventOutcome> {
1606        match action {
1607            BlockchainTreeAction::MakeForkchoiceHeadCanonical { state, attrs, tx } => {
1608                let start = Instant::now();
1609                let result = self.blockchain.make_canonical(state.head_block_hash);
1610                let elapsed = self.record_make_canonical_latency(start, &result);
1611                match self
1612                    .on_forkchoice_updated_make_canonical_result(state, attrs, result, elapsed)
1613                {
1614                    Ok(on_updated) => {
1615                        trace!(target: "consensus::engine", status = ?on_updated, ?state, "Returning forkchoice status");
1616                        let fcu_status = on_updated.forkchoice_status();
1617                        self.on_forkchoice_updated_status(state, on_updated, tx);
1618
1619                        if fcu_status.is_valid() {
1620                            let tip_number = self.blockchain.canonical_tip().number;
1621                            if self.sync.has_reached_max_block(tip_number) {
1622                                // Terminate the sync early if it's reached
1623                                // the maximum user configured block.
1624                                return Ok(EngineEventOutcome::ReachedMaxBlock)
1625                            }
1626                        }
1627                    }
1628                    Err(error) => {
1629                        let _ = tx.send(Err(RethError::Canonical(error.clone())));
1630                        if error.is_fatal() {
1631                            return Err(RethError::Canonical(error))
1632                        }
1633                    }
1634                };
1635            }
1636            BlockchainTreeAction::InsertNewPayload { block, tx } => {
1637                let block_hash = block.hash();
1638                let block_num_hash = block.num_hash();
1639                let result = if self.sync.is_pipeline_idle() {
1640                    // we can only insert new payloads if the pipeline is _not_ running, because it
1641                    // holds exclusive access to the database
1642                    self.try_insert_new_payload(block)
1643                } else {
1644                    self.try_buffer_payload(block)
1645                };
1646
1647                let status = match result {
1648                    Ok(status) => status,
1649                    Err(error) => {
1650                        warn!(target: "consensus::engine", %error, "Error while processing payload");
1651
1652                        let (block, error) = error.split();
1653                        if !error.is_invalid_block() {
1654                            // TODO: revise if any error should be considered fatal at this point.
1655                            let _ =
1656                                tx.send(Err(BeaconOnNewPayloadError::Internal(Box::new(error))));
1657                            return Ok(EngineEventOutcome::Processed)
1658                        }
1659
1660                        // If the error was due to an invalid payload, the payload is added to the
1661                        // invalid headers cache and `Ok` with [PayloadStatusEnum::Invalid] is
1662                        // returned.
1663                        warn!(target: "consensus::engine", invalid_hash=?block.hash(), invalid_number=?block.number, %error, "Invalid block error on new payload");
1664                        let latest_valid_hash = if error.is_block_pre_merge() {
1665                            // zero hash must be returned if block is pre-merge
1666                            Some(B256::ZERO)
1667                        } else {
1668                            self.latest_valid_hash_for_invalid_payload(block.parent_hash)?
1669                        };
1670                        // keep track of the invalid header
1671                        self.invalid_headers.insert(block.header.block_with_parent());
1672                        PayloadStatus::new(
1673                            PayloadStatusEnum::Invalid { validation_error: error.to_string() },
1674                            latest_valid_hash,
1675                        )
1676                    }
1677                };
1678
1679                if status.is_valid() {
1680                    if let Some(target) = self.forkchoice_state_tracker.sync_target_state() {
1681                        // if we're currently syncing and the inserted block is the targeted
1682                        // FCU head block, we can try to make it canonical.
1683                        if block_hash == target.head_block_hash {
1684                            self.set_blockchain_tree_action(
1685                                BlockchainTreeAction::MakeNewPayloadCanonical {
1686                                    payload_num_hash: block_num_hash,
1687                                    status,
1688                                    tx,
1689                                },
1690                            );
1691                            return Ok(EngineEventOutcome::Processed)
1692                        }
1693                    }
1694                    // block was successfully inserted, so we can cancel the full block
1695                    // request, if any exists
1696                    self.sync.cancel_full_block_request(block_hash);
1697                }
1698
1699                trace!(target: "consensus::engine", ?status, "Returning payload status");
1700                let _ = tx.send(Ok(status));
1701            }
1702            BlockchainTreeAction::MakeNewPayloadCanonical { payload_num_hash, status, tx } => {
1703                let status = match self.try_make_sync_target_canonical(payload_num_hash) {
1704                    Ok(()) => status,
1705                    Err((_hash, error)) => {
1706                        if error.is_fatal() {
1707                            let response =
1708                                Err(BeaconOnNewPayloadError::Internal(Box::new(error.clone())));
1709                            let _ = tx.send(response);
1710                            return Err(RethError::Canonical(error))
1711                        } else if error.optimistic_revert_block_number().is_some() {
1712                            // engine already set the pipeline unwind target on
1713                            // `try_make_sync_target_canonical`
1714                            PayloadStatus::from_status(PayloadStatusEnum::Syncing)
1715                        } else {
1716                            // If we could not make the sync target block canonical,
1717                            // we should return the error as an invalid payload status.
1718                            PayloadStatus::new(
1719                                PayloadStatusEnum::Invalid { validation_error: error.to_string() },
1720                                // TODO: return a proper latest valid hash
1721                                // See: <https://github.com/paradigmxyz/reth/issues/7146>
1722                                self.forkchoice_state_tracker.last_valid_head(),
1723                            )
1724                        }
1725                    }
1726                };
1727
1728                trace!(target: "consensus::engine", ?status, "Returning payload status");
1729                let _ = tx.send(Ok(status));
1730            }
1731
1732            BlockchainTreeAction::InsertDownloadedPayload { block } => {
1733                let downloaded_num_hash = block.num_hash();
1734                match self.blockchain.insert_block_without_senders(
1735                    block,
1736                    BlockValidationKind::SkipStateRootValidation,
1737                ) {
1738                    Ok(status) => {
1739                        match status {
1740                            InsertPayloadOk::Inserted(BlockStatus::Valid(_)) => {
1741                                // block is connected to the canonical chain and is valid.
1742                                // if it's not connected to current canonical head, the state root
1743                                // has not been validated.
1744                                if let Err((hash, error)) =
1745                                    self.try_make_sync_target_canonical(downloaded_num_hash)
1746                                {
1747                                    if error.is_fatal() {
1748                                        error!(target: "consensus::engine", %error, "Encountered fatal error while making sync target canonical: {:?}, {:?}", error, hash);
1749                                    } else if !error.is_block_hash_not_found() {
1750                                        debug!(
1751                                            target: "consensus::engine",
1752                                            "Unexpected error while making sync target canonical: {:?}, {:?}",
1753                                            error,
1754                                            hash
1755                                        )
1756                                    }
1757                                }
1758                            }
1759                            InsertPayloadOk::Inserted(BlockStatus::Disconnected {
1760                                head,
1761                                missing_ancestor: missing_parent,
1762                            }) => {
1763                                // block is not connected to the canonical head, we need to download
1764                                // its missing branch first
1765                                self.on_disconnected_block(
1766                                    downloaded_num_hash,
1767                                    missing_parent,
1768                                    head,
1769                                );
1770                            }
1771                            _ => (),
1772                        }
1773                    }
1774                    Err(err) => {
1775                        warn!(target: "consensus::engine", %err, "Failed to insert downloaded block");
1776                        if err.kind().is_invalid_block() {
1777                            let (block, err) = err.split();
1778                            warn!(target: "consensus::engine", invalid_number=?block.number, invalid_hash=?block.hash(), %err, "Marking block as invalid");
1779
1780                            self.invalid_headers.insert(block.header.block_with_parent());
1781                        }
1782                    }
1783                }
1784            }
1785        };
1786        Ok(EngineEventOutcome::Processed)
1787    }
1788}
1789
1790/// On initialization, the consensus engine will poll the message receiver and return
1791/// [`Poll::Pending`] until the first forkchoice update message is received.
1792///
1793/// As soon as the consensus engine receives the first forkchoice updated message and updates the
1794/// local forkchoice state, it will launch the pipeline to sync to the head hash.
1795/// While the pipeline is syncing, the consensus engine will keep processing messages from the
1796/// receiver and forwarding them to the blockchain tree.
1797impl<N, BT, Client> Future for BeaconConsensusEngine<N, BT, Client>
1798where
1799    N: TreeNodeTypes,
1800    Client: EthBlockClient + 'static,
1801    BT: BlockchainTreeEngine
1802        + BlockReader<Block = BlockTy<N>, Header = HeaderTy<N>>
1803        + BlockIdReader
1804        + CanonChainTracker<Header = HeaderTy<N>>
1805        + StageCheckpointReader
1806        + ChainSpecProvider<ChainSpec = N::ChainSpec>
1807        + Unpin
1808        + 'static,
1809{
1810    type Output = Result<(), BeaconConsensusEngineError>;
1811
1812    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1813        let this = self.get_mut();
1814
1815        // Control loop that advances the state
1816        'main: loop {
1817            // Poll a running hook with db write access (if any) and CL messages first, draining
1818            // both and then proceeding to polling other parts such as SyncController and hooks.
1819            loop {
1820                // Poll a running hook with db write access first, as we will not be able to process
1821                // any engine messages until it's finished.
1822                if let Poll::Ready(result) =
1823                    this.hooks.poll_active_db_write_hook(cx, this.current_engine_hook_context()?)?
1824                {
1825                    this.on_hook_result(result)?;
1826                    continue
1827                }
1828
1829                // Process any blockchain tree action result as set forth during engine message
1830                // processing.
1831                if let Some(action) = this.blockchain_tree_action.take() {
1832                    match this.on_blockchain_tree_action(action) {
1833                        Ok(EngineEventOutcome::Processed) => {}
1834                        Ok(EngineEventOutcome::ReachedMaxBlock) => return Poll::Ready(Ok(())),
1835                        Err(error) => {
1836                            error!(target: "consensus::engine", %error, "Encountered fatal error");
1837                            return Poll::Ready(Err(error.into()))
1838                        }
1839                    };
1840
1841                    // Blockchain tree action handler might set next action to take.
1842                    continue
1843                }
1844
1845                // If the db write hook is no longer active and we have a pending forkchoice update,
1846                // process it first.
1847                if this.hooks.active_db_write_hook().is_none() {
1848                    if let Some((state, attrs, tx)) = this.pending_forkchoice_update.take() {
1849                        this.set_blockchain_tree_action(
1850                            BlockchainTreeAction::MakeForkchoiceHeadCanonical { state, attrs, tx },
1851                        );
1852                        continue
1853                    }
1854                }
1855
1856                // Process one incoming message from the CL. We don't drain the messages right away,
1857                // because we want to sneak a polling of running hook in between them.
1858                //
1859                // These messages can affect the state of the SyncController and they're also time
1860                // sensitive, hence they are polled first.
1861                if let Poll::Ready(Some(msg)) = this.engine_message_stream.poll_next_unpin(cx) {
1862                    match msg {
1863                        BeaconEngineMessage::ForkchoiceUpdated {
1864                            state,
1865                            payload_attrs,
1866                            tx,
1867                            version: _version,
1868                        } => {
1869                            this.on_forkchoice_updated(state, payload_attrs, tx);
1870                        }
1871                        BeaconEngineMessage::NewPayload { payload, sidecar, tx } => {
1872                            match this.on_new_payload(payload, sidecar) {
1873                                Ok(Either::Right(block)) => {
1874                                    this.set_blockchain_tree_action(
1875                                        BlockchainTreeAction::InsertNewPayload { block, tx },
1876                                    );
1877                                }
1878                                Ok(Either::Left(status)) => {
1879                                    let _ = tx.send(Ok(status));
1880                                }
1881                                Err(error) => {
1882                                    let _ = tx.send(Err(error));
1883                                }
1884                            }
1885                        }
1886                        BeaconEngineMessage::TransitionConfigurationExchanged => {
1887                            this.blockchain.on_transition_configuration_exchanged();
1888                        }
1889                    }
1890                    continue
1891                }
1892
1893                // Both running hook with db write access and engine messages are pending,
1894                // proceed to other polls
1895                break
1896            }
1897
1898            // process sync events if any
1899            if let Poll::Ready(sync_event) = this.sync.poll(cx) {
1900                match this.on_sync_event(sync_event)? {
1901                    // Sync event was successfully processed
1902                    EngineEventOutcome::Processed => (),
1903                    // Max block has been reached, exit the engine loop
1904                    EngineEventOutcome::ReachedMaxBlock => return Poll::Ready(Ok(())),
1905                }
1906
1907                // this could have taken a while, so we start the next cycle to handle any new
1908                // engine messages
1909                continue 'main
1910            }
1911
1912            // at this point, all engine messages and sync events are fully drained
1913
1914            // Poll next hook if all conditions are met:
1915            // 1. Engine and sync messages are fully drained (both pending)
1916            // 2. Latest FCU status is not INVALID
1917            if !this.forkchoice_state_tracker.is_latest_invalid() {
1918                if let Poll::Ready(result) = this.hooks.poll_next_hook(
1919                    cx,
1920                    this.current_engine_hook_context()?,
1921                    this.sync.is_pipeline_active(),
1922                )? {
1923                    this.on_hook_result(result)?;
1924
1925                    // ensure we're polling until pending while also checking for new engine
1926                    // messages before polling the next hook
1927                    continue 'main
1928                }
1929            }
1930
1931            // incoming engine messages and sync events are drained, so we can yield back
1932            // control
1933            return Poll::Pending
1934        }
1935    }
1936}
1937
1938enum BlockchainTreeAction<EngineT: EngineTypes> {
1939    MakeForkchoiceHeadCanonical {
1940        state: ForkchoiceState,
1941        attrs: Option<EngineT::PayloadAttributes>,
1942        tx: oneshot::Sender<RethResult<OnForkChoiceUpdated>>,
1943    },
1944    InsertNewPayload {
1945        block: SealedBlock,
1946        tx: oneshot::Sender<Result<PayloadStatus, BeaconOnNewPayloadError>>,
1947    },
1948    MakeNewPayloadCanonical {
1949        payload_num_hash: BlockNumHash,
1950        status: PayloadStatus,
1951        tx: oneshot::Sender<Result<PayloadStatus, BeaconOnNewPayloadError>>,
1952    },
1953    /// Action to insert a new block that we successfully downloaded from the network.
1954    /// There are several outcomes for inserting a downloaded block into the tree:
1955    ///
1956    /// ## [`BlockStatus::Valid`]
1957    ///
1958    /// The block is connected to the current canonical chain and is valid.
1959    /// If the block is an ancestor of the current forkchoice head, then we can try again to
1960    /// make the chain canonical.
1961    ///
1962    /// ## [`BlockStatus::Disconnected`]
1963    ///
1964    /// The block is not connected to the canonical chain, and we need to download the
1965    /// missing parent first.
1966    ///
1967    /// ## Insert Error
1968    ///
1969    /// If the insertion into the tree failed, then the block was well-formed (valid hash),
1970    /// but its chain is invalid, which means the FCU that triggered the
1971    /// download is invalid. Here we can stop because there's nothing to do here
1972    /// and the engine needs to wait for another FCU.
1973    InsertDownloadedPayload { block: SealedBlock },
1974}
1975
1976/// Represents outcomes of processing an engine event
1977#[derive(Debug)]
1978enum EngineEventOutcome {
1979    /// Engine event was processed successfully, engine should continue.
1980    Processed,
1981    /// Engine event was processed successfully and reached max block.
1982    ReachedMaxBlock,
1983}
1984
1985#[cfg(test)]
1986mod tests {
1987    use super::*;
1988    use crate::{
1989        test_utils::{spawn_consensus_engine, TestConsensusEngineBuilder},
1990        BeaconForkChoiceUpdateError,
1991    };
1992    use alloy_rpc_types_engine::{ForkchoiceState, ForkchoiceUpdated, PayloadStatus};
1993    use assert_matches::assert_matches;
1994    use reth_chainspec::{ChainSpecBuilder, MAINNET};
1995    use reth_node_types::FullNodePrimitives;
1996    use reth_primitives::BlockExt;
1997    use reth_provider::{BlockWriter, ProviderFactory, StorageLocation};
1998    use reth_rpc_types_compat::engine::payload::block_to_payload_v1;
1999    use reth_stages::{ExecOutput, PipelineError, StageError};
2000    use reth_stages_api::StageCheckpoint;
2001    use reth_testing_utils::generators::{self, Rng};
2002    use std::{collections::VecDeque, sync::Arc};
2003    use tokio::sync::oneshot::error::TryRecvError;
2004
2005    // Pipeline error is propagated.
2006    #[tokio::test]
2007    async fn pipeline_error_is_propagated() {
2008        let mut rng = generators::rng();
2009        let chain_spec = Arc::new(
2010            ChainSpecBuilder::default()
2011                .chain(MAINNET.chain)
2012                .genesis(MAINNET.genesis.clone())
2013                .paris_activated()
2014                .build(),
2015        );
2016
2017        let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
2018            .with_pipeline_exec_outputs(VecDeque::from([Err(StageError::ChannelClosed)]))
2019            .disable_blockchain_tree_sync()
2020            .with_max_block(1)
2021            .build();
2022
2023        let res = spawn_consensus_engine(consensus_engine);
2024
2025        let _ = env
2026            .send_forkchoice_updated(ForkchoiceState {
2027                head_block_hash: rng.gen(),
2028                ..Default::default()
2029            })
2030            .await;
2031        assert_matches!(
2032            res.await,
2033            Ok(Err(BeaconConsensusEngineError::Pipeline(n))) if matches!(*n.as_ref(), PipelineError::Stage(StageError::ChannelClosed))
2034        );
2035    }
2036
2037    // Test that the consensus engine is idle until first forkchoice updated is received.
2038    #[tokio::test]
2039    async fn is_idle_until_forkchoice_is_set() {
2040        let mut rng = generators::rng();
2041        let chain_spec = Arc::new(
2042            ChainSpecBuilder::default()
2043                .chain(MAINNET.chain)
2044                .genesis(MAINNET.genesis.clone())
2045                .paris_activated()
2046                .build(),
2047        );
2048
2049        let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
2050            .with_pipeline_exec_outputs(VecDeque::from([Err(StageError::ChannelClosed)]))
2051            .disable_blockchain_tree_sync()
2052            .with_max_block(1)
2053            .build();
2054
2055        let mut rx = spawn_consensus_engine(consensus_engine);
2056
2057        // consensus engine is idle
2058        tokio::time::sleep(Duration::from_millis(100)).await;
2059        assert_matches!(rx.try_recv(), Err(TryRecvError::Empty));
2060
2061        // consensus engine is still idle because no FCUs were received
2062        let _ = env
2063            .send_new_payload(
2064                block_to_payload_v1(SealedBlock::default()),
2065                ExecutionPayloadSidecar::none(),
2066            )
2067            .await;
2068
2069        assert_matches!(rx.try_recv(), Err(TryRecvError::Empty));
2070
2071        // consensus engine is still idle because pruning is running
2072        let _ = env
2073            .send_forkchoice_updated(ForkchoiceState {
2074                head_block_hash: rng.gen(),
2075                ..Default::default()
2076            })
2077            .await;
2078        assert_matches!(rx.try_recv(), Err(TryRecvError::Empty));
2079
2080        // consensus engine receives a forkchoice state and triggers the pipeline when pruning is
2081        // finished
2082        loop {
2083            match rx.try_recv() {
2084                Ok(result) => {
2085                    assert_matches!(
2086                        result,
2087                        Err(BeaconConsensusEngineError::Pipeline(n)) if matches!(*n.as_ref(), PipelineError::Stage(StageError::ChannelClosed))
2088                    );
2089                    break
2090                }
2091                Err(TryRecvError::Empty) => {
2092                    let _ = env
2093                        .send_forkchoice_updated(ForkchoiceState {
2094                            head_block_hash: rng.gen(),
2095                            ..Default::default()
2096                        })
2097                        .await;
2098                }
2099                Err(err) => panic!("receive error: {err}"),
2100            }
2101        }
2102    }
2103
2104    // Test that the consensus engine runs the pipeline again if the tree cannot be restored.
2105    // The consensus engine will propagate the second result (error) only if it runs the pipeline
2106    // for the second time.
2107    #[tokio::test]
2108    async fn runs_pipeline_again_if_tree_not_restored() {
2109        let mut rng = generators::rng();
2110        let chain_spec = Arc::new(
2111            ChainSpecBuilder::default()
2112                .chain(MAINNET.chain)
2113                .genesis(MAINNET.genesis.clone())
2114                .paris_activated()
2115                .build(),
2116        );
2117
2118        let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
2119            .with_pipeline_exec_outputs(VecDeque::from([
2120                Ok(ExecOutput { checkpoint: StageCheckpoint::new(1), done: true }),
2121                Err(StageError::ChannelClosed),
2122            ]))
2123            .disable_blockchain_tree_sync()
2124            .with_max_block(2)
2125            .build();
2126
2127        let rx = spawn_consensus_engine(consensus_engine);
2128
2129        let _ = env
2130            .send_forkchoice_updated(ForkchoiceState {
2131                head_block_hash: rng.gen(),
2132                finalized_block_hash: rng.gen(),
2133                ..Default::default()
2134            })
2135            .await;
2136
2137        assert_matches!(
2138            rx.await,
2139            Ok(Err(BeaconConsensusEngineError::Pipeline(n)))  if matches!(*n.as_ref(), PipelineError::Stage(StageError::ChannelClosed))
2140        );
2141    }
2142
2143    #[tokio::test]
2144    async fn terminates_upon_reaching_max_block() {
2145        let mut rng = generators::rng();
2146        let max_block = 1000;
2147        let chain_spec = Arc::new(
2148            ChainSpecBuilder::default()
2149                .chain(MAINNET.chain)
2150                .genesis(MAINNET.genesis.clone())
2151                .paris_activated()
2152                .build(),
2153        );
2154
2155        let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
2156            .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
2157                checkpoint: StageCheckpoint::new(max_block),
2158                done: true,
2159            })]))
2160            .with_max_block(max_block)
2161            .disable_blockchain_tree_sync()
2162            .build();
2163
2164        let rx = spawn_consensus_engine(consensus_engine);
2165
2166        let _ = env
2167            .send_forkchoice_updated(ForkchoiceState {
2168                head_block_hash: rng.gen(),
2169                ..Default::default()
2170            })
2171            .await;
2172        assert_matches!(rx.await, Ok(Ok(())));
2173    }
2174
2175    fn insert_blocks<
2176        'a,
2177        N: ProviderNodeTypes<
2178            Primitives: FullNodePrimitives<
2179                BlockBody = reth_primitives::BlockBody,
2180                BlockHeader = reth_primitives::Header,
2181            >,
2182        >,
2183    >(
2184        provider_factory: ProviderFactory<N>,
2185        mut blocks: impl Iterator<Item = &'a SealedBlock>,
2186    ) {
2187        let provider = provider_factory.provider_rw().unwrap();
2188        blocks
2189            .try_for_each(|b| {
2190                provider
2191                    .insert_block(
2192                        b.clone().try_seal_with_senders().expect("invalid tx signature in block"),
2193                        StorageLocation::Database,
2194                    )
2195                    .map(drop)
2196            })
2197            .expect("failed to insert");
2198        provider.commit().unwrap();
2199    }
2200
2201    mod fork_choice_updated {
2202        use super::*;
2203        use alloy_primitives::U256;
2204        use alloy_rpc_types_engine::ForkchoiceUpdateError;
2205        use generators::BlockParams;
2206        use reth_db::{tables, test_utils::create_test_static_files_dir, Database};
2207        use reth_db_api::transaction::DbTxMut;
2208        use reth_provider::{providers::StaticFileProvider, test_utils::MockNodeTypesWithDB};
2209        use reth_testing_utils::generators::random_block;
2210
2211        #[tokio::test]
2212        async fn empty_head() {
2213            let chain_spec = Arc::new(
2214                ChainSpecBuilder::default()
2215                    .chain(MAINNET.chain)
2216                    .genesis(MAINNET.genesis.clone())
2217                    .paris_activated()
2218                    .build(),
2219            );
2220
2221            let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
2222                .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
2223                    checkpoint: StageCheckpoint::new(0),
2224                    done: true,
2225                })]))
2226                .build();
2227
2228            let mut engine_rx = spawn_consensus_engine(consensus_engine);
2229
2230            let res = env.send_forkchoice_updated(ForkchoiceState::default()).await;
2231            assert_matches!(
2232                res,
2233                Err(BeaconForkChoiceUpdateError::ForkchoiceUpdateError(
2234                    ForkchoiceUpdateError::InvalidState
2235                ))
2236            );
2237
2238            assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty));
2239        }
2240
2241        #[tokio::test]
2242        async fn valid_forkchoice() {
2243            let mut rng = generators::rng();
2244            let chain_spec = Arc::new(
2245                ChainSpecBuilder::default()
2246                    .chain(MAINNET.chain)
2247                    .genesis(MAINNET.genesis.clone())
2248                    .paris_activated()
2249                    .build(),
2250            );
2251
2252            let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
2253                .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
2254                    checkpoint: StageCheckpoint::new(0),
2255                    done: true,
2256                })]))
2257                .build();
2258
2259            let genesis = random_block(
2260                &mut rng,
2261                0,
2262                BlockParams { ommers_count: Some(0), ..Default::default() },
2263            );
2264            let block1 = random_block(
2265                &mut rng,
2266                1,
2267                BlockParams {
2268                    parent: Some(genesis.hash()),
2269                    ommers_count: Some(0),
2270                    ..Default::default()
2271                },
2272            );
2273            let (_static_dir, static_dir_path) = create_test_static_files_dir();
2274
2275            insert_blocks(
2276                ProviderFactory::<MockNodeTypesWithDB>::new(
2277                    env.db.clone(),
2278                    chain_spec.clone(),
2279                    StaticFileProvider::read_write(static_dir_path).unwrap(),
2280                ),
2281                [&genesis, &block1].into_iter(),
2282            );
2283            env.db
2284                .update(|tx| {
2285                    tx.put::<tables::StageCheckpoints>(
2286                        StageId::Finish.to_string(),
2287                        StageCheckpoint::new(block1.number),
2288                    )
2289                })
2290                .unwrap()
2291                .unwrap();
2292
2293            let mut engine_rx = spawn_consensus_engine(consensus_engine);
2294
2295            let forkchoice = ForkchoiceState {
2296                head_block_hash: block1.hash(),
2297                finalized_block_hash: block1.hash(),
2298                ..Default::default()
2299            };
2300
2301            let result = env.send_forkchoice_updated(forkchoice).await.unwrap();
2302            let expected_result = ForkchoiceUpdated::new(PayloadStatus::new(
2303                PayloadStatusEnum::Valid,
2304                Some(block1.hash()),
2305            ));
2306            assert_eq!(result, expected_result);
2307            assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty));
2308        }
2309
2310        #[tokio::test]
2311        async fn unknown_head_hash() {
2312            let mut rng = generators::rng();
2313
2314            let chain_spec = Arc::new(
2315                ChainSpecBuilder::default()
2316                    .chain(MAINNET.chain)
2317                    .genesis(MAINNET.genesis.clone())
2318                    .paris_activated()
2319                    .build(),
2320            );
2321
2322            let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
2323                .with_pipeline_exec_outputs(VecDeque::from([
2324                    Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }),
2325                    Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }),
2326                ]))
2327                .disable_blockchain_tree_sync()
2328                .build();
2329
2330            let genesis = random_block(
2331                &mut rng,
2332                0,
2333                BlockParams { ommers_count: Some(0), ..Default::default() },
2334            );
2335            let block1 = random_block(
2336                &mut rng,
2337                1,
2338                BlockParams { parent: Some(genesis.hash()), ..Default::default() },
2339            );
2340
2341            let (_static_dir, static_dir_path) = create_test_static_files_dir();
2342
2343            insert_blocks(
2344                ProviderFactory::<MockNodeTypesWithDB>::new(
2345                    env.db.clone(),
2346                    chain_spec.clone(),
2347                    StaticFileProvider::read_write(static_dir_path).unwrap(),
2348                ),
2349                [&genesis, &block1].into_iter(),
2350            );
2351
2352            let mut engine_rx = spawn_consensus_engine(consensus_engine);
2353            let next_head = random_block(
2354                &mut rng,
2355                2,
2356                BlockParams {
2357                    parent: Some(block1.hash()),
2358                    ommers_count: Some(0),
2359                    ..Default::default()
2360                },
2361            );
2362            let next_forkchoice_state = ForkchoiceState {
2363                head_block_hash: next_head.hash(),
2364                finalized_block_hash: block1.hash(),
2365                ..Default::default()
2366            };
2367
2368            // if we `await` in the assert, the forkchoice will poll after we've inserted the block,
2369            // and it will return VALID instead of SYNCING
2370            let invalid_rx = env.send_forkchoice_updated(next_forkchoice_state).await;
2371            let (_static_dir, static_dir_path) = create_test_static_files_dir();
2372
2373            // Insert next head immediately after sending forkchoice update
2374            insert_blocks(
2375                ProviderFactory::<MockNodeTypesWithDB>::new(
2376                    env.db.clone(),
2377                    chain_spec.clone(),
2378                    StaticFileProvider::read_write(static_dir_path).unwrap(),
2379                ),
2380                std::iter::once(&next_head),
2381            );
2382
2383            let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing);
2384            assert_matches!(invalid_rx, Ok(result) => assert_eq!(result, expected_result));
2385
2386            let result = env.send_forkchoice_retry_on_syncing(next_forkchoice_state).await.unwrap();
2387            let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Valid)
2388                .with_latest_valid_hash(next_head.hash());
2389            assert_eq!(result, expected_result);
2390
2391            assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty));
2392        }
2393
2394        #[tokio::test]
2395        async fn unknown_finalized_hash() {
2396            let mut rng = generators::rng();
2397            let chain_spec = Arc::new(
2398                ChainSpecBuilder::default()
2399                    .chain(MAINNET.chain)
2400                    .genesis(MAINNET.genesis.clone())
2401                    .paris_activated()
2402                    .build(),
2403            );
2404
2405            let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
2406                .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
2407                    checkpoint: StageCheckpoint::new(0),
2408                    done: true,
2409                })]))
2410                .disable_blockchain_tree_sync()
2411                .build();
2412
2413            let genesis = random_block(
2414                &mut rng,
2415                0,
2416                BlockParams { ommers_count: Some(0), ..Default::default() },
2417            );
2418            let block1 = random_block(
2419                &mut rng,
2420                1,
2421                BlockParams {
2422                    parent: Some(genesis.hash()),
2423                    ommers_count: Some(0),
2424                    ..Default::default()
2425                },
2426            );
2427
2428            let (_static_dir, static_dir_path) = create_test_static_files_dir();
2429
2430            insert_blocks(
2431                ProviderFactory::<MockNodeTypesWithDB>::new(
2432                    env.db.clone(),
2433                    chain_spec.clone(),
2434                    StaticFileProvider::read_write(static_dir_path).unwrap(),
2435                ),
2436                [&genesis, &block1].into_iter(),
2437            );
2438
2439            let engine = spawn_consensus_engine(consensus_engine);
2440
2441            let res = env
2442                .send_forkchoice_updated(ForkchoiceState {
2443                    head_block_hash: rng.gen(),
2444                    finalized_block_hash: block1.hash(),
2445                    ..Default::default()
2446                })
2447                .await;
2448            let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing);
2449            assert_matches!(res, Ok(result) => assert_eq!(result, expected_result));
2450            drop(engine);
2451        }
2452
2453        #[tokio::test]
2454        async fn forkchoice_updated_pre_merge() {
2455            let mut rng = generators::rng();
2456            let chain_spec = Arc::new(
2457                ChainSpecBuilder::default()
2458                    .chain(MAINNET.chain)
2459                    .genesis(MAINNET.genesis.clone())
2460                    .london_activated()
2461                    .paris_at_ttd(U256::from(3))
2462                    .build(),
2463            );
2464
2465            let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
2466                .with_pipeline_exec_outputs(VecDeque::from([
2467                    Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }),
2468                    Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }),
2469                ]))
2470                .build();
2471
2472            let genesis = random_block(
2473                &mut rng,
2474                0,
2475                BlockParams { ommers_count: Some(0), ..Default::default() },
2476            );
2477            let mut block1 = random_block(
2478                &mut rng,
2479                1,
2480                BlockParams {
2481                    parent: Some(genesis.hash()),
2482                    ommers_count: Some(0),
2483                    ..Default::default()
2484                },
2485            );
2486            block1.header.set_difficulty(U256::from(1));
2487
2488            // a second pre-merge block
2489            let mut block2 = random_block(
2490                &mut rng,
2491                1,
2492                BlockParams {
2493                    parent: Some(genesis.hash()),
2494                    ommers_count: Some(0),
2495                    ..Default::default()
2496                },
2497            );
2498            block2.header.set_difficulty(U256::from(1));
2499
2500            // a transition block
2501            let mut block3 = random_block(
2502                &mut rng,
2503                1,
2504                BlockParams {
2505                    parent: Some(genesis.hash()),
2506                    ommers_count: Some(0),
2507                    ..Default::default()
2508                },
2509            );
2510            block3.header.set_difficulty(U256::from(1));
2511
2512            let (_static_dir, static_dir_path) = create_test_static_files_dir();
2513            insert_blocks(
2514                ProviderFactory::<MockNodeTypesWithDB>::new(
2515                    env.db.clone(),
2516                    chain_spec.clone(),
2517                    StaticFileProvider::read_write(static_dir_path).unwrap(),
2518                ),
2519                [&genesis, &block1, &block2, &block3].into_iter(),
2520            );
2521
2522            let _engine = spawn_consensus_engine(consensus_engine);
2523
2524            let res = env
2525                .send_forkchoice_updated(ForkchoiceState {
2526                    head_block_hash: block1.hash(),
2527                    finalized_block_hash: block1.hash(),
2528                    ..Default::default()
2529                })
2530                .await;
2531
2532            assert_matches!(res, Ok(result) => {
2533                let ForkchoiceUpdated { payload_status, .. } = result;
2534                assert_matches!(payload_status.status, PayloadStatusEnum::Invalid { .. });
2535                assert_eq!(payload_status.latest_valid_hash, Some(B256::ZERO));
2536            });
2537        }
2538
2539        #[tokio::test]
2540        async fn forkchoice_updated_invalid_pow() {
2541            let mut rng = generators::rng();
2542            let chain_spec = Arc::new(
2543                ChainSpecBuilder::default()
2544                    .chain(MAINNET.chain)
2545                    .genesis(MAINNET.genesis.clone())
2546                    .london_activated()
2547                    .build(),
2548            );
2549
2550            let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
2551                .with_pipeline_exec_outputs(VecDeque::from([
2552                    Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }),
2553                    Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }),
2554                ]))
2555                .build();
2556
2557            let genesis = random_block(
2558                &mut rng,
2559                0,
2560                BlockParams { ommers_count: Some(0), ..Default::default() },
2561            );
2562            let block1 = random_block(
2563                &mut rng,
2564                1,
2565                BlockParams {
2566                    parent: Some(genesis.hash()),
2567                    ommers_count: Some(0),
2568                    ..Default::default()
2569                },
2570            );
2571
2572            let (_temp_dir, temp_dir_path) = create_test_static_files_dir();
2573
2574            insert_blocks(
2575                ProviderFactory::<MockNodeTypesWithDB>::new(
2576                    env.db.clone(),
2577                    chain_spec.clone(),
2578                    StaticFileProvider::read_write(temp_dir_path).unwrap(),
2579                ),
2580                [&genesis, &block1].into_iter(),
2581            );
2582
2583            let _engine = spawn_consensus_engine(consensus_engine);
2584
2585            let res = env
2586                .send_forkchoice_updated(ForkchoiceState {
2587                    head_block_hash: block1.hash(),
2588                    finalized_block_hash: block1.hash(),
2589                    ..Default::default()
2590                })
2591                .await;
2592            let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Invalid {
2593                validation_error: BlockValidationError::BlockPreMerge { hash: block1.hash() }
2594                    .to_string(),
2595            })
2596            .with_latest_valid_hash(B256::ZERO);
2597            assert_matches!(res, Ok(result) => assert_eq!(result, expected_result));
2598        }
2599    }
2600
2601    mod new_payload {
2602        use super::*;
2603        use alloy_genesis::Genesis;
2604        use alloy_primitives::U256;
2605        use generators::BlockParams;
2606        use reth_db::test_utils::create_test_static_files_dir;
2607        use reth_primitives::EthereumHardfork;
2608        use reth_provider::{
2609            providers::StaticFileProvider,
2610            test_utils::{blocks::BlockchainTestData, MockNodeTypesWithDB},
2611        };
2612        use reth_testing_utils::{generators::random_block, GenesisAllocator};
2613        #[tokio::test]
2614        async fn new_payload_before_forkchoice() {
2615            let mut rng = generators::rng();
2616            let chain_spec = Arc::new(
2617                ChainSpecBuilder::default()
2618                    .chain(MAINNET.chain)
2619                    .genesis(MAINNET.genesis.clone())
2620                    .paris_activated()
2621                    .build(),
2622            );
2623
2624            let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
2625                .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
2626                    checkpoint: StageCheckpoint::new(0),
2627                    done: true,
2628                })]))
2629                .build();
2630
2631            let mut engine_rx = spawn_consensus_engine(consensus_engine);
2632
2633            // Send new payload
2634            let res = env
2635                .send_new_payload(
2636                    block_to_payload_v1(random_block(
2637                        &mut rng,
2638                        0,
2639                        BlockParams { ommers_count: Some(0), ..Default::default() },
2640                    )),
2641                    ExecutionPayloadSidecar::none(),
2642                )
2643                .await;
2644
2645            // Invalid, because this is a genesis block
2646            assert_matches!(res, Ok(result) => assert_matches!(result.status, PayloadStatusEnum::Invalid { .. }));
2647
2648            // Send new payload
2649            let res = env
2650                .send_new_payload(
2651                    block_to_payload_v1(random_block(
2652                        &mut rng,
2653                        1,
2654                        BlockParams { ommers_count: Some(0), ..Default::default() },
2655                    )),
2656                    ExecutionPayloadSidecar::none(),
2657                )
2658                .await;
2659
2660            let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Syncing);
2661            assert_matches!(res, Ok(result) => assert_eq!(result, expected_result));
2662
2663            assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty));
2664        }
2665
2666        #[tokio::test]
2667        async fn payload_known() {
2668            let mut rng = generators::rng();
2669            let chain_spec = Arc::new(
2670                ChainSpecBuilder::default()
2671                    .chain(MAINNET.chain)
2672                    .genesis(MAINNET.genesis.clone())
2673                    .paris_activated()
2674                    .build(),
2675            );
2676
2677            let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
2678                .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
2679                    checkpoint: StageCheckpoint::new(0),
2680                    done: true,
2681                })]))
2682                .build();
2683
2684            let genesis = random_block(
2685                &mut rng,
2686                0,
2687                BlockParams { ommers_count: Some(0), ..Default::default() },
2688            );
2689            let block1 = random_block(
2690                &mut rng,
2691                1,
2692                BlockParams {
2693                    parent: Some(genesis.hash()),
2694                    ommers_count: Some(0),
2695                    ..Default::default()
2696                },
2697            );
2698            let block2 = random_block(
2699                &mut rng,
2700                2,
2701                BlockParams {
2702                    parent: Some(block1.hash()),
2703                    ommers_count: Some(0),
2704                    ..Default::default()
2705                },
2706            );
2707
2708            let (_static_dir, static_dir_path) = create_test_static_files_dir();
2709            insert_blocks(
2710                ProviderFactory::<MockNodeTypesWithDB>::new(
2711                    env.db.clone(),
2712                    chain_spec.clone(),
2713                    StaticFileProvider::read_write(static_dir_path).unwrap(),
2714                ),
2715                [&genesis, &block1, &block2].into_iter(),
2716            );
2717
2718            let mut engine_rx = spawn_consensus_engine(consensus_engine);
2719
2720            // Send forkchoice
2721            let res = env
2722                .send_forkchoice_updated(ForkchoiceState {
2723                    head_block_hash: block1.hash(),
2724                    finalized_block_hash: block1.hash(),
2725                    ..Default::default()
2726                })
2727                .await;
2728            let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Valid)
2729                .with_latest_valid_hash(block1.hash());
2730            assert_matches!(res, Ok(ForkchoiceUpdated { payload_status, .. }) => assert_eq!(payload_status, expected_result));
2731
2732            // Send new payload
2733            let result = env
2734                .send_new_payload_retry_on_syncing(
2735                    block_to_payload_v1(block2.clone()),
2736                    ExecutionPayloadSidecar::none(),
2737                )
2738                .await
2739                .unwrap();
2740
2741            let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Valid)
2742                .with_latest_valid_hash(block2.hash());
2743            assert_eq!(result, expected_result);
2744            assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty));
2745        }
2746
2747        #[tokio::test]
2748        async fn simple_validate_block() {
2749            let mut rng = generators::rng();
2750            let amount = U256::from(1000000000000000000u64);
2751            let mut allocator = GenesisAllocator::default().with_rng(&mut rng);
2752            for _ in 0..16 {
2753                // add 16 new accounts
2754                allocator.new_funded_account(amount);
2755            }
2756
2757            let alloc = allocator.build();
2758
2759            let genesis = Genesis::default().extend_accounts(alloc);
2760
2761            let chain_spec = Arc::new(
2762                ChainSpecBuilder::default()
2763                    .chain(MAINNET.chain)
2764                    .genesis(genesis)
2765                    .shanghai_activated()
2766                    .build(),
2767            );
2768
2769            let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
2770                .with_real_pipeline()
2771                .with_real_executor()
2772                .with_real_consensus()
2773                .build();
2774
2775            let genesis =
2776                SealedBlock { header: chain_spec.sealed_genesis_header(), ..Default::default() };
2777            let block1 = random_block(
2778                &mut rng,
2779                1,
2780                BlockParams {
2781                    parent: Some(chain_spec.genesis_hash()),
2782                    ommers_count: Some(0),
2783                    ..Default::default()
2784                },
2785            );
2786
2787            // TODO: add transactions that transfer from the alloc accounts, generating the new
2788            // block tx and state root
2789
2790            let (_static_dir, static_dir_path) = create_test_static_files_dir();
2791
2792            insert_blocks(
2793                ProviderFactory::<MockNodeTypesWithDB>::new(
2794                    env.db.clone(),
2795                    chain_spec.clone(),
2796                    StaticFileProvider::read_write(static_dir_path).unwrap(),
2797                ),
2798                [&genesis, &block1].into_iter(),
2799            );
2800
2801            let mut engine_rx = spawn_consensus_engine(consensus_engine);
2802
2803            // Send forkchoice
2804            let res = env
2805                .send_forkchoice_updated(ForkchoiceState {
2806                    head_block_hash: block1.hash(),
2807                    finalized_block_hash: block1.hash(),
2808                    ..Default::default()
2809                })
2810                .await;
2811            let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Valid)
2812                .with_latest_valid_hash(block1.hash());
2813            assert_matches!(res, Ok(ForkchoiceUpdated { payload_status, .. }) => assert_eq!(payload_status, expected_result));
2814            assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty));
2815        }
2816
2817        #[tokio::test]
2818        async fn payload_parent_unknown() {
2819            let mut rng = generators::rng();
2820            let chain_spec = Arc::new(
2821                ChainSpecBuilder::default()
2822                    .chain(MAINNET.chain)
2823                    .genesis(MAINNET.genesis.clone())
2824                    .paris_activated()
2825                    .build(),
2826            );
2827
2828            let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
2829                .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
2830                    checkpoint: StageCheckpoint::new(0),
2831                    done: true,
2832                })]))
2833                .build();
2834            let genesis = random_block(
2835                &mut rng,
2836                0,
2837                BlockParams { ommers_count: Some(0), ..Default::default() },
2838            );
2839
2840            let (_static_dir, static_dir_path) = create_test_static_files_dir();
2841
2842            insert_blocks(
2843                ProviderFactory::<MockNodeTypesWithDB>::new(
2844                    env.db.clone(),
2845                    chain_spec.clone(),
2846                    StaticFileProvider::read_write(static_dir_path).unwrap(),
2847                ),
2848                std::iter::once(&genesis),
2849            );
2850
2851            let mut engine_rx = spawn_consensus_engine(consensus_engine);
2852
2853            // Send forkchoice
2854            let res = env
2855                .send_forkchoice_updated(ForkchoiceState {
2856                    head_block_hash: genesis.hash(),
2857                    finalized_block_hash: genesis.hash(),
2858                    ..Default::default()
2859                })
2860                .await;
2861            let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Valid)
2862                .with_latest_valid_hash(genesis.hash());
2863            assert_matches!(res, Ok(ForkchoiceUpdated { payload_status, .. }) => assert_eq!(payload_status, expected_result));
2864
2865            // Send new payload
2866            let parent = rng.gen();
2867            let block = random_block(
2868                &mut rng,
2869                2,
2870                BlockParams { parent: Some(parent), ommers_count: Some(0), ..Default::default() },
2871            );
2872            let res = env
2873                .send_new_payload(block_to_payload_v1(block), ExecutionPayloadSidecar::none())
2874                .await;
2875            let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Syncing);
2876            assert_matches!(res, Ok(result) => assert_eq!(result, expected_result));
2877
2878            assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty));
2879        }
2880
2881        #[tokio::test]
2882        async fn payload_pre_merge() {
2883            let data = BlockchainTestData::default();
2884            let mut block1 = data.blocks[0].0.block.clone();
2885            block1.header.set_difficulty(
2886                MAINNET.fork(EthereumHardfork::Paris).ttd().unwrap() - U256::from(1),
2887            );
2888            block1 = block1.unseal::<reth_primitives::Block>().seal_slow();
2889            let (block2, exec_result2) = data.blocks[1].clone();
2890            let mut block2 = block2.unseal().block;
2891            block2.body.withdrawals = None;
2892            block2.header.parent_hash = block1.hash();
2893            block2.header.base_fee_per_gas = Some(100);
2894            block2.header.difficulty = U256::ZERO;
2895            let block2 = block2.clone().seal_slow();
2896
2897            let chain_spec = Arc::new(
2898                ChainSpecBuilder::default()
2899                    .chain(MAINNET.chain)
2900                    .genesis(MAINNET.genesis.clone())
2901                    .london_activated()
2902                    .build(),
2903            );
2904
2905            let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
2906                .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
2907                    checkpoint: StageCheckpoint::new(0),
2908                    done: true,
2909                })]))
2910                .with_executor_results(Vec::from([exec_result2]))
2911                .build();
2912
2913            let (_static_dir, static_dir_path) = create_test_static_files_dir();
2914
2915            insert_blocks(
2916                ProviderFactory::<MockNodeTypesWithDB>::new(
2917                    env.db.clone(),
2918                    chain_spec.clone(),
2919                    StaticFileProvider::read_write(static_dir_path).unwrap(),
2920                ),
2921                [&data.genesis, &block1].into_iter(),
2922            );
2923
2924            let mut engine_rx = spawn_consensus_engine(consensus_engine);
2925
2926            // Send forkchoice
2927            let res = env
2928                .send_forkchoice_updated(ForkchoiceState {
2929                    head_block_hash: block1.hash(),
2930                    finalized_block_hash: block1.hash(),
2931                    ..Default::default()
2932                })
2933                .await;
2934
2935            let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2936                validation_error: BlockValidationError::BlockPreMerge { hash: block1.hash() }
2937                    .to_string(),
2938            })
2939            .with_latest_valid_hash(B256::ZERO);
2940            assert_matches!(res, Ok(ForkchoiceUpdated { payload_status, .. }) => assert_eq!(payload_status, expected_result));
2941
2942            // Send new payload
2943            let result = env
2944                .send_new_payload_retry_on_syncing(
2945                    block_to_payload_v1(block2.clone()),
2946                    ExecutionPayloadSidecar::none(),
2947                )
2948                .await
2949                .unwrap();
2950
2951            let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2952                validation_error: BlockValidationError::BlockPreMerge { hash: block2.hash() }
2953                    .to_string(),
2954            })
2955            .with_latest_valid_hash(B256::ZERO);
2956            assert_eq!(result, expected_result);
2957
2958            assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty));
2959        }
2960    }
2961}