reth_engine_tree/tree/
mod.rs

1use crate::{
2    backfill::{BackfillAction, BackfillSyncState},
3    backup::{BackupAction, BackupHandle},
4    chain::FromOrchestrator,
5    engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
6    persistence::PersistenceHandle,
7    tree::{
8        cached_state::CachedStateProvider, executor::WorkloadExecutor, metrics::EngineApiMetrics,
9    },
10};
11use alloy_consensus::BlockHeader;
12use alloy_eips::{merge::EPOCH_SLOTS, BlockNumHash, NumHash};
13use alloy_primitives::B256;
14use alloy_rpc_types_engine::{
15    ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
16};
17use error::{InsertBlockError, InsertBlockErrorKind, InsertBlockFatalError};
18use instrumented_state::InstrumentedStateProvider;
19use payload_processor::sparse_trie::StateRootComputeOutcome;
20use persistence_state::CurrentPersistenceAction;
21use precompile_cache::PrecompileCacheMap;
22use reth_chain_state::{
23    CanonicalInMemoryState, ExecutedBlock, ExecutedBlockWithTrieUpdates,
24    MemoryOverlayStateProvider, NewCanonicalChain,
25};
26use reth_consensus::{Consensus, FullConsensus};
27pub use reth_engine_primitives::InvalidBlockHook;
28use reth_engine_primitives::{
29    BeaconConsensusEngineEvent, BeaconEngineMessage, BeaconOnNewPayloadError, EngineValidator,
30    ExecutionPayload, ForkchoiceStateTracker, OnForkChoiceUpdated,
31};
32use reth_errors::{ConsensusError, ProviderResult};
33use reth_evm::{ConfigureEvm, SpecFor};
34use reth_payload_builder::PayloadBuilderHandle;
35use reth_payload_primitives::{EngineApiMessageVersion, PayloadBuilderAttributes, PayloadTypes};
36use reth_primitives_traits::{
37    Block, GotExpected, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader,
38};
39use reth_provider::{
40    providers::ConsistentDbView, BlockNumReader, BlockReader, DBProvider, DatabaseProviderFactory,
41    ExecutionOutcome, HashedPostStateProvider, ProviderError, StateCommitmentProvider,
42    StateProvider, StateProviderBox, StateProviderFactory, StateReader, StateRootProvider,
43    TransactionVariant,
44};
45use reth_revm::{database::StateProviderDatabase, State};
46use reth_stages_api::ControlFlow;
47use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput};
48use reth_trie_db::{DatabaseHashedPostState, StateCommitment};
49use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
50use state::TreeState;
51use std::{
52    fmt::Debug,
53    sync::{
54        mpsc::{Receiver, RecvError, RecvTimeoutError, Sender},
55        Arc,
56    },
57    time::Instant,
58};
59use tokio::sync::{
60    mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
61    oneshot::{self, error::TryRecvError},
62};
63use tracing::*;
64
65mod block_buffer;
66mod cached_state;
67pub mod error;
68mod instrumented_state;
69mod invalid_block_hook;
70mod invalid_headers;
71mod metrics;
72mod payload_processor;
73mod persistence_state;
74#[allow(unused)]
75pub mod precompile_cache;
76// TODO(alexey): compare trie updates in `insert_block_inner`
77#[expect(unused)]
78mod trie_updates;
79
80use crate::tree::error::AdvancePersistenceError;
81pub use block_buffer::BlockBuffer;
82pub use invalid_block_hook::{InvalidBlockHooks, NoopInvalidBlockHook};
83pub use invalid_headers::InvalidHeaderCache;
84pub use payload_processor::*;
85pub use persistence_state::PersistenceState;
86pub use reth_engine_primitives::TreeConfig;
87use reth_evm::execute::BlockExecutionOutput;
88
89pub mod state;
90
91/// The largest gap for which the tree will be used to sync individual blocks by downloading them.
92///
93/// This is the default threshold, and represents the distance (gap) from the local head to a
94/// new (canonical) block, e.g. the forkchoice head block. If the block distance from the local head
95/// exceeds this threshold, the pipeline will be used to backfill the gap more efficiently.
96///
97/// E.g.: Local head `block.number` is 100 and the forkchoice head `block.number` is 133 (more than
98/// an epoch has slots), then this exceeds the threshold at which the pipeline should be used to
99/// backfill this gap.
100pub(crate) const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
101
102/// A builder for creating state providers that can be used across threads.
103#[derive(Clone, Debug)]
104pub struct StateProviderBuilder<N: NodePrimitives, P> {
105    /// The provider factory used to create providers.
106    provider_factory: P,
107    /// The historical block hash to fetch state from.
108    historical: B256,
109    /// The blocks that form the chain from historical to target and are in memory.
110    overlay: Option<Vec<ExecutedBlockWithTrieUpdates<N>>>,
111}
112
113impl<N: NodePrimitives, P> StateProviderBuilder<N, P> {
114    /// Creates a new state provider from the provider factory, historical block hash and optional
115    /// overlaid blocks.
116    pub const fn new(
117        provider_factory: P,
118        historical: B256,
119        overlay: Option<Vec<ExecutedBlockWithTrieUpdates<N>>>,
120    ) -> Self {
121        Self { provider_factory, historical, overlay }
122    }
123}
124
125impl<N: NodePrimitives, P> StateProviderBuilder<N, P>
126where
127    P: BlockReader + StateProviderFactory + StateReader + StateCommitmentProvider + Clone,
128{
129    /// Creates a new state provider from this builder.
130    pub fn build(&self) -> ProviderResult<StateProviderBox> {
131        let mut provider = self.provider_factory.state_by_block_hash(self.historical)?;
132        if let Some(overlay) = self.overlay.clone() {
133            provider = Box::new(MemoryOverlayStateProvider::new(provider, overlay))
134        }
135        Ok(provider)
136    }
137}
138
139/// Tracks the state of the engine api internals.
140///
141/// This type is not shareable.
142#[derive(Debug)]
143pub struct EngineApiTreeState<N: NodePrimitives> {
144    /// Tracks the state of the blockchain tree.
145    tree_state: TreeState<N>,
146    /// Tracks the forkchoice state updates received by the CL.
147    forkchoice_state_tracker: ForkchoiceStateTracker,
148    /// Buffer of detached blocks.
149    buffer: BlockBuffer<N::Block>,
150    /// Tracks the header of invalid payloads that were rejected by the engine because they're
151    /// invalid.
152    invalid_headers: InvalidHeaderCache,
153}
154
155impl<N: NodePrimitives> EngineApiTreeState<N> {
156    fn new(
157        block_buffer_limit: u32,
158        max_invalid_header_cache_length: u32,
159        canonical_block: BlockNumHash,
160        engine_kind: EngineApiKind,
161    ) -> Self {
162        Self {
163            invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length),
164            buffer: BlockBuffer::new(block_buffer_limit),
165            tree_state: TreeState::new(canonical_block, engine_kind),
166            forkchoice_state_tracker: ForkchoiceStateTracker::default(),
167        }
168    }
169}
170
171/// The outcome of a tree operation.
172#[derive(Debug)]
173pub struct TreeOutcome<T> {
174    /// The outcome of the operation.
175    pub outcome: T,
176    /// An optional event to tell the caller to do something.
177    pub event: Option<TreeEvent>,
178}
179
180impl<T> TreeOutcome<T> {
181    /// Create new tree outcome.
182    pub const fn new(outcome: T) -> Self {
183        Self { outcome, event: None }
184    }
185
186    /// Set event on the outcome.
187    pub fn with_event(mut self, event: TreeEvent) -> Self {
188        self.event = Some(event);
189        self
190    }
191}
192
193/// Events that are triggered by Tree Chain
194#[derive(Debug)]
195pub enum TreeEvent {
196    /// Tree action is needed.
197    TreeAction(TreeAction),
198    /// Backfill action is needed.
199    BackfillAction(BackfillAction),
200    /// Block download is needed.
201    Download(DownloadRequest),
202}
203
204impl TreeEvent {
205    /// Returns true if the event is a backfill action.
206    const fn is_backfill_action(&self) -> bool {
207        matches!(self, Self::BackfillAction(_))
208    }
209}
210
211/// The actions that can be performed on the tree.
212#[derive(Debug)]
213pub enum TreeAction {
214    /// Make target canonical.
215    MakeCanonical {
216        /// The sync target head hash
217        sync_target_head: B256,
218    },
219}
220
221/// The engine API tree handler implementation.
222///
223/// This type is responsible for processing engine API requests, maintaining the canonical state and
224/// emitting events.
225pub struct EngineApiTreeHandler<N, P, T, V, C>
226where
227    N: NodePrimitives,
228    T: PayloadTypes,
229    C: ConfigureEvm<Primitives = N> + 'static,
230{
231    provider: P,
232    consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
233    payload_validator: V,
234    /// Keeps track of internals such as executed and buffered blocks.
235    state: EngineApiTreeState<N>,
236    /// The half for sending messages to the engine.
237    ///
238    /// This is kept so that we can queue in messages to ourself that we can process later, for
239    /// example distributing workload across multiple messages that would otherwise take too long
240    /// to process. E.g. we might receive a range of downloaded blocks and we want to process
241    /// them one by one so that we can handle incoming engine API in between and don't become
242    /// unresponsive. This can happen during live sync transition where we're trying to close the
243    /// gap (up to 3 epochs of blocks in the worst case).
244    incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
245    /// Incoming engine API requests.
246    incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
247    /// Outgoing events that are emitted to the handler.
248    outgoing: UnboundedSender<EngineApiEvent<N>>,
249    /// Channels to the persistence layer.
250    persistence: PersistenceHandle<N>,
251    /// Tracks the state changes of the persistence task.
252    persistence_state: PersistenceState,
253    /// Flag indicating the state of the node's backfill synchronization process.
254    backfill_sync_state: BackfillSyncState,
255    /// Keeps track of the state of the canonical chain that isn't persisted yet.
256    /// This is intended to be accessed from external sources, such as rpc.
257    canonical_in_memory_state: CanonicalInMemoryState<N>,
258    /// Handle to the payload builder that will receive payload attributes for valid forkchoice
259    /// updates
260    payload_builder: PayloadBuilderHandle<T>,
261    /// Configuration settings.
262    config: TreeConfig,
263    /// Metrics for the engine api.
264    metrics: EngineApiMetrics,
265    /// An invalid block hook.
266    invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
267    /// The engine API variant of this handler
268    engine_kind: EngineApiKind,
269    /// The type responsible for processing new payloads
270    payload_processor: PayloadProcessor<N, C>,
271    /// The EVM configuration.
272    evm_config: C,
273    /// Precompile cache map.
274    #[allow(unused)]
275    precompile_cache_map: PrecompileCacheMap<SpecFor<C>>,
276    /// The backup handler
277    backup: BackupHandle,
278}
279
280impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
281    for EngineApiTreeHandler<N, P, T, V, C>
282where
283    N: NodePrimitives,
284    C: Debug + ConfigureEvm<Primitives = N>,
285{
286    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
287        f.debug_struct("EngineApiTreeHandler")
288            .field("provider", &self.provider)
289            .field("consensus", &self.consensus)
290            .field("payload_validator", &self.payload_validator)
291            .field("state", &self.state)
292            .field("incoming_tx", &self.incoming_tx)
293            .field("persistence", &self.persistence)
294            .field("persistence_state", &self.persistence_state)
295            .field("backfill_sync_state", &self.backfill_sync_state)
296            .field("canonical_in_memory_state", &self.canonical_in_memory_state)
297            .field("payload_builder", &self.payload_builder)
298            .field("config", &self.config)
299            .field("metrics", &self.metrics)
300            .field("invalid_block_hook", &format!("{:p}", self.invalid_block_hook))
301            .field("engine_kind", &self.engine_kind)
302            .field("payload_processor", &self.payload_processor)
303            .field("evm_config", &self.evm_config)
304            .finish()
305    }
306}
307
308impl<N, P, T, V, C> EngineApiTreeHandler<N, P, T, V, C>
309where
310    N: NodePrimitives,
311    P: DatabaseProviderFactory
312        + BlockReader<Block = N::Block, Header = N::BlockHeader>
313        + StateProviderFactory
314        + StateReader<Receipt = N::Receipt>
315        + StateCommitmentProvider
316        + HashedPostStateProvider
317        + Clone
318        + 'static,
319    <P as DatabaseProviderFactory>::Provider:
320        BlockReader<Block = N::Block, Header = N::BlockHeader>,
321    C: ConfigureEvm<Primitives = N> + 'static,
322    T: PayloadTypes,
323    V: EngineValidator<T, Block = N::Block>,
324{
325    /// Creates a new [`EngineApiTreeHandler`].
326    #[expect(clippy::too_many_arguments)]
327    pub fn new(
328        provider: P,
329        consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
330        payload_validator: V,
331        outgoing: UnboundedSender<EngineApiEvent<N>>,
332        state: EngineApiTreeState<N>,
333        canonical_in_memory_state: CanonicalInMemoryState<N>,
334        persistence: PersistenceHandle<N>,
335        persistence_state: PersistenceState,
336        payload_builder: PayloadBuilderHandle<T>,
337        config: TreeConfig,
338        engine_kind: EngineApiKind,
339        evm_config: C,
340        backup: BackupHandle,
341    ) -> Self {
342        let (incoming_tx, incoming) = std::sync::mpsc::channel();
343
344        let precompile_cache_map = PrecompileCacheMap::default();
345
346        let payload_processor = PayloadProcessor::new(
347            WorkloadExecutor::default(),
348            evm_config.clone(),
349            &config,
350            precompile_cache_map.clone(),
351        );
352
353        Self {
354            provider,
355            consensus,
356            payload_validator,
357            incoming,
358            outgoing,
359            persistence,
360            persistence_state,
361            backfill_sync_state: BackfillSyncState::Idle,
362            state,
363            canonical_in_memory_state,
364            payload_builder,
365            config,
366            metrics: Default::default(),
367            incoming_tx,
368            invalid_block_hook: Box::new(NoopInvalidBlockHook),
369            engine_kind,
370            payload_processor,
371            evm_config,
372            precompile_cache_map,
373            backup,
374        }
375    }
376
377    /// Sets the invalid block hook.
378    fn set_invalid_block_hook(&mut self, invalid_block_hook: Box<dyn InvalidBlockHook<N>>) {
379        self.invalid_block_hook = invalid_block_hook;
380    }
381
382    /// Creates a new [`EngineApiTreeHandler`] instance and spawns it in its
383    /// own thread.
384    ///
385    /// Returns the sender through which incoming requests can be sent to the task and the receiver
386    /// end of a [`EngineApiEvent`] unbounded channel to receive events from the engine.
387    #[expect(clippy::complexity)]
388    pub fn spawn_new(
389        provider: P,
390        consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
391        payload_validator: V,
392        persistence: PersistenceHandle<N>,
393        payload_builder: PayloadBuilderHandle<T>,
394        canonical_in_memory_state: CanonicalInMemoryState<N>,
395        config: TreeConfig,
396        invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
397        kind: EngineApiKind,
398        evm_config: C,
399        backup: BackupHandle,
400    ) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
401    {
402        let best_block_number = provider.best_block_number().unwrap_or(0);
403        let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
404
405        let persistence_state = PersistenceState {
406            last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
407            rx: None,
408        };
409
410        let (tx, outgoing) = unbounded_channel();
411        let state = EngineApiTreeState::new(
412            config.block_buffer_limit(),
413            config.max_invalid_header_cache_length(),
414            header.num_hash(),
415            kind,
416        );
417
418        let mut task = Self::new(
419            provider,
420            consensus,
421            payload_validator,
422            tx,
423            state,
424            canonical_in_memory_state,
425            persistence,
426            persistence_state,
427            payload_builder,
428            config,
429            kind,
430            evm_config,
431            backup,
432        );
433        task.set_invalid_block_hook(invalid_block_hook);
434        let incoming = task.incoming_tx.clone();
435        std::thread::Builder::new().name("Tree Task".to_string()).spawn(|| task.run()).unwrap();
436        (incoming, outgoing)
437    }
438
439    /// Returns a new [`Sender`] to send messages to this type.
440    pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
441        self.incoming_tx.clone()
442    }
443
444    /// Run the engine API handler.
445    ///
446    /// This will block the current thread and process incoming messages.
447    pub fn run(mut self) {
448        loop {
449            match self.try_recv_engine_message() {
450                Ok(Some(msg)) => {
451                    debug!(target: "engine::tree", %msg, "received new engine message");
452                    if let Err(fatal) = self.on_engine_message(msg) {
453                        error!(target: "engine::tree", %fatal, "insert block fatal error");
454                        return
455                    }
456                }
457                Ok(None) => {
458                    debug!(target: "engine::tree", "received no engine message for some time, while waiting for persistence task to complete");
459                }
460                Err(_err) => {
461                    error!(target: "engine::tree", "Engine channel disconnected");
462                    return
463                }
464            }
465
466            if let Err(err) = self.advance_persistence() {
467                error!(target: "engine::tree", %err, "Advancing persistence failed");
468                return
469            }
470            if let Err(err) = self.advance_backup() {
471                error!(target: "engine::tree", %err, "Advancing backup failed");
472                return
473            }
474        }
475    }
476
477    /// Invoked when previously requested blocks were downloaded.
478    ///
479    /// If the block count exceeds the configured batch size we're allowed to execute at once, this
480    /// will execute the first batch and send the remaining blocks back through the channel so that
481    /// block request processing isn't blocked for a long time.
482    fn on_downloaded(
483        &mut self,
484        mut blocks: Vec<RecoveredBlock<N::Block>>,
485    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
486        if blocks.is_empty() {
487            // nothing to execute
488            return Ok(None)
489        }
490
491        trace!(target: "engine::tree", block_count = %blocks.len(), "received downloaded blocks");
492        let batch = self.config.max_execute_block_batch_size().min(blocks.len());
493        for block in blocks.drain(..batch) {
494            if let Some(event) = self.on_downloaded_block(block)? {
495                let needs_backfill = event.is_backfill_action();
496                self.on_tree_event(event)?;
497                if needs_backfill {
498                    // can exit early if backfill is needed
499                    return Ok(None)
500                }
501            }
502        }
503
504        // if we still have blocks to execute, send them as a followup request
505        if !blocks.is_empty() {
506            let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
507        }
508
509        Ok(None)
510    }
511
512    /// When the Consensus layer receives a new block via the consensus gossip protocol,
513    /// the transactions in the block are sent to the execution layer in the form of a
514    /// [`PayloadTypes::ExecutionData`](reth_payload_primitives::PayloadTypes::ExecutionData). The
515    /// Execution layer executes the transactions and validates the state in the block header,
516    /// then passes validation data back to Consensus layer, that adds the block to the head of
517    /// its own blockchain and attests to it. The block is then broadcast over the consensus p2p
518    /// network in the form of a "Beacon block".
519    ///
520    /// These responses should adhere to the [Engine API Spec for
521    /// `engine_newPayload`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification).
522    ///
523    /// This returns a [`PayloadStatus`] that represents the outcome of a processed new payload and
524    /// returns an error if an internal error occurred.
525    #[instrument(level = "trace", skip_all, fields(block_hash = %payload.block_hash(), block_num = %payload.block_number(),), target = "engine::tree")]
526    fn on_new_payload(
527        &mut self,
528        payload: T::ExecutionData,
529    ) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
530        trace!(target: "engine::tree", "invoked new payload");
531        self.metrics.engine.new_payload_messages.increment(1);
532
533        // Ensures that the given payload does not violate any consensus rules that concern the
534        // block's layout, like:
535        //    - missing or invalid base fee
536        //    - invalid extra data
537        //    - invalid transactions
538        //    - incorrect hash
539        //    - the versioned hashes passed with the payload do not exactly match transaction
540        //      versioned hashes
541        //    - the block does not contain blob transactions if it is pre-cancun
542        //
543        // This validates the following engine API rule:
544        //
545        // 3. Given the expected array of blob versioned hashes client software **MUST** run its
546        //    validation by taking the following steps:
547        //
548        //   1. Obtain the actual array by concatenating blob versioned hashes lists
549        //      (`tx.blob_versioned_hashes`) of each [blob
550        //      transaction](https://eips.ethereum.org/EIPS/eip-4844#new-transaction-type) included
551        //      in the payload, respecting the order of inclusion. If the payload has no blob
552        //      transactions the expected array **MUST** be `[]`.
553        //
554        //   2. Return `{status: INVALID, latestValidHash: null, validationError: errorMessage |
555        //      null}` if the expected and the actual arrays don't match.
556        //
557        // This validation **MUST** be instantly run in all cases even during active sync process.
558        let parent_hash = payload.parent_hash();
559        debug!("on_new_payload: payload: {:?}", payload);
560        let block = match self.payload_validator.ensure_well_formed_payload(payload) {
561            Ok(block) => block,
562            Err(error) => {
563                error!(target: "engine::tree", %error, "Invalid payload");
564                // we need to convert the error to a payload status (response to the CL)
565
566                let latest_valid_hash =
567                    if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
568                        // Engine-API rules:
569                        // > `latestValidHash: null` if the blockHash validation has failed (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/shanghai.md?plain=1#L113>)
570                        // > `latestValidHash: null` if the expected and the actual arrays don't match (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md?plain=1#L103>)
571                        None
572                    } else {
573                        self.latest_valid_hash_for_invalid_payload(parent_hash)?
574                    };
575
576                let status = PayloadStatusEnum::from(error);
577                return Ok(TreeOutcome::new(PayloadStatus::new(status, latest_valid_hash)))
578            }
579        };
580
581        let block_hash = block.hash();
582        let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
583        if lowest_buffered_ancestor == block_hash {
584            lowest_buffered_ancestor = block.parent_hash();
585        }
586
587        // now check the block itself
588        if let Some(status) =
589            self.check_invalid_ancestor_with_head(lowest_buffered_ancestor, &block)?
590        {
591            return Ok(TreeOutcome::new(status))
592        }
593
594        let status = if self.backfill_sync_state.is_idle() {
595            let mut latest_valid_hash = None;
596            let num_hash = block.num_hash();
597            match self.insert_block(block) {
598                Ok(status) => {
599                    let status = match status {
600                        InsertPayloadOk::Inserted(BlockStatus::Valid) => {
601                            latest_valid_hash = Some(block_hash);
602                            self.try_connect_buffered_blocks(num_hash)?;
603                            PayloadStatusEnum::Valid
604                        }
605                        InsertPayloadOk::AlreadySeen(BlockStatus::Valid) => {
606                            latest_valid_hash = Some(block_hash);
607                            PayloadStatusEnum::Valid
608                        }
609                        InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) |
610                        InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
611                            // not known to be invalid, but we don't know anything else
612                            PayloadStatusEnum::Syncing
613                        }
614                    };
615
616                    PayloadStatus::new(status, latest_valid_hash)
617                }
618                Err(error) => self.on_insert_block_error(error)?,
619            }
620        } else if let Err(error) = self.buffer_block(block) {
621            self.on_insert_block_error(error)?
622        } else {
623            PayloadStatus::from_status(PayloadStatusEnum::Syncing)
624        };
625
626        let mut outcome = TreeOutcome::new(status);
627        // if the block is valid and it is the current sync target head, make it canonical
628        if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
629            // but only if it isn't already the canonical head
630            if self.state.tree_state.canonical_block_hash() != block_hash {
631                outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical {
632                    sync_target_head: block_hash,
633                }));
634            }
635        }
636
637        Ok(outcome)
638    }
639
640    /// Returns the new chain for the given head.
641    ///
642    /// This also handles reorgs.
643    ///
644    /// Note: This does not update the tracked state and instead returns the new chain based on the
645    /// given head.
646    fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain<N>>> {
647        // get the executed new head block
648        let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
649            return Ok(None)
650        };
651
652        let new_head_number = new_head_block.recovered_block().number();
653        let mut current_canonical_number = self.state.tree_state.current_canonical_head.number;
654
655        let mut new_chain = vec![new_head_block.clone()];
656        let mut current_hash = new_head_block.recovered_block().parent_hash();
657        let mut current_number = new_head_number - 1;
658
659        // Walk back the new chain until we reach a block we know about
660        //
661        // This is only done for in-memory blocks, because we should not have persisted any blocks
662        // that are _above_ the current canonical head.
663        while current_number > current_canonical_number {
664            if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
665            {
666                current_hash = block.recovered_block().parent_hash();
667                current_number -= 1;
668                new_chain.push(block);
669            } else {
670                warn!(target: "engine::tree", current_hash=?current_hash, "Sidechain block not found in TreeState");
671                // This should never happen as we're walking back a chain that should connect to
672                // the canonical chain
673                return Ok(None);
674            }
675        }
676
677        // If we have reached the current canonical head by walking back from the target, then we
678        // know this represents an extension of the canonical chain.
679        if current_hash == self.state.tree_state.current_canonical_head.hash {
680            new_chain.reverse();
681
682            // Simple extension of the current chain
683            return Ok(Some(NewCanonicalChain::Commit { new: new_chain }));
684        }
685
686        // We have a reorg. Walk back both chains to find the fork point.
687        let mut old_chain = Vec::new();
688        let mut old_hash = self.state.tree_state.current_canonical_head.hash;
689
690        // If the canonical chain is ahead of the new chain,
691        // gather all blocks until new head number.
692        while current_canonical_number > current_number {
693            if let Some(block) = self.canonical_block_by_hash(old_hash)? {
694                old_chain.push(block.clone());
695                old_hash = block.recovered_block().parent_hash();
696                current_canonical_number -= 1;
697            } else {
698                // This shouldn't happen as we're walking back the canonical chain
699                warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
700                return Ok(None);
701            }
702        }
703
704        // Both new and old chain pointers are now at the same height.
705        debug_assert_eq!(current_number, current_canonical_number);
706
707        // Walk both chains from specified hashes at same height until
708        // a common ancestor (fork block) is reached.
709        while old_hash != current_hash {
710            if let Some(block) = self.canonical_block_by_hash(old_hash)? {
711                old_hash = block.recovered_block().parent_hash();
712                old_chain.push(block);
713            } else {
714                // This shouldn't happen as we're walking back the canonical chain
715                warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
716                return Ok(None);
717            }
718
719            if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
720            {
721                current_hash = block.recovered_block().parent_hash();
722                new_chain.push(block);
723            } else {
724                // This shouldn't happen as we've already walked this path
725                warn!(target: "engine::tree", invalid_hash=?current_hash, "New chain block not found in TreeState");
726                return Ok(None);
727            }
728        }
729        new_chain.reverse();
730        old_chain.reverse();
731
732        Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }))
733    }
734
735    /// Determines if the given block is part of a fork by checking that these
736    /// conditions are true:
737    /// * walking back from the target hash to verify that the target hash is not part of an
738    ///   extension of the canonical chain.
739    /// * walking back from the current head to verify that the target hash is not already part of
740    ///   the canonical chain.
741    fn is_fork(&self, target_hash: B256) -> ProviderResult<bool> {
742        // verify that the given hash is not part of an extension of the canon chain.
743        let canonical_head = self.state.tree_state.canonical_head();
744        let mut current_hash = target_hash;
745        while let Some(current_block) = self.sealed_header_by_hash(current_hash)? {
746            if current_block.hash() == canonical_head.hash {
747                return Ok(false)
748            }
749            // We already passed the canonical head
750            if current_block.number() <= canonical_head.number {
751                break
752            }
753            current_hash = current_block.parent_hash();
754        }
755
756        // verify that the given hash is not already part of canonical chain stored in memory
757        if self.canonical_in_memory_state.header_by_hash(target_hash).is_some() {
758            return Ok(false)
759        }
760
761        // verify that the given hash is not already part of persisted canonical chain
762        if self.provider.block_number(target_hash)?.is_some() {
763            return Ok(false)
764        }
765
766        Ok(true)
767    }
768
769    /// Returns the persisting kind for the input block.
770    fn persisting_kind_for(&self, block: &N::BlockHeader) -> PersistingKind {
771        // Check that we're currently persisting.
772        let Some(action) = self.persistence_state.current_action() else {
773            return PersistingKind::NotPersisting
774        };
775        // Check that the persistince action is saving blocks, not removing them.
776        let CurrentPersistenceAction::SavingBlocks { highest } = action else {
777            return PersistingKind::PersistingNotDescendant
778        };
779
780        // The block being validated can only be a descendant if its number is higher than
781        // the highest block persisting. Otherwise, it's likely a fork of a lower block.
782        if block.number() > highest.number && self.state.tree_state.is_descendant(*highest, block) {
783            return PersistingKind::PersistingDescendant
784        }
785
786        // In all other cases, the block is not a descendant.
787        PersistingKind::PersistingNotDescendant
788    }
789
790    /// Invoked when we receive a new forkchoice update message. Calls into the blockchain tree
791    /// to resolve chain forks and ensure that the Execution Layer is working with the latest valid
792    /// chain.
793    ///
794    /// These responses should adhere to the [Engine API Spec for
795    /// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1).
796    ///
797    /// Returns an error if an internal error occurred like a database error.
798    #[instrument(level = "trace", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash), target = "engine::tree")]
799    fn on_forkchoice_updated(
800        &mut self,
801        state: ForkchoiceState,
802        attrs: Option<T::PayloadAttributes>,
803        version: EngineApiMessageVersion,
804    ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
805        trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
806        self.metrics.engine.forkchoice_updated_messages.increment(1);
807        self.canonical_in_memory_state.on_forkchoice_update_received();
808
809        if let Some(on_updated) = self.pre_validate_forkchoice_update(state)? {
810            return Ok(TreeOutcome::new(on_updated))
811        }
812
813        let valid_outcome = |head| {
814            TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
815                PayloadStatusEnum::Valid,
816                Some(head),
817            )))
818        };
819
820        // Process the forkchoice update by trying to make the head block canonical
821        //
822        // We can only process this forkchoice update if:
823        // - we have the `head` block
824        // - the head block is part of a chain that is connected to the canonical chain. This
825        //   includes reorgs.
826        //
827        // Performing a FCU involves:
828        // - marking the FCU's head block as canonical
829        // - updating in memory state to reflect the new canonical chain
830        // - updating canonical state trackers
831        // - emitting a canonicalization event for the new chain (including reorg)
832        // - if we have payload attributes, delegate them to the payload service
833
834        // 1. ensure we have a new head block
835        if self.state.tree_state.canonical_block_hash() == state.head_block_hash {
836            trace!(target: "engine::tree", "fcu head hash is already canonical");
837
838            // update the safe and finalized blocks and ensure their values are valid
839            if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
840                // safe or finalized hashes are invalid
841                return Ok(TreeOutcome::new(outcome))
842            }
843
844            // we still need to process payload attributes if the head is already canonical
845            if let Some(attr) = attrs {
846                let tip = self
847                    .block_by_hash(self.state.tree_state.canonical_block_hash())?
848                    .ok_or_else(|| {
849                        // If we can't find the canonical block, then something is wrong and we need
850                        // to return an error
851                        ProviderError::HeaderNotFound(state.head_block_hash.into())
852                    })?;
853                let updated = self.process_payload_attributes(attr, tip.header(), state, version);
854                return Ok(TreeOutcome::new(updated))
855            }
856
857            // the head block is already canonical
858            return Ok(valid_outcome(state.head_block_hash))
859        }
860
861        // 2. check if the head is already part of the canonical chain
862        if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
863            debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical");
864
865            // For OpStack the proposers are allowed to reorg their own chain at will, so we need to
866            // always trigger a new payload job if requested.
867            if self.engine_kind.is_opstack() {
868                if let Some(attr) = attrs {
869                    debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head");
870                    let updated =
871                        self.process_payload_attributes(attr, &canonical_header, state, version);
872                    return Ok(TreeOutcome::new(updated))
873                }
874            }
875
876            // 2. Client software MAY skip an update of the forkchoice state and MUST NOT begin a
877            //    payload build process if `forkchoiceState.headBlockHash` references a `VALID`
878            //    ancestor of the head of canonical chain, i.e. the ancestor passed payload
879            //    validation process and deemed `VALID`. In the case of such an event, client
880            //    software MUST return `{payloadStatus: {status: VALID, latestValidHash:
881            //    forkchoiceState.headBlockHash, validationError: null}, payloadId: null}`
882
883            // the head block is already canonical, so we're not triggering a payload job and can
884            // return right away
885            return Ok(valid_outcome(state.head_block_hash))
886        }
887
888        // 3. ensure we can apply a new chain update for the head block
889        if let Some(chain_update) = self.on_new_head(state.head_block_hash)? {
890            let tip = chain_update.tip().clone_sealed_header();
891            self.on_canonical_chain_update(chain_update);
892
893            // update the safe and finalized blocks and ensure their values are valid
894            if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
895                // safe or finalized hashes are invalid
896                return Ok(TreeOutcome::new(outcome))
897            }
898
899            if let Some(attr) = attrs {
900                let updated = self.process_payload_attributes(attr, &tip, state, version);
901                return Ok(TreeOutcome::new(updated))
902            }
903
904            return Ok(valid_outcome(state.head_block_hash))
905        }
906
907        // 4. we don't have the block to perform the update
908        // we assume the FCU is valid and at least the head is missing,
909        // so we need to start syncing to it
910        //
911        // find the appropriate target to sync to, if we don't have the safe block hash then we
912        // start syncing to the safe block via backfill first
913        let target = if self.state.forkchoice_state_tracker.is_empty() &&
914            // check that safe block is valid and missing
915            !state.safe_block_hash.is_zero() &&
916            self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
917        {
918            debug!(target: "engine::tree", "missing safe block on initial FCU, downloading safe block");
919            state.safe_block_hash
920        } else {
921            state.head_block_hash
922        };
923
924        let target = self.lowest_buffered_ancestor_or(target);
925        trace!(target: "engine::tree", %target, "downloading missing block");
926
927        Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
928            PayloadStatusEnum::Syncing,
929        )))
930        .with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
931    }
932
933    /// Attempts to receive the next engine request.
934    ///
935    /// If there's currently no persistence action in progress, this will block until a new request
936    /// is received. If there's a persistence action in progress, this will try to receive the
937    /// next request with a timeout to not block indefinitely and return `Ok(None)` if no request is
938    /// received in time.
939    ///
940    /// Returns an error if the engine channel is disconnected.
941    #[expect(clippy::type_complexity)]
942    fn try_recv_engine_message(
943        &self,
944    ) -> Result<Option<FromEngine<EngineApiRequest<T, N>, N::Block>>, RecvError> {
945        if self.persistence_state.in_progress() || self.backup.in_progress() {
946            // try to receive the next request with a timeout to not block indefinitely
947            match self.incoming.recv_timeout(std::time::Duration::from_millis(500)) {
948                Ok(msg) => Ok(Some(msg)),
949                Err(err) => match err {
950                    RecvTimeoutError::Timeout => Ok(None),
951                    RecvTimeoutError::Disconnected => Err(RecvError),
952                },
953            }
954        } else {
955            self.incoming.recv().map(Some)
956        }
957    }
958
959    /// Helper method to remove blocks and set the persistence state. This ensures we keep track of
960    /// the current persistence action while we're removing blocks.
961    fn remove_blocks(&mut self, new_tip_num: u64) {
962        debug!(target: "engine::tree", ?new_tip_num, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
963        if new_tip_num < self.persistence_state.last_persisted_block.number {
964            debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
965            let (tx, rx) = oneshot::channel();
966            let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
967            self.persistence_state.start_remove(new_tip_num, rx);
968        }
969    }
970
971    /// Helper method to save blocks and set the persistence state. This ensures we keep track of
972    /// the current persistence action while we're saving blocks.
973    fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlockWithTrieUpdates<N>>) {
974        if blocks_to_persist.is_empty() {
975            debug!(target: "engine::tree", "Returned empty set of blocks to persist");
976            return
977        }
978
979        // NOTE: checked non-empty above
980        let highest_num_hash = blocks_to_persist
981            .iter()
982            .max_by_key(|block| block.recovered_block().number())
983            .map(|b| b.recovered_block().num_hash())
984            .expect("Checked non-empty persisting blocks");
985
986        debug!(target: "engine::tree", blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
987        let (tx, rx) = oneshot::channel();
988        let _ = self.persistence.save_blocks(blocks_to_persist, tx);
989
990        self.persistence_state.start_save(highest_num_hash, rx);
991    }
992
993    /// Attempts to advance the persistence state.
994    ///
995    /// If we're currently awaiting a response this will try to receive the response (non-blocking)
996    /// or send a new persistence action if necessary.
997    fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
998        if self.persistence_state.in_progress() {
999            let (mut rx, start_time, current_action) = self
1000                .persistence_state
1001                .rx
1002                .take()
1003                .expect("if a persistence task is in progress Receiver must be Some");
1004            // Check if persistence has complete
1005            match rx.try_recv() {
1006                Ok(last_persisted_hash_num) => {
1007                    self.metrics.engine.persistence_duration.record(start_time.elapsed());
1008                    let Some(BlockNumHash {
1009                        hash: last_persisted_block_hash,
1010                        number: last_persisted_block_number,
1011                    }) = last_persisted_hash_num
1012                    else {
1013                        // if this happened, then we persisted no blocks because we sent an
1014                        // empty vec of blocks
1015                        warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
1016                        return Ok(())
1017                    };
1018
1019                    debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, "Finished persisting, calling finish");
1020                    self.persistence_state
1021                        .finish(last_persisted_block_hash, last_persisted_block_number);
1022                    self.on_new_persisted_block()?;
1023                }
1024                Err(TryRecvError::Closed) => return Err(TryRecvError::Closed.into()),
1025                Err(TryRecvError::Empty) => {
1026                    self.persistence_state.rx = Some((rx, start_time, current_action))
1027                }
1028            }
1029        }
1030
1031        if !self.persistence_state.in_progress() {
1032            if let Some(new_tip_num) = self.find_disk_reorg()? {
1033                self.remove_blocks(new_tip_num)
1034            } else if self.should_persist() {
1035                let blocks_to_persist = self.get_canonical_blocks_to_persist();
1036                self.persist_blocks(blocks_to_persist);
1037            }
1038        }
1039
1040        Ok(())
1041    }
1042
1043    fn advance_backup(&mut self) -> Result<(), AdvancePersistenceError> {
1044        debug!(target: "engine::tree", "advance_backup called");
1045        if !self.backup.in_progress() {
1046            if self.should_backup() {
1047                debug!(target: "engine::tree", "sending backup action");
1048                let (tx, rx) = oneshot::channel();
1049                let _ = self.backup.sender.send(BackupAction::BackupAtBlock(
1050                    self.persistence_state.last_persisted_block,
1051                    tx,
1052                ));
1053                self.backup.start(rx);
1054            }
1055        }
1056
1057        if self.backup.in_progress() {
1058            let (mut rx, start_time) = self
1059                .backup
1060                .rx
1061                .take()
1062                .expect("if a backup task is in progress Receiver must be Some");
1063            // Check if persistence has complete
1064            match rx.try_recv() {
1065                Ok(last_backup_hash_num) => {
1066                    let Some(BlockNumHash {
1067                        hash: last_backup_block_hash,
1068                        number: last_backup_block_number,
1069                    }) = last_backup_hash_num
1070                    else {
1071                        warn!(target: "engine::tree", "Backup task completed but did not backup any blocks");
1072                        return Ok(())
1073                    };
1074
1075                    debug!(target: "engine::tree", ?last_backup_hash_num, "Finished backup, calling finish");
1076                    self.backup.finish(BlockNumHash::new(
1077                        last_backup_block_number,
1078                        last_backup_block_hash,
1079                    ));
1080                }
1081                Err(TryRecvError::Closed) => return Err(TryRecvError::Closed.into()),
1082                Err(TryRecvError::Empty) => self.backup.rx = Some((rx, start_time)),
1083            }
1084        }
1085        Ok(())
1086    }
1087
1088    /// Handles a message from the engine.
1089    fn on_engine_message(
1090        &mut self,
1091        msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
1092    ) -> Result<(), InsertBlockFatalError> {
1093        match msg {
1094            FromEngine::Event(event) => match event {
1095                FromOrchestrator::BackfillSyncStarted => {
1096                    debug!(target: "engine::tree", "received backfill sync started event");
1097                    self.backfill_sync_state = BackfillSyncState::Active;
1098                }
1099                FromOrchestrator::BackfillSyncFinished(ctrl) => {
1100                    self.on_backfill_sync_finished(ctrl)?;
1101                }
1102            },
1103            FromEngine::Request(request) => {
1104                match request {
1105                    EngineApiRequest::InsertExecutedBlock(block) => {
1106                        let block_num_hash = block.recovered_block().num_hash();
1107                        if block_num_hash.number <= self.state.tree_state.canonical_block_number() {
1108                            // outdated block that can be skipped
1109                            return Ok(())
1110                        }
1111
1112                        debug!(target: "engine::tree", block=?block_num_hash, "inserting already executed block");
1113                        let now = Instant::now();
1114
1115                        // if the parent is the canonical head, we can insert the block as the
1116                        // pending block
1117                        if self.state.tree_state.canonical_block_hash() ==
1118                            block.recovered_block().parent_hash()
1119                        {
1120                            debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
1121                            self.canonical_in_memory_state.set_pending_block(block.clone());
1122                        }
1123
1124                        self.state.tree_state.insert_executed(block.clone());
1125                        self.metrics.engine.inserted_already_executed_blocks.increment(1);
1126                        self.emit_event(EngineApiEvent::BeaconConsensus(
1127                            BeaconConsensusEngineEvent::CanonicalBlockAdded(block, now.elapsed()),
1128                        ));
1129                    }
1130                    EngineApiRequest::Beacon(request) => {
1131                        match request {
1132                            BeaconEngineMessage::ForkchoiceUpdated {
1133                                state,
1134                                payload_attrs,
1135                                tx,
1136                                version,
1137                            } => {
1138                                let mut output =
1139                                    self.on_forkchoice_updated(state, payload_attrs, version);
1140
1141                                if let Ok(res) = &mut output {
1142                                    // track last received forkchoice state
1143                                    self.state
1144                                        .forkchoice_state_tracker
1145                                        .set_latest(state, res.outcome.forkchoice_status());
1146
1147                                    // emit an event about the handled FCU
1148                                    self.emit_event(BeaconConsensusEngineEvent::ForkchoiceUpdated(
1149                                        state,
1150                                        res.outcome.forkchoice_status(),
1151                                    ));
1152
1153                                    // handle the event if any
1154                                    self.on_maybe_tree_event(res.event.take())?;
1155                                }
1156
1157                                if let Err(err) =
1158                                    tx.send(output.map(|o| o.outcome).map_err(Into::into))
1159                                {
1160                                    self.metrics
1161                                        .engine
1162                                        .failed_forkchoice_updated_response_deliveries
1163                                        .increment(1);
1164                                    error!(target: "engine::tree", "Failed to send event: {err:?}");
1165                                }
1166                            }
1167                            BeaconEngineMessage::NewPayload { payload, tx } => {
1168                                debug!("receiving beacon engine message: payload: {:?}", payload);
1169                                let mut output = self.on_new_payload(payload);
1170
1171                                let maybe_event =
1172                                    output.as_mut().ok().and_then(|out| out.event.take());
1173
1174                                // emit response
1175                                if let Err(err) =
1176                                    tx.send(output.map(|o| o.outcome).map_err(|e| {
1177                                        BeaconOnNewPayloadError::Internal(Box::new(e))
1178                                    }))
1179                                {
1180                                    error!(target: "engine::tree", "Failed to send event: {err:?}");
1181                                    self.metrics
1182                                        .engine
1183                                        .failed_new_payload_response_deliveries
1184                                        .increment(1);
1185                                }
1186
1187                                // handle the event if any
1188                                self.on_maybe_tree_event(maybe_event)?;
1189                            }
1190                        }
1191                    }
1192                }
1193            }
1194            FromEngine::DownloadedBlocks(blocks) => {
1195                if let Some(event) = self.on_downloaded(blocks)? {
1196                    self.on_tree_event(event)?;
1197                }
1198            }
1199        }
1200        Ok(())
1201    }
1202
1203    /// Invoked if the backfill sync has finished to target.
1204    ///
1205    /// At this point we consider the block synced to the backfill target.
1206    ///
1207    /// Checks the tracked finalized block against the block on disk and requests another backfill
1208    /// run if the distance to the tip exceeds the threshold for another backfill run.
1209    ///
1210    /// This will also do the necessary housekeeping of the tree state, this includes:
1211    ///  - removing all blocks below the backfill height
1212    ///  - resetting the canonical in-memory state
1213    ///
1214    /// In case backfill resulted in an unwind, this will clear the tree state above the unwind
1215    /// target block.
1216    fn on_backfill_sync_finished(
1217        &mut self,
1218        ctrl: ControlFlow,
1219    ) -> Result<(), InsertBlockFatalError> {
1220        debug!(target: "engine::tree", "received backfill sync finished event");
1221        self.backfill_sync_state = BackfillSyncState::Idle;
1222
1223        // backfill height is the block number that the backfill finished at
1224        let mut backfill_height = ctrl.block_number();
1225
1226        // Pipeline unwound, memorize the invalid block and wait for CL for next sync target.
1227        if let ControlFlow::Unwind { bad_block, target } = &ctrl {
1228            warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
1229            // update the `invalid_headers` cache with the new invalid header
1230            self.state.invalid_headers.insert(**bad_block);
1231
1232            // if this was an unwind then the target is the new height
1233            backfill_height = Some(*target);
1234        }
1235
1236        // backfill height is the block number that the backfill finished at
1237        let Some(backfill_height) = backfill_height else { return Ok(()) };
1238
1239        // state house keeping after backfill sync
1240        // remove all executed blocks below the backfill height
1241        //
1242        // We set the `finalized_num` to `Some(backfill_height)` to ensure we remove all state
1243        // before that
1244        let Some(backfill_num_hash) = self
1245            .provider
1246            .block_hash(backfill_height)?
1247            .map(|hash| BlockNumHash { hash, number: backfill_height })
1248        else {
1249            debug!(target: "engine::tree", ?ctrl, "Backfill block not found");
1250            return Ok(())
1251        };
1252
1253        if ctrl.is_unwind() {
1254            // the node reset so we need to clear everything above that height so that backfill
1255            // height is the new canonical block.
1256            self.state.tree_state.reset(backfill_num_hash)
1257        } else {
1258            self.state.tree_state.remove_until(
1259                backfill_num_hash,
1260                self.persistence_state.last_persisted_block.hash,
1261                Some(backfill_num_hash),
1262            );
1263        }
1264
1265        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
1266        self.metrics.tree.canonical_chain_height.set(backfill_height as f64);
1267
1268        // remove all buffered blocks below the backfill height
1269        self.state.buffer.remove_old_blocks(backfill_height);
1270        // we remove all entries because now we're synced to the backfill target and consider this
1271        // the canonical chain
1272        self.canonical_in_memory_state.clear_state();
1273
1274        if let Ok(Some(new_head)) = self.provider.sealed_header(backfill_height) {
1275            // update the tracked chain height, after backfill sync both the canonical height and
1276            // persisted height are the same
1277            self.state.tree_state.set_canonical_head(new_head.num_hash());
1278            self.persistence_state.finish(new_head.hash(), new_head.number());
1279
1280            // update the tracked canonical head
1281            self.canonical_in_memory_state.set_canonical_head(new_head);
1282        }
1283
1284        // check if we need to run backfill again by comparing the most recent finalized height to
1285        // the backfill height
1286        let Some(sync_target_state) = self.state.forkchoice_state_tracker.sync_target_state()
1287        else {
1288            return Ok(())
1289        };
1290        if sync_target_state.finalized_block_hash.is_zero() {
1291            // no finalized block, can't check distance
1292            return Ok(())
1293        }
1294        // get the block number of the finalized block, if we have it
1295        let newest_finalized = self
1296            .state
1297            .buffer
1298            .block(&sync_target_state.finalized_block_hash)
1299            .map(|block| block.number());
1300
1301        // The block number that the backfill finished at - if the progress or newest
1302        // finalized is None then we can't check the distance anyways.
1303        //
1304        // If both are Some, we perform another distance check and return the desired
1305        // backfill target
1306        if let Some(backfill_target) =
1307            ctrl.block_number().zip(newest_finalized).and_then(|(progress, finalized_number)| {
1308                // Determines whether or not we should run backfill again, in case
1309                // the new gap is still large enough and requires running backfill again
1310                self.backfill_sync_target(progress, finalized_number, None)
1311            })
1312        {
1313            // request another backfill run
1314            self.emit_event(EngineApiEvent::BackfillAction(BackfillAction::Start(
1315                backfill_target.into(),
1316            )));
1317            return Ok(())
1318        };
1319
1320        // try to close the gap by executing buffered blocks that are child blocks of the new head
1321        self.try_connect_buffered_blocks(self.state.tree_state.current_canonical_head)
1322    }
1323
1324    /// Attempts to make the given target canonical.
1325    ///
1326    /// This will update the tracked canonical in memory state and do the necessary housekeeping.
1327    fn make_canonical(&mut self, target: B256) -> ProviderResult<()> {
1328        if let Some(chain_update) = self.on_new_head(target)? {
1329            self.on_canonical_chain_update(chain_update);
1330        }
1331
1332        Ok(())
1333    }
1334
1335    /// Convenience function to handle an optional tree event.
1336    fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
1337        if let Some(event) = event {
1338            self.on_tree_event(event)?;
1339        }
1340
1341        Ok(())
1342    }
1343
1344    /// Handles a tree event.
1345    ///
1346    /// Returns an error if a [`TreeAction::MakeCanonical`] results in a fatal error.
1347    fn on_tree_event(&mut self, event: TreeEvent) -> ProviderResult<()> {
1348        match event {
1349            TreeEvent::TreeAction(action) => match action {
1350                TreeAction::MakeCanonical { sync_target_head } => {
1351                    self.make_canonical(sync_target_head)?;
1352                }
1353            },
1354            TreeEvent::BackfillAction(action) => {
1355                self.emit_event(EngineApiEvent::BackfillAction(action));
1356            }
1357            TreeEvent::Download(action) => {
1358                self.emit_event(EngineApiEvent::Download(action));
1359            }
1360        }
1361
1362        Ok(())
1363    }
1364
1365    /// Emits an outgoing event to the engine.
1366    fn emit_event(&mut self, event: impl Into<EngineApiEvent<N>>) {
1367        let event = event.into();
1368
1369        if event.is_backfill_action() {
1370            debug_assert_eq!(
1371                self.backfill_sync_state,
1372                BackfillSyncState::Idle,
1373                "backfill action should only be emitted when backfill is idle"
1374            );
1375
1376            if self.persistence_state.in_progress() {
1377                // backfill sync and persisting data are mutually exclusive, so we can't start
1378                // backfill while we're still persisting
1379                debug!(target: "engine::tree", "skipping backfill file while persistence task is active");
1380                return
1381            }
1382
1383            self.backfill_sync_state = BackfillSyncState::Pending;
1384            self.metrics.engine.pipeline_runs.increment(1);
1385            debug!(target: "engine::tree", "emitting backfill action event");
1386        }
1387
1388        let _ = self.outgoing.send(event).inspect_err(
1389            |err| error!(target: "engine::tree", "Failed to send internal event: {err:?}"),
1390        );
1391    }
1392
1393    /// Returns true if the canonical chain length minus the last persisted
1394    /// block is greater than or equal to the persistence threshold and
1395    /// backfill is not running.
1396    pub const fn should_persist(&self) -> bool {
1397        if !self.backfill_sync_state.is_idle() {
1398            // can't persist if backfill is running
1399            return false
1400        }
1401
1402        let min_block = self.persistence_state.last_persisted_block.number;
1403        self.state.tree_state.canonical_block_number().saturating_sub(min_block) >
1404            self.config.persistence_threshold()
1405    }
1406
1407    /// Returns true if the canonical chain length minus the last persisted
1408    /// block is greater than or equal to the backup threshold and
1409    /// backfill is not running.
1410    fn should_backup(&self) -> bool {
1411        debug!(target: "engine::tree", "checking if we should backup");
1412        return false;
1413    }
1414
1415    /// Returns a batch of consecutive canonical blocks to persist in the range
1416    /// `(last_persisted_number .. canonical_head - threshold]` . The expected
1417    /// order is oldest -> newest.
1418    fn get_canonical_blocks_to_persist(&self) -> Vec<ExecutedBlockWithTrieUpdates<N>> {
1419        let mut blocks_to_persist = Vec::new();
1420        let mut current_hash = self.state.tree_state.canonical_block_hash();
1421        let last_persisted_number = self.persistence_state.last_persisted_block.number;
1422
1423        let canonical_head_number = self.state.tree_state.canonical_block_number();
1424
1425        let target_number =
1426            canonical_head_number.saturating_sub(self.config.memory_block_buffer_target());
1427
1428        debug!(target: "engine::tree", ?last_persisted_number, ?canonical_head_number, ?target_number, ?current_hash, "Returning canonical blocks to persist");
1429        while let Some(block) = self.state.tree_state.blocks_by_hash.get(&current_hash) {
1430            if block.recovered_block().number() <= last_persisted_number {
1431                break;
1432            }
1433
1434            if block.recovered_block().number() <= target_number {
1435                blocks_to_persist.push(block.clone());
1436            }
1437
1438            current_hash = block.recovered_block().parent_hash();
1439        }
1440
1441        // reverse the order so that the oldest block comes first
1442        blocks_to_persist.reverse();
1443
1444        blocks_to_persist
1445    }
1446
1447    /// This clears the blocks from the in-memory tree state that have been persisted to the
1448    /// database.
1449    ///
1450    /// This also updates the canonical in-memory state to reflect the newest persisted block
1451    /// height.
1452    ///
1453    /// Assumes that `finish` has been called on the `persistence_state` at least once
1454    fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
1455        // If we have an on-disk reorg, we need to handle it first before touching the in-memory
1456        // state.
1457        if let Some(remove_above) = self.find_disk_reorg()? {
1458            self.remove_blocks(remove_above);
1459            return Ok(())
1460        }
1461
1462        let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
1463        self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
1464        self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
1465            number: self.persistence_state.last_persisted_block.number,
1466            hash: self.persistence_state.last_persisted_block.hash,
1467        });
1468        Ok(())
1469    }
1470
1471    /// Return an [`ExecutedBlock`] from database or in-memory state by hash.
1472    ///
1473    /// NOTE: This cannot fetch [`ExecutedBlock`]s for _finalized_ blocks, instead it can only
1474    /// fetch [`ExecutedBlock`]s for _canonical_ blocks, or blocks from sidechains that the node
1475    /// has in memory.
1476    ///
1477    /// For finalized blocks, this will return `None`.
1478    fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<Option<ExecutedBlock<N>>> {
1479        trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
1480        // check memory first
1481        if let Some(block) = self.state.tree_state.executed_block_by_hash(hash).cloned() {
1482            return Ok(Some(block.block))
1483        }
1484
1485        let (block, senders) = self
1486            .provider
1487            .sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
1488            .ok_or_else(|| ProviderError::HeaderNotFound(hash.into()))?
1489            .split_sealed();
1490        let execution_output = self
1491            .provider
1492            .get_state(block.header().number())?
1493            .ok_or_else(|| ProviderError::StateForNumberNotFound(block.header().number()))?;
1494        let hashed_state = self.provider.hashed_post_state(execution_output.state());
1495
1496        Ok(Some(ExecutedBlock {
1497            recovered_block: Arc::new(RecoveredBlock::new_sealed(block, senders)),
1498            execution_output: Arc::new(execution_output),
1499            hashed_state: Arc::new(hashed_state),
1500        }))
1501    }
1502
1503    /// Return sealed block from database or in-memory state by hash.
1504    fn sealed_header_by_hash(
1505        &self,
1506        hash: B256,
1507    ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
1508        // check memory first
1509        let block = self
1510            .state
1511            .tree_state
1512            .block_by_hash(hash)
1513            .map(|block| block.as_ref().clone_sealed_header());
1514
1515        if block.is_some() {
1516            Ok(block)
1517        } else {
1518            self.provider.sealed_header_by_hash(hash)
1519        }
1520    }
1521
1522    /// Return block from database or in-memory state by hash.
1523    fn block_by_hash(&self, hash: B256) -> ProviderResult<Option<N::Block>> {
1524        // check database first
1525        let mut block = self.provider.block_by_hash(hash)?;
1526        if block.is_none() {
1527            // Note: it's fine to return the unsealed block because the caller already has
1528            // the hash
1529            block = self
1530                .state
1531                .tree_state
1532                .block_by_hash(hash)
1533                // TODO: clone for compatibility. should we return an Arc here?
1534                .map(|block| block.as_ref().clone().into_block());
1535        }
1536        Ok(block)
1537    }
1538
1539    /// Return the parent hash of the lowest buffered ancestor for the requested block, if there
1540    /// are any buffered ancestors. If there are no buffered ancestors, and the block itself does
1541    /// not exist in the buffer, this returns the hash that is passed in.
1542    ///
1543    /// Returns the parent hash of the block itself if the block is buffered and has no other
1544    /// buffered ancestors.
1545    fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
1546        self.state
1547            .buffer
1548            .lowest_ancestor(&hash)
1549            .map(|block| block.parent_hash())
1550            .unwrap_or_else(|| hash)
1551    }
1552
1553    /// If validation fails, the response MUST contain the latest valid hash:
1554    ///
1555    ///   - The block hash of the ancestor of the invalid payload satisfying the following two
1556    ///     conditions:
1557    ///     - It is fully validated and deemed VALID
1558    ///     - Any other ancestor of the invalid payload with a higher blockNumber is INVALID
1559    ///   - 0x0000000000000000000000000000000000000000000000000000000000000000 if the above
1560    ///     conditions are satisfied by a `PoW` block.
1561    ///   - null if client software cannot determine the ancestor of the invalid payload satisfying
1562    ///     the above conditions.
1563    fn latest_valid_hash_for_invalid_payload(
1564        &mut self,
1565        parent_hash: B256,
1566    ) -> ProviderResult<Option<B256>> {
1567        // Check if parent exists in side chain or in canonical chain.
1568        if self.block_by_hash(parent_hash)?.is_some() {
1569            return Ok(Some(parent_hash))
1570        }
1571
1572        // iterate over ancestors in the invalid cache
1573        // until we encounter the first valid ancestor
1574        let mut current_hash = parent_hash;
1575        let mut current_block = self.state.invalid_headers.get(&current_hash);
1576        while let Some(block_with_parent) = current_block {
1577            current_hash = block_with_parent.parent;
1578            current_block = self.state.invalid_headers.get(&current_hash);
1579
1580            // If current_header is None, then the current_hash does not have an invalid
1581            // ancestor in the cache, check its presence in blockchain tree
1582            if current_block.is_none() && self.block_by_hash(current_hash)?.is_some() {
1583                return Ok(Some(current_hash))
1584            }
1585        }
1586        Ok(None)
1587    }
1588
1589    /// Prepares the invalid payload response for the given hash, checking the
1590    /// database for the parent hash and populating the payload status with the latest valid hash
1591    /// according to the engine api spec.
1592    fn prepare_invalid_response(&mut self, mut parent_hash: B256) -> ProviderResult<PayloadStatus> {
1593        // Edge case: the `latestValid` field is the zero hash if the parent block is the terminal
1594        // PoW block, which we need to identify by looking at the parent's block difficulty
1595        if let Some(parent) = self.block_by_hash(parent_hash)? {
1596            if !parent.header().difficulty().is_zero() {
1597                parent_hash = B256::ZERO;
1598            }
1599        }
1600
1601        let valid_parent_hash = self.latest_valid_hash_for_invalid_payload(parent_hash)?;
1602        Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
1603            validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
1604        })
1605        .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
1606    }
1607
1608    /// Returns true if the given hash is the last received sync target block.
1609    ///
1610    /// See [`ForkchoiceStateTracker::sync_target_state`]
1611    fn is_sync_target_head(&self, block_hash: B256) -> bool {
1612        if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
1613            return target.head_block_hash == block_hash
1614        }
1615        false
1616    }
1617
1618    /// Checks if the given `check` hash points to an invalid header, inserting the given `head`
1619    /// block into the invalid header cache if the `check` hash has a known invalid ancestor.
1620    ///
1621    /// Returns a payload status response according to the engine API spec if the block is known to
1622    /// be invalid.
1623    fn check_invalid_ancestor_with_head(
1624        &mut self,
1625        check: B256,
1626        head: &SealedBlock<N::Block>,
1627    ) -> ProviderResult<Option<PayloadStatus>> {
1628        // check if the check hash was previously marked as invalid
1629        let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };
1630
1631        // populate the latest valid hash field
1632        let status = self.prepare_invalid_response(header.parent)?;
1633
1634        // insert the head block into the invalid header cache
1635        self.state.invalid_headers.insert_with_invalid_ancestor(head.hash(), header);
1636        self.emit_event(BeaconConsensusEngineEvent::InvalidBlock(Box::new(head.clone())));
1637
1638        Ok(Some(status))
1639    }
1640
1641    /// Checks if the given `head` points to an invalid header, which requires a specific response
1642    /// to a forkchoice update.
1643    fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
1644        // check if the head was previously marked as invalid
1645        let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) };
1646        // populate the latest valid hash field
1647        Ok(Some(self.prepare_invalid_response(header.parent)?))
1648    }
1649
1650    /// Validate if block is correct and satisfies all the consensus rules that concern the header
1651    /// and block body itself.
1652    fn validate_block(&self, block: &RecoveredBlock<N::Block>) -> Result<(), ConsensusError> {
1653        if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
1654            error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.hash());
1655            return Err(e)
1656        }
1657
1658        if let Err(e) = self.consensus.validate_block_pre_execution(block.sealed_block()) {
1659            error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.hash());
1660            return Err(e)
1661        }
1662
1663        Ok(())
1664    }
1665
1666    /// Attempts to connect any buffered blocks that are connected to the given parent hash.
1667    #[instrument(level = "trace", skip(self), target = "engine::tree")]
1668    fn try_connect_buffered_blocks(
1669        &mut self,
1670        parent: BlockNumHash,
1671    ) -> Result<(), InsertBlockFatalError> {
1672        let blocks = self.state.buffer.remove_block_with_children(&parent.hash);
1673
1674        if blocks.is_empty() {
1675            // nothing to append
1676            return Ok(())
1677        }
1678
1679        let now = Instant::now();
1680        let block_count = blocks.len();
1681        for child in blocks {
1682            let child_num_hash = child.num_hash();
1683            match self.insert_block(child) {
1684                Ok(res) => {
1685                    debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
1686                    if self.is_sync_target_head(child_num_hash.hash) &&
1687                        matches!(res, InsertPayloadOk::Inserted(BlockStatus::Valid))
1688                    {
1689                        self.make_canonical(child_num_hash.hash)?;
1690                    }
1691                }
1692                Err(err) => {
1693                    debug!(target: "engine::tree", ?err, "failed to connect buffered block to tree");
1694                    if let Err(fatal) = self.on_insert_block_error(err) {
1695                        warn!(target: "engine::tree", %fatal, "fatal error occurred while connecting buffered blocks");
1696                        return Err(fatal)
1697                    }
1698                }
1699            }
1700        }
1701
1702        debug!(target: "engine::tree", elapsed = ?now.elapsed(), %block_count, "connected buffered blocks");
1703        Ok(())
1704    }
1705
1706    /// Pre-validates the block and inserts it into the buffer.
1707    fn buffer_block(
1708        &mut self,
1709        block: RecoveredBlock<N::Block>,
1710    ) -> Result<(), InsertBlockError<N::Block>> {
1711        if let Err(err) = self.validate_block(&block) {
1712            return Err(InsertBlockError::consensus_error(err, block.into_sealed_block()))
1713        }
1714        self.state.buffer.insert_block(block);
1715        Ok(())
1716    }
1717
1718    /// Returns true if the distance from the local tip to the block is greater than the configured
1719    /// threshold.
1720    ///
1721    /// If the `local_tip` is greater than the `block`, then this will return false.
1722    #[inline]
1723    const fn exceeds_backfill_run_threshold(&self, local_tip: u64, block: u64) -> bool {
1724        block > local_tip && block - local_tip > MIN_BLOCKS_FOR_PIPELINE_RUN
1725    }
1726
1727    /// Returns how far the local tip is from the given block. If the local tip is at the same
1728    /// height or its block number is greater than the given block, this returns None.
1729    #[inline]
1730    const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
1731        if block > local_tip {
1732            Some(block - local_tip)
1733        } else {
1734            None
1735        }
1736    }
1737
1738    /// Returns the target hash to sync to if the distance from the local tip to the block is
1739    /// greater than the threshold and we're not synced to the finalized block yet (if we've seen
1740    /// that block already).
1741    ///
1742    /// If this is invoked after a new block has been downloaded, the downloaded block could be the
1743    /// (missing) finalized block.
1744    fn backfill_sync_target(
1745        &self,
1746        canonical_tip_num: u64,
1747        target_block_number: u64,
1748        downloaded_block: Option<BlockNumHash>,
1749    ) -> Option<B256> {
1750        let sync_target_state = self.state.forkchoice_state_tracker.sync_target_state();
1751
1752        // check if the distance exceeds the threshold for backfill sync
1753        let mut exceeds_backfill_threshold =
1754            self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number);
1755
1756        // check if the downloaded block is the tracked finalized block
1757        if let Some(buffered_finalized) = sync_target_state
1758            .as_ref()
1759            .and_then(|state| self.state.buffer.block(&state.finalized_block_hash))
1760        {
1761            // if we have buffered the finalized block, we should check how far
1762            // we're off
1763            exceeds_backfill_threshold =
1764                self.exceeds_backfill_run_threshold(canonical_tip_num, buffered_finalized.number());
1765        }
1766
1767        // If this is invoked after we downloaded a block we can check if this block is the
1768        // finalized block
1769        if let (Some(downloaded_block), Some(ref state)) = (downloaded_block, sync_target_state) {
1770            if downloaded_block.hash == state.finalized_block_hash {
1771                // we downloaded the finalized block and can now check how far we're off
1772                exceeds_backfill_threshold =
1773                    self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number);
1774            }
1775        }
1776
1777        // if the number of missing blocks is greater than the max, trigger backfill
1778        if exceeds_backfill_threshold {
1779            if let Some(state) = sync_target_state {
1780                // if we have already canonicalized the finalized block, we should skip backfill
1781                match self.provider.header_by_hash_or_number(state.finalized_block_hash.into()) {
1782                    Err(err) => {
1783                        warn!(target: "engine::tree", %err, "Failed to get finalized block header");
1784                    }
1785                    Ok(None) => {
1786                        // ensure the finalized block is known (not the zero hash)
1787                        if !state.finalized_block_hash.is_zero() {
1788                            // we don't have the block yet and the distance exceeds the allowed
1789                            // threshold
1790                            return Some(state.finalized_block_hash)
1791                        }
1792
1793                        // OPTIMISTIC SYNCING
1794                        //
1795                        // It can happen when the node is doing an
1796                        // optimistic sync, where the CL has no knowledge of the finalized hash,
1797                        // but is expecting the EL to sync as high
1798                        // as possible before finalizing.
1799                        //
1800                        // This usually doesn't happen on ETH mainnet since CLs use the more
1801                        // secure checkpoint syncing.
1802                        //
1803                        // However, optimism chains will do this. The risk of a reorg is however
1804                        // low.
1805                        debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
1806                        return Some(state.head_block_hash)
1807                    }
1808                    Ok(Some(_)) => {
1809                        // we're fully synced to the finalized block
1810                    }
1811                }
1812            }
1813        }
1814
1815        None
1816    }
1817
1818    /// This method tries to detect whether on-disk and in-memory states have diverged. It might
1819    /// happen if a reorg is happening while we are persisting a block.
1820    fn find_disk_reorg(&self) -> ProviderResult<Option<u64>> {
1821        let mut canonical = self.state.tree_state.current_canonical_head;
1822        let mut persisted = self.persistence_state.last_persisted_block;
1823
1824        let parent_num_hash = |num_hash: NumHash| -> ProviderResult<NumHash> {
1825            Ok(self
1826                .sealed_header_by_hash(num_hash.hash)?
1827                .ok_or(ProviderError::BlockHashNotFound(num_hash.hash))?
1828                .parent_num_hash())
1829        };
1830
1831        // Happy path, canonical chain is ahead or equal to persisted chain.
1832        // Walk canonical chain back to make sure that it connects to persisted chain.
1833        while canonical.number > persisted.number {
1834            canonical = parent_num_hash(canonical)?;
1835        }
1836
1837        // If we've reached persisted tip by walking the canonical chain back, everything is fine.
1838        if canonical == persisted {
1839            return Ok(None);
1840        }
1841
1842        // At this point, we know that `persisted` block can't be reached by walking the canonical
1843        // chain back. In this case we need to truncate it to the first canonical block it connects
1844        // to.
1845
1846        // Firstly, walk back until we reach the same height as `canonical`.
1847        while persisted.number > canonical.number {
1848            persisted = parent_num_hash(persisted)?;
1849        }
1850
1851        debug_assert_eq!(persisted.number, canonical.number);
1852
1853        // Now walk both chains back until we find a common ancestor.
1854        while persisted.hash != canonical.hash {
1855            canonical = parent_num_hash(canonical)?;
1856            persisted = parent_num_hash(persisted)?;
1857        }
1858
1859        debug!(target: "engine::tree", remove_above=persisted.number, "on-disk reorg detected");
1860
1861        Ok(Some(persisted.number))
1862    }
1863
1864    /// Invoked when we the canonical chain has been updated.
1865    ///
1866    /// This is invoked on a valid forkchoice update, or if we can make the target block canonical.
1867    fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain<N>) {
1868        trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks =  %chain_update.reorged_block_count(), "applying new chain update");
1869        let start = Instant::now();
1870
1871        // update the tracked canonical head
1872        self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
1873
1874        let tip = chain_update.tip().clone_sealed_header();
1875        let notification = chain_update.to_chain_notification();
1876
1877        // reinsert any missing reorged blocks
1878        if let NewCanonicalChain::Reorg { new, old } = &chain_update {
1879            let new_first = new.first().map(|first| first.recovered_block().num_hash());
1880            let old_first = old.first().map(|first| first.recovered_block().num_hash());
1881            trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
1882
1883            self.update_reorg_metrics(old.len());
1884            self.reinsert_reorged_blocks(new.clone());
1885            // Try reinserting the reorged canonical chain. This is only possible if we have
1886            // `persisted_trie_updates` for those blocks.
1887            let old = old
1888                .iter()
1889                .filter_map(|block| {
1890                    let (_, trie) = self
1891                        .state
1892                        .tree_state
1893                        .persisted_trie_updates
1894                        .get(&block.recovered_block.hash())
1895                        .cloned()?;
1896                    Some(ExecutedBlockWithTrieUpdates { block: block.clone(), trie })
1897                })
1898                .collect::<Vec<_>>();
1899            self.reinsert_reorged_blocks(old);
1900        }
1901
1902        // update the tracked in-memory state with the new chain
1903        self.canonical_in_memory_state.update_chain(chain_update);
1904        self.canonical_in_memory_state.set_canonical_head(tip.clone());
1905
1906        // Update metrics based on new tip
1907        self.metrics.tree.canonical_chain_height.set(tip.number() as f64);
1908
1909        // sends an event to all active listeners about the new canonical chain
1910        self.canonical_in_memory_state.notify_canon_state(notification);
1911
1912        // emit event
1913        self.emit_event(BeaconConsensusEngineEvent::CanonicalChainCommitted(
1914            Box::new(tip),
1915            start.elapsed(),
1916        ));
1917    }
1918
1919    /// This updates metrics based on the given reorg length.
1920    fn update_reorg_metrics(&self, old_chain_length: usize) {
1921        self.metrics.tree.reorgs.increment(1);
1922        self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
1923    }
1924
1925    /// This reinserts any blocks in the new chain that do not already exist in the tree
1926    fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlockWithTrieUpdates<N>>) {
1927        for block in new_chain {
1928            if self
1929                .state
1930                .tree_state
1931                .executed_block_by_hash(block.recovered_block().hash())
1932                .is_none()
1933            {
1934                trace!(target: "engine::tree", num=?block.recovered_block().number(), hash=?block.recovered_block().hash(), "Reinserting block into tree state");
1935                self.state.tree_state.insert_executed(block);
1936            }
1937        }
1938    }
1939
1940    /// Invoke the invalid block hook if this is a new invalid block.
1941    fn on_invalid_block(
1942        &mut self,
1943        parent_header: &SealedHeader<N::BlockHeader>,
1944        block: &RecoveredBlock<N::Block>,
1945        output: &BlockExecutionOutput<N::Receipt>,
1946        trie_updates: Option<(&TrieUpdates, B256)>,
1947    ) {
1948        if self.state.invalid_headers.get(&block.hash()).is_some() {
1949            // we already marked this block as invalid
1950            return;
1951        }
1952        self.invalid_block_hook.on_invalid_block(parent_header, block, output, trie_updates);
1953    }
1954
1955    /// This handles downloaded blocks that are shown to be disconnected from the canonical chain.
1956    ///
1957    /// This mainly compares the missing parent of the downloaded block with the current canonical
1958    /// tip, and decides whether or not backfill sync should be triggered.
1959    fn on_disconnected_downloaded_block(
1960        &self,
1961        downloaded_block: BlockNumHash,
1962        missing_parent: BlockNumHash,
1963        head: BlockNumHash,
1964    ) -> Option<TreeEvent> {
1965        // compare the missing parent with the canonical tip
1966        if let Some(target) =
1967            self.backfill_sync_target(head.number, missing_parent.number, Some(downloaded_block))
1968        {
1969            trace!(target: "engine::tree", %target, "triggering backfill on downloaded block");
1970            return Some(TreeEvent::BackfillAction(BackfillAction::Start(target.into())));
1971        }
1972
1973        // continue downloading the missing parent
1974        //
1975        // this happens if either:
1976        //  * the missing parent block num < canonical tip num
1977        //    * this case represents a missing block on a fork that is shorter than the canonical
1978        //      chain
1979        //  * the missing parent block num >= canonical tip num, but the number of missing blocks is
1980        //    less than the backfill threshold
1981        //    * this case represents a potentially long range of blocks to download and execute
1982        let request = if let Some(distance) =
1983            self.distance_from_local_tip(head.number, missing_parent.number)
1984        {
1985            trace!(target: "engine::tree", %distance, missing=?missing_parent, "downloading missing parent block range");
1986            DownloadRequest::BlockRange(missing_parent.hash, distance)
1987        } else {
1988            trace!(target: "engine::tree", missing=?missing_parent, "downloading missing parent block");
1989            // This happens when the missing parent is on an outdated
1990            // sidechain and we can only download the missing block itself
1991            DownloadRequest::single_block(missing_parent.hash)
1992        };
1993
1994        Some(TreeEvent::Download(request))
1995    }
1996
1997    /// Invoked with a block downloaded from the network
1998    ///
1999    /// Returns an event with the appropriate action to take, such as:
2000    ///  - download more missing blocks
2001    ///  - try to canonicalize the target if the `block` is the tracked target (head) block.
2002    #[instrument(level = "trace", skip_all, fields(block_hash = %block.hash(), block_num = %block.number(),), target = "engine::tree")]
2003    fn on_downloaded_block(
2004        &mut self,
2005        block: RecoveredBlock<N::Block>,
2006    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2007        let block_num_hash = block.num_hash();
2008        let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
2009        if self
2010            .check_invalid_ancestor_with_head(lowest_buffered_ancestor, block.sealed_block())?
2011            .is_some()
2012        {
2013            return Ok(None)
2014        }
2015
2016        if !self.backfill_sync_state.is_idle() {
2017            return Ok(None)
2018        }
2019
2020        // try to append the block
2021        match self.insert_block(block) {
2022            Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => {
2023                if self.is_sync_target_head(block_num_hash.hash) {
2024                    trace!(target: "engine::tree", "appended downloaded sync target block");
2025
2026                    // we just inserted the current sync target block, we can try to make it
2027                    // canonical
2028                    return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
2029                        sync_target_head: block_num_hash.hash,
2030                    })))
2031                }
2032                trace!(target: "engine::tree", "appended downloaded block");
2033                self.try_connect_buffered_blocks(block_num_hash)?;
2034            }
2035            Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { head, missing_ancestor })) => {
2036                // block is not connected to the canonical head, we need to download
2037                // its missing branch first
2038                return Ok(self.on_disconnected_downloaded_block(
2039                    block_num_hash,
2040                    missing_ancestor,
2041                    head,
2042                ))
2043            }
2044            Ok(InsertPayloadOk::AlreadySeen(_)) => {
2045                trace!(target: "engine::tree", "downloaded block already executed");
2046            }
2047            Err(err) => {
2048                debug!(target: "engine::tree", err=%err.kind(), "failed to insert downloaded block");
2049                if let Err(fatal) = self.on_insert_block_error(err) {
2050                    warn!(target: "engine::tree", %fatal, "fatal error occurred while inserting downloaded block");
2051                    return Err(fatal)
2052                }
2053            }
2054        }
2055        Ok(None)
2056    }
2057
2058    fn insert_block(
2059        &mut self,
2060        block: RecoveredBlock<N::Block>,
2061    ) -> Result<InsertPayloadOk, InsertBlockError<N::Block>> {
2062        match self.insert_block_inner(block) {
2063            Ok(result) => Ok(result),
2064            Err((kind, block)) => Err(InsertBlockError::new(block.into_sealed_block(), kind)),
2065        }
2066    }
2067
2068    fn insert_block_inner(
2069        &mut self,
2070        block: RecoveredBlock<N::Block>,
2071    ) -> Result<InsertPayloadOk, (InsertBlockErrorKind, RecoveredBlock<N::Block>)> {
2072        /// A helper macro that returns the block in case there was an error
2073        macro_rules! ensure_ok {
2074            ($expr:expr) => {
2075                match $expr {
2076                    Ok(val) => val,
2077                    Err(e) => return Err((e.into(), block)),
2078                }
2079            };
2080        }
2081
2082        let block_num_hash = block.num_hash();
2083        debug!(target: "engine::tree", block=?block_num_hash, parent = ?block.parent_hash(), state_root = ?block.state_root(), "Inserting new block into tree");
2084
2085        if ensure_ok!(self.block_by_hash(block.hash())).is_some() {
2086            return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid))
2087        }
2088
2089        let start = Instant::now();
2090
2091        trace!(target: "engine::tree", block=?block_num_hash, "Validating block consensus");
2092
2093        // validate block consensus rules
2094        ensure_ok!(self.validate_block(&block));
2095
2096        trace!(target: "engine::tree", block=?block_num_hash, parent=?block.parent_hash(), "Fetching block state provider");
2097        let Some(provider_builder) = ensure_ok!(self.state_provider_builder(block.parent_hash()))
2098        else {
2099            // we don't have the state required to execute this block, buffering it and find the
2100            // missing parent block
2101            let missing_ancestor = self
2102                .state
2103                .buffer
2104                .lowest_ancestor(&block.parent_hash())
2105                .map(|block| block.parent_num_hash())
2106                .unwrap_or_else(|| block.parent_num_hash());
2107
2108            self.state.buffer.insert_block(block);
2109
2110            return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected {
2111                head: self.state.tree_state.current_canonical_head,
2112                missing_ancestor,
2113            }))
2114        };
2115
2116        // now validate against the parent
2117        let Some(parent_block) = ensure_ok!(self.sealed_header_by_hash(block.parent_hash())) else {
2118            return Err((
2119                InsertBlockErrorKind::Provider(ProviderError::HeaderNotFound(
2120                    block.parent_hash().into(),
2121                )),
2122                block,
2123            ))
2124        };
2125
2126        if let Err(e) =
2127            self.consensus.validate_header_against_parent(block.sealed_header(), &parent_block)
2128        {
2129            warn!(target: "engine::tree", ?block, "Failed to validate header {} against parent: {e}", block.hash());
2130            return Err((e.into(), block))
2131        }
2132
2133        let state_provider = ensure_ok!(provider_builder.build());
2134
2135        // We only run the parallel state root if we are not currently persisting any blocks or
2136        // persisting blocks that are all ancestors of the one we are executing.
2137        //
2138        // If we're committing ancestor blocks, then: any trie updates being committed are a subset
2139        // of the in-memory trie updates collected before fetching reverts. So any diff in
2140        // reverts (pre vs post commit) is already covered by the in-memory trie updates we
2141        // collect in `compute_state_root_parallel`.
2142        //
2143        // See https://github.com/paradigmxyz/reth/issues/12688 for more details
2144        let persisting_kind = self.persisting_kind_for(block.header());
2145        let run_parallel_state_root = persisting_kind.can_run_parallel_state_root();
2146
2147        // use prewarming background task
2148        let header = block.clone_sealed_header();
2149        let txs = block.clone_transactions_recovered().collect();
2150        let mut handle = if run_parallel_state_root && self.config.use_state_root_task() {
2151            // use background tasks for state root calc
2152            let consistent_view =
2153                ensure_ok!(ConsistentDbView::new_with_latest_tip(self.provider.clone()));
2154
2155            // Compute trie input
2156            let trie_input_start = Instant::now();
2157            let res = self.compute_trie_input(
2158                persisting_kind,
2159                consistent_view.clone(),
2160                block.header().parent_hash(),
2161            );
2162            let trie_input = match res {
2163                Ok(val) => val,
2164                Err(e) => return Err((InsertBlockErrorKind::Other(Box::new(e)), block)),
2165            };
2166
2167            self.metrics
2168                .block_validation
2169                .trie_input_duration
2170                .record(trie_input_start.elapsed().as_secs_f64());
2171
2172            self.payload_processor.spawn(
2173                header,
2174                txs,
2175                provider_builder,
2176                consistent_view,
2177                trie_input,
2178                &self.config,
2179            )
2180        } else {
2181            self.payload_processor.spawn_cache_exclusive(header, txs, provider_builder)
2182        };
2183
2184        // Use cached state provider before executing, used in execution after prewarming threads
2185        // complete
2186        let state_provider = CachedStateProvider::new_with_caches(
2187            state_provider,
2188            handle.caches(),
2189            handle.cache_metrics(),
2190        );
2191
2192        let (output, execution_finish) = if self.config.state_provider_metrics() {
2193            let state_provider = InstrumentedStateProvider::from_state_provider(&state_provider);
2194            let (output, execution_finish) =
2195                ensure_ok!(self.execute_block(&state_provider, &block, &handle));
2196            state_provider.record_total_latency();
2197            (output, execution_finish)
2198        } else {
2199            let (output, execution_finish) =
2200                ensure_ok!(self.execute_block(&state_provider, &block, &handle));
2201            (output, execution_finish)
2202        };
2203
2204        // after executing the block we can stop executing transactions
2205        handle.stop_prewarming_execution();
2206
2207        if let Err(err) = self.consensus.validate_block_post_execution(&block, &output) {
2208            // call post-block hook
2209            self.on_invalid_block(&parent_block, &block, &output, None);
2210            return Err((err.into(), block))
2211        }
2212
2213        let hashed_state = self.provider.hashed_post_state(&output.state);
2214
2215        if let Err(err) = self
2216            .payload_validator
2217            .validate_block_post_execution_with_hashed_state(&hashed_state, &block)
2218        {
2219            // call post-block hook
2220            self.on_invalid_block(&parent_block, &block, &output, None);
2221            return Err((err.into(), block))
2222        }
2223
2224        debug!(target: "engine::tree", block=?block_num_hash, "Calculating block state root");
2225
2226        let root_time = Instant::now();
2227
2228        let mut maybe_state_root = None;
2229
2230        if run_parallel_state_root {
2231            // if we new payload extends the current canonical change we attempt to use the
2232            // background task or try to compute it in parallel
2233            if self.config.use_state_root_task() {
2234                match handle.state_root() {
2235                    Ok(StateRootComputeOutcome { state_root, trie_updates }) => {
2236                        let elapsed = execution_finish.elapsed();
2237                        info!(target: "engine::tree", ?state_root, ?elapsed, "State root task finished");
2238                        // we double check the state root here for good measure
2239                        if state_root == block.header().state_root() {
2240                            maybe_state_root = Some((state_root, trie_updates, elapsed))
2241                        } else {
2242                            warn!(
2243                                target: "engine::tree",
2244                                ?state_root,
2245                                block_state_root = ?block.header().state_root(),
2246                                "State root task returned incorrect state root"
2247                            );
2248                        }
2249                    }
2250                    Err(error) => {
2251                        debug!(target: "engine::tree", %error, "Background parallel state root computation failed");
2252                    }
2253                }
2254            } else {
2255                match self.compute_state_root_parallel(
2256                    persisting_kind,
2257                    block.header().parent_hash(),
2258                    &hashed_state,
2259                ) {
2260                    Ok(result) => {
2261                        info!(
2262                            target: "engine::tree",
2263                            block = ?block_num_hash,
2264                            regular_state_root = ?result.0,
2265                            "Regular root task finished"
2266                        );
2267                        maybe_state_root = Some((result.0, result.1, root_time.elapsed()));
2268                    }
2269                    Err(ParallelStateRootError::Provider(ProviderError::ConsistentView(error))) => {
2270                        debug!(target: "engine::tree", %error, "Parallel state root computation failed consistency check, falling back");
2271                    }
2272                    Err(error) => return Err((InsertBlockErrorKind::Other(Box::new(error)), block)),
2273                }
2274            }
2275        }
2276
2277        let (state_root, trie_output, root_elapsed) = if let Some(maybe_state_root) =
2278            maybe_state_root
2279        {
2280            maybe_state_root
2281        } else {
2282            // fallback is to compute the state root regularly in sync
2283            warn!(target: "engine::tree", block=?block_num_hash, ?persisting_kind, "Failed to compute state root in parallel");
2284            self.metrics.block_validation.state_root_parallel_fallback_total.increment(1);
2285            let (root, updates) =
2286                ensure_ok!(state_provider.state_root_with_updates(hashed_state.clone()));
2287            (root, updates, root_time.elapsed())
2288        };
2289
2290        self.metrics.block_validation.record_state_root(&trie_output, root_elapsed.as_secs_f64());
2291        debug!(target: "engine::tree", ?root_elapsed, block=?block_num_hash, "Calculated state root");
2292
2293        // ensure state root matches
2294        if state_root != block.header().state_root() {
2295            // call post-block hook
2296            self.on_invalid_block(&parent_block, &block, &output, Some((&trie_output, state_root)));
2297            return Err((
2298                ConsensusError::BodyStateRootDiff(
2299                    GotExpected { got: state_root, expected: block.header().state_root() }.into(),
2300                )
2301                .into(),
2302                block,
2303            ))
2304        }
2305
2306        // terminate prewarming task with good state output
2307        handle.terminate_caching(Some(output.state.clone()));
2308
2309        let executed: ExecutedBlockWithTrieUpdates<N> = ExecutedBlockWithTrieUpdates {
2310            block: ExecutedBlock {
2311                recovered_block: Arc::new(block),
2312                execution_output: Arc::new(ExecutionOutcome::from((output, block_num_hash.number))),
2313                hashed_state: Arc::new(hashed_state),
2314            },
2315            trie: Arc::new(trie_output),
2316        };
2317
2318        // if the parent is the canonical head, we can insert the block as the pending block
2319        if self.state.tree_state.canonical_block_hash() == executed.recovered_block().parent_hash()
2320        {
2321            debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
2322            self.canonical_in_memory_state.set_pending_block(executed.clone());
2323        }
2324
2325        self.state.tree_state.insert_executed(executed.clone());
2326        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
2327
2328        // emit insert event
2329        let elapsed = start.elapsed();
2330        let is_fork = match self.is_fork(block_num_hash.hash) {
2331            Ok(val) => val,
2332            Err(e) => return Err((e.into(), executed.block.recovered_block().clone())),
2333        };
2334        let engine_event = if is_fork {
2335            BeaconConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
2336        } else {
2337            BeaconConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
2338        };
2339        self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
2340
2341        debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block");
2342        Ok(InsertPayloadOk::Inserted(BlockStatus::Valid))
2343    }
2344
2345    /// Executes a block with the given state provider
2346    fn execute_block<S: StateProvider>(
2347        &mut self,
2348        state_provider: S,
2349        block: &RecoveredBlock<N::Block>,
2350        handle: &PayloadHandle,
2351    ) -> Result<(BlockExecutionOutput<N::Receipt>, Instant), InsertBlockErrorKind> {
2352        debug!(target: "engine::tree", block=?block.num_hash(), "Executing block");
2353        let mut db = State::builder()
2354            .with_database(StateProviderDatabase::new(&state_provider))
2355            .with_bundle_update()
2356            .without_state_clear()
2357            .build();
2358
2359        // seismic upstream merge: we do not enable precompile cache since it breaks our stateful
2360        // precompiles
2361        let executor = self.evm_config.executor_for_block(&mut db, block);
2362        // if self.config.precompile_cache_enabled() {
2363        //     executor.evm_mut().precompiles_mut().map_precompiles(|address, precompile| {
2364        //         CachedPrecompile::wrap(
2365        //             precompile,
2366        //             self.precompile_cache_map.cache_for_address(*address),
2367        //             *self.evm_config.evm_env(block.header()).spec_id(),
2368        //         )
2369        //     });
2370        // }
2371
2372        let execution_start = Instant::now();
2373        let output = self.metrics.executor.execute_metered(
2374            executor,
2375            block,
2376            Box::new(handle.state_hook()),
2377        )?;
2378        let execution_finish = Instant::now();
2379        let execution_time = execution_finish.duration_since(execution_start);
2380        debug!(target: "engine::tree", elapsed = ?execution_time, number=?block.number(), "Executed block");
2381        Ok((output, execution_finish))
2382    }
2383
2384    /// Compute state root for the given hashed post state in parallel.
2385    ///
2386    /// # Returns
2387    ///
2388    /// Returns `Ok(_)` if computed successfully.
2389    /// Returns `Err(_)` if error was encountered during computation.
2390    /// `Err(ProviderError::ConsistentView(_))` can be safely ignored and fallback computation
2391    /// should be used instead.
2392    fn compute_state_root_parallel(
2393        &self,
2394        persisting_kind: PersistingKind,
2395        parent_hash: B256,
2396        hashed_state: &HashedPostState,
2397    ) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
2398        let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
2399
2400        let mut input =
2401            self.compute_trie_input(persisting_kind, consistent_view.clone(), parent_hash)?;
2402        // Extend with block we are validating root for.
2403        input.append_ref(hashed_state);
2404
2405        ParallelStateRoot::new(consistent_view, input).incremental_root_with_updates()
2406    }
2407
2408    /// Computes the trie input at the provided parent hash.
2409    ///
2410    /// The goal of this function is to take in-memory blocks and generate a [`TrieInput`] that
2411    /// serves as an overlay to the database blocks.
2412    ///
2413    /// It works as follows:
2414    /// 1. Collect in-memory blocks that are descendants of the provided parent hash using
2415    ///    [`TreeState::blocks_by_hash`].
2416    /// 2. If the persistence is in progress, and the block that we're computing the trie input for
2417    ///    is a descendant of the currently persisting blocks, we need to be sure that in-memory
2418    ///    blocks are not overlapping with the database blocks that may have been already persisted.
2419    ///    To do that, we're filtering out in-memory blocks that are lower than the highest database
2420    ///    block.
2421    /// 3. Once in-memory blocks are collected and optionally filtered, we compute the
2422    ///    [`HashedPostState`] from them.
2423    fn compute_trie_input(
2424        &self,
2425        persisting_kind: PersistingKind,
2426        consistent_view: ConsistentDbView<P>,
2427        parent_hash: B256,
2428    ) -> Result<TrieInput, ParallelStateRootError> {
2429        let mut input = TrieInput::default();
2430
2431        let provider = consistent_view.provider_ro()?;
2432        let best_block_number = provider.best_block_number()?;
2433
2434        let (mut historical, mut blocks) = self
2435            .state
2436            .tree_state
2437            .blocks_by_hash(parent_hash)
2438            .map_or_else(|| (parent_hash.into(), vec![]), |(hash, blocks)| (hash.into(), blocks));
2439
2440        // If the current block is a descendant of the currently persisting blocks, then we need to
2441        // filter in-memory blocks, so that none of them are already persisted in the database.
2442        if persisting_kind.is_descendant() {
2443            // Iterate over the blocks from oldest to newest.
2444            while let Some(block) = blocks.last() {
2445                let recovered_block = block.recovered_block();
2446                if recovered_block.number() <= best_block_number {
2447                    // Remove those blocks that lower than or equal to the highest database
2448                    // block.
2449                    blocks.pop();
2450                } else {
2451                    // If the block is higher than the best block number, stop filtering, as it's
2452                    // the first block that's not in the database.
2453                    break
2454                }
2455            }
2456
2457            historical = if let Some(block) = blocks.last() {
2458                // If there are any in-memory blocks left after filtering, set the anchor to the
2459                // parent of the oldest block.
2460                (block.recovered_block().number() - 1).into()
2461            } else {
2462                // Otherwise, set the anchor to the original provided parent hash.
2463                parent_hash.into()
2464            };
2465        }
2466
2467        if blocks.is_empty() {
2468            debug!(target: "engine::tree", %parent_hash, "Parent found on disk");
2469        } else {
2470            debug!(target: "engine::tree", %parent_hash, %historical, blocks = blocks.len(), "Parent found in memory");
2471        }
2472
2473        // Convert the historical block to the block number.
2474        let block_number = provider
2475            .convert_hash_or_number(historical)?
2476            .ok_or_else(|| ProviderError::BlockHashNotFound(historical.as_hash().unwrap()))?;
2477
2478        // Retrieve revert state for historical block.
2479        let revert_state = if block_number == best_block_number {
2480            // We do not check against the `last_block_number` here because
2481            // `HashedPostState::from_reverts` only uses the database tables, and not static files.
2482            debug!(target: "engine::tree", block_number, best_block_number, "Empty revert state");
2483            HashedPostState::default()
2484        } else {
2485            let revert_state = HashedPostState::from_reverts::<
2486                <P::StateCommitment as StateCommitment>::KeyHasher,
2487            >(provider.tx_ref(), block_number + 1)
2488            .map_err(ProviderError::from)?;
2489            debug!(
2490                target: "engine::tree",
2491                block_number,
2492                best_block_number,
2493                accounts = revert_state.accounts.len(),
2494                storages = revert_state.storages.len(),
2495                "Non-empty revert state"
2496            );
2497            revert_state
2498        };
2499        input.append(revert_state);
2500
2501        // Extend with contents of parent in-memory blocks.
2502        for block in blocks.iter().rev() {
2503            input.append_cached_ref(block.trie_updates(), block.hashed_state())
2504        }
2505
2506        Ok(input)
2507    }
2508
2509    /// Handles an error that occurred while inserting a block.
2510    ///
2511    /// If this is a validation error this will mark the block as invalid.
2512    ///
2513    /// Returns the proper payload status response if the block is invalid.
2514    fn on_insert_block_error(
2515        &mut self,
2516        error: InsertBlockError<N::Block>,
2517    ) -> Result<PayloadStatus, InsertBlockFatalError> {
2518        let (block, error) = error.split();
2519
2520        // if invalid block, we check the validation error. Otherwise return the fatal
2521        // error.
2522        let validation_err = error.ensure_validation_error()?;
2523
2524        // If the error was due to an invalid payload, the payload is added to the
2525        // invalid headers cache and `Ok` with [PayloadStatusEnum::Invalid] is
2526        // returned.
2527        warn!(
2528            target: "engine::tree",
2529            invalid_hash=%block.hash(),
2530            invalid_number=block.number(),
2531            %validation_err,
2532            "Invalid block error on new payload",
2533        );
2534        let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(block.parent_hash())?;
2535
2536        // keep track of the invalid header
2537        self.state.invalid_headers.insert(block.block_with_parent());
2538        self.emit_event(EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::InvalidBlock(
2539            Box::new(block),
2540        )));
2541        Ok(PayloadStatus::new(
2542            PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() },
2543            latest_valid_hash,
2544        ))
2545    }
2546
2547    /// Attempts to find the header for the given block hash if it is canonical.
2548    pub fn find_canonical_header(
2549        &self,
2550        hash: B256,
2551    ) -> Result<Option<SealedHeader<N::BlockHeader>>, ProviderError> {
2552        let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
2553
2554        if canonical.is_none() {
2555            canonical = self.provider.header(&hash)?.map(|header| SealedHeader::new(header, hash));
2556        }
2557
2558        Ok(canonical)
2559    }
2560
2561    /// Updates the tracked finalized block if we have it.
2562    fn update_finalized_block(
2563        &self,
2564        finalized_block_hash: B256,
2565    ) -> Result<(), OnForkChoiceUpdated> {
2566        if finalized_block_hash.is_zero() {
2567            return Ok(())
2568        }
2569
2570        match self.find_canonical_header(finalized_block_hash) {
2571            Ok(None) => {
2572                debug!(target: "engine::tree", "Finalized block not found in canonical chain");
2573                // if the finalized block is not known, we can't update the finalized block
2574                return Err(OnForkChoiceUpdated::invalid_state())
2575            }
2576            Ok(Some(finalized)) => {
2577                if Some(finalized.num_hash()) !=
2578                    self.canonical_in_memory_state.get_finalized_num_hash()
2579                {
2580                    // we're also persisting the finalized block on disk so we can reload it on
2581                    // restart this is required by optimism which queries the finalized block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
2582                    let _ = self.persistence.save_finalized_block_number(finalized.number());
2583                    self.canonical_in_memory_state.set_finalized(finalized);
2584                }
2585            }
2586            Err(err) => {
2587                error!(target: "engine::tree", %err, "Failed to fetch finalized block header");
2588            }
2589        }
2590
2591        Ok(())
2592    }
2593
2594    /// Updates the tracked safe block if we have it
2595    fn update_safe_block(&self, safe_block_hash: B256) -> Result<(), OnForkChoiceUpdated> {
2596        if safe_block_hash.is_zero() {
2597            return Ok(())
2598        }
2599
2600        match self.find_canonical_header(safe_block_hash) {
2601            Ok(None) => {
2602                debug!(target: "engine::tree", "Safe block not found in canonical chain");
2603                // if the safe block is not known, we can't update the safe block
2604                return Err(OnForkChoiceUpdated::invalid_state())
2605            }
2606            Ok(Some(safe)) => {
2607                if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
2608                    // we're also persisting the safe block on disk so we can reload it on
2609                    // restart this is required by optimism which queries the safe block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
2610                    let _ = self.persistence.save_safe_block_number(safe.number());
2611                    self.canonical_in_memory_state.set_safe(safe);
2612                }
2613            }
2614            Err(err) => {
2615                error!(target: "engine::tree", %err, "Failed to fetch safe block header");
2616            }
2617        }
2618
2619        Ok(())
2620    }
2621
2622    /// Ensures that the given forkchoice state is consistent, assuming the head block has been
2623    /// made canonical.
2624    ///
2625    /// If the forkchoice state is consistent, this will return Ok(()). Otherwise, this will
2626    /// return an instance of [`OnForkChoiceUpdated`] that is INVALID.
2627    ///
2628    /// This also updates the safe and finalized blocks in the [`CanonicalInMemoryState`], if they
2629    /// are consistent with the head block.
2630    fn ensure_consistent_forkchoice_state(
2631        &self,
2632        state: ForkchoiceState,
2633    ) -> Result<(), OnForkChoiceUpdated> {
2634        // Ensure that the finalized block, if not zero, is known and in the canonical chain
2635        // after the head block is canonicalized.
2636        //
2637        // This ensures that the finalized block is consistent with the head block, i.e. the
2638        // finalized block is an ancestor of the head block.
2639        self.update_finalized_block(state.finalized_block_hash)?;
2640
2641        // Also ensure that the safe block, if not zero, is known and in the canonical chain
2642        // after the head block is canonicalized.
2643        //
2644        // This ensures that the safe block is consistent with the head block, i.e. the safe
2645        // block is an ancestor of the head block.
2646        self.update_safe_block(state.safe_block_hash)
2647    }
2648
2649    /// Pre-validate forkchoice update and check whether it can be processed.
2650    ///
2651    /// This method returns the update outcome if validation fails or
2652    /// the node is syncing and the update cannot be processed at the moment.
2653    fn pre_validate_forkchoice_update(
2654        &mut self,
2655        state: ForkchoiceState,
2656    ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
2657        if state.head_block_hash.is_zero() {
2658            return Ok(Some(OnForkChoiceUpdated::invalid_state()))
2659        }
2660
2661        // check if the new head hash is connected to any ancestor that we previously marked as
2662        // invalid
2663        let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
2664        if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
2665            return Ok(Some(OnForkChoiceUpdated::with_invalid(status)))
2666        }
2667
2668        if !self.backfill_sync_state.is_idle() {
2669            // We can only process new forkchoice updates if the pipeline is idle, since it requires
2670            // exclusive access to the database
2671            trace!(target: "engine::tree", "Pipeline is syncing, skipping forkchoice update");
2672            return Ok(Some(OnForkChoiceUpdated::syncing()))
2673        }
2674
2675        Ok(None)
2676    }
2677
2678    /// Validates the payload attributes with respect to the header and fork choice state.
2679    ///
2680    /// Note: At this point, the fork choice update is considered to be VALID, however, we can still
2681    /// return an error if the payload attributes are invalid.
2682    fn process_payload_attributes(
2683        &self,
2684        attrs: T::PayloadAttributes,
2685        head: &N::BlockHeader,
2686        state: ForkchoiceState,
2687        version: EngineApiMessageVersion,
2688    ) -> OnForkChoiceUpdated {
2689        if let Err(err) =
2690            self.payload_validator.validate_payload_attributes_against_header(&attrs, head)
2691        {
2692            warn!(target: "engine::tree", %err, ?head, "Invalid payload attributes");
2693            return OnForkChoiceUpdated::invalid_payload_attributes()
2694        }
2695
2696        // 8. Client software MUST begin a payload build process building on top of
2697        //    forkchoiceState.headBlockHash and identified via buildProcessId value if
2698        //    payloadAttributes is not null and the forkchoice state has been updated successfully.
2699        //    The build process is specified in the Payload building section.
2700        match <T::PayloadBuilderAttributes as PayloadBuilderAttributes>::try_new(
2701            state.head_block_hash,
2702            attrs,
2703            version as u8,
2704        ) {
2705            Ok(attributes) => {
2706                // send the payload to the builder and return the receiver for the pending payload
2707                // id, initiating payload job is handled asynchronously
2708                let pending_payload_id = self.payload_builder.send_new_payload(attributes);
2709
2710                // Client software MUST respond to this method call in the following way:
2711                // {
2712                //      payloadStatus: {
2713                //          status: VALID,
2714                //          latestValidHash: forkchoiceState.headBlockHash,
2715                //          validationError: null
2716                //      },
2717                //      payloadId: buildProcessId
2718                // }
2719                //
2720                // if the payload is deemed VALID and the build process has begun.
2721                OnForkChoiceUpdated::updated_with_pending_payload_id(
2722                    PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
2723                    pending_payload_id,
2724                )
2725            }
2726            Err(_) => OnForkChoiceUpdated::invalid_payload_attributes(),
2727        }
2728    }
2729
2730    /// Remove all blocks up to __and including__ the given block number.
2731    ///
2732    /// If a finalized hash is provided, the only non-canonical blocks which will be removed are
2733    /// those which have a fork point at or below the finalized hash.
2734    ///
2735    /// Canonical blocks below the upper bound will still be removed.
2736    pub(crate) fn remove_before(
2737        &mut self,
2738        upper_bound: BlockNumHash,
2739        finalized_hash: Option<B256>,
2740    ) -> ProviderResult<()> {
2741        // first fetch the finalized block number and then call the remove_before method on
2742        // tree_state
2743        let num = if let Some(hash) = finalized_hash {
2744            self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
2745        } else {
2746            None
2747        };
2748
2749        self.state.tree_state.remove_until(
2750            upper_bound,
2751            self.persistence_state.last_persisted_block.hash,
2752            num,
2753        );
2754        Ok(())
2755    }
2756
2757    /// Returns a builder for creating state providers for the given hash.
2758    ///
2759    /// This is an optimization for parallel execution contexts where we want to avoid
2760    /// creating state providers in the critical path.
2761    pub fn state_provider_builder(
2762        &self,
2763        hash: B256,
2764    ) -> ProviderResult<Option<StateProviderBuilder<N, P>>>
2765    where
2766        P: BlockReader + StateProviderFactory + StateReader + StateCommitmentProvider + Clone,
2767    {
2768        if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
2769            debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory, creating provider builder");
2770            // the block leads back to the canonical chain
2771            return Ok(Some(StateProviderBuilder::new(
2772                self.provider.clone(),
2773                historical,
2774                Some(blocks),
2775            )))
2776        }
2777
2778        // Check if the block is persisted
2779        if let Some(header) = self.provider.header(&hash)? {
2780            debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
2781            // For persisted blocks, we create a builder that will fetch state directly from the
2782            // database
2783            return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
2784        }
2785
2786        debug!(target: "engine::tree", %hash, "no canonical state found for block");
2787        Ok(None)
2788    }
2789}
2790
2791/// Block inclusion can be valid, accepted, or invalid. Invalid blocks are returned as an error
2792/// variant.
2793///
2794/// If we don't know the block's parent, we return `Disconnected`, as we can't claim that the block
2795/// is valid or not.
2796#[derive(Clone, Copy, Debug, Eq, PartialEq)]
2797pub enum BlockStatus {
2798    /// The block is valid and block extends canonical chain.
2799    Valid,
2800    /// The block may be valid and has an unknown missing ancestor.
2801    Disconnected {
2802        /// Current canonical head.
2803        head: BlockNumHash,
2804        /// The lowest ancestor block that is not connected to the canonical chain.
2805        missing_ancestor: BlockNumHash,
2806    },
2807}
2808
2809/// How a payload was inserted if it was valid.
2810///
2811/// If the payload was valid, but has already been seen, [`InsertPayloadOk::AlreadySeen(_)`] is
2812/// returned, otherwise [`InsertPayloadOk::Inserted(_)`] is returned.
2813#[derive(Clone, Copy, Debug, Eq, PartialEq)]
2814pub enum InsertPayloadOk {
2815    /// The payload was valid, but we have already seen it.
2816    AlreadySeen(BlockStatus),
2817    /// The payload was valid and inserted into the tree.
2818    Inserted(BlockStatus),
2819}
2820
2821/// Whether or not the blocks are currently persisting and the input block is a descendant.
2822#[derive(Debug, Clone, Copy)]
2823pub enum PersistingKind {
2824    /// The blocks are not currently persisting.
2825    NotPersisting,
2826    /// The blocks are currently persisting but the input block is not a descendant.
2827    PersistingNotDescendant,
2828    /// The blocks are currently persisting and the input block is a descendant.
2829    PersistingDescendant,
2830}
2831
2832impl PersistingKind {
2833    /// Returns true if the parallel state root can be run.
2834    ///
2835    /// We only run the parallel state root if we are not currently persisting any blocks or
2836    /// persisting blocks that are all ancestors of the one we are calculating the state root for.
2837    pub const fn can_run_parallel_state_root(&self) -> bool {
2838        matches!(self, Self::NotPersisting | Self::PersistingDescendant)
2839    }
2840
2841    /// Returns true if the blocks are currently being persisted and the input block is a
2842    /// descendant.
2843    pub const fn is_descendant(&self) -> bool {
2844        matches!(self, Self::PersistingDescendant)
2845    }
2846}
2847#[cfg(test)]
2848mod tests {
2849    use super::*;
2850    use crate::persistence::PersistenceAction;
2851    use alloy_consensus::Header;
2852    use alloy_primitives::{
2853        map::{HashMap, HashSet},
2854        Bytes, B256,
2855    };
2856    use alloy_rlp::Decodable;
2857    use alloy_rpc_types_engine::{
2858        CancunPayloadFields, ExecutionData, ExecutionPayloadSidecar, ExecutionPayloadV1,
2859        ExecutionPayloadV3,
2860    };
2861    use assert_matches::assert_matches;
2862    use reth_chain_state::{test_utils::TestBlockBuilder, BlockState};
2863    use reth_chainspec::{ChainSpec, HOLESKY, MAINNET};
2864    use reth_engine_primitives::ForkchoiceStatus;
2865    use reth_ethereum_consensus::EthBeaconConsensus;
2866    use reth_ethereum_engine_primitives::EthEngineTypes;
2867    use reth_ethereum_primitives::{Block, EthPrimitives};
2868    use reth_evm_ethereum::MockEvmConfig;
2869    use reth_node_ethereum::EthereumEngineValidator;
2870    use reth_primitives_traits::Block as _;
2871    use reth_provider::test_utils::MockEthProvider;
2872    use reth_trie::{updates::TrieUpdates, HashedPostState};
2873    use std::{
2874        collections::BTreeMap,
2875        str::FromStr,
2876        sync::mpsc::{channel, Sender},
2877    };
2878
2879    // seismic imports not used by upstream
2880    use reth_node_core::dirs::MaybePlatformPath;
2881
2882    /// This is a test channel that allows you to `release` any value that is in the channel.
2883    ///
2884    /// If nothing has been sent, then the next value will be immediately sent.
2885    struct TestChannel<T> {
2886        /// If an item is sent to this channel, an item will be released in the wrapped channel
2887        release: Receiver<()>,
2888        /// The sender channel
2889        tx: Sender<T>,
2890        /// The receiver channel
2891        rx: Receiver<T>,
2892    }
2893
2894    impl<T: Send + 'static> TestChannel<T> {
2895        /// Creates a new test channel
2896        fn spawn_channel() -> (Sender<T>, Receiver<T>, TestChannelHandle) {
2897            let (original_tx, original_rx) = channel();
2898            let (wrapped_tx, wrapped_rx) = channel();
2899            let (release_tx, release_rx) = channel();
2900            let handle = TestChannelHandle::new(release_tx);
2901            let test_channel = Self { release: release_rx, tx: wrapped_tx, rx: original_rx };
2902            // spawn the task that listens and releases stuff
2903            std::thread::spawn(move || test_channel.intercept_loop());
2904            (original_tx, wrapped_rx, handle)
2905        }
2906
2907        /// Runs the intercept loop, waiting for the handle to release a value
2908        fn intercept_loop(&self) {
2909            while self.release.recv() == Ok(()) {
2910                let Ok(value) = self.rx.recv() else { return };
2911
2912                let _ = self.tx.send(value);
2913            }
2914        }
2915    }
2916
2917    struct TestChannelHandle {
2918        /// The sender to use for releasing values
2919        release: Sender<()>,
2920    }
2921
2922    impl TestChannelHandle {
2923        /// Returns a [`TestChannelHandle`]
2924        const fn new(release: Sender<()>) -> Self {
2925            Self { release }
2926        }
2927
2928        /// Signals to the channel task that a value should be released
2929        #[expect(dead_code)]
2930        fn release(&self) {
2931            let _ = self.release.send(());
2932        }
2933    }
2934
2935    struct TestHarness {
2936        tree: EngineApiTreeHandler<
2937            EthPrimitives,
2938            MockEthProvider,
2939            EthEngineTypes,
2940            EthereumEngineValidator,
2941            MockEvmConfig,
2942        >,
2943        to_tree_tx: Sender<FromEngine<EngineApiRequest<EthEngineTypes, EthPrimitives>, Block>>,
2944        from_tree_rx: UnboundedReceiver<EngineApiEvent>,
2945        blocks: Vec<ExecutedBlockWithTrieUpdates>,
2946        action_rx: Receiver<PersistenceAction>,
2947        evm_config: MockEvmConfig,
2948        block_builder: TestBlockBuilder,
2949        provider: MockEthProvider,
2950    }
2951
2952    impl TestHarness {
2953        fn new(chain_spec: Arc<ChainSpec>) -> Self {
2954            let (action_tx, action_rx) = channel();
2955            Self::with_persistence_channel(chain_spec, action_tx, action_rx)
2956        }
2957
2958        #[expect(dead_code)]
2959        fn with_test_channel(chain_spec: Arc<ChainSpec>) -> (Self, TestChannelHandle) {
2960            let (action_tx, action_rx, handle) = TestChannel::spawn_channel();
2961            (Self::with_persistence_channel(chain_spec, action_tx, action_rx), handle)
2962        }
2963
2964        fn with_persistence_channel(
2965            chain_spec: Arc<ChainSpec>,
2966            action_tx: Sender<PersistenceAction>,
2967            action_rx: Receiver<PersistenceAction>,
2968        ) -> Self {
2969            let persistence_handle = PersistenceHandle::new(action_tx);
2970
2971            let backup_handle = BackupHandle::spawn_service(MaybePlatformPath::chain_default(
2972                chain_spec.chain.clone(),
2973            ));
2974
2975            let consensus = Arc::new(EthBeaconConsensus::new(chain_spec.clone()));
2976
2977            let provider = MockEthProvider::default();
2978
2979            let payload_validator = EthereumEngineValidator::new(chain_spec.clone());
2980
2981            let (from_tree_tx, from_tree_rx) = unbounded_channel();
2982
2983            let header = chain_spec.genesis_header().clone();
2984            let header = SealedHeader::seal_slow(header);
2985            let engine_api_tree_state =
2986                EngineApiTreeState::new(10, 10, header.num_hash(), EngineApiKind::Ethereum);
2987            let canonical_in_memory_state = CanonicalInMemoryState::with_head(header, None, None);
2988
2989            let (to_payload_service, _payload_command_rx) = unbounded_channel();
2990            let payload_builder = PayloadBuilderHandle::new(to_payload_service);
2991
2992            let evm_config = MockEvmConfig::default();
2993
2994            let tree = EngineApiTreeHandler::new(
2995                provider.clone(),
2996                consensus,
2997                payload_validator,
2998                from_tree_tx,
2999                engine_api_tree_state,
3000                canonical_in_memory_state,
3001                persistence_handle,
3002                PersistenceState::default(),
3003                payload_builder,
3004                // TODO: fix tests for state root task https://github.com/paradigmxyz/reth/issues/14376
3005                // always assume enough parallelism for tests
3006                TreeConfig::default()
3007                    .with_legacy_state_root(true)
3008                    .with_has_enough_parallelism(true),
3009                EngineApiKind::Ethereum,
3010                evm_config.clone(),
3011                backup_handle,
3012            );
3013
3014            let block_builder = TestBlockBuilder::default().with_chain_spec((*chain_spec).clone());
3015            Self {
3016                to_tree_tx: tree.incoming_tx.clone(),
3017                tree,
3018                from_tree_rx,
3019                blocks: vec![],
3020                action_rx,
3021                evm_config,
3022                block_builder,
3023                provider,
3024            }
3025        }
3026
3027        fn with_blocks(mut self, blocks: Vec<ExecutedBlockWithTrieUpdates>) -> Self {
3028            let mut blocks_by_hash = HashMap::default();
3029            let mut blocks_by_number = BTreeMap::new();
3030            let mut state_by_hash = HashMap::default();
3031            let mut hash_by_number = BTreeMap::new();
3032            let mut parent_to_child: HashMap<B256, HashSet<B256>> = HashMap::default();
3033            let mut parent_hash = B256::ZERO;
3034
3035            for block in &blocks {
3036                let sealed_block = block.recovered_block();
3037                let hash = sealed_block.hash();
3038                let number = sealed_block.number;
3039                blocks_by_hash.insert(hash, block.clone());
3040                blocks_by_number.entry(number).or_insert_with(Vec::new).push(block.clone());
3041                state_by_hash.insert(hash, Arc::new(BlockState::new(block.clone())));
3042                hash_by_number.insert(number, hash);
3043                parent_to_child.entry(parent_hash).or_default().insert(hash);
3044                parent_hash = hash;
3045            }
3046
3047            self.tree.state.tree_state = TreeState {
3048                blocks_by_hash,
3049                blocks_by_number,
3050                current_canonical_head: blocks.last().unwrap().recovered_block().num_hash(),
3051                parent_to_child,
3052                persisted_trie_updates: HashMap::default(),
3053                engine_kind: EngineApiKind::Ethereum,
3054            };
3055
3056            let last_executed_block = blocks.last().unwrap().clone();
3057            let pending = Some(BlockState::new(last_executed_block));
3058            self.tree.canonical_in_memory_state =
3059                CanonicalInMemoryState::new(state_by_hash, hash_by_number, pending, None, None);
3060
3061            self.blocks = blocks.clone();
3062
3063            let recovered_blocks =
3064                blocks.iter().map(|b| b.recovered_block().clone()).collect::<Vec<_>>();
3065
3066            self.persist_blocks(recovered_blocks);
3067
3068            self
3069        }
3070
3071        const fn with_backfill_state(mut self, state: BackfillSyncState) -> Self {
3072            self.tree.backfill_sync_state = state;
3073            self
3074        }
3075
3076        fn extend_execution_outcome(
3077            &self,
3078            execution_outcomes: impl IntoIterator<Item = impl Into<ExecutionOutcome>>,
3079        ) {
3080            self.evm_config.extend(execution_outcomes);
3081        }
3082
3083        fn insert_block(
3084            &mut self,
3085            block: RecoveredBlock<reth_ethereum_primitives::Block>,
3086        ) -> Result<InsertPayloadOk, InsertBlockError<Block>> {
3087            let execution_outcome = self.block_builder.get_execution_outcome(block.clone());
3088            self.extend_execution_outcome([execution_outcome]);
3089            self.tree.provider.add_state_root(block.state_root);
3090            self.tree.insert_block(block)
3091        }
3092
3093        async fn fcu_to(&mut self, block_hash: B256, fcu_status: impl Into<ForkchoiceStatus>) {
3094            let fcu_status = fcu_status.into();
3095
3096            self.send_fcu(block_hash, fcu_status).await;
3097
3098            self.check_fcu(block_hash, fcu_status).await;
3099        }
3100
3101        async fn send_fcu(&mut self, block_hash: B256, fcu_status: impl Into<ForkchoiceStatus>) {
3102            let fcu_state = self.fcu_state(block_hash);
3103
3104            let (tx, rx) = oneshot::channel();
3105            self.tree
3106                .on_engine_message(FromEngine::Request(
3107                    BeaconEngineMessage::ForkchoiceUpdated {
3108                        state: fcu_state,
3109                        payload_attrs: None,
3110                        tx,
3111                        version: EngineApiMessageVersion::default(),
3112                    }
3113                    .into(),
3114                ))
3115                .unwrap();
3116
3117            let response = rx.await.unwrap().unwrap().await.unwrap();
3118            match fcu_status.into() {
3119                ForkchoiceStatus::Valid => assert!(response.payload_status.is_valid()),
3120                ForkchoiceStatus::Syncing => assert!(response.payload_status.is_syncing()),
3121                ForkchoiceStatus::Invalid => assert!(response.payload_status.is_invalid()),
3122            }
3123        }
3124
3125        async fn check_fcu(&mut self, block_hash: B256, fcu_status: impl Into<ForkchoiceStatus>) {
3126            let fcu_state = self.fcu_state(block_hash);
3127
3128            // check for ForkchoiceUpdated event
3129            let event = self.from_tree_rx.recv().await.unwrap();
3130            match event {
3131                EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::ForkchoiceUpdated(
3132                    state,
3133                    status,
3134                )) => {
3135                    assert_eq!(state, fcu_state);
3136                    assert_eq!(status, fcu_status.into());
3137                }
3138                _ => panic!("Unexpected event: {event:#?}"),
3139            }
3140        }
3141
3142        const fn fcu_state(&self, block_hash: B256) -> ForkchoiceState {
3143            ForkchoiceState {
3144                head_block_hash: block_hash,
3145                safe_block_hash: block_hash,
3146                finalized_block_hash: block_hash,
3147            }
3148        }
3149
3150        async fn send_new_payload(
3151            &mut self,
3152            block: RecoveredBlock<reth_ethereum_primitives::Block>,
3153        ) {
3154            let payload = ExecutionPayloadV3::from_block_unchecked(
3155                block.hash(),
3156                &block.clone_sealed_block().into_block(),
3157            );
3158            self.tree
3159                .on_new_payload(ExecutionData {
3160                    payload: payload.into(),
3161                    sidecar: ExecutionPayloadSidecar::v3(CancunPayloadFields {
3162                        parent_beacon_block_root: block.parent_beacon_block_root.unwrap(),
3163                        versioned_hashes: vec![],
3164                    }),
3165                })
3166                .unwrap();
3167        }
3168
3169        async fn insert_chain(
3170            &mut self,
3171            chain: impl IntoIterator<Item = RecoveredBlock<reth_ethereum_primitives::Block>> + Clone,
3172        ) {
3173            for block in chain.clone() {
3174                self.insert_block(block.clone()).unwrap();
3175            }
3176            self.check_canon_chain_insertion(chain).await;
3177        }
3178
3179        async fn check_canon_commit(&mut self, hash: B256) {
3180            let event = self.from_tree_rx.recv().await.unwrap();
3181            match event {
3182                EngineApiEvent::BeaconConsensus(
3183                    BeaconConsensusEngineEvent::CanonicalChainCommitted(header, _),
3184                ) => {
3185                    assert_eq!(header.hash(), hash);
3186                }
3187                _ => panic!("Unexpected event: {event:#?}"),
3188            }
3189        }
3190
3191        async fn check_fork_chain_insertion(
3192            &mut self,
3193            chain: impl IntoIterator<Item = RecoveredBlock<reth_ethereum_primitives::Block>> + Clone,
3194        ) {
3195            for block in chain {
3196                self.check_fork_block_added(block.hash()).await;
3197            }
3198        }
3199
3200        async fn check_canon_chain_insertion(
3201            &mut self,
3202            chain: impl IntoIterator<Item = RecoveredBlock<reth_ethereum_primitives::Block>> + Clone,
3203        ) {
3204            for block in chain.clone() {
3205                self.check_canon_block_added(block.hash()).await;
3206            }
3207        }
3208
3209        async fn check_canon_block_added(&mut self, expected_hash: B256) {
3210            let event = self.from_tree_rx.recv().await.unwrap();
3211            match event {
3212                EngineApiEvent::BeaconConsensus(
3213                    BeaconConsensusEngineEvent::CanonicalBlockAdded(executed, _),
3214                ) => {
3215                    assert_eq!(executed.recovered_block.hash(), expected_hash);
3216                }
3217                _ => panic!("Unexpected event: {event:#?}"),
3218            }
3219        }
3220
3221        async fn check_fork_block_added(&mut self, expected_hash: B256) {
3222            let event = self.from_tree_rx.recv().await.unwrap();
3223            match event {
3224                EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::ForkBlockAdded(
3225                    executed,
3226                    _,
3227                )) => {
3228                    assert_eq!(executed.recovered_block.hash(), expected_hash);
3229                }
3230                _ => panic!("Unexpected event: {event:#?}"),
3231            }
3232        }
3233
3234        async fn check_invalid_block(&mut self, expected_hash: B256) {
3235            let event = self.from_tree_rx.recv().await.unwrap();
3236            match event {
3237                EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::InvalidBlock(
3238                    block,
3239                )) => {
3240                    assert_eq!(block.hash(), expected_hash);
3241                }
3242                _ => panic!("Unexpected event: {event:#?}"),
3243            }
3244        }
3245
3246        fn persist_blocks(&self, blocks: Vec<RecoveredBlock<reth_ethereum_primitives::Block>>) {
3247            let mut block_data: Vec<(B256, Block)> = Vec::with_capacity(blocks.len());
3248            let mut headers_data: Vec<(B256, Header)> = Vec::with_capacity(blocks.len());
3249
3250            for block in &blocks {
3251                block_data.push((block.hash(), block.clone_block()));
3252                headers_data.push((block.hash(), block.header().clone()));
3253            }
3254
3255            self.provider.extend_blocks(block_data);
3256            self.provider.extend_headers(headers_data);
3257        }
3258
3259        fn setup_range_insertion_for_valid_chain(
3260            &mut self,
3261            chain: Vec<RecoveredBlock<reth_ethereum_primitives::Block>>,
3262        ) {
3263            self.setup_range_insertion_for_chain(chain, None)
3264        }
3265
3266        fn setup_range_insertion_for_invalid_chain(
3267            &mut self,
3268            chain: Vec<RecoveredBlock<reth_ethereum_primitives::Block>>,
3269            index: usize,
3270        ) {
3271            self.setup_range_insertion_for_chain(chain, Some(index))
3272        }
3273
3274        fn setup_range_insertion_for_chain(
3275            &mut self,
3276            chain: Vec<RecoveredBlock<reth_ethereum_primitives::Block>>,
3277            invalid_index: Option<usize>,
3278        ) {
3279            // setting up execution outcomes for the chain, the blocks will be
3280            // executed starting from the oldest, so we need to reverse.
3281            let mut chain_rev = chain;
3282            chain_rev.reverse();
3283
3284            let mut execution_outcomes = Vec::with_capacity(chain_rev.len());
3285            for (index, block) in chain_rev.iter().enumerate() {
3286                let execution_outcome = self.block_builder.get_execution_outcome(block.clone());
3287                let state_root = if invalid_index.is_some() && invalid_index.unwrap() == index {
3288                    B256::random()
3289                } else {
3290                    block.state_root
3291                };
3292                self.tree.provider.add_state_root(state_root);
3293                execution_outcomes.push(execution_outcome);
3294            }
3295            self.extend_execution_outcome(execution_outcomes);
3296        }
3297
3298        fn check_canon_head(&self, head_hash: B256) {
3299            assert_eq!(self.tree.state.tree_state.canonical_head().hash, head_hash);
3300        }
3301    }
3302
3303    #[test]
3304    fn test_tree_persist_block_batch() {
3305        let tree_config = TreeConfig::default();
3306        let chain_spec = MAINNET.clone();
3307        let mut test_block_builder = TestBlockBuilder::eth().with_chain_spec((*chain_spec).clone());
3308
3309        // we need more than tree_config.persistence_threshold() +1 blocks to
3310        // trigger the persistence task.
3311        let blocks: Vec<_> = test_block_builder
3312            .get_executed_blocks(1..tree_config.persistence_threshold() + 2)
3313            .collect();
3314        let mut test_harness = TestHarness::new(chain_spec).with_blocks(blocks);
3315
3316        let mut blocks = vec![];
3317        for idx in 0..tree_config.max_execute_block_batch_size() * 2 {
3318            blocks.push(test_block_builder.generate_random_block(idx as u64, B256::random()));
3319        }
3320
3321        test_harness.to_tree_tx.send(FromEngine::DownloadedBlocks(blocks)).unwrap();
3322
3323        // process the message
3324        let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap();
3325        test_harness.tree.on_engine_message(msg).unwrap();
3326
3327        // we now should receive the other batch
3328        let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap();
3329        match msg {
3330            FromEngine::DownloadedBlocks(blocks) => {
3331                assert_eq!(blocks.len(), tree_config.max_execute_block_batch_size());
3332            }
3333            _ => panic!("unexpected message: {msg:#?}"),
3334        }
3335    }
3336
3337    #[tokio::test]
3338    async fn test_tree_persist_blocks() {
3339        let tree_config = TreeConfig::default();
3340        let chain_spec = MAINNET.clone();
3341        let mut test_block_builder = TestBlockBuilder::eth().with_chain_spec((*chain_spec).clone());
3342
3343        // we need more than tree_config.persistence_threshold() +1 blocks to
3344        // trigger the persistence task.
3345        let blocks: Vec<_> = test_block_builder
3346            .get_executed_blocks(1..tree_config.persistence_threshold() + 2)
3347            .collect();
3348        let test_harness = TestHarness::new(chain_spec).with_blocks(blocks.clone());
3349        std::thread::Builder::new()
3350            .name("Tree Task".to_string())
3351            .spawn(|| test_harness.tree.run())
3352            .unwrap();
3353
3354        // send a message to the tree to enter the main loop.
3355        test_harness.to_tree_tx.send(FromEngine::DownloadedBlocks(vec![])).unwrap();
3356
3357        let received_action =
3358            test_harness.action_rx.recv().expect("Failed to receive save blocks action");
3359        if let PersistenceAction::SaveBlocks(saved_blocks, _) = received_action {
3360            // only blocks.len() - tree_config.memory_block_buffer_target() will be
3361            // persisted
3362            let expected_persist_len =
3363                blocks.len() - tree_config.memory_block_buffer_target() as usize;
3364            assert_eq!(saved_blocks.len(), expected_persist_len);
3365            assert_eq!(saved_blocks, blocks[..expected_persist_len]);
3366        } else {
3367            panic!("unexpected action received {received_action:?}");
3368        }
3369    }
3370
3371    #[tokio::test]
3372    async fn test_in_memory_state_trait_impl() {
3373        let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(0..10).collect();
3374        let test_harness = TestHarness::new(MAINNET.clone()).with_blocks(blocks.clone());
3375
3376        for executed_block in blocks {
3377            let sealed_block = executed_block.recovered_block();
3378
3379            let expected_state = BlockState::new(executed_block.clone());
3380
3381            let actual_state_by_hash = test_harness
3382                .tree
3383                .canonical_in_memory_state
3384                .state_by_hash(sealed_block.hash())
3385                .unwrap();
3386            assert_eq!(expected_state, *actual_state_by_hash);
3387
3388            let actual_state_by_number = test_harness
3389                .tree
3390                .canonical_in_memory_state
3391                .state_by_number(sealed_block.number)
3392                .unwrap();
3393            assert_eq!(expected_state, *actual_state_by_number);
3394        }
3395    }
3396
3397    #[tokio::test]
3398    async fn test_engine_request_during_backfill() {
3399        let tree_config = TreeConfig::default();
3400        let blocks: Vec<_> = TestBlockBuilder::eth()
3401            .get_executed_blocks(0..tree_config.persistence_threshold())
3402            .collect();
3403        let mut test_harness = TestHarness::new(MAINNET.clone())
3404            .with_blocks(blocks)
3405            .with_backfill_state(BackfillSyncState::Active);
3406
3407        let (tx, rx) = oneshot::channel();
3408        test_harness
3409            .tree
3410            .on_engine_message(FromEngine::Request(
3411                BeaconEngineMessage::ForkchoiceUpdated {
3412                    state: ForkchoiceState {
3413                        head_block_hash: B256::random(),
3414                        safe_block_hash: B256::random(),
3415                        finalized_block_hash: B256::random(),
3416                    },
3417                    payload_attrs: None,
3418                    tx,
3419                    version: EngineApiMessageVersion::default(),
3420                }
3421                .into(),
3422            ))
3423            .unwrap();
3424
3425        let resp = rx.await.unwrap().unwrap().await.unwrap();
3426        assert!(resp.payload_status.is_syncing());
3427    }
3428
3429    #[test]
3430    fn test_disconnected_payload() {
3431        let s = include_str!("../../test-data/holesky/2.rlp");
3432        let data = Bytes::from_str(s).unwrap();
3433        let block = Block::decode(&mut data.as_ref()).unwrap();
3434        let sealed = block.seal_slow();
3435        let hash = sealed.hash();
3436        let payload = ExecutionPayloadV1::from_block_unchecked(hash, &sealed.clone().into_block());
3437
3438        let mut test_harness = TestHarness::new(HOLESKY.clone());
3439
3440        let outcome = test_harness
3441            .tree
3442            .on_new_payload(ExecutionData {
3443                payload: payload.into(),
3444                sidecar: ExecutionPayloadSidecar::none(),
3445            })
3446            .unwrap();
3447        assert!(outcome.outcome.is_syncing());
3448
3449        // ensure block is buffered
3450        let buffered = test_harness.tree.state.buffer.block(&hash).unwrap();
3451        assert_eq!(buffered.clone_sealed_block(), sealed);
3452    }
3453
3454    #[test]
3455    fn test_disconnected_block() {
3456        let s = include_str!("../../test-data/holesky/2.rlp");
3457        let data = Bytes::from_str(s).unwrap();
3458        let block = Block::decode(&mut data.as_ref()).unwrap();
3459        let sealed = block.seal_slow().try_recover().unwrap();
3460
3461        let mut test_harness = TestHarness::new(HOLESKY.clone());
3462
3463        let outcome = test_harness.tree.insert_block(sealed.clone()).unwrap();
3464        assert_eq!(
3465            outcome,
3466            InsertPayloadOk::Inserted(BlockStatus::Disconnected {
3467                head: test_harness.tree.state.tree_state.current_canonical_head,
3468                missing_ancestor: sealed.parent_num_hash()
3469            })
3470        );
3471    }
3472
3473    #[tokio::test]
3474    async fn test_holesky_payload() {
3475        let s = include_str!("../../test-data/holesky/1.rlp");
3476        let data = Bytes::from_str(s).unwrap();
3477        let block: Block = Block::decode(&mut data.as_ref()).unwrap();
3478        let sealed = block.seal_slow();
3479        let payload =
3480            ExecutionPayloadV1::from_block_unchecked(sealed.hash(), &sealed.clone().into_block());
3481
3482        let mut test_harness =
3483            TestHarness::new(HOLESKY.clone()).with_backfill_state(BackfillSyncState::Active);
3484
3485        let (tx, rx) = oneshot::channel();
3486        test_harness
3487            .tree
3488            .on_engine_message(FromEngine::Request(
3489                BeaconEngineMessage::NewPayload {
3490                    payload: ExecutionData {
3491                        payload: payload.clone().into(),
3492                        sidecar: ExecutionPayloadSidecar::none(),
3493                    },
3494                    tx,
3495                }
3496                .into(),
3497            ))
3498            .unwrap();
3499
3500        let resp = rx.await.unwrap().unwrap();
3501        assert!(resp.is_syncing());
3502    }
3503
3504    #[tokio::test]
3505    async fn test_tree_state_on_new_head_reorg() {
3506        reth_tracing::init_test_tracing();
3507        let chain_spec = MAINNET.clone();
3508
3509        // Set persistence_threshold to 1
3510        let mut test_harness = TestHarness::new(chain_spec);
3511        test_harness.tree.config = test_harness
3512            .tree
3513            .config
3514            .with_persistence_threshold(1)
3515            .with_memory_block_buffer_target(1);
3516        let mut test_block_builder = TestBlockBuilder::eth();
3517        let blocks: Vec<_> = test_block_builder.get_executed_blocks(1..6).collect();
3518
3519        for block in &blocks {
3520            test_harness.tree.state.tree_state.insert_executed(block.clone());
3521        }
3522
3523        // set block 3 as the current canonical head
3524        test_harness
3525            .tree
3526            .state
3527            .tree_state
3528            .set_canonical_head(blocks[2].recovered_block().num_hash());
3529
3530        // create a fork from block 2
3531        let fork_block_3 = test_block_builder
3532            .get_executed_block_with_number(3, blocks[1].recovered_block().hash());
3533        let fork_block_4 = test_block_builder
3534            .get_executed_block_with_number(4, fork_block_3.recovered_block().hash());
3535        let fork_block_5 = test_block_builder
3536            .get_executed_block_with_number(5, fork_block_4.recovered_block().hash());
3537
3538        test_harness.tree.state.tree_state.insert_executed(fork_block_3.clone());
3539        test_harness.tree.state.tree_state.insert_executed(fork_block_4.clone());
3540        test_harness.tree.state.tree_state.insert_executed(fork_block_5.clone());
3541
3542        // normal (non-reorg) case
3543        let result = test_harness.tree.on_new_head(blocks[4].recovered_block().hash()).unwrap();
3544        assert!(matches!(result, Some(NewCanonicalChain::Commit { .. })));
3545        if let Some(NewCanonicalChain::Commit { new }) = result {
3546            assert_eq!(new.len(), 2);
3547            assert_eq!(new[0].recovered_block().hash(), blocks[3].recovered_block().hash());
3548            assert_eq!(new[1].recovered_block().hash(), blocks[4].recovered_block().hash());
3549        }
3550
3551        // should be a None persistence action before we advance persistence
3552        let current_action = test_harness.tree.persistence_state.current_action();
3553        assert_eq!(current_action, None);
3554
3555        // let's attempt to persist and check that it attempts to save blocks
3556        //
3557        // since in-memory block buffer target and persistence_threshold are both 1, this should
3558        // save all but the current tip of the canonical chain (up to blocks[1])
3559        test_harness.tree.advance_persistence().unwrap();
3560        let current_action = test_harness.tree.persistence_state.current_action().cloned();
3561        assert_eq!(
3562            current_action,
3563            Some(CurrentPersistenceAction::SavingBlocks {
3564                highest: blocks[1].recovered_block().num_hash()
3565            })
3566        );
3567
3568        // get rid of the prev action
3569        let received_action = test_harness.action_rx.recv().unwrap();
3570        let PersistenceAction::SaveBlocks(saved_blocks, sender) = received_action else {
3571            panic!("received wrong action");
3572        };
3573        assert_eq!(saved_blocks, vec![blocks[0].clone(), blocks[1].clone()]);
3574
3575        // send the response so we can advance again
3576        sender.send(Some(blocks[1].recovered_block().num_hash())).unwrap();
3577
3578        // we should be persisting blocks[1] because we threw out the prev action
3579        let current_action = test_harness.tree.persistence_state.current_action().cloned();
3580        assert_eq!(
3581            current_action,
3582            Some(CurrentPersistenceAction::SavingBlocks {
3583                highest: blocks[1].recovered_block().num_hash()
3584            })
3585        );
3586
3587        // after advancing persistence, we should be at `None` for the next action
3588        test_harness.tree.advance_persistence().unwrap();
3589        let current_action = test_harness.tree.persistence_state.current_action().cloned();
3590        assert_eq!(current_action, None);
3591
3592        // reorg case
3593        let result = test_harness.tree.on_new_head(fork_block_5.recovered_block().hash()).unwrap();
3594        assert!(matches!(result, Some(NewCanonicalChain::Reorg { .. })));
3595
3596        if let Some(NewCanonicalChain::Reorg { new, old }) = result {
3597            assert_eq!(new.len(), 3);
3598            assert_eq!(new[0].recovered_block().hash(), fork_block_3.recovered_block().hash());
3599            assert_eq!(new[1].recovered_block().hash(), fork_block_4.recovered_block().hash());
3600            assert_eq!(new[2].recovered_block().hash(), fork_block_5.recovered_block().hash());
3601
3602            assert_eq!(old.len(), 1);
3603            assert_eq!(old[0].recovered_block().hash(), blocks[2].recovered_block().hash());
3604        }
3605
3606        // The canonical block has not changed, so we will not get any active persistence action
3607        test_harness.tree.advance_persistence().unwrap();
3608        let current_action = test_harness.tree.persistence_state.current_action().cloned();
3609        assert_eq!(current_action, None);
3610
3611        // Let's change the canonical head and advance persistence
3612        test_harness
3613            .tree
3614            .state
3615            .tree_state
3616            .set_canonical_head(fork_block_5.recovered_block().num_hash());
3617
3618        // The canonical block has changed now, we should get fork_block_4 due to the persistence
3619        // threshold and in memory block buffer target
3620        test_harness.tree.advance_persistence().unwrap();
3621        let current_action = test_harness.tree.persistence_state.current_action().cloned();
3622        assert_eq!(
3623            current_action,
3624            Some(CurrentPersistenceAction::SavingBlocks {
3625                highest: fork_block_4.recovered_block().num_hash()
3626            })
3627        );
3628    }
3629
3630    #[test]
3631    fn test_tree_state_on_new_head_deep_fork() {
3632        reth_tracing::init_test_tracing();
3633
3634        let chain_spec = MAINNET.clone();
3635        let mut test_harness = TestHarness::new(chain_spec);
3636        let mut test_block_builder = TestBlockBuilder::eth();
3637
3638        let blocks: Vec<_> = test_block_builder.get_executed_blocks(0..5).collect();
3639
3640        for block in &blocks {
3641            test_harness.tree.state.tree_state.insert_executed(block.clone());
3642        }
3643
3644        // set last block as the current canonical head
3645        let last_block = blocks.last().unwrap().recovered_block().clone();
3646
3647        test_harness.tree.state.tree_state.set_canonical_head(last_block.num_hash());
3648
3649        // create a fork chain from last_block
3650        let chain_a = test_block_builder.create_fork(&last_block, 10);
3651        let chain_b = test_block_builder.create_fork(&last_block, 10);
3652
3653        for block in &chain_a {
3654            test_harness.tree.state.tree_state.insert_executed(ExecutedBlockWithTrieUpdates {
3655                block: ExecutedBlock {
3656                    recovered_block: Arc::new(block.clone()),
3657                    execution_output: Arc::new(ExecutionOutcome::default()),
3658                    hashed_state: Arc::new(HashedPostState::default()),
3659                },
3660                trie: Arc::new(TrieUpdates::default()),
3661            });
3662        }
3663        test_harness.tree.state.tree_state.set_canonical_head(chain_a.last().unwrap().num_hash());
3664
3665        for block in &chain_b {
3666            test_harness.tree.state.tree_state.insert_executed(ExecutedBlockWithTrieUpdates {
3667                block: ExecutedBlock {
3668                    recovered_block: Arc::new(block.clone()),
3669                    execution_output: Arc::new(ExecutionOutcome::default()),
3670                    hashed_state: Arc::new(HashedPostState::default()),
3671                },
3672                trie: Arc::new(TrieUpdates::default()),
3673            });
3674        }
3675
3676        // for each block in chain_b, reorg to it and then back to canonical
3677        let mut expected_new = Vec::new();
3678        for block in &chain_b {
3679            // reorg to chain from block b
3680            let result = test_harness.tree.on_new_head(block.hash()).unwrap();
3681            assert_matches!(result, Some(NewCanonicalChain::Reorg { .. }));
3682
3683            expected_new.push(block);
3684            if let Some(NewCanonicalChain::Reorg { new, old }) = result {
3685                assert_eq!(new.len(), expected_new.len());
3686                for (index, block) in expected_new.iter().enumerate() {
3687                    assert_eq!(new[index].recovered_block().hash(), block.hash());
3688                }
3689
3690                assert_eq!(old.len(), chain_a.len());
3691                for (index, block) in chain_a.iter().enumerate() {
3692                    assert_eq!(old[index].recovered_block().hash(), block.hash());
3693                }
3694            }
3695
3696            // set last block of chain a as canonical head
3697            test_harness.tree.on_new_head(chain_a.last().unwrap().hash()).unwrap();
3698        }
3699    }
3700
3701    #[tokio::test]
3702    async fn test_get_canonical_blocks_to_persist() {
3703        let chain_spec = MAINNET.clone();
3704        let mut test_harness = TestHarness::new(chain_spec);
3705        let mut test_block_builder = TestBlockBuilder::eth();
3706
3707        let canonical_head_number = 9;
3708        let blocks: Vec<_> =
3709            test_block_builder.get_executed_blocks(0..canonical_head_number + 1).collect();
3710        test_harness = test_harness.with_blocks(blocks.clone());
3711
3712        let last_persisted_block_number = 3;
3713        test_harness.tree.persistence_state.last_persisted_block =
3714            blocks[last_persisted_block_number as usize].recovered_block.num_hash();
3715
3716        let persistence_threshold = 4;
3717        let memory_block_buffer_target = 3;
3718        test_harness.tree.config = TreeConfig::default()
3719            .with_persistence_threshold(persistence_threshold)
3720            .with_memory_block_buffer_target(memory_block_buffer_target);
3721
3722        let blocks_to_persist = test_harness.tree.get_canonical_blocks_to_persist();
3723
3724        let expected_blocks_to_persist_length: usize =
3725            (canonical_head_number - memory_block_buffer_target - last_persisted_block_number)
3726                .try_into()
3727                .unwrap();
3728
3729        assert_eq!(blocks_to_persist.len(), expected_blocks_to_persist_length);
3730        for (i, item) in
3731            blocks_to_persist.iter().enumerate().take(expected_blocks_to_persist_length)
3732        {
3733            assert_eq!(item.recovered_block().number, last_persisted_block_number + i as u64 + 1);
3734        }
3735
3736        // make sure only canonical blocks are included
3737        let fork_block = test_block_builder.get_executed_block_with_number(4, B256::random());
3738        let fork_block_hash = fork_block.recovered_block().hash();
3739        test_harness.tree.state.tree_state.insert_executed(fork_block);
3740
3741        assert!(test_harness.tree.state.tree_state.block_by_hash(fork_block_hash).is_some());
3742
3743        let blocks_to_persist = test_harness.tree.get_canonical_blocks_to_persist();
3744        assert_eq!(blocks_to_persist.len(), expected_blocks_to_persist_length);
3745
3746        // check that the fork block is not included in the blocks to persist
3747        assert!(!blocks_to_persist.iter().any(|b| b.recovered_block().hash() == fork_block_hash));
3748
3749        // check that the original block 4 is still included
3750        assert!(blocks_to_persist.iter().any(|b| b.recovered_block().number == 4 &&
3751            b.recovered_block().hash() == blocks[4].recovered_block().hash()));
3752
3753        // check that if we advance persistence, the persistence action is the correct value
3754        test_harness.tree.advance_persistence().expect("advancing persistence should succeed");
3755        assert_eq!(
3756            test_harness.tree.persistence_state.current_action().cloned(),
3757            Some(CurrentPersistenceAction::SavingBlocks {
3758                highest: blocks_to_persist.last().unwrap().recovered_block().num_hash()
3759            })
3760        );
3761    }
3762
3763    #[tokio::test]
3764    async fn test_engine_tree_fcu_missing_head() {
3765        let chain_spec = MAINNET.clone();
3766        let mut test_harness = TestHarness::new(chain_spec.clone());
3767
3768        let mut test_block_builder = TestBlockBuilder::eth().with_chain_spec((*chain_spec).clone());
3769
3770        let blocks: Vec<_> = test_block_builder.get_executed_blocks(0..5).collect();
3771        test_harness = test_harness.with_blocks(blocks);
3772
3773        let missing_block = test_block_builder
3774            .generate_random_block(6, test_harness.blocks.last().unwrap().recovered_block().hash());
3775
3776        test_harness.fcu_to(missing_block.hash(), PayloadStatusEnum::Syncing).await;
3777
3778        // after FCU we receive an EngineApiEvent::Download event to get the missing block.
3779        let event = test_harness.from_tree_rx.recv().await.unwrap();
3780        match event {
3781            EngineApiEvent::Download(DownloadRequest::BlockSet(actual_block_set)) => {
3782                let expected_block_set = HashSet::from_iter([missing_block.hash()]);
3783                assert_eq!(actual_block_set, expected_block_set);
3784            }
3785            _ => panic!("Unexpected event: {event:#?}"),
3786        }
3787    }
3788
3789    #[tokio::test]
3790    async fn test_engine_tree_fcu_canon_chain_insertion() {
3791        let chain_spec = MAINNET.clone();
3792        let mut test_harness = TestHarness::new(chain_spec.clone());
3793
3794        let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
3795        test_harness = test_harness.with_blocks(base_chain.clone());
3796
3797        test_harness
3798            .fcu_to(base_chain.last().unwrap().recovered_block().hash(), ForkchoiceStatus::Valid)
3799            .await;
3800
3801        // extend main chain
3802        let main_chain = test_harness.block_builder.create_fork(base_chain[0].recovered_block(), 3);
3803
3804        test_harness.insert_chain(main_chain).await;
3805    }
3806
3807    #[tokio::test]
3808    async fn test_engine_tree_fcu_reorg_with_all_blocks() {
3809        let chain_spec = MAINNET.clone();
3810        let mut test_harness = TestHarness::new(chain_spec.clone());
3811
3812        let main_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..5).collect();
3813        test_harness = test_harness.with_blocks(main_chain.clone());
3814
3815        let fork_chain = test_harness.block_builder.create_fork(main_chain[2].recovered_block(), 3);
3816        let fork_chain_last_hash = fork_chain.last().unwrap().hash();
3817
3818        // add fork blocks to the tree
3819        for block in &fork_chain {
3820            test_harness.insert_block(block.clone()).unwrap();
3821        }
3822
3823        test_harness.send_fcu(fork_chain_last_hash, ForkchoiceStatus::Valid).await;
3824
3825        // check for ForkBlockAdded events, we expect fork_chain.len() blocks added
3826        test_harness.check_fork_chain_insertion(fork_chain.clone()).await;
3827
3828        // check for CanonicalChainCommitted event
3829        test_harness.check_canon_commit(fork_chain_last_hash).await;
3830
3831        test_harness.check_fcu(fork_chain_last_hash, ForkchoiceStatus::Valid).await;
3832
3833        // new head is the tip of the fork chain
3834        test_harness.check_canon_head(fork_chain_last_hash);
3835    }
3836
3837    #[tokio::test]
3838    async fn test_engine_tree_live_sync_transition_required_blocks_requested() {
3839        reth_tracing::init_test_tracing();
3840
3841        let chain_spec = MAINNET.clone();
3842        let mut test_harness = TestHarness::new(chain_spec.clone());
3843
3844        let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
3845        test_harness = test_harness.with_blocks(base_chain.clone());
3846
3847        test_harness
3848            .fcu_to(base_chain.last().unwrap().recovered_block().hash(), ForkchoiceStatus::Valid)
3849            .await;
3850
3851        // extend main chain with enough blocks to trigger pipeline run but don't insert them
3852        let main_chain = test_harness
3853            .block_builder
3854            .create_fork(base_chain[0].recovered_block(), MIN_BLOCKS_FOR_PIPELINE_RUN + 10);
3855
3856        let main_chain_last_hash = main_chain.last().unwrap().hash();
3857        test_harness.send_fcu(main_chain_last_hash, ForkchoiceStatus::Syncing).await;
3858
3859        test_harness.check_fcu(main_chain_last_hash, ForkchoiceStatus::Syncing).await;
3860
3861        // create event for backfill finished
3862        let backfill_finished_block_number = MIN_BLOCKS_FOR_PIPELINE_RUN + 1;
3863        let backfill_finished = FromOrchestrator::BackfillSyncFinished(ControlFlow::Continue {
3864            block_number: backfill_finished_block_number,
3865        });
3866
3867        let backfill_tip_block = main_chain[(backfill_finished_block_number - 1) as usize].clone();
3868        // add block to mock provider to enable persistence clean up.
3869        test_harness.provider.add_block(backfill_tip_block.hash(), backfill_tip_block.into_block());
3870        test_harness.tree.on_engine_message(FromEngine::Event(backfill_finished)).unwrap();
3871
3872        let event = test_harness.from_tree_rx.recv().await.unwrap();
3873        match event {
3874            EngineApiEvent::Download(DownloadRequest::BlockSet(hash_set)) => {
3875                assert_eq!(hash_set, HashSet::from_iter([main_chain_last_hash]));
3876            }
3877            _ => panic!("Unexpected event: {event:#?}"),
3878        }
3879
3880        test_harness
3881            .tree
3882            .on_engine_message(FromEngine::DownloadedBlocks(vec![main_chain
3883                .last()
3884                .unwrap()
3885                .clone()]))
3886            .unwrap();
3887
3888        let event = test_harness.from_tree_rx.recv().await.unwrap();
3889        match event {
3890            EngineApiEvent::Download(DownloadRequest::BlockRange(initial_hash, total_blocks)) => {
3891                assert_eq!(
3892                    total_blocks,
3893                    (main_chain.len() - backfill_finished_block_number as usize - 1) as u64
3894                );
3895                assert_eq!(initial_hash, main_chain.last().unwrap().parent_hash);
3896            }
3897            _ => panic!("Unexpected event: {event:#?}"),
3898        }
3899    }
3900
3901    #[tokio::test]
3902    async fn test_engine_tree_live_sync_transition_eventually_canonical() {
3903        reth_tracing::init_test_tracing();
3904
3905        let chain_spec = MAINNET.clone();
3906        let mut test_harness = TestHarness::new(chain_spec.clone());
3907        test_harness.tree.config = test_harness.tree.config.with_max_execute_block_batch_size(100);
3908
3909        // create base chain and setup test harness with it
3910        let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
3911        test_harness = test_harness.with_blocks(base_chain.clone());
3912
3913        // fcu to the tip of base chain
3914        test_harness
3915            .fcu_to(base_chain.last().unwrap().recovered_block().hash(), ForkchoiceStatus::Valid)
3916            .await;
3917
3918        // create main chain, extension of base chain, with enough blocks to
3919        // trigger backfill sync
3920        let main_chain = test_harness
3921            .block_builder
3922            .create_fork(base_chain[0].recovered_block(), MIN_BLOCKS_FOR_PIPELINE_RUN + 10);
3923
3924        let main_chain_last = main_chain.last().unwrap();
3925        let main_chain_last_hash = main_chain_last.hash();
3926        let main_chain_backfill_target =
3927            main_chain.get(MIN_BLOCKS_FOR_PIPELINE_RUN as usize).unwrap();
3928        let main_chain_backfill_target_hash = main_chain_backfill_target.hash();
3929
3930        // fcu to the element of main chain that should trigger backfill sync
3931        test_harness.send_fcu(main_chain_backfill_target_hash, ForkchoiceStatus::Syncing).await;
3932        test_harness.check_fcu(main_chain_backfill_target_hash, ForkchoiceStatus::Syncing).await;
3933
3934        // check download request for target
3935        let event = test_harness.from_tree_rx.recv().await.unwrap();
3936        match event {
3937            EngineApiEvent::Download(DownloadRequest::BlockSet(hash_set)) => {
3938                assert_eq!(hash_set, HashSet::from_iter([main_chain_backfill_target_hash]));
3939            }
3940            _ => panic!("Unexpected event: {event:#?}"),
3941        }
3942
3943        // send message to tell the engine the requested block was downloaded
3944        test_harness
3945            .tree
3946            .on_engine_message(FromEngine::DownloadedBlocks(vec![
3947                main_chain_backfill_target.clone()
3948            ]))
3949            .unwrap();
3950
3951        // check that backfill is triggered
3952        let event = test_harness.from_tree_rx.recv().await.unwrap();
3953        match event {
3954            EngineApiEvent::BackfillAction(BackfillAction::Start(
3955                reth_stages::PipelineTarget::Sync(target_hash),
3956            )) => {
3957                assert_eq!(target_hash, main_chain_backfill_target_hash);
3958            }
3959            _ => panic!("Unexpected event: {event:#?}"),
3960        }
3961
3962        // persist blocks of main chain, same as the backfill operation would do
3963        let backfilled_chain: Vec<_> =
3964            main_chain.clone().drain(0..(MIN_BLOCKS_FOR_PIPELINE_RUN + 1) as usize).collect();
3965        test_harness.persist_blocks(backfilled_chain.clone());
3966
3967        test_harness.setup_range_insertion_for_valid_chain(backfilled_chain);
3968
3969        // send message to mark backfill finished
3970        test_harness
3971            .tree
3972            .on_engine_message(FromEngine::Event(FromOrchestrator::BackfillSyncFinished(
3973                ControlFlow::Continue { block_number: main_chain_backfill_target.number },
3974            )))
3975            .unwrap();
3976
3977        // send fcu to the tip of main
3978        test_harness.fcu_to(main_chain_last_hash, ForkchoiceStatus::Syncing).await;
3979
3980        let event = test_harness.from_tree_rx.recv().await.unwrap();
3981        match event {
3982            EngineApiEvent::Download(DownloadRequest::BlockSet(target_hash)) => {
3983                assert_eq!(target_hash, HashSet::from_iter([main_chain_last_hash]));
3984            }
3985            _ => panic!("Unexpected event: {event:#?}"),
3986        }
3987
3988        // tell engine main chain tip downloaded
3989        test_harness
3990            .tree
3991            .on_engine_message(FromEngine::DownloadedBlocks(vec![main_chain_last.clone()]))
3992            .unwrap();
3993
3994        // check download range request
3995        let event = test_harness.from_tree_rx.recv().await.unwrap();
3996        match event {
3997            EngineApiEvent::Download(DownloadRequest::BlockRange(initial_hash, total_blocks)) => {
3998                assert_eq!(
3999                    total_blocks,
4000                    (main_chain.len() - MIN_BLOCKS_FOR_PIPELINE_RUN as usize - 2) as u64
4001                );
4002                assert_eq!(initial_hash, main_chain_last.parent_hash);
4003            }
4004            _ => panic!("Unexpected event: {event:#?}"),
4005        }
4006
4007        let remaining: Vec<_> = main_chain
4008            .clone()
4009            .drain((MIN_BLOCKS_FOR_PIPELINE_RUN + 1) as usize..main_chain.len())
4010            .collect();
4011
4012        test_harness.setup_range_insertion_for_valid_chain(remaining.clone());
4013
4014        // tell engine block range downloaded
4015        test_harness
4016            .tree
4017            .on_engine_message(FromEngine::DownloadedBlocks(remaining.clone()))
4018            .unwrap();
4019
4020        test_harness.check_canon_chain_insertion(remaining).await;
4021
4022        // check canonical chain committed event with the hash of the latest block
4023        test_harness.check_canon_commit(main_chain_last_hash).await;
4024
4025        // new head is the tip of the main chain
4026        test_harness.check_canon_head(main_chain_last_hash);
4027    }
4028
4029    #[tokio::test]
4030    async fn test_engine_tree_live_sync_fcu_extends_canon_chain() {
4031        reth_tracing::init_test_tracing();
4032
4033        let chain_spec = MAINNET.clone();
4034        let mut test_harness = TestHarness::new(chain_spec.clone());
4035
4036        // create base chain and setup test harness with it
4037        let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4038        test_harness = test_harness.with_blocks(base_chain.clone());
4039
4040        // fcu to the tip of base chain
4041        test_harness
4042            .fcu_to(base_chain.last().unwrap().recovered_block().hash(), ForkchoiceStatus::Valid)
4043            .await;
4044
4045        // create main chain, extension of base chain
4046        let main_chain =
4047            test_harness.block_builder.create_fork(base_chain[0].recovered_block(), 10);
4048        // determine target in the middle of main hain
4049        let target = main_chain.get(5).unwrap();
4050        let target_hash = target.hash();
4051        let main_last = main_chain.last().unwrap();
4052        let main_last_hash = main_last.hash();
4053
4054        // insert main chain
4055        test_harness.insert_chain(main_chain).await;
4056
4057        // send fcu to target
4058        test_harness.send_fcu(target_hash, ForkchoiceStatus::Valid).await;
4059
4060        test_harness.check_canon_commit(target_hash).await;
4061        test_harness.check_fcu(target_hash, ForkchoiceStatus::Valid).await;
4062
4063        // send fcu to main tip
4064        test_harness.send_fcu(main_last_hash, ForkchoiceStatus::Valid).await;
4065
4066        test_harness.check_canon_commit(main_last_hash).await;
4067        test_harness.check_fcu(main_last_hash, ForkchoiceStatus::Valid).await;
4068        test_harness.check_canon_head(main_last_hash);
4069    }
4070
4071    #[tokio::test]
4072    async fn test_engine_tree_valid_forks_with_older_canonical_head() {
4073        reth_tracing::init_test_tracing();
4074
4075        let chain_spec = MAINNET.clone();
4076        let mut test_harness = TestHarness::new(chain_spec.clone());
4077
4078        // create base chain and setup test harness with it
4079        let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4080        test_harness = test_harness.with_blocks(base_chain.clone());
4081
4082        let old_head = base_chain.first().unwrap().recovered_block();
4083
4084        // extend base chain
4085        let extension_chain = test_harness.block_builder.create_fork(old_head, 5);
4086        let fork_block = extension_chain.last().unwrap().clone_sealed_block();
4087
4088        test_harness.setup_range_insertion_for_valid_chain(extension_chain.clone());
4089        test_harness.insert_chain(extension_chain).await;
4090
4091        // fcu to old_head
4092        test_harness.fcu_to(old_head.hash(), ForkchoiceStatus::Valid).await;
4093
4094        // create two competing chains starting from fork_block
4095        let chain_a = test_harness.block_builder.create_fork(&fork_block, 10);
4096        let chain_b = test_harness.block_builder.create_fork(&fork_block, 10);
4097
4098        // insert chain A blocks using newPayload
4099        test_harness.setup_range_insertion_for_valid_chain(chain_a.clone());
4100        for block in &chain_a {
4101            test_harness.send_new_payload(block.clone()).await;
4102        }
4103
4104        test_harness.check_canon_chain_insertion(chain_a.clone()).await;
4105
4106        // insert chain B blocks using newPayload
4107        test_harness.setup_range_insertion_for_valid_chain(chain_b.clone());
4108        for block in &chain_b {
4109            test_harness.send_new_payload(block.clone()).await;
4110        }
4111
4112        test_harness.check_canon_chain_insertion(chain_b.clone()).await;
4113
4114        // send FCU to make the tip of chain B the new head
4115        let chain_b_tip_hash = chain_b.last().unwrap().hash();
4116        test_harness.send_fcu(chain_b_tip_hash, ForkchoiceStatus::Valid).await;
4117
4118        // check for CanonicalChainCommitted event
4119        test_harness.check_canon_commit(chain_b_tip_hash).await;
4120
4121        // verify FCU was processed
4122        test_harness.check_fcu(chain_b_tip_hash, ForkchoiceStatus::Valid).await;
4123
4124        // verify the new canonical head
4125        test_harness.check_canon_head(chain_b_tip_hash);
4126
4127        // verify that chain A is now considered a fork
4128        assert!(test_harness.tree.is_fork(chain_a.last().unwrap().hash()).unwrap());
4129    }
4130
4131    #[tokio::test]
4132    async fn test_engine_tree_buffered_blocks_are_eventually_connected() {
4133        let chain_spec = MAINNET.clone();
4134        let mut test_harness = TestHarness::new(chain_spec.clone());
4135
4136        let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4137        test_harness = test_harness.with_blocks(base_chain.clone());
4138
4139        // side chain consisting of two blocks, the last will be inserted first
4140        // so that we force it to be buffered
4141        let side_chain =
4142            test_harness.block_builder.create_fork(base_chain.last().unwrap().recovered_block(), 2);
4143
4144        // buffer last block of side chain
4145        let buffered_block = side_chain.last().unwrap();
4146        let buffered_block_hash = buffered_block.hash();
4147
4148        test_harness.setup_range_insertion_for_valid_chain(vec![buffered_block.clone()]);
4149        test_harness.send_new_payload(buffered_block.clone()).await;
4150
4151        assert!(test_harness.tree.state.buffer.block(&buffered_block_hash).is_some());
4152
4153        let non_buffered_block = side_chain.first().unwrap();
4154        let non_buffered_block_hash = non_buffered_block.hash();
4155
4156        // insert block that continues the canon chain, should not be buffered
4157        test_harness.setup_range_insertion_for_valid_chain(vec![non_buffered_block.clone()]);
4158        test_harness.send_new_payload(non_buffered_block.clone()).await;
4159        assert!(test_harness.tree.state.buffer.block(&non_buffered_block_hash).is_none());
4160
4161        // the previously buffered block should be connected now
4162        assert!(test_harness.tree.state.buffer.block(&buffered_block_hash).is_none());
4163
4164        // both blocks are added to the canon chain in order
4165        test_harness.check_canon_block_added(non_buffered_block_hash).await;
4166        test_harness.check_canon_block_added(buffered_block_hash).await;
4167    }
4168
4169    #[tokio::test]
4170    async fn test_engine_tree_valid_and_invalid_forks_with_older_canonical_head() {
4171        reth_tracing::init_test_tracing();
4172
4173        let chain_spec = MAINNET.clone();
4174        let mut test_harness = TestHarness::new(chain_spec.clone());
4175
4176        // create base chain and setup test harness with it
4177        let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4178        test_harness = test_harness.with_blocks(base_chain.clone());
4179
4180        let old_head = base_chain.first().unwrap().recovered_block();
4181
4182        // extend base chain
4183        let extension_chain = test_harness.block_builder.create_fork(old_head, 5);
4184        let fork_block = extension_chain.last().unwrap().clone_sealed_block();
4185        test_harness.insert_chain(extension_chain).await;
4186
4187        // fcu to old_head
4188        test_harness.fcu_to(old_head.hash(), ForkchoiceStatus::Valid).await;
4189
4190        // create two competing chains starting from fork_block, one of them invalid
4191        let total_fork_elements = 10;
4192        let chain_a = test_harness.block_builder.create_fork(&fork_block, total_fork_elements);
4193        let chain_b = test_harness.block_builder.create_fork(&fork_block, total_fork_elements);
4194
4195        // insert chain B blocks using newPayload
4196        test_harness.setup_range_insertion_for_valid_chain(chain_b.clone());
4197        for block in &chain_b {
4198            test_harness.send_new_payload(block.clone()).await;
4199            test_harness.send_fcu(block.hash(), ForkchoiceStatus::Valid).await;
4200            test_harness.check_canon_block_added(block.hash()).await;
4201            test_harness.check_canon_commit(block.hash()).await;
4202            test_harness.check_fcu(block.hash(), ForkchoiceStatus::Valid).await;
4203        }
4204
4205        // insert chain A blocks using newPayload, one of the blocks will be invalid
4206        let invalid_index = 3;
4207        test_harness.setup_range_insertion_for_invalid_chain(chain_a.clone(), invalid_index);
4208        for block in &chain_a {
4209            test_harness.send_new_payload(block.clone()).await;
4210        }
4211
4212        // check canon chain insertion up to the invalid index and taking into
4213        // account reversed ordering
4214        test_harness
4215            .check_fork_chain_insertion(
4216                chain_a[..chain_a.len() - invalid_index - 1].iter().cloned(),
4217            )
4218            .await;
4219        for block in &chain_a[chain_a.len() - invalid_index - 1..] {
4220            test_harness.check_invalid_block(block.hash()).await;
4221        }
4222
4223        // send FCU to make the tip of chain A, expect invalid
4224        let chain_a_tip_hash = chain_a.last().unwrap().hash();
4225        test_harness.fcu_to(chain_a_tip_hash, ForkchoiceStatus::Invalid).await;
4226
4227        // send FCU to make the tip of chain B the new head
4228        let chain_b_tip_hash = chain_b.last().unwrap().hash();
4229
4230        // verify the new canonical head
4231        test_harness.check_canon_head(chain_b_tip_hash);
4232
4233        // verify the canonical head didn't change
4234        test_harness.check_canon_head(chain_b_tip_hash);
4235    }
4236
4237    #[tokio::test]
4238    async fn test_engine_tree_reorg_with_missing_ancestor_expecting_valid() {
4239        reth_tracing::init_test_tracing();
4240        let chain_spec = MAINNET.clone();
4241        let mut test_harness = TestHarness::new(chain_spec.clone());
4242
4243        let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..6).collect();
4244        test_harness = test_harness.with_blocks(base_chain.clone());
4245
4246        // create a side chain with an invalid block
4247        let side_chain = test_harness
4248            .block_builder
4249            .create_fork(base_chain.last().unwrap().recovered_block(), 15);
4250        let invalid_index = 9;
4251
4252        test_harness.setup_range_insertion_for_invalid_chain(side_chain.clone(), invalid_index);
4253
4254        for (index, block) in side_chain.iter().enumerate() {
4255            test_harness.send_new_payload(block.clone()).await;
4256
4257            if index < side_chain.len() - invalid_index - 1 {
4258                test_harness.send_fcu(block.hash(), ForkchoiceStatus::Valid).await;
4259            }
4260        }
4261
4262        // Try to do a forkchoice update to a block after the invalid one
4263        let fork_tip_hash = side_chain.last().unwrap().hash();
4264        test_harness.send_fcu(fork_tip_hash, ForkchoiceStatus::Invalid).await;
4265    }
4266}