reth_engine_tree/tree/
mod.rs

1use crate::{
2    backfill::{BackfillAction, BackfillSyncState},
3    backup::{BackupAction, BackupHandle},
4    chain::FromOrchestrator,
5    engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
6    persistence::PersistenceHandle,
7    tree::metrics::EngineApiMetrics,
8};
9use alloy_consensus::BlockHeader;
10use alloy_eips::BlockNumHash;
11use alloy_primitives::{
12    map::{HashMap, HashSet},
13    BlockNumber, B256, U256,
14};
15use alloy_rpc_types_engine::{
16    ExecutionPayload, ExecutionPayloadSidecar, ForkchoiceState, PayloadStatus, PayloadStatusEnum,
17    PayloadValidationError,
18};
19use reth_beacon_consensus::{
20    BeaconConsensusEngineEvent, InvalidHeaderCache, MIN_BLOCKS_FOR_PIPELINE_RUN,
21};
22use reth_blockchain_tree::{
23    error::{InsertBlockErrorKindTwo, InsertBlockErrorTwo, InsertBlockFatalError},
24    BlockBuffer, BlockStatus2, InsertPayloadOk2,
25};
26use reth_chain_state::{
27    CanonicalInMemoryState, ExecutedBlock, MemoryOverlayStateProvider, NewCanonicalChain,
28};
29use reth_consensus::{Consensus, FullConsensus, PostExecutionInput};
30use reth_engine_primitives::{
31    BeaconEngineMessage, BeaconOnNewPayloadError, EngineApiMessageVersion, EngineTypes,
32    EngineValidator, ForkchoiceStateTracker, OnForkChoiceUpdated,
33};
34use reth_errors::{ConsensusError, ProviderResult};
35use reth_evm::execute::BlockExecutorProvider;
36use reth_payload_builder::PayloadBuilderHandle;
37use reth_payload_builder_primitives::PayloadBuilder;
38use reth_payload_primitives::PayloadBuilderAttributes;
39use reth_primitives::{
40    EthPrimitives, GotExpected, NodePrimitives, SealedBlockFor, SealedBlockWithSenders,
41    SealedHeader,
42};
43use reth_primitives_traits::Block;
44use reth_provider::{
45    providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, ExecutionOutcome,
46    HashedPostStateProvider, ProviderError, StateCommitmentProvider, StateProviderBox,
47    StateProviderFactory, StateReader, StateRootProvider, TransactionVariant,
48};
49use reth_revm::database::StateProviderDatabase;
50use reth_stages_api::ControlFlow;
51use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput};
52use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
53use revm_primitives::EvmState;
54use std::{
55    cmp::Ordering,
56    collections::{btree_map, hash_map, BTreeMap, VecDeque},
57    fmt::Debug,
58    ops::Bound,
59    sync::{
60        mpsc::{Receiver, RecvError, RecvTimeoutError, Sender},
61        Arc,
62    },
63    time::Instant,
64};
65use tokio::sync::{
66    mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
67    oneshot::{self, error::TryRecvError},
68};
69use tracing::*;
70
71pub mod config;
72mod invalid_block_hook;
73mod metrics;
74mod persistence_state;
75pub use config::TreeConfig;
76pub use invalid_block_hook::{InvalidBlockHooks, NoopInvalidBlockHook};
77pub use persistence_state::PersistenceState;
78pub use reth_engine_primitives::InvalidBlockHook;
79
80pub mod root;
81
82/// Keeps track of the state of the tree.
83///
84/// ## Invariants
85///
86/// - This only stores blocks that are connected to the canonical chain.
87/// - All executed blocks are valid and have been executed.
88#[derive(Debug, Default)]
89pub struct TreeState<N: NodePrimitives = EthPrimitives> {
90    /// __All__ unique executed blocks by block hash that are connected to the canonical chain.
91    ///
92    /// This includes blocks of all forks.
93    blocks_by_hash: HashMap<B256, ExecutedBlock<N>>,
94    /// Executed blocks grouped by their respective block number.
95    ///
96    /// This maps unique block number to all known blocks for that height.
97    ///
98    /// Note: there can be multiple blocks at the same height due to forks.
99    blocks_by_number: BTreeMap<BlockNumber, Vec<ExecutedBlock<N>>>,
100    /// Map of any parent block hash to its children.
101    parent_to_child: HashMap<B256, HashSet<B256>>,
102    /// Map of hash to trie updates for canonical blocks that are persisted but not finalized.
103    ///
104    /// Contains the block number for easy removal.
105    persisted_trie_updates: HashMap<B256, (BlockNumber, Arc<TrieUpdates>)>,
106    /// Currently tracked canonical head of the chain.
107    current_canonical_head: BlockNumHash,
108}
109
110impl<N: NodePrimitives> TreeState<N> {
111    /// Returns a new, empty tree state that points to the given canonical head.
112    fn new(current_canonical_head: BlockNumHash) -> Self {
113        Self {
114            blocks_by_hash: HashMap::default(),
115            blocks_by_number: BTreeMap::new(),
116            current_canonical_head,
117            parent_to_child: HashMap::default(),
118            persisted_trie_updates: HashMap::default(),
119        }
120    }
121
122    /// Returns the number of executed blocks stored.
123    fn block_count(&self) -> usize {
124        self.blocks_by_hash.len()
125    }
126
127    /// Returns the [`ExecutedBlock`] by hash.
128    fn executed_block_by_hash(&self, hash: B256) -> Option<&ExecutedBlock<N>> {
129        self.blocks_by_hash.get(&hash)
130    }
131
132    /// Returns the block by hash.
133    fn block_by_hash(&self, hash: B256) -> Option<Arc<SealedBlockFor<N::Block>>> {
134        self.blocks_by_hash.get(&hash).map(|b| b.block.clone())
135    }
136
137    /// Returns all available blocks for the given hash that lead back to the canonical chain, from
138    /// newest to oldest. And the parent hash of the oldest block that is missing from the buffer.
139    ///
140    /// Returns `None` if the block for the given hash is not found.
141    fn blocks_by_hash(&self, hash: B256) -> Option<(B256, Vec<ExecutedBlock<N>>)> {
142        let block = self.blocks_by_hash.get(&hash).cloned()?;
143        let mut parent_hash = block.block().parent_hash();
144        let mut blocks = vec![block];
145        while let Some(executed) = self.blocks_by_hash.get(&parent_hash) {
146            parent_hash = executed.block.parent_hash();
147            blocks.push(executed.clone());
148        }
149
150        Some((parent_hash, blocks))
151    }
152
153    /// Insert executed block into the state.
154    fn insert_executed(&mut self, executed: ExecutedBlock<N>) {
155        let hash = executed.block.hash();
156        let parent_hash = executed.block.parent_hash();
157        let block_number = executed.block.number();
158
159        if self.blocks_by_hash.contains_key(&hash) {
160            return;
161        }
162
163        self.blocks_by_hash.insert(hash, executed.clone());
164
165        self.blocks_by_number.entry(block_number).or_default().push(executed);
166
167        self.parent_to_child.entry(parent_hash).or_default().insert(hash);
168
169        if let Some(existing_blocks) = self.blocks_by_number.get(&block_number) {
170            if existing_blocks.len() > 1 {
171                self.parent_to_child.entry(parent_hash).or_default().insert(hash);
172            }
173        }
174
175        for children in self.parent_to_child.values_mut() {
176            children.retain(|child| self.blocks_by_hash.contains_key(child));
177        }
178    }
179
180    /// Remove single executed block by its hash.
181    ///
182    /// ## Returns
183    ///
184    /// The removed block and the block hashes of its children.
185    fn remove_by_hash(&mut self, hash: B256) -> Option<(ExecutedBlock<N>, HashSet<B256>)> {
186        let executed = self.blocks_by_hash.remove(&hash)?;
187
188        // Remove this block from collection of children of its parent block.
189        let parent_entry = self.parent_to_child.entry(executed.block.parent_hash());
190        if let hash_map::Entry::Occupied(mut entry) = parent_entry {
191            entry.get_mut().remove(&hash);
192
193            if entry.get().is_empty() {
194                entry.remove();
195            }
196        }
197
198        // Remove point to children of this block.
199        let children = self.parent_to_child.remove(&hash).unwrap_or_default();
200
201        // Remove this block from `blocks_by_number`.
202        let block_number_entry = self.blocks_by_number.entry(executed.block.number());
203        if let btree_map::Entry::Occupied(mut entry) = block_number_entry {
204            // We have to find the index of the block since it exists in a vec
205            if let Some(index) = entry.get().iter().position(|b| b.block.hash() == hash) {
206                entry.get_mut().swap_remove(index);
207
208                // If there are no blocks left then remove the entry for this block
209                if entry.get().is_empty() {
210                    entry.remove();
211                }
212            }
213        }
214
215        Some((executed, children))
216    }
217
218    /// Returns whether or not the hash is part of the canonical chain.
219    pub(crate) fn is_canonical(&self, hash: B256) -> bool {
220        let mut current_block = self.current_canonical_head.hash;
221        if current_block == hash {
222            return true
223        }
224
225        while let Some(executed) = self.blocks_by_hash.get(&current_block) {
226            current_block = executed.block.parent_hash();
227            if current_block == hash {
228                return true
229            }
230        }
231
232        false
233    }
234
235    /// Removes canonical blocks below the upper bound, only if the last persisted hash is
236    /// part of the canonical chain.
237    pub(crate) fn remove_canonical_until(
238        &mut self,
239        upper_bound: BlockNumber,
240        last_persisted_hash: B256,
241    ) {
242        debug!(target: "engine::tree", ?upper_bound, ?last_persisted_hash, "Removing canonical blocks from the tree");
243
244        // If the last persisted hash is not canonical, then we don't want to remove any canonical
245        // blocks yet.
246        if !self.is_canonical(last_persisted_hash) {
247            return
248        }
249
250        // First, let's walk back the canonical chain and remove canonical blocks lower than the
251        // upper bound
252        let mut current_block = self.current_canonical_head.hash;
253        while let Some(executed) = self.blocks_by_hash.get(&current_block) {
254            current_block = executed.block.parent_hash();
255            if executed.block.number() <= upper_bound {
256                debug!(target: "engine::tree", num_hash=?executed.block.num_hash(), "Attempting to remove block walking back from the head");
257                if let Some((removed, _)) = self.remove_by_hash(executed.block.hash()) {
258                    debug!(target: "engine::tree", num_hash=?removed.block.num_hash(), "Removed block walking back from the head");
259                    // finally, move the trie updates
260                    self.persisted_trie_updates
261                        .insert(removed.block.hash(), (removed.block.number(), removed.trie));
262                }
263            }
264        }
265        debug!(target: "engine::tree", ?upper_bound, ?last_persisted_hash, "Removed canonical blocks from the tree");
266    }
267
268    /// Removes all blocks that are below the finalized block, as well as removing non-canonical
269    /// sidechains that fork from below the finalized block.
270    pub(crate) fn prune_finalized_sidechains(&mut self, finalized_num_hash: BlockNumHash) {
271        let BlockNumHash { number: finalized_num, hash: finalized_hash } = finalized_num_hash;
272
273        // We remove disconnected sidechains in three steps:
274        // * first, remove everything with a block number __below__ the finalized block.
275        // * next, we populate a vec with parents __at__ the finalized block.
276        // * finally, we iterate through the vec, removing children until the vec is empty
277        // (BFS).
278
279        // We _exclude_ the finalized block because we will be dealing with the blocks __at__
280        // the finalized block later.
281        let blocks_to_remove = self
282            .blocks_by_number
283            .range((Bound::Unbounded, Bound::Excluded(finalized_num)))
284            .flat_map(|(_, blocks)| blocks.iter().map(|b| b.block.hash()))
285            .collect::<Vec<_>>();
286        for hash in blocks_to_remove {
287            if let Some((removed, _)) = self.remove_by_hash(hash) {
288                debug!(target: "engine::tree", num_hash=?removed.block.num_hash(), "Removed finalized sidechain block");
289            }
290        }
291
292        // remove trie updates that are below the finalized block
293        self.persisted_trie_updates.retain(|_, (block_num, _)| *block_num > finalized_num);
294
295        // The only block that should remain at the `finalized` number now, is the finalized
296        // block, if it exists.
297        //
298        // For all other blocks, we  first put their children into this vec.
299        // Then, we will iterate over them, removing them, adding their children, etc etc,
300        // until the vec is empty.
301        let mut blocks_to_remove = self.blocks_by_number.remove(&finalized_num).unwrap_or_default();
302
303        // re-insert the finalized hash if we removed it
304        if let Some(position) =
305            blocks_to_remove.iter().position(|b| b.block.hash() == finalized_hash)
306        {
307            let finalized_block = blocks_to_remove.swap_remove(position);
308            self.blocks_by_number.insert(finalized_num, vec![finalized_block]);
309        }
310
311        let mut blocks_to_remove =
312            blocks_to_remove.into_iter().map(|e| e.block.hash()).collect::<VecDeque<_>>();
313        while let Some(block) = blocks_to_remove.pop_front() {
314            if let Some((removed, children)) = self.remove_by_hash(block) {
315                debug!(target: "engine::tree", num_hash=?removed.block.num_hash(), "Removed finalized sidechain child block");
316                blocks_to_remove.extend(children);
317            }
318        }
319    }
320
321    /// Remove all blocks up to __and including__ the given block number.
322    ///
323    /// If a finalized hash is provided, the only non-canonical blocks which will be removed are
324    /// those which have a fork point at or below the finalized hash.
325    ///
326    /// Canonical blocks below the upper bound will still be removed.
327    ///
328    /// NOTE: if the finalized block is greater than the upper bound, the only blocks that will be
329    /// removed are canonical blocks and sidechains that fork below the `upper_bound`. This is the
330    /// same behavior as if the `finalized_num` were `Some(upper_bound)`.
331    pub(crate) fn remove_until(
332        &mut self,
333        upper_bound: BlockNumHash,
334        last_persisted_hash: B256,
335        finalized_num_hash: Option<BlockNumHash>,
336    ) {
337        debug!(target: "engine::tree", ?upper_bound, ?finalized_num_hash, "Removing blocks from the tree");
338
339        // If the finalized num is ahead of the upper bound, and exists, we need to instead ensure
340        // that the only blocks removed, are canonical blocks less than the upper bound
341        let finalized_num_hash = finalized_num_hash.map(|mut finalized| {
342            if upper_bound.number < finalized.number {
343                finalized = upper_bound;
344                debug!(target: "engine::tree", ?finalized, "Adjusted upper bound");
345            }
346            finalized
347        });
348
349        // We want to do two things:
350        // * remove canonical blocks that are persisted
351        // * remove forks whose root are below the finalized block
352        // We can do this in 2 steps:
353        // * remove all canonical blocks below the upper bound
354        // * fetch the number of the finalized hash, removing any sidechains that are __below__ the
355        // finalized block
356        self.remove_canonical_until(upper_bound.number, last_persisted_hash);
357
358        // Now, we have removed canonical blocks (assuming the upper bound is above the finalized
359        // block) and only have sidechains below the finalized block.
360        if let Some(finalized_num_hash) = finalized_num_hash {
361            self.prune_finalized_sidechains(finalized_num_hash);
362        }
363    }
364
365    /// Updates the canonical head to the given block.
366    fn set_canonical_head(&mut self, new_head: BlockNumHash) {
367        self.current_canonical_head = new_head;
368    }
369
370    /// Returns the tracked canonical head.
371    const fn canonical_head(&self) -> &BlockNumHash {
372        &self.current_canonical_head
373    }
374
375    /// Returns the block hash of the canonical head.
376    const fn canonical_block_hash(&self) -> B256 {
377        self.canonical_head().hash
378    }
379
380    /// Returns the block number of the canonical head.
381    const fn canonical_block_number(&self) -> BlockNumber {
382        self.canonical_head().number
383    }
384}
385
386/// Tracks the state of the engine api internals.
387///
388/// This type is not shareable.
389#[derive(Debug)]
390pub struct EngineApiTreeState<N: NodePrimitives> {
391    /// Tracks the state of the blockchain tree.
392    tree_state: TreeState<N>,
393    /// Tracks the forkchoice state updates received by the CL.
394    forkchoice_state_tracker: ForkchoiceStateTracker,
395    /// Buffer of detached blocks.
396    buffer: BlockBuffer<N::Block>,
397    /// Tracks the header of invalid payloads that were rejected by the engine because they're
398    /// invalid.
399    invalid_headers: InvalidHeaderCache,
400}
401
402impl<N: NodePrimitives> EngineApiTreeState<N> {
403    fn new(
404        block_buffer_limit: u32,
405        max_invalid_header_cache_length: u32,
406        canonical_block: BlockNumHash,
407    ) -> Self {
408        Self {
409            invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length),
410            buffer: BlockBuffer::new(block_buffer_limit),
411            tree_state: TreeState::new(canonical_block),
412            forkchoice_state_tracker: ForkchoiceStateTracker::default(),
413        }
414    }
415}
416
417/// The outcome of a tree operation.
418#[derive(Debug)]
419pub struct TreeOutcome<T> {
420    /// The outcome of the operation.
421    pub outcome: T,
422    /// An optional event to tell the caller to do something.
423    pub event: Option<TreeEvent>,
424}
425
426impl<T> TreeOutcome<T> {
427    /// Create new tree outcome.
428    pub const fn new(outcome: T) -> Self {
429        Self { outcome, event: None }
430    }
431
432    /// Set event on the outcome.
433    pub fn with_event(mut self, event: TreeEvent) -> Self {
434        self.event = Some(event);
435        self
436    }
437}
438
439/// Events that are triggered by Tree Chain
440#[derive(Debug)]
441pub enum TreeEvent {
442    /// Tree action is needed.
443    TreeAction(TreeAction),
444    /// Backfill action is needed.
445    BackfillAction(BackfillAction),
446    /// Block download is needed.
447    Download(DownloadRequest),
448}
449
450impl TreeEvent {
451    /// Returns true if the event is a backfill action.
452    const fn is_backfill_action(&self) -> bool {
453        matches!(self, Self::BackfillAction(_))
454    }
455}
456
457/// The actions that can be performed on the tree.
458#[derive(Debug)]
459pub enum TreeAction {
460    /// Make target canonical.
461    MakeCanonical {
462        /// The sync target head hash
463        sync_target_head: B256,
464    },
465}
466
467/// The engine API tree handler implementation.
468///
469/// This type is responsible for processing engine API requests, maintaining the canonical state and
470/// emitting events.
471pub struct EngineApiTreeHandler<N, P, E, T, V>
472where
473    N: NodePrimitives,
474    T: EngineTypes,
475{
476    provider: P,
477    executor_provider: E,
478    consensus: Arc<dyn FullConsensus<N>>,
479    payload_validator: V,
480    /// Keeps track of internals such as executed and buffered blocks.
481    state: EngineApiTreeState<N>,
482    /// The half for sending messages to the engine.
483    ///
484    /// This is kept so that we can queue in messages to ourself that we can process later, for
485    /// example distributing workload across multiple messages that would otherwise take too long
486    /// to process. E.g. we might receive a range of downloaded blocks and we want to process
487    /// them one by one so that we can handle incoming engine API in between and don't become
488    /// unresponsive. This can happen during live sync transition where we're trying to close the
489    /// gap (up to 3 epochs of blocks in the worst case).
490    incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
491    /// Incoming engine API requests.
492    incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
493    /// Outgoing events that are emitted to the handler.
494    outgoing: UnboundedSender<EngineApiEvent<N>>,
495    /// Channels to the persistence layer.
496    persistence: PersistenceHandle<N>,
497    /// Tracks the state changes of the persistence task.
498    persistence_state: PersistenceState,
499    /// Flag indicating the state of the node's backfill synchronization process.
500    backfill_sync_state: BackfillSyncState,
501    /// Keeps track of the state of the canonical chain that isn't persisted yet.
502    /// This is intended to be accessed from external sources, such as rpc.
503    canonical_in_memory_state: CanonicalInMemoryState<N>,
504    /// Handle to the payload builder that will receive payload attributes for valid forkchoice
505    /// updates
506    payload_builder: PayloadBuilderHandle<T>,
507    /// Configuration settings.
508    config: TreeConfig,
509    /// Metrics for the engine api.
510    metrics: EngineApiMetrics,
511    /// An invalid block hook.
512    invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
513    /// The engine API variant of this handler
514    engine_kind: EngineApiKind,
515    /// The backup handler
516    backup: BackupHandle,
517}
518
519impl<N, P: Debug, E: Debug, T: EngineTypes + Debug, V: Debug> std::fmt::Debug
520    for EngineApiTreeHandler<N, P, E, T, V>
521where
522    N: NodePrimitives,
523{
524    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
525        f.debug_struct("EngineApiTreeHandler")
526            .field("provider", &self.provider)
527            .field("executor_provider", &self.executor_provider)
528            .field("consensus", &self.consensus)
529            .field("payload_validator", &self.payload_validator)
530            .field("state", &self.state)
531            .field("incoming_tx", &self.incoming_tx)
532            .field("persistence", &self.persistence)
533            .field("persistence_state", &self.persistence_state)
534            .field("backfill_sync_state", &self.backfill_sync_state)
535            .field("canonical_in_memory_state", &self.canonical_in_memory_state)
536            .field("payload_builder", &self.payload_builder)
537            .field("config", &self.config)
538            .field("metrics", &self.metrics)
539            .field("invalid_block_hook", &format!("{:p}", self.invalid_block_hook))
540            .field("engine_kind", &self.engine_kind)
541            .finish()
542    }
543}
544
545impl<N, P, E, T, V> EngineApiTreeHandler<N, P, E, T, V>
546where
547    N: NodePrimitives,
548    P: DatabaseProviderFactory
549        + BlockReader<Block = N::Block, Header = N::BlockHeader>
550        + StateProviderFactory
551        + StateReader<Receipt = N::Receipt>
552        + StateCommitmentProvider
553        + HashedPostStateProvider
554        + Clone
555        + 'static,
556    <P as DatabaseProviderFactory>::Provider:
557        BlockReader<Block = N::Block, Header = N::BlockHeader>,
558    E: BlockExecutorProvider<Primitives = N>,
559    T: EngineTypes,
560    V: EngineValidator<T, Block = N::Block>,
561{
562    /// Creates a new [`EngineApiTreeHandler`].
563    #[expect(clippy::too_many_arguments)]
564    pub fn new(
565        provider: P,
566        executor_provider: E,
567        consensus: Arc<dyn FullConsensus<N>>,
568        payload_validator: V,
569        outgoing: UnboundedSender<EngineApiEvent<N>>,
570        state: EngineApiTreeState<N>,
571        canonical_in_memory_state: CanonicalInMemoryState<N>,
572        persistence: PersistenceHandle<N>,
573        persistence_state: PersistenceState,
574        payload_builder: PayloadBuilderHandle<T>,
575        config: TreeConfig,
576        engine_kind: EngineApiKind,
577        backup: BackupHandle,
578    ) -> Self {
579        let (incoming_tx, incoming) = std::sync::mpsc::channel();
580
581        Self {
582            provider,
583            executor_provider,
584            consensus,
585            payload_validator,
586            incoming,
587            outgoing,
588            persistence,
589            persistence_state,
590            backfill_sync_state: BackfillSyncState::Idle,
591            state,
592            canonical_in_memory_state,
593            payload_builder,
594            config,
595            metrics: Default::default(),
596            incoming_tx,
597            invalid_block_hook: Box::new(NoopInvalidBlockHook),
598            engine_kind,
599            backup,
600        }
601    }
602
603    /// Sets the invalid block hook.
604    fn set_invalid_block_hook(&mut self, invalid_block_hook: Box<dyn InvalidBlockHook<N>>) {
605        self.invalid_block_hook = invalid_block_hook;
606    }
607
608    /// Creates a new [`EngineApiTreeHandler`] instance and spawns it in its
609    /// own thread.
610    ///
611    /// Returns the sender through which incoming requests can be sent to the task and the receiver
612    /// end of a [`EngineApiEvent`] unbounded channel to receive events from the engine.
613    #[expect(clippy::complexity)]
614    pub fn spawn_new(
615        provider: P,
616        executor_provider: E,
617        consensus: Arc<dyn FullConsensus<N>>,
618        payload_validator: V,
619        persistence: PersistenceHandle<N>,
620        payload_builder: PayloadBuilderHandle<T>,
621        canonical_in_memory_state: CanonicalInMemoryState<N>,
622        config: TreeConfig,
623        invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
624        kind: EngineApiKind,
625        backup: BackupHandle,
626    ) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
627    {
628        let best_block_number = provider.best_block_number().unwrap_or(0);
629        let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
630
631        let persistence_state = PersistenceState {
632            last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
633            rx: None,
634            remove_above_state: VecDeque::new(),
635        };
636
637        let (tx, outgoing) = unbounded_channel();
638        let state = EngineApiTreeState::new(
639            config.block_buffer_limit(),
640            config.max_invalid_header_cache_length(),
641            header.num_hash(),
642        );
643
644        let mut task = Self::new(
645            provider,
646            executor_provider,
647            consensus,
648            payload_validator,
649            tx,
650            state,
651            canonical_in_memory_state,
652            persistence,
653            persistence_state,
654            payload_builder,
655            config,
656            kind,
657            backup,
658        );
659        task.set_invalid_block_hook(invalid_block_hook);
660        let incoming = task.incoming_tx.clone();
661        std::thread::Builder::new().name("Tree Task".to_string()).spawn(|| task.run()).unwrap();
662        (incoming, outgoing)
663    }
664
665    /// Returns a new [`Sender`] to send messages to this type.
666    pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
667        self.incoming_tx.clone()
668    }
669
670    /// Run the engine API handler.
671    ///
672    /// This will block the current thread and process incoming messages.
673    pub fn run(mut self) {
674        loop {
675            match self.try_recv_engine_message() {
676                Ok(Some(msg)) => {
677                    debug!(target: "engine::tree", %msg, "received new engine message");
678                    if let Err(fatal) = self.on_engine_message(msg) {
679                        error!(target: "engine::tree", %fatal, "insert block fatal error");
680                        return
681                    }
682                }
683                Ok(None) => {
684                    debug!(target: "engine::tree", "received no engine message for some time, while waiting for persistence task to complete");
685                }
686                Err(_err) => {
687                    error!(target: "engine::tree", "Engine channel disconnected");
688                    return
689                }
690            }
691
692            if let Err(err) = self.advance_persistence() {
693                error!(target: "engine::tree", %err, "Advancing persistence failed");
694                return
695            }
696            if let Err(err) = self.advance_backup() {
697                error!(target: "engine::tree", %err, "Advancing backup failed");
698                return
699            }
700        }
701    }
702
703    /// Invoked when previously requested blocks were downloaded.
704    ///
705    /// If the block count exceeds the configured batch size we're allowed to execute at once, this
706    /// will execute the first batch and send the remaining blocks back through the channel so that
707    /// block request processing isn't blocked for a long time.
708    fn on_downloaded(
709        &mut self,
710        mut blocks: Vec<SealedBlockWithSenders<N::Block>>,
711    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
712        if blocks.is_empty() {
713            // nothing to execute
714            return Ok(None)
715        }
716
717        trace!(target: "engine::tree", block_count = %blocks.len(), "received downloaded blocks");
718        let batch = self.config.max_execute_block_batch_size().min(blocks.len());
719        for block in blocks.drain(..batch) {
720            if let Some(event) = self.on_downloaded_block(block)? {
721                let needs_backfill = event.is_backfill_action();
722                self.on_tree_event(event)?;
723                if needs_backfill {
724                    // can exit early if backfill is needed
725                    return Ok(None)
726                }
727            }
728        }
729
730        // if we still have blocks to execute, send them as a followup request
731        if !blocks.is_empty() {
732            let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
733        }
734
735        Ok(None)
736    }
737
738    /// When the Consensus layer receives a new block via the consensus gossip protocol,
739    /// the transactions in the block are sent to the execution layer in the form of a
740    /// [`ExecutionPayload`]. The Execution layer executes the transactions and validates the
741    /// state in the block header, then passes validation data back to Consensus layer, that
742    /// adds the block to the head of its own blockchain and attests to it. The block is then
743    /// broadcast over the consensus p2p network in the form of a "Beacon block".
744    ///
745    /// These responses should adhere to the [Engine API Spec for
746    /// `engine_newPayload`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification).
747    ///
748    /// This returns a [`PayloadStatus`] that represents the outcome of a processed new payload and
749    /// returns an error if an internal error occurred.
750    #[instrument(level = "trace", skip_all, fields(block_hash = %payload.block_hash(), block_num = %payload.block_number(),), target = "engine::tree")]
751    fn on_new_payload(
752        &mut self,
753        payload: ExecutionPayload,
754        sidecar: ExecutionPayloadSidecar,
755    ) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
756        trace!(target: "engine::tree", "invoked new payload");
757        self.metrics.engine.new_payload_messages.increment(1);
758
759        // Ensures that the given payload does not violate any consensus rules that concern the
760        // block's layout, like:
761        //    - missing or invalid base fee
762        //    - invalid extra data
763        //    - invalid transactions
764        //    - incorrect hash
765        //    - the versioned hashes passed with the payload do not exactly match transaction
766        //      versioned hashes
767        //    - the block does not contain blob transactions if it is pre-cancun
768        //
769        // This validates the following engine API rule:
770        //
771        // 3. Given the expected array of blob versioned hashes client software **MUST** run its
772        //    validation by taking the following steps:
773        //
774        //   1. Obtain the actual array by concatenating blob versioned hashes lists
775        //      (`tx.blob_versioned_hashes`) of each [blob
776        //      transaction](https://eips.ethereum.org/EIPS/eip-4844#new-transaction-type) included
777        //      in the payload, respecting the order of inclusion. If the payload has no blob
778        //      transactions the expected array **MUST** be `[]`.
779        //
780        //   2. Return `{status: INVALID, latestValidHash: null, validationError: errorMessage |
781        //      null}` if the expected and the actual arrays don't match.
782        //
783        // This validation **MUST** be instantly run in all cases even during active sync process.
784        let parent_hash = payload.parent_hash();
785        let block = match self.payload_validator.ensure_well_formed_payload(payload, sidecar) {
786            Ok(block) => block,
787            Err(error) => {
788                error!(target: "engine::tree", %error, "Invalid payload");
789                // we need to convert the error to a payload status (response to the CL)
790
791                let latest_valid_hash =
792                    if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
793                        // Engine-API rules:
794                        // > `latestValidHash: null` if the blockHash validation has failed (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/shanghai.md?plain=1#L113>)
795                        // > `latestValidHash: null` if the expected and the actual arrays don't match (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md?plain=1#L103>)
796                        None
797                    } else {
798                        self.latest_valid_hash_for_invalid_payload(parent_hash)?
799                    };
800
801                let status = PayloadStatusEnum::from(error);
802                return Ok(TreeOutcome::new(PayloadStatus::new(status, latest_valid_hash)))
803            }
804        };
805
806        let block_hash = block.hash();
807        let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
808        if lowest_buffered_ancestor == block_hash {
809            lowest_buffered_ancestor = block.parent_hash();
810        }
811
812        // now check the block itself
813        if let Some(status) =
814            self.check_invalid_ancestor_with_head(lowest_buffered_ancestor, block_hash)?
815        {
816            return Ok(TreeOutcome::new(status))
817        }
818
819        let status = if self.backfill_sync_state.is_idle() {
820            let mut latest_valid_hash = None;
821            let num_hash = block.num_hash();
822            match self.insert_block_without_senders(block) {
823                Ok(status) => {
824                    let status = match status {
825                        InsertPayloadOk2::Inserted(BlockStatus2::Valid) => {
826                            latest_valid_hash = Some(block_hash);
827                            self.try_connect_buffered_blocks(num_hash)?;
828                            PayloadStatusEnum::Valid
829                        }
830                        InsertPayloadOk2::AlreadySeen(BlockStatus2::Valid) => {
831                            latest_valid_hash = Some(block_hash);
832                            PayloadStatusEnum::Valid
833                        }
834                        InsertPayloadOk2::Inserted(BlockStatus2::Disconnected { .. }) |
835                        InsertPayloadOk2::AlreadySeen(BlockStatus2::Disconnected { .. }) => {
836                            // not known to be invalid, but we don't know anything else
837                            PayloadStatusEnum::Syncing
838                        }
839                    };
840
841                    PayloadStatus::new(status, latest_valid_hash)
842                }
843                Err(error) => self.on_insert_block_error(error)?,
844            }
845        } else if let Err(error) = self.buffer_block_without_senders(block) {
846            self.on_insert_block_error(error)?
847        } else {
848            PayloadStatus::from_status(PayloadStatusEnum::Syncing)
849        };
850
851        let mut outcome = TreeOutcome::new(status);
852        if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
853            // if the block is valid and it is the sync target head, make it canonical
854            outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical {
855                sync_target_head: block_hash,
856            }));
857        }
858
859        Ok(outcome)
860    }
861
862    /// Returns the new chain for the given head.
863    ///
864    /// This also handles reorgs.
865    ///
866    /// Note: This does not update the tracked state and instead returns the new chain based on the
867    /// given head.
868    fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain<N>>> {
869        // get the executed new head block
870        let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
871            return Ok(None)
872        };
873
874        let new_head_number = new_head_block.block.number();
875        let mut current_canonical_number = self.state.tree_state.current_canonical_head.number;
876
877        let mut new_chain = vec![new_head_block.clone()];
878        let mut current_hash = new_head_block.block.parent_hash();
879        let mut current_number = new_head_number - 1;
880
881        // Walk back the new chain until we reach a block we know about
882        //
883        // This is only done for in-memory blocks, because we should not have persisted any blocks
884        // that are _above_ the current canonical head.
885        while current_number > current_canonical_number {
886            if let Some(block) = self.executed_block_by_hash(current_hash)? {
887                current_hash = block.block.parent_hash();
888                current_number -= 1;
889                new_chain.push(block);
890            } else {
891                warn!(target: "engine::tree", current_hash=?current_hash, "Sidechain block not found in TreeState");
892                // This should never happen as we're walking back a chain that should connect to
893                // the canonical chain
894                return Ok(None);
895            }
896        }
897
898        // If we have reached the current canonical head by walking back from the target, then we
899        // know this represents an extension of the canonical chain.
900        if current_hash == self.state.tree_state.current_canonical_head.hash {
901            new_chain.reverse();
902
903            // Simple extension of the current chain
904            return Ok(Some(NewCanonicalChain::Commit { new: new_chain }));
905        }
906
907        // We have a reorg. Walk back both chains to find the fork point.
908        let mut old_chain = Vec::new();
909        let mut old_hash = self.state.tree_state.current_canonical_head.hash;
910
911        // If the canonical chain is ahead of the new chain,
912        // gather all blocks until new head number.
913        while current_canonical_number > current_number {
914            if let Some(block) = self.executed_block_by_hash(old_hash)? {
915                old_chain.push(block.clone());
916                old_hash = block.block.header.parent_hash();
917                current_canonical_number -= 1;
918            } else {
919                // This shouldn't happen as we're walking back the canonical chain
920                warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
921                return Ok(None);
922            }
923        }
924
925        // Both new and old chain pointers are now at the same height.
926        debug_assert_eq!(current_number, current_canonical_number);
927
928        // Walk both chains from specified hashes at same height until
929        // a common ancestor (fork block) is reached.
930        while old_hash != current_hash {
931            if let Some(block) = self.executed_block_by_hash(old_hash)? {
932                old_hash = block.block.header.parent_hash();
933                old_chain.push(block);
934            } else {
935                // This shouldn't happen as we're walking back the canonical chain
936                warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
937                return Ok(None);
938            }
939
940            if let Some(block) = self.executed_block_by_hash(current_hash)? {
941                current_hash = block.block.parent_hash();
942                new_chain.push(block);
943            } else {
944                // This shouldn't happen as we've already walked this path
945                warn!(target: "engine::tree", invalid_hash=?current_hash, "New chain block not found in TreeState");
946                return Ok(None);
947            }
948        }
949        new_chain.reverse();
950        old_chain.reverse();
951
952        Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }))
953    }
954
955    /// Determines if the given block is part of a fork by checking that these
956    /// conditions are true:
957    /// * walking back from the target hash to verify that the target hash is not part of an
958    ///   extension of the canonical chain.
959    /// * walking back from the current head to verify that the target hash is not already part of
960    ///   the canonical chain.
961    fn is_fork(&self, target_hash: B256) -> ProviderResult<bool> {
962        // verify that the given hash is not part of an extension of the canon chain.
963        let canonical_head = self.state.tree_state.canonical_head();
964        let mut current_hash = target_hash;
965        while let Some(current_block) = self.sealed_header_by_hash(current_hash)? {
966            if current_block.hash() == canonical_head.hash {
967                return Ok(false)
968            }
969            // We already passed the canonical head
970            if current_block.number() <= canonical_head.number {
971                break
972            }
973            current_hash = current_block.parent_hash();
974        }
975
976        // verify that the given hash is not already part of canonical chain stored in memory
977        if self.canonical_in_memory_state.header_by_hash(target_hash).is_some() {
978            return Ok(false)
979        }
980
981        // verify that the given hash is not already part of persisted canonical chain
982        if self.provider.block_number(target_hash)?.is_some() {
983            return Ok(false)
984        }
985
986        Ok(true)
987    }
988
989    /// Invoked when we receive a new forkchoice update message. Calls into the blockchain tree
990    /// to resolve chain forks and ensure that the Execution Layer is working with the latest valid
991    /// chain.
992    ///
993    /// These responses should adhere to the [Engine API Spec for
994    /// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1).
995    ///
996    /// Returns an error if an internal error occurred like a database error.
997    #[instrument(level = "trace", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash), target = "engine::tree")]
998    fn on_forkchoice_updated(
999        &mut self,
1000        state: ForkchoiceState,
1001        attrs: Option<T::PayloadAttributes>,
1002        version: EngineApiMessageVersion,
1003    ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1004        trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
1005        self.metrics.engine.forkchoice_updated_messages.increment(1);
1006        self.canonical_in_memory_state.on_forkchoice_update_received();
1007
1008        if let Some(on_updated) = self.pre_validate_forkchoice_update(state)? {
1009            return Ok(TreeOutcome::new(on_updated))
1010        }
1011
1012        let valid_outcome = |head| {
1013            TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1014                PayloadStatusEnum::Valid,
1015                Some(head),
1016            )))
1017        };
1018
1019        // Process the forkchoice update by trying to make the head block canonical
1020        //
1021        // We can only process this forkchoice update if:
1022        // - we have the `head` block
1023        // - the head block is part of a chain that is connected to the canonical chain. This
1024        //   includes reorgs.
1025        //
1026        // Performing a FCU involves:
1027        // - marking the FCU's head block as canonical
1028        // - updating in memory state to reflect the new canonical chain
1029        // - updating canonical state trackers
1030        // - emitting a canonicalization event for the new chain (including reorg)
1031        // - if we have payload attributes, delegate them to the payload service
1032
1033        // 1. ensure we have a new head block
1034        if self.state.tree_state.canonical_block_hash() == state.head_block_hash {
1035            trace!(target: "engine::tree", "fcu head hash is already canonical");
1036
1037            // update the safe and finalized blocks and ensure their values are valid
1038            if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1039                // safe or finalized hashes are invalid
1040                return Ok(TreeOutcome::new(outcome))
1041            }
1042
1043            // we still need to process payload attributes if the head is already canonical
1044            if let Some(attr) = attrs {
1045                let tip = self
1046                    .block_by_hash(self.state.tree_state.canonical_block_hash())?
1047                    .ok_or_else(|| {
1048                        // If we can't find the canonical block, then something is wrong and we need
1049                        // to return an error
1050                        ProviderError::HeaderNotFound(state.head_block_hash.into())
1051                    })?;
1052                let updated = self.process_payload_attributes(attr, tip.header(), state, version);
1053                return Ok(TreeOutcome::new(updated))
1054            }
1055
1056            // the head block is already canonical
1057            return Ok(valid_outcome(state.head_block_hash))
1058        }
1059
1060        // 2. ensure we can apply a new chain update for the head block
1061        if let Some(chain_update) = self.on_new_head(state.head_block_hash)? {
1062            let tip = chain_update.tip().header.clone();
1063            self.on_canonical_chain_update(chain_update);
1064
1065            // update the safe and finalized blocks and ensure their values are valid
1066            if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1067                // safe or finalized hashes are invalid
1068                return Ok(TreeOutcome::new(outcome))
1069            }
1070
1071            if let Some(attr) = attrs {
1072                let updated = self.process_payload_attributes(attr, &tip, state, version);
1073                return Ok(TreeOutcome::new(updated))
1074            }
1075
1076            return Ok(valid_outcome(state.head_block_hash))
1077        }
1078
1079        // 3. check if the head is already part of the canonical chain
1080        if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
1081            debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical");
1082
1083            // For OpStack the proposers are allowed to reorg their own chain at will, so we need to
1084            // always trigger a new payload job if requested.
1085            if self.engine_kind.is_opstack() {
1086                if let Some(attr) = attrs {
1087                    debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head");
1088                    let updated =
1089                        self.process_payload_attributes(attr, &canonical_header, state, version);
1090                    return Ok(TreeOutcome::new(updated))
1091                }
1092            }
1093
1094            // 2. Client software MAY skip an update of the forkchoice state and MUST NOT begin a
1095            //    payload build process if `forkchoiceState.headBlockHash` references a `VALID`
1096            //    ancestor of the head of canonical chain, i.e. the ancestor passed payload
1097            //    validation process and deemed `VALID`. In the case of such an event, client
1098            //    software MUST return `{payloadStatus: {status: VALID, latestValidHash:
1099            //    forkchoiceState.headBlockHash, validationError: null}, payloadId: null}`
1100
1101            // the head block is already canonical, so we're not triggering a payload job and can
1102            // return right away
1103            return Ok(valid_outcome(state.head_block_hash))
1104        }
1105
1106        // 4. we don't have the block to perform the update
1107        // we assume the FCU is valid and at least the head is missing,
1108        // so we need to start syncing to it
1109        //
1110        // find the appropriate target to sync to, if we don't have the safe block hash then we
1111        // start syncing to the safe block via backfill first
1112        let target = if self.state.forkchoice_state_tracker.is_empty() &&
1113            // check that safe block is valid and missing
1114            !state.safe_block_hash.is_zero() &&
1115            self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
1116        {
1117            debug!(target: "engine::tree", "missing safe block on initial FCU, downloading safe block");
1118            state.safe_block_hash
1119        } else {
1120            state.head_block_hash
1121        };
1122
1123        let target = self.lowest_buffered_ancestor_or(target);
1124        trace!(target: "engine::tree", %target, "downloading missing block");
1125
1126        Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1127            PayloadStatusEnum::Syncing,
1128        )))
1129        .with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
1130    }
1131
1132    /// Attempts to receive the next engine request.
1133    ///
1134    /// If there's currently no persistence action in progress, this will block until a new request
1135    /// is received. If there's a persistence action in progress, this will try to receive the
1136    /// next request with a timeout to not block indefinitely and return `Ok(None)` if no request is
1137    /// received in time.
1138    ///
1139    /// Returns an error if the engine channel is disconnected.
1140    #[expect(clippy::type_complexity)]
1141    fn try_recv_engine_message(
1142        &self,
1143    ) -> Result<Option<FromEngine<EngineApiRequest<T, N>, N::Block>>, RecvError> {
1144        if self.persistence_state.in_progress() || self.backup.in_progress() {
1145            // try to receive the next request with a timeout to not block indefinitely
1146            match self.incoming.recv_timeout(std::time::Duration::from_millis(500)) {
1147                Ok(msg) => Ok(Some(msg)),
1148                Err(err) => match err {
1149                    RecvTimeoutError::Timeout => Ok(None),
1150                    RecvTimeoutError::Disconnected => Err(RecvError),
1151                },
1152            }
1153        } else {
1154            self.incoming.recv().map(Some)
1155        }
1156    }
1157
1158    /// Attempts to advance the persistence state.
1159    ///
1160    /// If we're currently awaiting a response this will try to receive the response (non-blocking)
1161    /// or send a new persistence action if necessary.
1162    fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
1163        if !self.persistence_state.in_progress() {
1164            if let Some(new_tip_num) = self.persistence_state.remove_above_state.pop_front() {
1165                debug!(target: "engine::tree", ?new_tip_num, remove_state=?self.persistence_state.remove_above_state, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
1166                if new_tip_num < self.persistence_state.last_persisted_block.number {
1167                    debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
1168                    let (tx, rx) = oneshot::channel();
1169                    let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
1170                    self.persistence_state.start(rx);
1171                }
1172            } else if self.should_persist() {
1173                let blocks_to_persist = self.get_canonical_blocks_to_persist();
1174                if blocks_to_persist.is_empty() {
1175                    debug!(target: "engine::tree", "Returned empty set of blocks to persist");
1176                } else {
1177                    debug!(target: "engine::tree", blocks = ?blocks_to_persist.iter().map(|block| block.block.num_hash()).collect::<Vec<_>>(), "Persisting blocks");
1178                    let (tx, rx) = oneshot::channel();
1179                    let _ = self.persistence.save_blocks(blocks_to_persist, tx);
1180                    self.persistence_state.start(rx);
1181                }
1182            }
1183        }
1184
1185        if self.persistence_state.in_progress() {
1186            let (mut rx, start_time) = self
1187                .persistence_state
1188                .rx
1189                .take()
1190                .expect("if a persistence task is in progress Receiver must be Some");
1191            // Check if persistence has complete
1192            match rx.try_recv() {
1193                Ok(last_persisted_hash_num) => {
1194                    self.metrics.engine.persistence_duration.record(start_time.elapsed());
1195                    let Some(BlockNumHash {
1196                        hash: last_persisted_block_hash,
1197                        number: last_persisted_block_number,
1198                    }) = last_persisted_hash_num
1199                    else {
1200                        // if this happened, then we persisted no blocks because we sent an
1201                        // empty vec of blocks
1202                        warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
1203                        return Ok(())
1204                    };
1205
1206                    debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, "Finished persisting, calling finish");
1207                    self.persistence_state
1208                        .finish(last_persisted_block_hash, last_persisted_block_number);
1209                    self.on_new_persisted_block()?;
1210                }
1211                Err(TryRecvError::Closed) => return Err(TryRecvError::Closed.into()),
1212                Err(TryRecvError::Empty) => self.persistence_state.rx = Some((rx, start_time)),
1213            }
1214        }
1215        Ok(())
1216    }
1217
1218    fn advance_backup(&mut self) -> Result<(), AdvancePersistenceError> {
1219        debug!(target: "engine::tree", "advance_backup called");
1220        if !self.backup.in_progress() {
1221            if self.should_backup() {
1222                debug!(target: "engine::tree", "sending backup action");
1223                let (tx, rx) = oneshot::channel();
1224                let _ = self.backup.sender.send(BackupAction::BackupAtBlock(
1225                    self.persistence_state.last_persisted_block,
1226                    tx,
1227                ));
1228                self.backup.start(rx);
1229            }
1230        }
1231
1232        if self.backup.in_progress() {
1233            let (mut rx, start_time) = self
1234                .backup
1235                .rx
1236                .take()
1237                .expect("if a backup task is in progress Receiver must be Some");
1238            // Check if persistence has complete
1239            match rx.try_recv() {
1240                Ok(last_backup_hash_num) => {
1241                    let Some(BlockNumHash {
1242                        hash: last_backup_block_hash,
1243                        number: last_backup_block_number,
1244                    }) = last_backup_hash_num
1245                    else {
1246                        warn!(target: "engine::tree", "Backup task completed but did not backup any blocks");
1247                        return Ok(())
1248                    };
1249
1250                    debug!(target: "engine::tree", ?last_backup_hash_num, "Finished backup, calling finish");
1251                    self.backup.finish(BlockNumHash::new(
1252                        last_backup_block_number,
1253                        last_backup_block_hash,
1254                    ));
1255                }
1256                Err(TryRecvError::Closed) => return Err(TryRecvError::Closed.into()),
1257                Err(TryRecvError::Empty) => self.backup.rx = Some((rx, start_time)),
1258            }
1259        }
1260        Ok(())
1261    }
1262
1263    /// Handles a message from the engine.
1264    fn on_engine_message(
1265        &mut self,
1266        msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
1267    ) -> Result<(), InsertBlockFatalError> {
1268        match msg {
1269            FromEngine::Event(event) => match event {
1270                FromOrchestrator::BackfillSyncStarted => {
1271                    debug!(target: "engine::tree", "received backfill sync started event");
1272                    self.backfill_sync_state = BackfillSyncState::Active;
1273                }
1274                FromOrchestrator::BackfillSyncFinished(ctrl) => {
1275                    self.on_backfill_sync_finished(ctrl)?;
1276                }
1277            },
1278            FromEngine::Request(request) => {
1279                match request {
1280                    EngineApiRequest::InsertExecutedBlock(block) => {
1281                        debug!(target: "engine::tree", block=?block.block().num_hash(), "inserting already executed block");
1282                        let now = Instant::now();
1283                        let sealed_block = block.block.clone();
1284                        self.state.tree_state.insert_executed(block);
1285                        self.metrics.engine.inserted_already_executed_blocks.increment(1);
1286
1287                        self.emit_event(EngineApiEvent::BeaconConsensus(
1288                            BeaconConsensusEngineEvent::CanonicalBlockAdded(
1289                                sealed_block,
1290                                now.elapsed(),
1291                            ),
1292                        ));
1293                    }
1294                    EngineApiRequest::Beacon(request) => {
1295                        match request {
1296                            BeaconEngineMessage::ForkchoiceUpdated {
1297                                state,
1298                                payload_attrs,
1299                                tx,
1300                                version,
1301                            } => {
1302                                let mut output =
1303                                    self.on_forkchoice_updated(state, payload_attrs, version);
1304
1305                                if let Ok(res) = &mut output {
1306                                    // track last received forkchoice state
1307                                    self.state
1308                                        .forkchoice_state_tracker
1309                                        .set_latest(state, res.outcome.forkchoice_status());
1310
1311                                    // emit an event about the handled FCU
1312                                    self.emit_event(BeaconConsensusEngineEvent::ForkchoiceUpdated(
1313                                        state,
1314                                        res.outcome.forkchoice_status(),
1315                                    ));
1316
1317                                    // handle the event if any
1318                                    self.on_maybe_tree_event(res.event.take())?;
1319                                }
1320
1321                                if let Err(err) =
1322                                    tx.send(output.map(|o| o.outcome).map_err(Into::into))
1323                                {
1324                                    self.metrics
1325                                        .engine
1326                                        .failed_forkchoice_updated_response_deliveries
1327                                        .increment(1);
1328                                    error!(target: "engine::tree", "Failed to send event: {err:?}");
1329                                }
1330                            }
1331                            BeaconEngineMessage::NewPayload { payload, sidecar, tx } => {
1332                                let output = self.on_new_payload(payload, sidecar);
1333                                if let Err(err) =
1334                                    tx.send(output.map(|o| o.outcome).map_err(|e| {
1335                                        BeaconOnNewPayloadError::Internal(Box::new(e))
1336                                    }))
1337                                {
1338                                    error!(target: "engine::tree", "Failed to send event: {err:?}");
1339                                    self.metrics
1340                                        .engine
1341                                        .failed_new_payload_response_deliveries
1342                                        .increment(1);
1343                                }
1344                            }
1345                            BeaconEngineMessage::TransitionConfigurationExchanged => {
1346                                // triggering this hook will record that we received a request from
1347                                // the CL
1348                                self.canonical_in_memory_state
1349                                    .on_transition_configuration_exchanged();
1350                            }
1351                        }
1352                    }
1353                }
1354            }
1355            FromEngine::DownloadedBlocks(blocks) => {
1356                if let Some(event) = self.on_downloaded(blocks)? {
1357                    self.on_tree_event(event)?;
1358                }
1359            }
1360        }
1361        Ok(())
1362    }
1363
1364    /// Invoked if the backfill sync has finished to target.
1365    ///
1366    /// At this point we consider the block synced to the backfill target.
1367    ///
1368    /// Checks the tracked finalized block against the block on disk and requests another backfill
1369    /// run if the distance to the tip exceeds the threshold for another backfill run.
1370    ///
1371    /// This will also do the necessary housekeeping of the tree state, this includes:
1372    ///  - removing all blocks below the backfill height
1373    ///  - resetting the canonical in-memory state
1374    fn on_backfill_sync_finished(
1375        &mut self,
1376        ctrl: ControlFlow,
1377    ) -> Result<(), InsertBlockFatalError> {
1378        debug!(target: "engine::tree", "received backfill sync finished event");
1379        self.backfill_sync_state = BackfillSyncState::Idle;
1380
1381        // Pipeline unwound, memorize the invalid block and wait for CL for next sync target.
1382        if let ControlFlow::Unwind { bad_block, .. } = ctrl {
1383            warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
1384            // update the `invalid_headers` cache with the new invalid header
1385            self.state.invalid_headers.insert(*bad_block);
1386            return Ok(())
1387        }
1388
1389        // backfill height is the block number that the backfill finished at
1390        let Some(backfill_height) = ctrl.block_number() else { return Ok(()) };
1391
1392        // state house keeping after backfill sync
1393        // remove all executed blocks below the backfill height
1394        //
1395        // We set the `finalized_num` to `Some(backfill_height)` to ensure we remove all state
1396        // before that
1397        let backfill_num_hash = self
1398            .provider
1399            .block_hash(backfill_height)?
1400            .map(|hash| BlockNumHash { hash, number: backfill_height });
1401
1402        self.state.tree_state.remove_until(
1403            backfill_num_hash
1404                .expect("after backfill the block target hash should be present in the db"),
1405            self.persistence_state.last_persisted_block.hash,
1406            backfill_num_hash,
1407        );
1408        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
1409        self.metrics.tree.canonical_chain_height.set(backfill_height as f64);
1410
1411        // remove all buffered blocks below the backfill height
1412        self.state.buffer.remove_old_blocks(backfill_height);
1413        // we remove all entries because now we're synced to the backfill target and consider this
1414        // the canonical chain
1415        self.canonical_in_memory_state.clear_state();
1416
1417        if let Ok(Some(new_head)) = self.provider.sealed_header(backfill_height) {
1418            // update the tracked chain height, after backfill sync both the canonical height and
1419            // persisted height are the same
1420            self.state.tree_state.set_canonical_head(new_head.num_hash());
1421            self.persistence_state.finish(new_head.hash(), new_head.number());
1422
1423            // update the tracked canonical head
1424            self.canonical_in_memory_state.set_canonical_head(new_head);
1425        }
1426
1427        // check if we need to run backfill again by comparing the most recent finalized height to
1428        // the backfill height
1429        let Some(sync_target_state) = self.state.forkchoice_state_tracker.sync_target_state()
1430        else {
1431            return Ok(())
1432        };
1433        if sync_target_state.finalized_block_hash.is_zero() {
1434            // no finalized block, can't check distance
1435            return Ok(())
1436        }
1437        // get the block number of the finalized block, if we have it
1438        let newest_finalized = self
1439            .state
1440            .buffer
1441            .block(&sync_target_state.finalized_block_hash)
1442            .map(|block| block.number());
1443
1444        // The block number that the backfill finished at - if the progress or newest
1445        // finalized is None then we can't check the distance anyways.
1446        //
1447        // If both are Some, we perform another distance check and return the desired
1448        // backfill target
1449        if let Some(backfill_target) =
1450            ctrl.block_number().zip(newest_finalized).and_then(|(progress, finalized_number)| {
1451                // Determines whether or not we should run backfill again, in case
1452                // the new gap is still large enough and requires running backfill again
1453                self.backfill_sync_target(progress, finalized_number, None)
1454            })
1455        {
1456            // request another backfill run
1457            self.emit_event(EngineApiEvent::BackfillAction(BackfillAction::Start(
1458                backfill_target.into(),
1459            )));
1460            return Ok(())
1461        };
1462
1463        // try to close the gap by executing buffered blocks that are child blocks of the new head
1464        self.try_connect_buffered_blocks(self.state.tree_state.current_canonical_head)
1465    }
1466
1467    /// Attempts to make the given target canonical.
1468    ///
1469    /// This will update the tracked canonical in memory state and do the necessary housekeeping.
1470    fn make_canonical(&mut self, target: B256) -> ProviderResult<()> {
1471        if let Some(chain_update) = self.on_new_head(target)? {
1472            self.on_canonical_chain_update(chain_update);
1473        }
1474
1475        Ok(())
1476    }
1477
1478    /// Convenience function to handle an optional tree event.
1479    fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
1480        if let Some(event) = event {
1481            self.on_tree_event(event)?;
1482        }
1483
1484        Ok(())
1485    }
1486
1487    /// Handles a tree event.
1488    fn on_tree_event(&mut self, event: TreeEvent) -> ProviderResult<()> {
1489        match event {
1490            TreeEvent::TreeAction(action) => match action {
1491                TreeAction::MakeCanonical { sync_target_head } => {
1492                    self.make_canonical(sync_target_head)?;
1493                }
1494            },
1495            TreeEvent::BackfillAction(action) => {
1496                self.emit_event(EngineApiEvent::BackfillAction(action));
1497            }
1498            TreeEvent::Download(action) => {
1499                self.emit_event(EngineApiEvent::Download(action));
1500            }
1501        }
1502
1503        Ok(())
1504    }
1505
1506    /// Emits an outgoing event to the engine.
1507    fn emit_event(&mut self, event: impl Into<EngineApiEvent<N>>) {
1508        let event = event.into();
1509
1510        if event.is_backfill_action() {
1511            debug_assert_eq!(
1512                self.backfill_sync_state,
1513                BackfillSyncState::Idle,
1514                "backfill action should only be emitted when backfill is idle"
1515            );
1516
1517            if self.persistence_state.in_progress() {
1518                // backfill sync and persisting data are mutually exclusive, so we can't start
1519                // backfill while we're still persisting
1520                debug!(target: "engine::tree", "skipping backfill file while persistence task is active");
1521                return
1522            }
1523
1524            self.backfill_sync_state = BackfillSyncState::Pending;
1525            self.metrics.engine.pipeline_runs.increment(1);
1526            debug!(target: "engine::tree", "emitting backfill action event");
1527        }
1528
1529        let _ = self.outgoing.send(event).inspect_err(
1530            |err| error!(target: "engine::tree", "Failed to send internal event: {err:?}"),
1531        );
1532    }
1533
1534    /// Returns true if the canonical chain length minus the last persisted
1535    /// block is greater than or equal to the persistence threshold and
1536    /// backfill is not running.
1537    const fn should_persist(&self) -> bool {
1538        if !self.backfill_sync_state.is_idle() {
1539            // can't persist if backfill is running
1540            return false
1541        }
1542
1543        let min_block = self.persistence_state.last_persisted_block.number;
1544        self.state.tree_state.canonical_block_number().saturating_sub(min_block) >
1545            self.config.persistence_threshold()
1546    }
1547
1548    /// Returns true if the canonical chain length minus the last persisted
1549    /// block is greater than or equal to the backup threshold and
1550    /// backfill is not running.
1551    fn should_backup(&self) -> bool {
1552        debug!(target: "engine::tree", "checking if we should backup");
1553        return false;
1554    }
1555
1556    /// Returns a batch of consecutive canonical blocks to persist in the range
1557    /// `(last_persisted_number .. canonical_head - threshold]` . The expected
1558    /// order is oldest -> newest.
1559    fn get_canonical_blocks_to_persist(&self) -> Vec<ExecutedBlock<N>> {
1560        let mut blocks_to_persist = Vec::new();
1561        let mut current_hash = self.state.tree_state.canonical_block_hash();
1562        let last_persisted_number = self.persistence_state.last_persisted_block.number;
1563
1564        let canonical_head_number = self.state.tree_state.canonical_block_number();
1565
1566        let target_number =
1567            canonical_head_number.saturating_sub(self.config.memory_block_buffer_target());
1568
1569        debug!(target: "engine::tree", ?last_persisted_number, ?canonical_head_number, ?target_number, ?current_hash, "Returning canonical blocks to persist");
1570        while let Some(block) = self.state.tree_state.blocks_by_hash.get(&current_hash) {
1571            if block.block.number() <= last_persisted_number {
1572                break;
1573            }
1574
1575            if block.block.number() <= target_number {
1576                blocks_to_persist.push(block.clone());
1577            }
1578
1579            current_hash = block.block.parent_hash();
1580        }
1581
1582        // reverse the order so that the oldest block comes first
1583        blocks_to_persist.reverse();
1584
1585        blocks_to_persist
1586    }
1587
1588    /// This clears the blocks from the in-memory tree state that have been persisted to the
1589    /// database.
1590    ///
1591    /// This also updates the canonical in-memory state to reflect the newest persisted block
1592    /// height.
1593    ///
1594    /// Assumes that `finish` has been called on the `persistence_state` at least once
1595    fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
1596        let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
1597        self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
1598        self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
1599            number: self.persistence_state.last_persisted_block.number,
1600            hash: self.persistence_state.last_persisted_block.hash,
1601        });
1602        Ok(())
1603    }
1604
1605    /// Return an [`ExecutedBlock`] from database or in-memory state by hash.
1606    ///
1607    /// NOTE: This cannot fetch [`ExecutedBlock`]s for _finalized_ blocks, instead it can only
1608    /// fetch [`ExecutedBlock`]s for _canonical_ blocks, or blocks from sidechains that the node
1609    /// has in memory.
1610    ///
1611    /// For finalized blocks, this will return `None`.
1612    fn executed_block_by_hash(&self, hash: B256) -> ProviderResult<Option<ExecutedBlock<N>>> {
1613        trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
1614        // check memory first
1615        let block = self.state.tree_state.executed_block_by_hash(hash).cloned();
1616
1617        if block.is_some() {
1618            return Ok(block)
1619        }
1620
1621        let Some((_, updates)) = self.state.tree_state.persisted_trie_updates.get(&hash) else {
1622            return Ok(None)
1623        };
1624
1625        let SealedBlockWithSenders { block, senders } = self
1626            .provider
1627            .sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
1628            .ok_or_else(|| ProviderError::HeaderNotFound(hash.into()))?;
1629        let execution_output = self
1630            .provider
1631            .get_state(block.number())?
1632            .ok_or_else(|| ProviderError::StateForNumberNotFound(block.number()))?;
1633        let hashed_state = self.provider.hashed_post_state(execution_output.state());
1634
1635        Ok(Some(ExecutedBlock {
1636            block: Arc::new(block),
1637            senders: Arc::new(senders),
1638            trie: updates.clone(),
1639            execution_output: Arc::new(execution_output),
1640            hashed_state: Arc::new(hashed_state),
1641        }))
1642    }
1643
1644    /// Return sealed block from database or in-memory state by hash.
1645    fn sealed_header_by_hash(
1646        &self,
1647        hash: B256,
1648    ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
1649        // check memory first
1650        let block =
1651            self.state.tree_state.block_by_hash(hash).map(|block| block.as_ref().clone().header);
1652
1653        if block.is_some() {
1654            Ok(block)
1655        } else {
1656            self.provider.sealed_header_by_hash(hash)
1657        }
1658    }
1659
1660    /// Return block from database or in-memory state by hash.
1661    fn block_by_hash(&self, hash: B256) -> ProviderResult<Option<N::Block>> {
1662        // check database first
1663        let mut block = self.provider.block_by_hash(hash)?;
1664        if block.is_none() {
1665            // Note: it's fine to return the unsealed block because the caller already has
1666            // the hash
1667            block = self
1668                .state
1669                .tree_state
1670                .block_by_hash(hash)
1671                // TODO: clone for compatibility. should we return an Arc here?
1672                .map(|block| block.as_ref().clone().unseal());
1673        }
1674        Ok(block)
1675    }
1676
1677    /// Returns the state provider for the requested block hash.
1678    ///
1679    /// This merges the state of all blocks that are part of the chain that the requested block is
1680    /// the head of and are not yet persisted on disk. This includes all blocks that connect back to
1681    /// a canonical block on disk.
1682    ///
1683    /// Returns `None` if the state for the requested hash is not found, this happens if the
1684    /// requested state belongs to a block that is not connected to the canonical chain.
1685    ///
1686    /// Returns an error if we failed to fetch the state from the database.
1687    fn state_provider(&self, hash: B256) -> ProviderResult<Option<StateProviderBox>> {
1688        if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
1689            debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory");
1690            // the block leads back to the canonical chain
1691            let historical = self.provider.state_by_block_hash(historical)?;
1692            return Ok(Some(Box::new(MemoryOverlayStateProvider::new(historical, blocks))))
1693        }
1694
1695        // the hash could belong to an unknown block or a persisted block
1696        if let Some(header) = self.provider.header(&hash)? {
1697            debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database");
1698            // the block is known and persisted
1699            let historical = self.provider.state_by_block_hash(hash)?;
1700            return Ok(Some(historical))
1701        }
1702
1703        debug!(target: "engine::tree", %hash, "no canonical state found for block");
1704
1705        Ok(None)
1706    }
1707
1708    /// Return the parent hash of the lowest buffered ancestor for the requested block, if there
1709    /// are any buffered ancestors. If there are no buffered ancestors, and the block itself does
1710    /// not exist in the buffer, this returns the hash that is passed in.
1711    ///
1712    /// Returns the parent hash of the block itself if the block is buffered and has no other
1713    /// buffered ancestors.
1714    fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
1715        self.state
1716            .buffer
1717            .lowest_ancestor(&hash)
1718            .map(|block| block.parent_hash())
1719            .unwrap_or_else(|| hash)
1720    }
1721
1722    /// If validation fails, the response MUST contain the latest valid hash:
1723    ///
1724    ///   - The block hash of the ancestor of the invalid payload satisfying the following two
1725    ///     conditions:
1726    ///     - It is fully validated and deemed VALID
1727    ///     - Any other ancestor of the invalid payload with a higher blockNumber is INVALID
1728    ///   - 0x0000000000000000000000000000000000000000000000000000000000000000 if the above
1729    ///     conditions are satisfied by a `PoW` block.
1730    ///   - null if client software cannot determine the ancestor of the invalid payload satisfying
1731    ///     the above conditions.
1732    fn latest_valid_hash_for_invalid_payload(
1733        &mut self,
1734        parent_hash: B256,
1735    ) -> ProviderResult<Option<B256>> {
1736        // Check if parent exists in side chain or in canonical chain.
1737        if self.block_by_hash(parent_hash)?.is_some() {
1738            return Ok(Some(parent_hash))
1739        }
1740
1741        // iterate over ancestors in the invalid cache
1742        // until we encounter the first valid ancestor
1743        let mut current_hash = parent_hash;
1744        let mut current_block = self.state.invalid_headers.get(&current_hash);
1745        while let Some(block_with_parent) = current_block {
1746            current_hash = block_with_parent.parent;
1747            current_block = self.state.invalid_headers.get(&current_hash);
1748
1749            // If current_header is None, then the current_hash does not have an invalid
1750            // ancestor in the cache, check its presence in blockchain tree
1751            if current_block.is_none() && self.block_by_hash(current_hash)?.is_some() {
1752                return Ok(Some(current_hash))
1753            }
1754        }
1755        Ok(None)
1756    }
1757
1758    /// Prepares the invalid payload response for the given hash, checking the
1759    /// database for the parent hash and populating the payload status with the latest valid hash
1760    /// according to the engine api spec.
1761    fn prepare_invalid_response(&mut self, mut parent_hash: B256) -> ProviderResult<PayloadStatus> {
1762        // Edge case: the `latestValid` field is the zero hash if the parent block is the terminal
1763        // PoW block, which we need to identify by looking at the parent's block difficulty
1764        if let Some(parent) = self.block_by_hash(parent_hash)? {
1765            if !parent.header().difficulty().is_zero() {
1766                parent_hash = B256::ZERO;
1767            }
1768        }
1769
1770        let valid_parent_hash = self.latest_valid_hash_for_invalid_payload(parent_hash)?;
1771        Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
1772            validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
1773        })
1774        .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
1775    }
1776
1777    /// Returns true if the given hash is the last received sync target block.
1778    ///
1779    /// See [`ForkchoiceStateTracker::sync_target_state`]
1780    fn is_sync_target_head(&self, block_hash: B256) -> bool {
1781        if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
1782            return target.head_block_hash == block_hash
1783        }
1784        false
1785    }
1786
1787    /// Checks if the given `check` hash points to an invalid header, inserting the given `head`
1788    /// block into the invalid header cache if the `check` hash has a known invalid ancestor.
1789    ///
1790    /// Returns a payload status response according to the engine API spec if the block is known to
1791    /// be invalid.
1792    fn check_invalid_ancestor_with_head(
1793        &mut self,
1794        check: B256,
1795        head: B256,
1796    ) -> ProviderResult<Option<PayloadStatus>> {
1797        // check if the check hash was previously marked as invalid
1798        let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };
1799
1800        // populate the latest valid hash field
1801        let status = self.prepare_invalid_response(header.parent)?;
1802
1803        // insert the head block into the invalid header cache
1804        self.state.invalid_headers.insert_with_invalid_ancestor(head, header);
1805
1806        Ok(Some(status))
1807    }
1808
1809    /// Checks if the given `head` points to an invalid header, which requires a specific response
1810    /// to a forkchoice update.
1811    fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
1812        // check if the head was previously marked as invalid
1813        let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) };
1814        // populate the latest valid hash field
1815        Ok(Some(self.prepare_invalid_response(header.parent)?))
1816    }
1817
1818    /// Validate if block is correct and satisfies all the consensus rules that concern the header
1819    /// and block body itself.
1820    fn validate_block(
1821        &self,
1822        block: &SealedBlockWithSenders<N::Block>,
1823    ) -> Result<(), ConsensusError> {
1824        if let Err(e) = self.consensus.validate_header_with_total_difficulty(block, U256::MAX) {
1825            error!(
1826                target: "engine::tree",
1827                ?block,
1828                "Failed to validate total difficulty for block {}: {e}",
1829                block.header.hash()
1830            );
1831            return Err(e)
1832        }
1833
1834        if let Err(e) = self.consensus.validate_header(block) {
1835            error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.header.hash());
1836            return Err(e)
1837        }
1838
1839        if let Err(e) = self.consensus.validate_block_pre_execution(block) {
1840            error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.header.hash());
1841            return Err(e)
1842        }
1843
1844        Ok(())
1845    }
1846
1847    /// Attempts to connect any buffered blocks that are connected to the given parent hash.
1848    #[instrument(level = "trace", skip(self), target = "engine::tree")]
1849    fn try_connect_buffered_blocks(
1850        &mut self,
1851        parent: BlockNumHash,
1852    ) -> Result<(), InsertBlockFatalError> {
1853        let blocks = self.state.buffer.remove_block_with_children(&parent.hash);
1854
1855        if blocks.is_empty() {
1856            // nothing to append
1857            return Ok(())
1858        }
1859
1860        let now = Instant::now();
1861        let block_count = blocks.len();
1862        for child in blocks {
1863            let child_num_hash = child.num_hash();
1864            match self.insert_block(child) {
1865                Ok(res) => {
1866                    debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
1867                    if self.is_sync_target_head(child_num_hash.hash) &&
1868                        matches!(res, InsertPayloadOk2::Inserted(BlockStatus2::Valid))
1869                    {
1870                        self.make_canonical(child_num_hash.hash)?;
1871                    }
1872                }
1873                Err(err) => {
1874                    debug!(target: "engine::tree", ?err, "failed to connect buffered block to tree");
1875                    if let Err(fatal) = self.on_insert_block_error(err) {
1876                        warn!(target: "engine::tree", %fatal, "fatal error occurred while connecting buffered blocks");
1877                        return Err(fatal)
1878                    }
1879                }
1880            }
1881        }
1882
1883        debug!(target: "engine::tree", elapsed = ?now.elapsed(), %block_count, "connected buffered blocks");
1884        Ok(())
1885    }
1886
1887    /// Attempts to recover the block's senders and then buffers it.
1888    ///
1889    /// Returns an error if sender recovery failed or inserting into the buffer failed.
1890    fn buffer_block_without_senders(
1891        &mut self,
1892        block: SealedBlockFor<N::Block>,
1893    ) -> Result<(), InsertBlockErrorTwo<N::Block>> {
1894        match block.try_seal_with_senders() {
1895            Ok(block) => self.buffer_block(block),
1896            Err(block) => Err(InsertBlockErrorTwo::sender_recovery_error(block)),
1897        }
1898    }
1899
1900    /// Pre-validates the block and inserts it into the buffer.
1901    fn buffer_block(
1902        &mut self,
1903        block: SealedBlockWithSenders<N::Block>,
1904    ) -> Result<(), InsertBlockErrorTwo<N::Block>> {
1905        if let Err(err) = self.validate_block(&block) {
1906            return Err(InsertBlockErrorTwo::consensus_error(err, block.block))
1907        }
1908        self.state.buffer.insert_block(block);
1909        Ok(())
1910    }
1911
1912    /// Returns true if the distance from the local tip to the block is greater than the configured
1913    /// threshold.
1914    ///
1915    /// If the `local_tip` is greater than the `block`, then this will return false.
1916    #[inline]
1917    const fn exceeds_backfill_run_threshold(&self, local_tip: u64, block: u64) -> bool {
1918        block > local_tip && block - local_tip > MIN_BLOCKS_FOR_PIPELINE_RUN
1919    }
1920
1921    /// Returns how far the local tip is from the given block. If the local tip is at the same
1922    /// height or its block number is greater than the given block, this returns None.
1923    #[inline]
1924    const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
1925        if block > local_tip {
1926            Some(block - local_tip)
1927        } else {
1928            None
1929        }
1930    }
1931
1932    /// Returns the target hash to sync to if the distance from the local tip to the block is
1933    /// greater than the threshold and we're not synced to the finalized block yet (if we've seen
1934    /// that block already).
1935    ///
1936    /// If this is invoked after a new block has been downloaded, the downloaded block could be the
1937    /// (missing) finalized block.
1938    fn backfill_sync_target(
1939        &self,
1940        canonical_tip_num: u64,
1941        target_block_number: u64,
1942        downloaded_block: Option<BlockNumHash>,
1943    ) -> Option<B256> {
1944        let sync_target_state = self.state.forkchoice_state_tracker.sync_target_state();
1945
1946        // check if the distance exceeds the threshold for backfill sync
1947        let mut exceeds_backfill_threshold =
1948            self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number);
1949
1950        // check if the downloaded block is the tracked finalized block
1951        if let Some(buffered_finalized) = sync_target_state
1952            .as_ref()
1953            .and_then(|state| self.state.buffer.block(&state.finalized_block_hash))
1954        {
1955            // if we have buffered the finalized block, we should check how far
1956            // we're off
1957            exceeds_backfill_threshold =
1958                self.exceeds_backfill_run_threshold(canonical_tip_num, buffered_finalized.number());
1959        }
1960
1961        // If this is invoked after we downloaded a block we can check if this block is the
1962        // finalized block
1963        if let (Some(downloaded_block), Some(ref state)) = (downloaded_block, sync_target_state) {
1964            if downloaded_block.hash == state.finalized_block_hash {
1965                // we downloaded the finalized block and can now check how far we're off
1966                exceeds_backfill_threshold =
1967                    self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number);
1968            }
1969        }
1970
1971        // if the number of missing blocks is greater than the max, trigger backfill
1972        if exceeds_backfill_threshold {
1973            if let Some(state) = sync_target_state {
1974                // if we have already canonicalized the finalized block, we should skip backfill
1975                match self.provider.header_by_hash_or_number(state.finalized_block_hash.into()) {
1976                    Err(err) => {
1977                        warn!(target: "engine::tree", %err, "Failed to get finalized block header");
1978                    }
1979                    Ok(None) => {
1980                        // ensure the finalized block is known (not the zero hash)
1981                        if !state.finalized_block_hash.is_zero() {
1982                            // we don't have the block yet and the distance exceeds the allowed
1983                            // threshold
1984                            return Some(state.finalized_block_hash)
1985                        }
1986
1987                        // OPTIMISTIC SYNCING
1988                        //
1989                        // It can happen when the node is doing an
1990                        // optimistic sync, where the CL has no knowledge of the finalized hash,
1991                        // but is expecting the EL to sync as high
1992                        // as possible before finalizing.
1993                        //
1994                        // This usually doesn't happen on ETH mainnet since CLs use the more
1995                        // secure checkpoint syncing.
1996                        //
1997                        // However, optimism chains will do this. The risk of a reorg is however
1998                        // low.
1999                        debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
2000                        return Some(state.head_block_hash)
2001                    }
2002                    Ok(Some(_)) => {
2003                        // we're fully synced to the finalized block
2004                    }
2005                }
2006            }
2007        }
2008
2009        None
2010    }
2011
2012    /// This determines whether or not we should remove blocks from the chain, based on a canonical
2013    /// chain update.
2014    ///
2015    /// If the chain update is a reorg:
2016    /// * is the new chain behind the last persisted block, or
2017    /// * if the root of the new chain is at the same height as the last persisted block, is it a
2018    ///   different block
2019    ///
2020    /// If either of these are true, then this returns the height of the first block. Otherwise,
2021    /// this returns [`None`]. This should be used to check whether or not we should be sending a
2022    /// remove command to the persistence task.
2023    fn find_disk_reorg(&self, chain_update: &NewCanonicalChain<N>) -> Option<u64> {
2024        let NewCanonicalChain::Reorg { new, old: _ } = chain_update else { return None };
2025
2026        let BlockNumHash { number: new_num, hash: new_hash } =
2027            new.first().map(|block| block.block.num_hash())?;
2028
2029        match new_num.cmp(&self.persistence_state.last_persisted_block.number) {
2030            Ordering::Greater => {
2031                // new number is above the last persisted block so the reorg can be performed
2032                // entirely in memory
2033                None
2034            }
2035            Ordering::Equal => {
2036                // new number is the same, if the hash is the same then we should not need to remove
2037                // any blocks
2038                (self.persistence_state.last_persisted_block.hash != new_hash).then_some(new_num)
2039            }
2040            Ordering::Less => {
2041                // this means we are below the last persisted block and must remove on disk blocks
2042                Some(new_num)
2043            }
2044        }
2045    }
2046
2047    /// Invoked when we the canonical chain has been updated.
2048    ///
2049    /// This is invoked on a valid forkchoice update, or if we can make the target block canonical.
2050    fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain<N>) {
2051        trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks =  %chain_update.reorged_block_count(), "applying new chain update");
2052        let start = Instant::now();
2053
2054        // schedule a remove_above call if we have an on-disk reorg
2055        if let Some(height) = self.find_disk_reorg(&chain_update) {
2056            // calculate the new tip by subtracting one from the lowest part of the chain
2057            let new_tip_num = height.saturating_sub(1);
2058            self.persistence_state.schedule_removal(new_tip_num);
2059        }
2060
2061        // update the tracked canonical head
2062        self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
2063
2064        let tip = chain_update.tip().header.clone();
2065        let notification = chain_update.to_chain_notification();
2066
2067        // reinsert any missing reorged blocks
2068        if let NewCanonicalChain::Reorg { new, old } = &chain_update {
2069            let new_first = new.first().map(|first| first.block.num_hash());
2070            let old_first = old.first().map(|first| first.block.num_hash());
2071            trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
2072
2073            self.update_reorg_metrics(old.len());
2074            self.reinsert_reorged_blocks(new.clone());
2075            self.reinsert_reorged_blocks(old.clone());
2076        }
2077
2078        // update the tracked in-memory state with the new chain
2079        self.canonical_in_memory_state.update_chain(chain_update);
2080        self.canonical_in_memory_state.set_canonical_head(tip.clone());
2081
2082        // Update metrics based on new tip
2083        self.metrics.tree.canonical_chain_height.set(tip.number() as f64);
2084
2085        // sends an event to all active listeners about the new canonical chain
2086        self.canonical_in_memory_state.notify_canon_state(notification);
2087
2088        // emit event
2089        self.emit_event(BeaconConsensusEngineEvent::CanonicalChainCommitted(
2090            Box::new(tip),
2091            start.elapsed(),
2092        ));
2093    }
2094
2095    /// This updates metrics based on the given reorg length.
2096    fn update_reorg_metrics(&self, old_chain_length: usize) {
2097        self.metrics.tree.reorgs.increment(1);
2098        self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
2099    }
2100
2101    /// This reinserts any blocks in the new chain that do not already exist in the tree
2102    fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlock<N>>) {
2103        for block in new_chain {
2104            if self.state.tree_state.executed_block_by_hash(block.block.hash()).is_none() {
2105                trace!(target: "engine::tree", num=?block.block.number(), hash=?block.block.hash(), "Reinserting block into tree state");
2106                self.state.tree_state.insert_executed(block);
2107            }
2108        }
2109    }
2110
2111    /// This handles downloaded blocks that are shown to be disconnected from the canonical chain.
2112    ///
2113    /// This mainly compares the missing parent of the downloaded block with the current canonical
2114    /// tip, and decides whether or not backfill sync should be triggered.
2115    fn on_disconnected_downloaded_block(
2116        &self,
2117        downloaded_block: BlockNumHash,
2118        missing_parent: BlockNumHash,
2119        head: BlockNumHash,
2120    ) -> Option<TreeEvent> {
2121        // compare the missing parent with the canonical tip
2122        if let Some(target) =
2123            self.backfill_sync_target(head.number, missing_parent.number, Some(downloaded_block))
2124        {
2125            trace!(target: "engine::tree", %target, "triggering backfill on downloaded block");
2126            return Some(TreeEvent::BackfillAction(BackfillAction::Start(target.into())));
2127        }
2128
2129        // continue downloading the missing parent
2130        //
2131        // this happens if either:
2132        //  * the missing parent block num < canonical tip num
2133        //    * this case represents a missing block on a fork that is shorter than the canonical
2134        //      chain
2135        //  * the missing parent block num >= canonical tip num, but the number of missing blocks is
2136        //    less than the backfill threshold
2137        //    * this case represents a potentially long range of blocks to download and execute
2138        let request = if let Some(distance) =
2139            self.distance_from_local_tip(head.number, missing_parent.number)
2140        {
2141            trace!(target: "engine::tree", %distance, missing=?missing_parent, "downloading missing parent block range");
2142            DownloadRequest::BlockRange(missing_parent.hash, distance)
2143        } else {
2144            trace!(target: "engine::tree", missing=?missing_parent, "downloading missing parent block");
2145            // This happens when the missing parent is on an outdated
2146            // sidechain and we can only download the missing block itself
2147            DownloadRequest::single_block(missing_parent.hash)
2148        };
2149
2150        Some(TreeEvent::Download(request))
2151    }
2152
2153    /// Invoked with a block downloaded from the network
2154    ///
2155    /// Returns an event with the appropriate action to take, such as:
2156    ///  - download more missing blocks
2157    ///  - try to canonicalize the target if the `block` is the tracked target (head) block.
2158    #[instrument(level = "trace", skip_all, fields(block_hash = %block.hash(), block_num = %block.number(),), target = "engine::tree")]
2159    fn on_downloaded_block(
2160        &mut self,
2161        block: SealedBlockWithSenders<N::Block>,
2162    ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2163        let block_num_hash = block.num_hash();
2164        let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
2165        if self
2166            .check_invalid_ancestor_with_head(lowest_buffered_ancestor, block_num_hash.hash)?
2167            .is_some()
2168        {
2169            return Ok(None)
2170        }
2171
2172        if !self.backfill_sync_state.is_idle() {
2173            return Ok(None)
2174        }
2175
2176        // try to append the block
2177        match self.insert_block(block) {
2178            Ok(InsertPayloadOk2::Inserted(BlockStatus2::Valid)) => {
2179                if self.is_sync_target_head(block_num_hash.hash) {
2180                    trace!(target: "engine::tree", "appended downloaded sync target block");
2181
2182                    // we just inserted the current sync target block, we can try to make it
2183                    // canonical
2184                    return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
2185                        sync_target_head: block_num_hash.hash,
2186                    })))
2187                }
2188                trace!(target: "engine::tree", "appended downloaded block");
2189                self.try_connect_buffered_blocks(block_num_hash)?;
2190            }
2191            Ok(InsertPayloadOk2::Inserted(BlockStatus2::Disconnected {
2192                head,
2193                missing_ancestor,
2194            })) => {
2195                // block is not connected to the canonical head, we need to download
2196                // its missing branch first
2197                return Ok(self.on_disconnected_downloaded_block(
2198                    block_num_hash,
2199                    missing_ancestor,
2200                    head,
2201                ))
2202            }
2203            Ok(InsertPayloadOk2::AlreadySeen(_)) => {
2204                trace!(target: "engine::tree", "downloaded block already executed");
2205            }
2206            Err(err) => {
2207                debug!(target: "engine::tree", err=%err.kind(), "failed to insert downloaded block");
2208                if let Err(fatal) = self.on_insert_block_error(err) {
2209                    warn!(target: "engine::tree", %fatal, "fatal error occurred while inserting downloaded block");
2210                    return Err(fatal)
2211                }
2212            }
2213        }
2214        Ok(None)
2215    }
2216
2217    fn insert_block_without_senders(
2218        &mut self,
2219        block: SealedBlockFor<N::Block>,
2220    ) -> Result<InsertPayloadOk2, InsertBlockErrorTwo<N::Block>> {
2221        match block.try_seal_with_senders() {
2222            Ok(block) => self.insert_block(block),
2223            Err(block) => Err(InsertBlockErrorTwo::sender_recovery_error(block)),
2224        }
2225    }
2226
2227    fn insert_block(
2228        &mut self,
2229        block: SealedBlockWithSenders<N::Block>,
2230    ) -> Result<InsertPayloadOk2, InsertBlockErrorTwo<N::Block>> {
2231        self.insert_block_inner(block.clone())
2232            .map_err(|kind| InsertBlockErrorTwo::new(block.block, kind))
2233    }
2234
2235    fn insert_block_inner(
2236        &mut self,
2237        block: SealedBlockWithSenders<N::Block>,
2238    ) -> Result<InsertPayloadOk2, InsertBlockErrorKindTwo> {
2239        debug!(target: "engine::tree", block=?block.num_hash(), parent = ?block.parent_hash(), state_root = ?block.state_root(), "Inserting new block into tree");
2240
2241        if self.block_by_hash(block.hash())?.is_some() {
2242            return Ok(InsertPayloadOk2::AlreadySeen(BlockStatus2::Valid))
2243        }
2244
2245        let start = Instant::now();
2246
2247        trace!(target: "engine::tree", block=?block.num_hash(), "Validating block consensus");
2248        // validate block consensus rules
2249        self.validate_block(&block)?;
2250
2251        trace!(target: "engine::tree", block=?block.num_hash(), parent=?block.parent_hash(), "Fetching block state provider");
2252        let Some(state_provider) = self.state_provider(block.parent_hash())? else {
2253            // we don't have the state required to execute this block, buffering it and find the
2254            // missing parent block
2255            let missing_ancestor = self
2256                .state
2257                .buffer
2258                .lowest_ancestor(&block.parent_hash())
2259                .map(|block| block.parent_num_hash())
2260                .unwrap_or_else(|| block.parent_num_hash());
2261
2262            self.state.buffer.insert_block(block);
2263
2264            return Ok(InsertPayloadOk2::Inserted(BlockStatus2::Disconnected {
2265                head: self.state.tree_state.current_canonical_head,
2266                missing_ancestor,
2267            }))
2268        };
2269
2270        // now validate against the parent
2271        let parent_block = self.sealed_header_by_hash(block.parent_hash())?.ok_or_else(|| {
2272            InsertBlockErrorKindTwo::Provider(ProviderError::HeaderNotFound(
2273                block.parent_hash().into(),
2274            ))
2275        })?;
2276        if let Err(e) = self.consensus.validate_header_against_parent(&block, &parent_block) {
2277            warn!(target: "engine::tree", ?block, "Failed to validate header {} against parent: {e}", block.header.hash());
2278            return Err(e.into())
2279        }
2280
2281        trace!(target: "engine::tree", block=?block.num_hash(), "Executing block");
2282        let executor = self.executor_provider.executor(StateProviderDatabase::new(&state_provider));
2283
2284        let block_number = block.number();
2285        let block_hash = block.hash();
2286        let sealed_block = Arc::new(block.block.clone());
2287        let block = block.unseal();
2288
2289        let exec_time = Instant::now();
2290
2291        // TODO: create StateRootTask with the receiving end of a channel and
2292        // pass the sending end of the channel to the state hook.
2293        let noop_state_hook = |_state: &EvmState| {};
2294        let output = self.metrics.executor.execute_metered(
2295            executor,
2296            (&block, U256::MAX).into(),
2297            Box::new(noop_state_hook),
2298        )?;
2299
2300        trace!(target: "engine::tree", elapsed=?exec_time.elapsed(), ?block_number, "Executed block");
2301
2302        if let Err(err) = self.consensus.validate_block_post_execution(
2303            &block,
2304            PostExecutionInput::new(&output.receipts, &output.requests),
2305        ) {
2306            // call post-block hook
2307            self.invalid_block_hook.on_invalid_block(
2308                &parent_block,
2309                &block.seal_slow(),
2310                &output,
2311                None,
2312            );
2313            return Err(err.into())
2314        }
2315
2316        let hashed_state = self.provider.hashed_post_state(&output.state);
2317
2318        trace!(target: "engine::tree", block=?sealed_block.num_hash(), "Calculating block state root");
2319        let root_time = Instant::now();
2320        let mut state_root_result = None;
2321
2322        // TODO: switch to calculate state root using `StateRootTask`.
2323
2324        // We attempt to compute state root in parallel if we are currently not persisting anything
2325        // to database. This is safe, because the database state cannot change until we
2326        // finish parallel computation. It is important that nothing is being persisted as
2327        // we are computing in parallel, because we initialize a different database transaction
2328        // per thread and it might end up with a different view of the database.
2329        let persistence_in_progress = self.persistence_state.in_progress();
2330        if !persistence_in_progress {
2331            state_root_result = match self
2332                .compute_state_root_parallel(block.header().parent_hash(), &hashed_state)
2333            {
2334                Ok((state_root, trie_output)) => Some((state_root, trie_output)),
2335                Err(ParallelStateRootError::Provider(ProviderError::ConsistentView(error))) => {
2336                    debug!(target: "engine", %error, "Parallel state root computation failed consistency check, falling back");
2337                    None
2338                }
2339                Err(error) => return Err(InsertBlockErrorKindTwo::Other(Box::new(error))),
2340            };
2341        }
2342
2343        let (state_root, trie_output) = if let Some(result) = state_root_result {
2344            result
2345        } else {
2346            debug!(target: "engine::tree", block=?sealed_block.num_hash(), persistence_in_progress, "Failed to compute state root in parallel");
2347            state_provider.state_root_with_updates(hashed_state.clone())?
2348        };
2349
2350        if state_root != block.header().state_root() {
2351            // call post-block hook
2352            self.invalid_block_hook.on_invalid_block(
2353                &parent_block,
2354                &block.clone().seal_slow(),
2355                &output,
2356                Some((&trie_output, state_root)),
2357            );
2358            return Err(ConsensusError::BodyStateRootDiff(
2359                GotExpected { got: state_root, expected: block.header().state_root() }.into(),
2360            )
2361            .into())
2362        }
2363
2364        let root_elapsed = root_time.elapsed();
2365        self.metrics.block_validation.record_state_root(&trie_output, root_elapsed.as_secs_f64());
2366        debug!(target: "engine::tree", ?root_elapsed, block=?sealed_block.num_hash(), "Calculated state root");
2367
2368        let executed: ExecutedBlock<N> = ExecutedBlock {
2369            block: sealed_block.clone(),
2370            senders: Arc::new(block.senders),
2371            execution_output: Arc::new(ExecutionOutcome::from((output, block_number))),
2372            hashed_state: Arc::new(hashed_state),
2373            trie: Arc::new(trie_output),
2374        };
2375
2376        if self.state.tree_state.canonical_block_hash() == executed.block().parent_hash() {
2377            debug!(target: "engine::tree", pending = ?executed.block().num_hash() ,"updating pending block");
2378            // if the parent is the canonical head, we can insert the block as the pending block
2379            self.canonical_in_memory_state.set_pending_block(executed.clone());
2380        }
2381
2382        self.state.tree_state.insert_executed(executed);
2383        self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
2384
2385        // emit insert event
2386        let elapsed = start.elapsed();
2387        let engine_event = if self.is_fork(block_hash)? {
2388            BeaconConsensusEngineEvent::ForkBlockAdded(sealed_block, elapsed)
2389        } else {
2390            BeaconConsensusEngineEvent::CanonicalBlockAdded(sealed_block, elapsed)
2391        };
2392        self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
2393
2394        debug!(target: "engine::tree", block=?BlockNumHash::new(block_number, block_hash), "Finished inserting block");
2395        Ok(InsertPayloadOk2::Inserted(BlockStatus2::Valid))
2396    }
2397
2398    /// Compute state root for the given hashed post state in parallel.
2399    ///
2400    /// # Returns
2401    ///
2402    /// Returns `Ok(_)` if computed successfully.
2403    /// Returns `Err(_)` if error was encountered during computation.
2404    /// `Err(ProviderError::ConsistentView(_))` can be safely ignored and fallback computation
2405    /// should be used instead.
2406    fn compute_state_root_parallel(
2407        &self,
2408        parent_hash: B256,
2409        hashed_state: &HashedPostState,
2410    ) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
2411        // TODO: when we switch to calculate state root using `StateRootTask` this
2412        // method can be still useful to calculate the required `TrieInput` to
2413        // create the task.
2414        let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
2415        let mut input = TrieInput::default();
2416
2417        if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(parent_hash) {
2418            debug!(target: "engine::tree", %parent_hash, %historical, "Calculating state root in parallel, parent found in memory");
2419            // Retrieve revert state for historical block.
2420            let revert_state = consistent_view.revert_state(historical)?;
2421            input.append(revert_state);
2422
2423            // Extend with contents of parent in-memory blocks.
2424            for block in blocks.iter().rev() {
2425                input.append_cached_ref(block.trie_updates(), block.hashed_state())
2426            }
2427        } else {
2428            // The block attaches to canonical persisted parent.
2429            debug!(target: "engine::tree", %parent_hash, "Calculating state root in parallel, parent found in disk");
2430            let revert_state = consistent_view.revert_state(parent_hash)?;
2431            input.append(revert_state);
2432        }
2433
2434        // Extend with block we are validating root for.
2435        input.append_ref(hashed_state);
2436
2437        ParallelStateRoot::new(consistent_view, input).incremental_root_with_updates()
2438    }
2439
2440    /// Handles an error that occurred while inserting a block.
2441    ///
2442    /// If this is a validation error this will mark the block as invalid.
2443    ///
2444    /// Returns the proper payload status response if the block is invalid.
2445    fn on_insert_block_error(
2446        &mut self,
2447        error: InsertBlockErrorTwo<N::Block>,
2448    ) -> Result<PayloadStatus, InsertBlockFatalError> {
2449        let (block, error) = error.split();
2450
2451        // if invalid block, we check the validation error. Otherwise return the fatal
2452        // error.
2453        let validation_err = error.ensure_validation_error()?;
2454
2455        // If the error was due to an invalid payload, the payload is added to the
2456        // invalid headers cache and `Ok` with [PayloadStatusEnum::Invalid] is
2457        // returned.
2458        warn!(target: "engine::tree", invalid_hash=?block.hash(), invalid_number=?block.number(), %validation_err, "Invalid block error on new payload");
2459        let latest_valid_hash = if validation_err.is_block_pre_merge() {
2460            // zero hash must be returned if block is pre-merge
2461            Some(B256::ZERO)
2462        } else {
2463            self.latest_valid_hash_for_invalid_payload(block.parent_hash())?
2464        };
2465
2466        // keep track of the invalid header
2467        self.state.invalid_headers.insert(block.header.block_with_parent());
2468        Ok(PayloadStatus::new(
2469            PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() },
2470            latest_valid_hash,
2471        ))
2472    }
2473
2474    /// Attempts to find the header for the given block hash if it is canonical.
2475    pub fn find_canonical_header(
2476        &self,
2477        hash: B256,
2478    ) -> Result<Option<SealedHeader<N::BlockHeader>>, ProviderError> {
2479        let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
2480
2481        if canonical.is_none() {
2482            canonical = self.provider.header(&hash)?.map(|header| SealedHeader::new(header, hash));
2483        }
2484
2485        Ok(canonical)
2486    }
2487
2488    /// Updates the tracked finalized block if we have it.
2489    fn update_finalized_block(
2490        &self,
2491        finalized_block_hash: B256,
2492    ) -> Result<(), OnForkChoiceUpdated> {
2493        if finalized_block_hash.is_zero() {
2494            return Ok(())
2495        }
2496
2497        match self.find_canonical_header(finalized_block_hash) {
2498            Ok(None) => {
2499                debug!(target: "engine::tree", "Finalized block not found in canonical chain");
2500                // if the finalized block is not known, we can't update the finalized block
2501                return Err(OnForkChoiceUpdated::invalid_state())
2502            }
2503            Ok(Some(finalized)) => {
2504                if Some(finalized.num_hash()) !=
2505                    self.canonical_in_memory_state.get_finalized_num_hash()
2506                {
2507                    // we're also persisting the finalized block on disk so we can reload it on
2508                    // restart this is required by optimism which queries the finalized block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
2509                    let _ = self.persistence.save_finalized_block_number(finalized.number());
2510                    self.canonical_in_memory_state.set_finalized(finalized);
2511                }
2512            }
2513            Err(err) => {
2514                error!(target: "engine::tree", %err, "Failed to fetch finalized block header");
2515            }
2516        }
2517
2518        Ok(())
2519    }
2520
2521    /// Updates the tracked safe block if we have it
2522    fn update_safe_block(&self, safe_block_hash: B256) -> Result<(), OnForkChoiceUpdated> {
2523        if safe_block_hash.is_zero() {
2524            return Ok(())
2525        }
2526
2527        match self.find_canonical_header(safe_block_hash) {
2528            Ok(None) => {
2529                debug!(target: "engine::tree", "Safe block not found in canonical chain");
2530                // if the safe block is not known, we can't update the safe block
2531                return Err(OnForkChoiceUpdated::invalid_state())
2532            }
2533            Ok(Some(safe)) => {
2534                if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
2535                    // we're also persisting the safe block on disk so we can reload it on
2536                    // restart this is required by optimism which queries the safe block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
2537                    let _ = self.persistence.save_safe_block_number(safe.number());
2538                    self.canonical_in_memory_state.set_safe(safe);
2539                }
2540            }
2541            Err(err) => {
2542                error!(target: "engine::tree", %err, "Failed to fetch safe block header");
2543            }
2544        }
2545
2546        Ok(())
2547    }
2548
2549    /// Ensures that the given forkchoice state is consistent, assuming the head block has been
2550    /// made canonical.
2551    ///
2552    /// If the forkchoice state is consistent, this will return Ok(()). Otherwise, this will
2553    /// return an instance of [`OnForkChoiceUpdated`] that is INVALID.
2554    ///
2555    /// This also updates the safe and finalized blocks in the [`CanonicalInMemoryState`], if they
2556    /// are consistent with the head block.
2557    fn ensure_consistent_forkchoice_state(
2558        &self,
2559        state: ForkchoiceState,
2560    ) -> Result<(), OnForkChoiceUpdated> {
2561        // Ensure that the finalized block, if not zero, is known and in the canonical chain
2562        // after the head block is canonicalized.
2563        //
2564        // This ensures that the finalized block is consistent with the head block, i.e. the
2565        // finalized block is an ancestor of the head block.
2566        self.update_finalized_block(state.finalized_block_hash)?;
2567
2568        // Also ensure that the safe block, if not zero, is known and in the canonical chain
2569        // after the head block is canonicalized.
2570        //
2571        // This ensures that the safe block is consistent with the head block, i.e. the safe
2572        // block is an ancestor of the head block.
2573        self.update_safe_block(state.safe_block_hash)
2574    }
2575
2576    /// Pre-validate forkchoice update and check whether it can be processed.
2577    ///
2578    /// This method returns the update outcome if validation fails or
2579    /// the node is syncing and the update cannot be processed at the moment.
2580    fn pre_validate_forkchoice_update(
2581        &mut self,
2582        state: ForkchoiceState,
2583    ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
2584        if state.head_block_hash.is_zero() {
2585            return Ok(Some(OnForkChoiceUpdated::invalid_state()))
2586        }
2587
2588        // check if the new head hash is connected to any ancestor that we previously marked as
2589        // invalid
2590        let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
2591        if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
2592            return Ok(Some(OnForkChoiceUpdated::with_invalid(status)))
2593        }
2594
2595        if !self.backfill_sync_state.is_idle() {
2596            // We can only process new forkchoice updates if the pipeline is idle, since it requires
2597            // exclusive access to the database
2598            trace!(target: "engine::tree", "Pipeline is syncing, skipping forkchoice update");
2599            return Ok(Some(OnForkChoiceUpdated::syncing()))
2600        }
2601
2602        Ok(None)
2603    }
2604
2605    /// Validates the payload attributes with respect to the header and fork choice state.
2606    ///
2607    /// Note: At this point, the fork choice update is considered to be VALID, however, we can still
2608    /// return an error if the payload attributes are invalid.
2609    fn process_payload_attributes(
2610        &self,
2611        attrs: T::PayloadAttributes,
2612        head: &N::BlockHeader,
2613        state: ForkchoiceState,
2614        version: EngineApiMessageVersion,
2615    ) -> OnForkChoiceUpdated {
2616        if let Err(err) =
2617            self.payload_validator.validate_payload_attributes_against_header(&attrs, head)
2618        {
2619            warn!(target: "engine::tree", %err, ?head, "Invalid payload attributes");
2620            return OnForkChoiceUpdated::invalid_payload_attributes()
2621        }
2622
2623        // 8. Client software MUST begin a payload build process building on top of
2624        //    forkchoiceState.headBlockHash and identified via buildProcessId value if
2625        //    payloadAttributes is not null and the forkchoice state has been updated successfully.
2626        //    The build process is specified in the Payload building section.
2627        match <T::PayloadBuilderAttributes as PayloadBuilderAttributes>::try_new(
2628            state.head_block_hash,
2629            attrs,
2630            version as u8,
2631        ) {
2632            Ok(attributes) => {
2633                // send the payload to the builder and return the receiver for the pending payload
2634                // id, initiating payload job is handled asynchronously
2635                let pending_payload_id = self.payload_builder.send_new_payload(attributes);
2636
2637                // Client software MUST respond to this method call in the following way:
2638                // {
2639                //      payloadStatus: {
2640                //          status: VALID,
2641                //          latestValidHash: forkchoiceState.headBlockHash,
2642                //          validationError: null
2643                //      },
2644                //      payloadId: buildProcessId
2645                // }
2646                //
2647                // if the payload is deemed VALID and the build process has begun.
2648                OnForkChoiceUpdated::updated_with_pending_payload_id(
2649                    PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
2650                    pending_payload_id,
2651                )
2652            }
2653            Err(_) => OnForkChoiceUpdated::invalid_payload_attributes(),
2654        }
2655    }
2656
2657    /// Remove all blocks up to __and including__ the given block number.
2658    ///
2659    /// If a finalized hash is provided, the only non-canonical blocks which will be removed are
2660    /// those which have a fork point at or below the finalized hash.
2661    ///
2662    /// Canonical blocks below the upper bound will still be removed.
2663    pub(crate) fn remove_before(
2664        &mut self,
2665        upper_bound: BlockNumHash,
2666        finalized_hash: Option<B256>,
2667    ) -> ProviderResult<()> {
2668        // first fetch the finalized block number and then call the remove_before method on
2669        // tree_state
2670        let num = if let Some(hash) = finalized_hash {
2671            self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
2672        } else {
2673            None
2674        };
2675
2676        self.state.tree_state.remove_until(
2677            upper_bound,
2678            self.persistence_state.last_persisted_block.hash,
2679            num,
2680        );
2681        Ok(())
2682    }
2683}
2684
2685/// This is an error that can come from advancing persistence. Either this can be a
2686/// [`TryRecvError`], or this can be a [`ProviderError`]
2687#[derive(Debug, thiserror::Error)]
2688pub enum AdvancePersistenceError {
2689    /// An error that can be from failing to receive a value from persistence
2690    #[error(transparent)]
2691    RecvError(#[from] TryRecvError),
2692    /// A provider error
2693    #[error(transparent)]
2694    Provider(#[from] ProviderError),
2695}
2696
2697#[cfg(test)]
2698mod tests {
2699    use super::*;
2700    use crate::persistence::PersistenceAction;
2701    use alloy_consensus::Header;
2702    use alloy_primitives::Bytes;
2703    use alloy_rlp::Decodable;
2704    use alloy_rpc_types_engine::{CancunPayloadFields, ExecutionPayloadSidecar};
2705    use assert_matches::assert_matches;
2706    use reth_beacon_consensus::EthBeaconConsensus;
2707    use reth_chain_state::{test_utils::TestBlockBuilder, BlockState};
2708    use reth_chainspec::{ChainSpec, HOLESKY, MAINNET};
2709    use reth_engine_primitives::ForkchoiceStatus;
2710    use reth_ethereum_engine_primitives::{EthEngineTypes, EthereumEngineValidator};
2711    use reth_evm::test_utils::MockExecutorProvider;
2712    use reth_node_core::dirs::MaybePlatformPath;
2713    use reth_primitives::{Block, BlockExt, EthPrimitives};
2714    use reth_provider::test_utils::MockEthProvider;
2715    use reth_rpc_types_compat::engine::{block_to_payload_v1, payload::block_to_payload_v3};
2716    use reth_trie::updates::TrieUpdates;
2717    use std::{
2718        str::FromStr,
2719        sync::mpsc::{channel, Sender},
2720    };
2721
2722    /// This is a test channel that allows you to `release` any value that is in the channel.
2723    ///
2724    /// If nothing has been sent, then the next value will be immediately sent.
2725    #[allow(dead_code)]
2726    struct TestChannel<T> {
2727        /// If an item is sent to this channel, an item will be released in the wrapped channel
2728        release: Receiver<()>,
2729        /// The sender channel
2730        tx: Sender<T>,
2731        /// The receiver channel
2732        rx: Receiver<T>,
2733    }
2734
2735    impl<T: Send + 'static> TestChannel<T> {
2736        /// Creates a new test channel
2737        #[allow(dead_code)]
2738        fn spawn_channel() -> (Sender<T>, Receiver<T>, TestChannelHandle) {
2739            let (original_tx, original_rx) = channel();
2740            let (wrapped_tx, wrapped_rx) = channel();
2741            let (release_tx, release_rx) = channel();
2742            let handle = TestChannelHandle::new(release_tx);
2743            let test_channel = Self { release: release_rx, tx: wrapped_tx, rx: original_rx };
2744            // spawn the task that listens and releases stuff
2745            std::thread::spawn(move || test_channel.intercept_loop());
2746            (original_tx, wrapped_rx, handle)
2747        }
2748
2749        /// Runs the intercept loop, waiting for the handle to release a value
2750        fn intercept_loop(&self) {
2751            while self.release.recv() == Ok(()) {
2752                let Ok(value) = self.rx.recv() else { return };
2753
2754                let _ = self.tx.send(value);
2755            }
2756        }
2757    }
2758
2759    struct TestChannelHandle {
2760        /// The sender to use for releasing values
2761        release: Sender<()>,
2762    }
2763
2764    impl TestChannelHandle {
2765        /// Returns a [`TestChannelHandle`]
2766        const fn new(release: Sender<()>) -> Self {
2767            Self { release }
2768        }
2769
2770        /// Signals to the channel task that a value should be released
2771        #[allow(dead_code)]
2772        fn release(&self) {
2773            let _ = self.release.send(());
2774        }
2775    }
2776
2777    struct TestHarness {
2778        tree: EngineApiTreeHandler<
2779            EthPrimitives,
2780            MockEthProvider,
2781            MockExecutorProvider,
2782            EthEngineTypes,
2783            EthereumEngineValidator,
2784        >,
2785        to_tree_tx: Sender<FromEngine<EngineApiRequest<EthEngineTypes, EthPrimitives>, Block>>,
2786        from_tree_rx: UnboundedReceiver<EngineApiEvent>,
2787        blocks: Vec<ExecutedBlock>,
2788        action_rx: Receiver<PersistenceAction>,
2789        executor_provider: MockExecutorProvider,
2790        block_builder: TestBlockBuilder,
2791        provider: MockEthProvider,
2792    }
2793
2794    impl TestHarness {
2795        fn new(chain_spec: Arc<ChainSpec>) -> Self {
2796            let (action_tx, action_rx) = channel();
2797            Self::with_persistence_channel(chain_spec, action_tx, action_rx)
2798        }
2799
2800        #[allow(dead_code)]
2801        fn with_test_channel(chain_spec: Arc<ChainSpec>) -> (Self, TestChannelHandle) {
2802            let (action_tx, action_rx, handle) = TestChannel::spawn_channel();
2803            (Self::with_persistence_channel(chain_spec, action_tx, action_rx), handle)
2804        }
2805
2806        fn with_persistence_channel(
2807            chain_spec: Arc<ChainSpec>,
2808            action_tx: Sender<PersistenceAction>,
2809            action_rx: Receiver<PersistenceAction>,
2810        ) -> Self {
2811            let persistence_handle = PersistenceHandle::new(action_tx);
2812
2813            let backup_handle = BackupHandle::spawn_service(MaybePlatformPath::chain_default(
2814                chain_spec.chain.clone(),
2815            ));
2816
2817            let consensus = Arc::new(EthBeaconConsensus::new(chain_spec.clone()));
2818
2819            let provider = MockEthProvider::default();
2820            let executor_provider = MockExecutorProvider::default();
2821
2822            let payload_validator = EthereumEngineValidator::new(chain_spec.clone());
2823
2824            let (from_tree_tx, from_tree_rx) = unbounded_channel();
2825
2826            let header = chain_spec.genesis_header().clone();
2827            let header = SealedHeader::seal(header);
2828            let engine_api_tree_state = EngineApiTreeState::new(10, 10, header.num_hash());
2829            let canonical_in_memory_state = CanonicalInMemoryState::with_head(header, None, None);
2830
2831            let (to_payload_service, _payload_command_rx) = unbounded_channel();
2832            let payload_builder = PayloadBuilderHandle::new(to_payload_service);
2833
2834            let tree = EngineApiTreeHandler::new(
2835                provider.clone(),
2836                executor_provider.clone(),
2837                consensus,
2838                payload_validator,
2839                from_tree_tx,
2840                engine_api_tree_state,
2841                canonical_in_memory_state,
2842                persistence_handle,
2843                PersistenceState::default(),
2844                payload_builder,
2845                TreeConfig::default(),
2846                EngineApiKind::Ethereum,
2847                backup_handle,
2848            );
2849
2850            let block_builder = TestBlockBuilder::default().with_chain_spec((*chain_spec).clone());
2851            Self {
2852                to_tree_tx: tree.incoming_tx.clone(),
2853                tree,
2854                from_tree_rx,
2855                blocks: vec![],
2856                action_rx,
2857                executor_provider,
2858                block_builder,
2859                provider,
2860            }
2861        }
2862
2863        fn with_blocks(mut self, blocks: Vec<ExecutedBlock>) -> Self {
2864            let mut blocks_by_hash = HashMap::default();
2865            let mut blocks_by_number = BTreeMap::new();
2866            let mut state_by_hash = HashMap::default();
2867            let mut hash_by_number = BTreeMap::new();
2868            let mut parent_to_child: HashMap<B256, HashSet<B256>> = HashMap::default();
2869            let mut parent_hash = B256::ZERO;
2870
2871            for block in &blocks {
2872                let sealed_block = block.block();
2873                let hash = sealed_block.hash();
2874                let number = sealed_block.number;
2875                blocks_by_hash.insert(hash, block.clone());
2876                blocks_by_number.entry(number).or_insert_with(Vec::new).push(block.clone());
2877                state_by_hash.insert(hash, Arc::new(BlockState::new(block.clone())));
2878                hash_by_number.insert(number, hash);
2879                parent_to_child.entry(parent_hash).or_default().insert(hash);
2880                parent_hash = hash;
2881            }
2882
2883            self.tree.state.tree_state = TreeState {
2884                blocks_by_hash,
2885                blocks_by_number,
2886                current_canonical_head: blocks.last().unwrap().block().num_hash(),
2887                parent_to_child,
2888                persisted_trie_updates: HashMap::default(),
2889            };
2890
2891            let last_executed_block = blocks.last().unwrap().clone();
2892            let pending = Some(BlockState::new(last_executed_block));
2893            self.tree.canonical_in_memory_state =
2894                CanonicalInMemoryState::new(state_by_hash, hash_by_number, pending, None, None);
2895
2896            self.blocks = blocks.clone();
2897            self.persist_blocks(
2898                blocks
2899                    .into_iter()
2900                    .map(|b| SealedBlockWithSenders {
2901                        block: (*b.block).clone(),
2902                        senders: b.senders.to_vec(),
2903                    })
2904                    .collect(),
2905            );
2906
2907            self
2908        }
2909
2910        const fn with_backfill_state(mut self, state: BackfillSyncState) -> Self {
2911            self.tree.backfill_sync_state = state;
2912            self
2913        }
2914
2915        fn extend_execution_outcome(
2916            &self,
2917            execution_outcomes: impl IntoIterator<Item = impl Into<ExecutionOutcome>>,
2918        ) {
2919            self.executor_provider.extend(execution_outcomes);
2920        }
2921
2922        fn insert_block(
2923            &mut self,
2924            block: SealedBlockWithSenders,
2925        ) -> Result<InsertPayloadOk2, InsertBlockErrorTwo<Block>> {
2926            let execution_outcome = self.block_builder.get_execution_outcome(block.clone());
2927            self.extend_execution_outcome([execution_outcome]);
2928            self.tree.provider.add_state_root(block.state_root);
2929            self.tree.insert_block(block)
2930        }
2931
2932        async fn fcu_to(&mut self, block_hash: B256, fcu_status: impl Into<ForkchoiceStatus>) {
2933            let fcu_status = fcu_status.into();
2934
2935            self.send_fcu(block_hash, fcu_status).await;
2936
2937            self.check_fcu(block_hash, fcu_status).await;
2938        }
2939
2940        async fn send_fcu(&mut self, block_hash: B256, fcu_status: impl Into<ForkchoiceStatus>) {
2941            let fcu_state = self.fcu_state(block_hash);
2942
2943            let (tx, rx) = oneshot::channel();
2944            self.tree
2945                .on_engine_message(FromEngine::Request(
2946                    BeaconEngineMessage::ForkchoiceUpdated {
2947                        state: fcu_state,
2948                        payload_attrs: None,
2949                        tx,
2950                        version: EngineApiMessageVersion::default(),
2951                    }
2952                    .into(),
2953                ))
2954                .unwrap();
2955
2956            let response = rx.await.unwrap().unwrap().await.unwrap();
2957            match fcu_status.into() {
2958                ForkchoiceStatus::Valid => assert!(response.payload_status.is_valid()),
2959                ForkchoiceStatus::Syncing => assert!(response.payload_status.is_syncing()),
2960                ForkchoiceStatus::Invalid => assert!(response.payload_status.is_invalid()),
2961            }
2962        }
2963
2964        async fn check_fcu(&mut self, block_hash: B256, fcu_status: impl Into<ForkchoiceStatus>) {
2965            let fcu_state = self.fcu_state(block_hash);
2966
2967            // check for ForkchoiceUpdated event
2968            let event = self.from_tree_rx.recv().await.unwrap();
2969            match event {
2970                EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::ForkchoiceUpdated(
2971                    state,
2972                    status,
2973                )) => {
2974                    assert_eq!(state, fcu_state);
2975                    assert_eq!(status, fcu_status.into());
2976                }
2977                _ => panic!("Unexpected event: {:#?}", event),
2978            }
2979        }
2980
2981        const fn fcu_state(&self, block_hash: B256) -> ForkchoiceState {
2982            ForkchoiceState {
2983                head_block_hash: block_hash,
2984                safe_block_hash: block_hash,
2985                finalized_block_hash: block_hash,
2986            }
2987        }
2988
2989        async fn send_new_payload(&mut self, block: SealedBlockWithSenders) {
2990            let payload = block_to_payload_v3(block.block.clone());
2991            self.tree
2992                .on_new_payload(
2993                    payload.into(),
2994                    ExecutionPayloadSidecar::v3(CancunPayloadFields {
2995                        parent_beacon_block_root: block.parent_beacon_block_root.unwrap(),
2996                        versioned_hashes: vec![],
2997                    }),
2998                )
2999                .unwrap();
3000        }
3001
3002        async fn insert_chain(
3003            &mut self,
3004            chain: impl IntoIterator<Item = SealedBlockWithSenders> + Clone,
3005        ) {
3006            for block in chain.clone() {
3007                self.insert_block(block.clone()).unwrap();
3008            }
3009            self.check_canon_chain_insertion(chain).await;
3010        }
3011
3012        async fn check_canon_commit(&mut self, hash: B256) {
3013            let event = self.from_tree_rx.recv().await.unwrap();
3014            match event {
3015                EngineApiEvent::BeaconConsensus(
3016                    BeaconConsensusEngineEvent::CanonicalChainCommitted(header, _),
3017                ) => {
3018                    assert_eq!(header.hash(), hash);
3019                }
3020                _ => panic!("Unexpected event: {:#?}", event),
3021            }
3022        }
3023
3024        async fn check_fork_chain_insertion(
3025            &mut self,
3026            chain: impl IntoIterator<Item = SealedBlockWithSenders> + Clone,
3027        ) {
3028            for block in chain {
3029                self.check_fork_block_added(block.block.hash()).await;
3030            }
3031        }
3032
3033        async fn check_canon_chain_insertion(
3034            &mut self,
3035            chain: impl IntoIterator<Item = SealedBlockWithSenders> + Clone,
3036        ) {
3037            for block in chain.clone() {
3038                self.check_canon_block_added(block.hash()).await;
3039            }
3040        }
3041
3042        async fn check_canon_block_added(&mut self, expected_hash: B256) {
3043            let event = self.from_tree_rx.recv().await.unwrap();
3044            match event {
3045                EngineApiEvent::BeaconConsensus(
3046                    BeaconConsensusEngineEvent::CanonicalBlockAdded(block, _),
3047                ) => {
3048                    assert_eq!(block.hash(), expected_hash);
3049                }
3050                _ => panic!("Unexpected event: {:#?}", event),
3051            }
3052        }
3053
3054        async fn check_fork_block_added(&mut self, expected_hash: B256) {
3055            let event = self.from_tree_rx.recv().await.unwrap();
3056            match event {
3057                EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::ForkBlockAdded(
3058                    block,
3059                    _,
3060                )) => {
3061                    assert!(block.hash() == expected_hash);
3062                }
3063                _ => panic!("Unexpected event: {:#?}", event),
3064            }
3065        }
3066
3067        fn persist_blocks(&self, blocks: Vec<SealedBlockWithSenders>) {
3068            let mut block_data: Vec<(B256, Block)> = Vec::with_capacity(blocks.len());
3069            let mut headers_data: Vec<(B256, Header)> = Vec::with_capacity(blocks.len());
3070
3071            for block in &blocks {
3072                let unsealed_block = block.clone().unseal();
3073                block_data.push((block.hash(), unsealed_block.clone().block));
3074                headers_data.push((block.hash(), unsealed_block.header.clone()));
3075            }
3076
3077            self.provider.extend_blocks(block_data);
3078            self.provider.extend_headers(headers_data);
3079        }
3080
3081        fn setup_range_insertion_for_valid_chain(&mut self, chain: Vec<SealedBlockWithSenders>) {
3082            self.setup_range_insertion_for_chain(chain, None)
3083        }
3084
3085        fn setup_range_insertion_for_invalid_chain(
3086            &mut self,
3087            chain: Vec<SealedBlockWithSenders>,
3088            index: usize,
3089        ) {
3090            self.setup_range_insertion_for_chain(chain, Some(index))
3091        }
3092
3093        fn setup_range_insertion_for_chain(
3094            &mut self,
3095            chain: Vec<SealedBlockWithSenders>,
3096            invalid_index: Option<usize>,
3097        ) {
3098            // setting up execution outcomes for the chain, the blocks will be
3099            // executed starting from the oldest, so we need to reverse.
3100            let mut chain_rev = chain;
3101            chain_rev.reverse();
3102
3103            let mut execution_outcomes = Vec::with_capacity(chain_rev.len());
3104            for (index, block) in chain_rev.iter().enumerate() {
3105                let execution_outcome = self.block_builder.get_execution_outcome(block.clone());
3106                let state_root = if invalid_index.is_some() && invalid_index.unwrap() == index {
3107                    B256::random()
3108                } else {
3109                    block.state_root
3110                };
3111                self.tree.provider.add_state_root(state_root);
3112                execution_outcomes.push(execution_outcome);
3113            }
3114            self.extend_execution_outcome(execution_outcomes);
3115        }
3116
3117        fn check_canon_head(&self, head_hash: B256) {
3118            assert_eq!(self.tree.state.tree_state.canonical_head().hash, head_hash);
3119        }
3120    }
3121
3122    #[test]
3123    fn test_tree_persist_block_batch() {
3124        let tree_config = TreeConfig::default();
3125        let chain_spec = MAINNET.clone();
3126        let mut test_block_builder =
3127            TestBlockBuilder::default().with_chain_spec((*chain_spec).clone());
3128
3129        // we need more than tree_config.persistence_threshold() +1 blocks to
3130        // trigger the persistence task.
3131        let blocks: Vec<_> = test_block_builder
3132            .get_executed_blocks(1..tree_config.persistence_threshold() + 2)
3133            .collect();
3134        let mut test_harness = TestHarness::new(chain_spec).with_blocks(blocks);
3135
3136        let mut blocks = vec![];
3137        for idx in 0..tree_config.max_execute_block_batch_size() * 2 {
3138            blocks.push(test_block_builder.generate_random_block(idx as u64, B256::random()));
3139        }
3140
3141        test_harness.to_tree_tx.send(FromEngine::DownloadedBlocks(blocks)).unwrap();
3142
3143        // process the message
3144        let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap();
3145        test_harness.tree.on_engine_message(msg).unwrap();
3146
3147        // we now should receive the other batch
3148        let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap();
3149        match msg {
3150            FromEngine::DownloadedBlocks(blocks) => {
3151                assert_eq!(blocks.len(), tree_config.max_execute_block_batch_size());
3152            }
3153            _ => panic!("unexpected message: {:#?}", msg),
3154        }
3155    }
3156
3157    #[tokio::test]
3158    async fn test_tree_persist_blocks() {
3159        let tree_config = TreeConfig::default();
3160        let chain_spec = MAINNET.clone();
3161        let mut test_block_builder =
3162            TestBlockBuilder::default().with_chain_spec((*chain_spec).clone());
3163
3164        // we need more than tree_config.persistence_threshold() +1 blocks to
3165        // trigger the persistence task.
3166        let blocks: Vec<_> = test_block_builder
3167            .get_executed_blocks(1..tree_config.persistence_threshold() + 2)
3168            .collect();
3169        let test_harness = TestHarness::new(chain_spec).with_blocks(blocks.clone());
3170        std::thread::Builder::new()
3171            .name("Tree Task".to_string())
3172            .spawn(|| test_harness.tree.run())
3173            .unwrap();
3174
3175        // send a message to the tree to enter the main loop.
3176        test_harness.to_tree_tx.send(FromEngine::DownloadedBlocks(vec![])).unwrap();
3177
3178        let received_action =
3179            test_harness.action_rx.recv().expect("Failed to receive save blocks action");
3180        if let PersistenceAction::SaveBlocks(saved_blocks, _) = received_action {
3181            // only blocks.len() - tree_config.memory_block_buffer_target() will be
3182            // persisted
3183            let expected_persist_len =
3184                blocks.len() - tree_config.memory_block_buffer_target() as usize;
3185            assert_eq!(saved_blocks.len(), expected_persist_len);
3186            assert_eq!(saved_blocks, blocks[..expected_persist_len]);
3187        } else {
3188            panic!("unexpected action received {received_action:?}");
3189        }
3190    }
3191
3192    #[tokio::test]
3193    async fn test_in_memory_state_trait_impl() {
3194        let blocks: Vec<_> = TestBlockBuilder::default().get_executed_blocks(0..10).collect();
3195        let test_harness = TestHarness::new(MAINNET.clone()).with_blocks(blocks.clone());
3196
3197        for executed_block in blocks {
3198            let sealed_block = executed_block.block();
3199
3200            let expected_state = BlockState::new(executed_block.clone());
3201
3202            let actual_state_by_hash = test_harness
3203                .tree
3204                .canonical_in_memory_state
3205                .state_by_hash(sealed_block.hash())
3206                .unwrap();
3207            assert_eq!(expected_state, *actual_state_by_hash);
3208
3209            let actual_state_by_number = test_harness
3210                .tree
3211                .canonical_in_memory_state
3212                .state_by_number(sealed_block.number)
3213                .unwrap();
3214            assert_eq!(expected_state, *actual_state_by_number);
3215        }
3216    }
3217
3218    #[tokio::test]
3219    async fn test_engine_request_during_backfill() {
3220        let tree_config = TreeConfig::default();
3221        let blocks: Vec<_> = TestBlockBuilder::default()
3222            .get_executed_blocks(0..tree_config.persistence_threshold())
3223            .collect();
3224        let mut test_harness = TestHarness::new(MAINNET.clone())
3225            .with_blocks(blocks)
3226            .with_backfill_state(BackfillSyncState::Active);
3227
3228        let (tx, rx) = oneshot::channel();
3229        test_harness
3230            .tree
3231            .on_engine_message(FromEngine::Request(
3232                BeaconEngineMessage::ForkchoiceUpdated {
3233                    state: ForkchoiceState {
3234                        head_block_hash: B256::random(),
3235                        safe_block_hash: B256::random(),
3236                        finalized_block_hash: B256::random(),
3237                    },
3238                    payload_attrs: None,
3239                    tx,
3240                    version: EngineApiMessageVersion::default(),
3241                }
3242                .into(),
3243            ))
3244            .unwrap();
3245
3246        let resp = rx.await.unwrap().unwrap().await.unwrap();
3247        assert!(resp.payload_status.is_syncing());
3248    }
3249
3250    #[test]
3251    fn test_disconnected_payload() {
3252        let s = include_str!("../../test-data/holesky/2.rlp");
3253        let data = Bytes::from_str(s).unwrap();
3254        let block = Block::decode(&mut data.as_ref()).unwrap();
3255        let sealed = block.seal_slow();
3256        let hash = sealed.hash();
3257        let payload = block_to_payload_v1(sealed.clone());
3258
3259        let mut test_harness = TestHarness::new(HOLESKY.clone());
3260
3261        let outcome = test_harness
3262            .tree
3263            .on_new_payload(payload.into(), ExecutionPayloadSidecar::none())
3264            .unwrap();
3265        assert!(outcome.outcome.is_syncing());
3266
3267        // ensure block is buffered
3268        let buffered = test_harness.tree.state.buffer.block(&hash).unwrap();
3269        assert_eq!(buffered.block, sealed);
3270    }
3271
3272    #[test]
3273    fn test_disconnected_block() {
3274        let s = include_str!("../../test-data/holesky/2.rlp");
3275        let data = Bytes::from_str(s).unwrap();
3276        let block = Block::decode(&mut data.as_ref()).unwrap();
3277        let sealed = block.seal_slow();
3278
3279        let mut test_harness = TestHarness::new(HOLESKY.clone());
3280
3281        let outcome = test_harness.tree.insert_block_without_senders(sealed.clone()).unwrap();
3282        assert_eq!(
3283            outcome,
3284            InsertPayloadOk2::Inserted(BlockStatus2::Disconnected {
3285                head: test_harness.tree.state.tree_state.current_canonical_head,
3286                missing_ancestor: sealed.parent_num_hash()
3287            })
3288        );
3289    }
3290
3291    #[tokio::test]
3292    async fn test_holesky_payload() {
3293        let s = include_str!("../../test-data/holesky/1.rlp");
3294        let data = Bytes::from_str(s).unwrap();
3295        let block = Block::decode(&mut data.as_ref()).unwrap();
3296        let sealed = block.seal_slow();
3297        let payload = block_to_payload_v1(sealed);
3298
3299        let mut test_harness =
3300            TestHarness::new(HOLESKY.clone()).with_backfill_state(BackfillSyncState::Active);
3301
3302        let (tx, rx) = oneshot::channel();
3303        test_harness
3304            .tree
3305            .on_engine_message(FromEngine::Request(
3306                BeaconEngineMessage::NewPayload {
3307                    payload: payload.clone().into(),
3308                    sidecar: ExecutionPayloadSidecar::none(),
3309                    tx,
3310                }
3311                .into(),
3312            ))
3313            .unwrap();
3314
3315        let resp = rx.await.unwrap().unwrap();
3316        assert!(resp.is_syncing());
3317    }
3318
3319    #[tokio::test]
3320    async fn test_tree_state_insert_executed() {
3321        let mut tree_state = TreeState::new(BlockNumHash::default());
3322        let blocks: Vec<_> = TestBlockBuilder::default().get_executed_blocks(1..4).collect();
3323
3324        tree_state.insert_executed(blocks[0].clone());
3325        tree_state.insert_executed(blocks[1].clone());
3326
3327        assert_eq!(
3328            tree_state.parent_to_child.get(&blocks[0].block.hash()),
3329            Some(&HashSet::from_iter([blocks[1].block.hash()]))
3330        );
3331
3332        assert!(!tree_state.parent_to_child.contains_key(&blocks[1].block.hash()));
3333
3334        tree_state.insert_executed(blocks[2].clone());
3335
3336        assert_eq!(
3337            tree_state.parent_to_child.get(&blocks[1].block.hash()),
3338            Some(&HashSet::from_iter([blocks[2].block.hash()]))
3339        );
3340        assert!(tree_state.parent_to_child.contains_key(&blocks[1].block.hash()));
3341
3342        assert!(!tree_state.parent_to_child.contains_key(&blocks[2].block.hash()));
3343    }
3344
3345    #[tokio::test]
3346    async fn test_tree_state_insert_executed_with_reorg() {
3347        let mut tree_state = TreeState::new(BlockNumHash::default());
3348        let mut test_block_builder = TestBlockBuilder::default();
3349        let blocks: Vec<_> = test_block_builder.get_executed_blocks(1..6).collect();
3350
3351        for block in &blocks {
3352            tree_state.insert_executed(block.clone());
3353        }
3354        assert_eq!(tree_state.blocks_by_hash.len(), 5);
3355
3356        let fork_block_3 =
3357            test_block_builder.get_executed_block_with_number(3, blocks[1].block.hash());
3358        let fork_block_4 =
3359            test_block_builder.get_executed_block_with_number(4, fork_block_3.block.hash());
3360        let fork_block_5 =
3361            test_block_builder.get_executed_block_with_number(5, fork_block_4.block.hash());
3362
3363        tree_state.insert_executed(fork_block_3.clone());
3364        tree_state.insert_executed(fork_block_4.clone());
3365        tree_state.insert_executed(fork_block_5.clone());
3366
3367        assert_eq!(tree_state.blocks_by_hash.len(), 8);
3368        assert_eq!(tree_state.blocks_by_number[&3].len(), 2); // two blocks at height 3 (original and fork)
3369        assert_eq!(tree_state.parent_to_child[&blocks[1].block.hash()].len(), 2); // block 2 should have two children
3370
3371        // verify that we can insert the same block again without issues
3372        tree_state.insert_executed(fork_block_4.clone());
3373        assert_eq!(tree_state.blocks_by_hash.len(), 8);
3374
3375        assert!(tree_state.parent_to_child[&fork_block_3.block.hash()]
3376            .contains(&fork_block_4.block.hash()));
3377        assert!(tree_state.parent_to_child[&fork_block_4.block.hash()]
3378            .contains(&fork_block_5.block.hash()));
3379
3380        assert_eq!(tree_state.blocks_by_number[&4].len(), 2);
3381        assert_eq!(tree_state.blocks_by_number[&5].len(), 2);
3382    }
3383
3384    #[tokio::test]
3385    async fn test_tree_state_remove_before() {
3386        let start_num_hash = BlockNumHash::default();
3387        let mut tree_state = TreeState::new(start_num_hash);
3388        let blocks: Vec<_> = TestBlockBuilder::default().get_executed_blocks(1..6).collect();
3389
3390        for block in &blocks {
3391            tree_state.insert_executed(block.clone());
3392        }
3393
3394        let last = blocks.last().unwrap();
3395
3396        // set the canonical head
3397        tree_state.set_canonical_head(last.block.num_hash());
3398
3399        // inclusive bound, so we should remove anything up to and including 2
3400        tree_state.remove_until(
3401            BlockNumHash::new(2, blocks[1].block.hash()),
3402            start_num_hash.hash,
3403            Some(blocks[1].block.num_hash()),
3404        );
3405
3406        assert!(!tree_state.blocks_by_hash.contains_key(&blocks[0].block.hash()));
3407        assert!(!tree_state.blocks_by_hash.contains_key(&blocks[1].block.hash()));
3408        assert!(!tree_state.blocks_by_number.contains_key(&1));
3409        assert!(!tree_state.blocks_by_number.contains_key(&2));
3410
3411        assert!(tree_state.blocks_by_hash.contains_key(&blocks[2].block.hash()));
3412        assert!(tree_state.blocks_by_hash.contains_key(&blocks[3].block.hash()));
3413        assert!(tree_state.blocks_by_hash.contains_key(&blocks[4].block.hash()));
3414        assert!(tree_state.blocks_by_number.contains_key(&3));
3415        assert!(tree_state.blocks_by_number.contains_key(&4));
3416        assert!(tree_state.blocks_by_number.contains_key(&5));
3417
3418        assert!(!tree_state.parent_to_child.contains_key(&blocks[0].block.hash()));
3419        assert!(!tree_state.parent_to_child.contains_key(&blocks[1].block.hash()));
3420        assert!(tree_state.parent_to_child.contains_key(&blocks[2].block.hash()));
3421        assert!(tree_state.parent_to_child.contains_key(&blocks[3].block.hash()));
3422        assert!(!tree_state.parent_to_child.contains_key(&blocks[4].block.hash()));
3423
3424        assert_eq!(
3425            tree_state.parent_to_child.get(&blocks[2].block.hash()),
3426            Some(&HashSet::from_iter([blocks[3].block.hash()]))
3427        );
3428        assert_eq!(
3429            tree_state.parent_to_child.get(&blocks[3].block.hash()),
3430            Some(&HashSet::from_iter([blocks[4].block.hash()]))
3431        );
3432    }
3433
3434    #[tokio::test]
3435    async fn test_tree_state_remove_before_finalized() {
3436        let start_num_hash = BlockNumHash::default();
3437        let mut tree_state = TreeState::new(start_num_hash);
3438        let blocks: Vec<_> = TestBlockBuilder::default().get_executed_blocks(1..6).collect();
3439
3440        for block in &blocks {
3441            tree_state.insert_executed(block.clone());
3442        }
3443
3444        let last = blocks.last().unwrap();
3445
3446        // set the canonical head
3447        tree_state.set_canonical_head(last.block.num_hash());
3448
3449        // we should still remove everything up to and including 2
3450        tree_state.remove_until(
3451            BlockNumHash::new(2, blocks[1].block.hash()),
3452            start_num_hash.hash,
3453            None,
3454        );
3455
3456        assert!(!tree_state.blocks_by_hash.contains_key(&blocks[0].block.hash()));
3457        assert!(!tree_state.blocks_by_hash.contains_key(&blocks[1].block.hash()));
3458        assert!(!tree_state.blocks_by_number.contains_key(&1));
3459        assert!(!tree_state.blocks_by_number.contains_key(&2));
3460
3461        assert!(tree_state.blocks_by_hash.contains_key(&blocks[2].block.hash()));
3462        assert!(tree_state.blocks_by_hash.contains_key(&blocks[3].block.hash()));
3463        assert!(tree_state.blocks_by_hash.contains_key(&blocks[4].block.hash()));
3464        assert!(tree_state.blocks_by_number.contains_key(&3));
3465        assert!(tree_state.blocks_by_number.contains_key(&4));
3466        assert!(tree_state.blocks_by_number.contains_key(&5));
3467
3468        assert!(!tree_state.parent_to_child.contains_key(&blocks[0].block.hash()));
3469        assert!(!tree_state.parent_to_child.contains_key(&blocks[1].block.hash()));
3470        assert!(tree_state.parent_to_child.contains_key(&blocks[2].block.hash()));
3471        assert!(tree_state.parent_to_child.contains_key(&blocks[3].block.hash()));
3472        assert!(!tree_state.parent_to_child.contains_key(&blocks[4].block.hash()));
3473
3474        assert_eq!(
3475            tree_state.parent_to_child.get(&blocks[2].block.hash()),
3476            Some(&HashSet::from_iter([blocks[3].block.hash()]))
3477        );
3478        assert_eq!(
3479            tree_state.parent_to_child.get(&blocks[3].block.hash()),
3480            Some(&HashSet::from_iter([blocks[4].block.hash()]))
3481        );
3482    }
3483
3484    #[tokio::test]
3485    async fn test_tree_state_remove_before_lower_finalized() {
3486        let start_num_hash = BlockNumHash::default();
3487        let mut tree_state = TreeState::new(start_num_hash);
3488        let blocks: Vec<_> = TestBlockBuilder::default().get_executed_blocks(1..6).collect();
3489
3490        for block in &blocks {
3491            tree_state.insert_executed(block.clone());
3492        }
3493
3494        let last = blocks.last().unwrap();
3495
3496        // set the canonical head
3497        tree_state.set_canonical_head(last.block.num_hash());
3498
3499        // we have no forks so we should still remove anything up to and including 2
3500        tree_state.remove_until(
3501            BlockNumHash::new(2, blocks[1].block.hash()),
3502            start_num_hash.hash,
3503            Some(blocks[0].block.num_hash()),
3504        );
3505
3506        assert!(!tree_state.blocks_by_hash.contains_key(&blocks[0].block.hash()));
3507        assert!(!tree_state.blocks_by_hash.contains_key(&blocks[1].block.hash()));
3508        assert!(!tree_state.blocks_by_number.contains_key(&1));
3509        assert!(!tree_state.blocks_by_number.contains_key(&2));
3510
3511        assert!(tree_state.blocks_by_hash.contains_key(&blocks[2].block.hash()));
3512        assert!(tree_state.blocks_by_hash.contains_key(&blocks[3].block.hash()));
3513        assert!(tree_state.blocks_by_hash.contains_key(&blocks[4].block.hash()));
3514        assert!(tree_state.blocks_by_number.contains_key(&3));
3515        assert!(tree_state.blocks_by_number.contains_key(&4));
3516        assert!(tree_state.blocks_by_number.contains_key(&5));
3517
3518        assert!(!tree_state.parent_to_child.contains_key(&blocks[0].block.hash()));
3519        assert!(!tree_state.parent_to_child.contains_key(&blocks[1].block.hash()));
3520        assert!(tree_state.parent_to_child.contains_key(&blocks[2].block.hash()));
3521        assert!(tree_state.parent_to_child.contains_key(&blocks[3].block.hash()));
3522        assert!(!tree_state.parent_to_child.contains_key(&blocks[4].block.hash()));
3523
3524        assert_eq!(
3525            tree_state.parent_to_child.get(&blocks[2].block.hash()),
3526            Some(&HashSet::from_iter([blocks[3].block.hash()]))
3527        );
3528        assert_eq!(
3529            tree_state.parent_to_child.get(&blocks[3].block.hash()),
3530            Some(&HashSet::from_iter([blocks[4].block.hash()]))
3531        );
3532    }
3533
3534    #[tokio::test]
3535    async fn test_tree_state_on_new_head() {
3536        let chain_spec = MAINNET.clone();
3537        let mut test_harness = TestHarness::new(chain_spec);
3538        let mut test_block_builder = TestBlockBuilder::default();
3539
3540        let blocks: Vec<_> = test_block_builder.get_executed_blocks(1..6).collect();
3541
3542        for block in &blocks {
3543            test_harness.tree.state.tree_state.insert_executed(block.clone());
3544        }
3545
3546        // set block 3 as the current canonical head
3547        test_harness.tree.state.tree_state.set_canonical_head(blocks[2].block.num_hash());
3548
3549        // create a fork from block 2
3550        let fork_block_3 =
3551            test_block_builder.get_executed_block_with_number(3, blocks[1].block.hash());
3552        let fork_block_4 =
3553            test_block_builder.get_executed_block_with_number(4, fork_block_3.block.hash());
3554        let fork_block_5 =
3555            test_block_builder.get_executed_block_with_number(5, fork_block_4.block.hash());
3556
3557        test_harness.tree.state.tree_state.insert_executed(fork_block_3.clone());
3558        test_harness.tree.state.tree_state.insert_executed(fork_block_4.clone());
3559        test_harness.tree.state.tree_state.insert_executed(fork_block_5.clone());
3560
3561        // normal (non-reorg) case
3562        let result = test_harness.tree.on_new_head(blocks[4].block.hash()).unwrap();
3563        assert!(matches!(result, Some(NewCanonicalChain::Commit { .. })));
3564        if let Some(NewCanonicalChain::Commit { new }) = result {
3565            assert_eq!(new.len(), 2);
3566            assert_eq!(new[0].block.hash(), blocks[3].block.hash());
3567            assert_eq!(new[1].block.hash(), blocks[4].block.hash());
3568        }
3569
3570        // reorg case
3571        let result = test_harness.tree.on_new_head(fork_block_5.block.hash()).unwrap();
3572        assert!(matches!(result, Some(NewCanonicalChain::Reorg { .. })));
3573        if let Some(NewCanonicalChain::Reorg { new, old }) = result {
3574            assert_eq!(new.len(), 3);
3575            assert_eq!(new[0].block.hash(), fork_block_3.block.hash());
3576            assert_eq!(new[1].block.hash(), fork_block_4.block.hash());
3577            assert_eq!(new[2].block.hash(), fork_block_5.block.hash());
3578
3579            assert_eq!(old.len(), 1);
3580            assert_eq!(old[0].block.hash(), blocks[2].block.hash());
3581        }
3582    }
3583
3584    #[tokio::test]
3585    async fn test_tree_state_on_new_head_deep_fork() {
3586        reth_tracing::init_test_tracing();
3587
3588        let chain_spec = MAINNET.clone();
3589        let mut test_harness = TestHarness::new(chain_spec);
3590        let mut test_block_builder = TestBlockBuilder::default();
3591
3592        let blocks: Vec<_> = test_block_builder.get_executed_blocks(0..5).collect();
3593
3594        for block in &blocks {
3595            test_harness.tree.state.tree_state.insert_executed(block.clone());
3596        }
3597
3598        // set last block as the current canonical head
3599        let last_block = blocks.last().unwrap().block.clone();
3600
3601        test_harness.tree.state.tree_state.set_canonical_head(last_block.num_hash());
3602
3603        // create a fork chain from last_block
3604        let chain_a = test_block_builder.create_fork(&last_block, 10);
3605        let chain_b = test_block_builder.create_fork(&last_block, 10);
3606
3607        for block in &chain_a {
3608            test_harness.tree.state.tree_state.insert_executed(ExecutedBlock {
3609                block: Arc::new(block.block.clone()),
3610                senders: Arc::new(block.senders.clone()),
3611                execution_output: Arc::new(ExecutionOutcome::default()),
3612                hashed_state: Arc::new(HashedPostState::default()),
3613                trie: Arc::new(TrieUpdates::default()),
3614            });
3615        }
3616        test_harness.tree.state.tree_state.set_canonical_head(chain_a.last().unwrap().num_hash());
3617
3618        for block in &chain_b {
3619            test_harness.tree.state.tree_state.insert_executed(ExecutedBlock {
3620                block: Arc::new(block.block.clone()),
3621                senders: Arc::new(block.senders.clone()),
3622                execution_output: Arc::new(ExecutionOutcome::default()),
3623                hashed_state: Arc::new(HashedPostState::default()),
3624                trie: Arc::new(TrieUpdates::default()),
3625            });
3626        }
3627
3628        // for each block in chain_b, reorg to it and then back to canonical
3629        let mut expected_new = Vec::new();
3630        for block in &chain_b {
3631            // reorg to chain from block b
3632            let result = test_harness.tree.on_new_head(block.block.hash()).unwrap();
3633            assert_matches!(result, Some(NewCanonicalChain::Reorg { .. }));
3634
3635            expected_new.push(block);
3636            if let Some(NewCanonicalChain::Reorg { new, old }) = result {
3637                assert_eq!(new.len(), expected_new.len());
3638                for (index, block) in expected_new.iter().enumerate() {
3639                    assert_eq!(new[index].block.hash(), block.block.hash());
3640                }
3641
3642                assert_eq!(old.len(), chain_a.len());
3643                for (index, block) in chain_a.iter().enumerate() {
3644                    assert_eq!(old[index].block.hash(), block.block.hash());
3645                }
3646            }
3647
3648            // set last block of chain a as canonical head
3649            test_harness.tree.on_new_head(chain_a.last().unwrap().hash()).unwrap();
3650        }
3651    }
3652
3653    #[tokio::test]
3654    async fn test_get_canonical_blocks_to_persist() {
3655        let chain_spec = MAINNET.clone();
3656        let mut test_harness = TestHarness::new(chain_spec);
3657        let mut test_block_builder = TestBlockBuilder::default();
3658
3659        let canonical_head_number = 9;
3660        let blocks: Vec<_> =
3661            test_block_builder.get_executed_blocks(0..canonical_head_number + 1).collect();
3662        test_harness = test_harness.with_blocks(blocks.clone());
3663
3664        let last_persisted_block_number = 3;
3665        test_harness.tree.persistence_state.last_persisted_block.number =
3666            last_persisted_block_number;
3667
3668        let persistence_threshold = 4;
3669        let memory_block_buffer_target = 3;
3670        test_harness.tree.config = TreeConfig::default()
3671            .with_persistence_threshold(persistence_threshold)
3672            .with_memory_block_buffer_target(memory_block_buffer_target);
3673
3674        let blocks_to_persist = test_harness.tree.get_canonical_blocks_to_persist();
3675
3676        let expected_blocks_to_persist_length: usize =
3677            (canonical_head_number - memory_block_buffer_target - last_persisted_block_number)
3678                .try_into()
3679                .unwrap();
3680
3681        assert_eq!(blocks_to_persist.len(), expected_blocks_to_persist_length);
3682        for (i, item) in
3683            blocks_to_persist.iter().enumerate().take(expected_blocks_to_persist_length)
3684        {
3685            assert_eq!(item.block.number, last_persisted_block_number + i as u64 + 1);
3686        }
3687
3688        // make sure only canonical blocks are included
3689        let fork_block = test_block_builder.get_executed_block_with_number(4, B256::random());
3690        let fork_block_hash = fork_block.block.hash();
3691        test_harness.tree.state.tree_state.insert_executed(fork_block);
3692
3693        assert!(test_harness.tree.state.tree_state.block_by_hash(fork_block_hash).is_some());
3694
3695        let blocks_to_persist = test_harness.tree.get_canonical_blocks_to_persist();
3696        assert_eq!(blocks_to_persist.len(), expected_blocks_to_persist_length);
3697
3698        // check that the fork block is not included in the blocks to persist
3699        assert!(!blocks_to_persist.iter().any(|b| b.block.hash() == fork_block_hash));
3700
3701        // check that the original block 4 is still included
3702        assert!(blocks_to_persist
3703            .iter()
3704            .any(|b| b.block.number == 4 && b.block.hash() == blocks[4].block.hash()));
3705    }
3706
3707    #[tokio::test]
3708    async fn test_engine_tree_fcu_missing_head() {
3709        let chain_spec = MAINNET.clone();
3710        let mut test_harness = TestHarness::new(chain_spec.clone());
3711
3712        let mut test_block_builder =
3713            TestBlockBuilder::default().with_chain_spec((*chain_spec).clone());
3714
3715        let blocks: Vec<_> = test_block_builder.get_executed_blocks(0..5).collect();
3716        test_harness = test_harness.with_blocks(blocks);
3717
3718        let missing_block = test_block_builder
3719            .generate_random_block(6, test_harness.blocks.last().unwrap().block().hash());
3720
3721        test_harness.fcu_to(missing_block.hash(), PayloadStatusEnum::Syncing).await;
3722
3723        // after FCU we receive an EngineApiEvent::Download event to get the missing block.
3724        let event = test_harness.from_tree_rx.recv().await.unwrap();
3725        match event {
3726            EngineApiEvent::Download(DownloadRequest::BlockSet(actual_block_set)) => {
3727                let expected_block_set = HashSet::from_iter([missing_block.hash()]);
3728                assert_eq!(actual_block_set, expected_block_set);
3729            }
3730            _ => panic!("Unexpected event: {:#?}", event),
3731        }
3732    }
3733
3734    #[tokio::test]
3735    async fn test_engine_tree_fcu_canon_chain_insertion() {
3736        let chain_spec = MAINNET.clone();
3737        let mut test_harness = TestHarness::new(chain_spec.clone());
3738
3739        let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
3740        test_harness = test_harness.with_blocks(base_chain.clone());
3741
3742        test_harness
3743            .fcu_to(base_chain.last().unwrap().block().hash(), ForkchoiceStatus::Valid)
3744            .await;
3745
3746        // extend main chain
3747        let main_chain = test_harness.block_builder.create_fork(base_chain[0].block(), 3);
3748
3749        test_harness.insert_chain(main_chain).await;
3750    }
3751
3752    #[tokio::test]
3753    async fn test_engine_tree_fcu_reorg_with_all_blocks() {
3754        let chain_spec = MAINNET.clone();
3755        let mut test_harness = TestHarness::new(chain_spec.clone());
3756
3757        let main_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..5).collect();
3758        test_harness = test_harness.with_blocks(main_chain.clone());
3759
3760        let fork_chain = test_harness.block_builder.create_fork(main_chain[2].block(), 3);
3761        let fork_chain_last_hash = fork_chain.last().unwrap().hash();
3762
3763        // add fork blocks to the tree
3764        for block in &fork_chain {
3765            test_harness.insert_block(block.clone()).unwrap();
3766        }
3767
3768        test_harness.send_fcu(fork_chain_last_hash, ForkchoiceStatus::Valid).await;
3769
3770        // check for ForkBlockAdded events, we expect fork_chain.len() blocks added
3771        test_harness.check_fork_chain_insertion(fork_chain.clone()).await;
3772
3773        // check for CanonicalChainCommitted event
3774        test_harness.check_canon_commit(fork_chain_last_hash).await;
3775
3776        test_harness.check_fcu(fork_chain_last_hash, ForkchoiceStatus::Valid).await;
3777
3778        // new head is the tip of the fork chain
3779        test_harness.check_canon_head(fork_chain_last_hash);
3780    }
3781
3782    #[tokio::test]
3783    async fn test_engine_tree_live_sync_transition_required_blocks_requested() {
3784        reth_tracing::init_test_tracing();
3785
3786        let chain_spec = MAINNET.clone();
3787        let mut test_harness = TestHarness::new(chain_spec.clone());
3788
3789        let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
3790        test_harness = test_harness.with_blocks(base_chain.clone());
3791
3792        test_harness
3793            .fcu_to(base_chain.last().unwrap().block().hash(), ForkchoiceStatus::Valid)
3794            .await;
3795
3796        // extend main chain with enough blocks to trigger pipeline run but don't insert them
3797        let main_chain = test_harness
3798            .block_builder
3799            .create_fork(base_chain[0].block(), MIN_BLOCKS_FOR_PIPELINE_RUN + 10);
3800
3801        let main_chain_last_hash = main_chain.last().unwrap().hash();
3802        test_harness.send_fcu(main_chain_last_hash, ForkchoiceStatus::Syncing).await;
3803
3804        test_harness.check_fcu(main_chain_last_hash, ForkchoiceStatus::Syncing).await;
3805
3806        // create event for backfill finished
3807        let backfill_finished_block_number = MIN_BLOCKS_FOR_PIPELINE_RUN + 1;
3808        let backfill_finished = FromOrchestrator::BackfillSyncFinished(ControlFlow::Continue {
3809            block_number: backfill_finished_block_number,
3810        });
3811
3812        let backfill_tip_block = main_chain[(backfill_finished_block_number - 1) as usize].clone();
3813        // add block to mock provider to enable persistence clean up.
3814        test_harness
3815            .provider
3816            .add_block(backfill_tip_block.hash(), backfill_tip_block.block.unseal());
3817        test_harness.tree.on_engine_message(FromEngine::Event(backfill_finished)).unwrap();
3818
3819        let event = test_harness.from_tree_rx.recv().await.unwrap();
3820        match event {
3821            EngineApiEvent::Download(DownloadRequest::BlockSet(hash_set)) => {
3822                assert_eq!(hash_set, HashSet::from_iter([main_chain_last_hash]));
3823            }
3824            _ => panic!("Unexpected event: {:#?}", event),
3825        }
3826
3827        test_harness
3828            .tree
3829            .on_engine_message(FromEngine::DownloadedBlocks(vec![main_chain
3830                .last()
3831                .unwrap()
3832                .clone()]))
3833            .unwrap();
3834
3835        let event = test_harness.from_tree_rx.recv().await.unwrap();
3836        match event {
3837            EngineApiEvent::Download(DownloadRequest::BlockRange(initial_hash, total_blocks)) => {
3838                assert_eq!(
3839                    total_blocks,
3840                    (main_chain.len() - backfill_finished_block_number as usize - 1) as u64
3841                );
3842                assert_eq!(initial_hash, main_chain.last().unwrap().parent_hash);
3843            }
3844            _ => panic!("Unexpected event: {:#?}", event),
3845        }
3846    }
3847
3848    #[tokio::test]
3849    async fn test_engine_tree_live_sync_transition_eventually_canonical() {
3850        reth_tracing::init_test_tracing();
3851
3852        let chain_spec = MAINNET.clone();
3853        let mut test_harness = TestHarness::new(chain_spec.clone());
3854        test_harness.tree.config = test_harness.tree.config.with_max_execute_block_batch_size(100);
3855
3856        // create base chain and setup test harness with it
3857        let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
3858        test_harness = test_harness.with_blocks(base_chain.clone());
3859
3860        // fcu to the tip of base chain
3861        test_harness
3862            .fcu_to(base_chain.last().unwrap().block().hash(), ForkchoiceStatus::Valid)
3863            .await;
3864
3865        // create main chain, extension of base chain, with enough blocks to
3866        // trigger backfill sync
3867        let main_chain = test_harness
3868            .block_builder
3869            .create_fork(base_chain[0].block(), MIN_BLOCKS_FOR_PIPELINE_RUN + 10);
3870
3871        let main_chain_last = main_chain.last().unwrap();
3872        let main_chain_last_hash = main_chain_last.hash();
3873        let main_chain_backfill_target =
3874            main_chain.get(MIN_BLOCKS_FOR_PIPELINE_RUN as usize).unwrap();
3875        let main_chain_backfill_target_hash = main_chain_backfill_target.hash();
3876
3877        // fcu to the element of main chain that should trigger backfill sync
3878        test_harness.send_fcu(main_chain_backfill_target_hash, ForkchoiceStatus::Syncing).await;
3879        test_harness.check_fcu(main_chain_backfill_target_hash, ForkchoiceStatus::Syncing).await;
3880
3881        // check download request for target
3882        let event = test_harness.from_tree_rx.recv().await.unwrap();
3883        match event {
3884            EngineApiEvent::Download(DownloadRequest::BlockSet(hash_set)) => {
3885                assert_eq!(hash_set, HashSet::from_iter([main_chain_backfill_target_hash]));
3886            }
3887            _ => panic!("Unexpected event: {:#?}", event),
3888        }
3889
3890        // send message to tell the engine the requested block was downloaded
3891        test_harness
3892            .tree
3893            .on_engine_message(FromEngine::DownloadedBlocks(vec![
3894                main_chain_backfill_target.clone()
3895            ]))
3896            .unwrap();
3897
3898        // check that backfill is triggered
3899        let event = test_harness.from_tree_rx.recv().await.unwrap();
3900        match event {
3901            EngineApiEvent::BackfillAction(BackfillAction::Start(
3902                reth_stages::PipelineTarget::Sync(target_hash),
3903            )) => {
3904                assert_eq!(target_hash, main_chain_backfill_target_hash);
3905            }
3906            _ => panic!("Unexpected event: {:#?}", event),
3907        }
3908
3909        // persist blocks of main chain, same as the backfill operation would do
3910        let backfilled_chain: Vec<_> =
3911            main_chain.clone().drain(0..(MIN_BLOCKS_FOR_PIPELINE_RUN + 1) as usize).collect();
3912        test_harness.persist_blocks(backfilled_chain.clone());
3913
3914        test_harness.setup_range_insertion_for_valid_chain(backfilled_chain);
3915
3916        // send message to mark backfill finished
3917        test_harness
3918            .tree
3919            .on_engine_message(FromEngine::Event(FromOrchestrator::BackfillSyncFinished(
3920                ControlFlow::Continue { block_number: main_chain_backfill_target.number },
3921            )))
3922            .unwrap();
3923
3924        // send fcu to the tip of main
3925        test_harness.fcu_to(main_chain_last_hash, ForkchoiceStatus::Syncing).await;
3926
3927        let event = test_harness.from_tree_rx.recv().await.unwrap();
3928        match event {
3929            EngineApiEvent::Download(DownloadRequest::BlockSet(target_hash)) => {
3930                assert_eq!(target_hash, HashSet::from_iter([main_chain_last_hash]));
3931            }
3932            _ => panic!("Unexpected event: {:#?}", event),
3933        }
3934
3935        // tell engine main chain tip downloaded
3936        test_harness
3937            .tree
3938            .on_engine_message(FromEngine::DownloadedBlocks(vec![main_chain_last.clone()]))
3939            .unwrap();
3940
3941        // check download range request
3942        let event = test_harness.from_tree_rx.recv().await.unwrap();
3943        match event {
3944            EngineApiEvent::Download(DownloadRequest::BlockRange(initial_hash, total_blocks)) => {
3945                assert_eq!(
3946                    total_blocks,
3947                    (main_chain.len() - MIN_BLOCKS_FOR_PIPELINE_RUN as usize - 2) as u64
3948                );
3949                assert_eq!(initial_hash, main_chain_last.parent_hash);
3950            }
3951            _ => panic!("Unexpected event: {:#?}", event),
3952        }
3953
3954        let remaining: Vec<_> = main_chain
3955            .clone()
3956            .drain((MIN_BLOCKS_FOR_PIPELINE_RUN + 1) as usize..main_chain.len())
3957            .collect();
3958
3959        test_harness.setup_range_insertion_for_valid_chain(remaining.clone());
3960
3961        // tell engine block range downloaded
3962        test_harness
3963            .tree
3964            .on_engine_message(FromEngine::DownloadedBlocks(remaining.clone()))
3965            .unwrap();
3966
3967        test_harness.check_canon_chain_insertion(remaining).await;
3968
3969        // check canonical chain committed event with the hash of the latest block
3970        test_harness.check_canon_commit(main_chain_last_hash).await;
3971
3972        // new head is the tip of the main chain
3973        test_harness.check_canon_head(main_chain_last_hash);
3974    }
3975
3976    #[tokio::test]
3977    async fn test_engine_tree_live_sync_fcu_extends_canon_chain() {
3978        reth_tracing::init_test_tracing();
3979
3980        let chain_spec = MAINNET.clone();
3981        let mut test_harness = TestHarness::new(chain_spec.clone());
3982
3983        // create base chain and setup test harness with it
3984        let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
3985        test_harness = test_harness.with_blocks(base_chain.clone());
3986
3987        // fcu to the tip of base chain
3988        test_harness
3989            .fcu_to(base_chain.last().unwrap().block().hash(), ForkchoiceStatus::Valid)
3990            .await;
3991
3992        // create main chain, extension of base chain
3993        let main_chain = test_harness.block_builder.create_fork(base_chain[0].block(), 10);
3994        // determine target in the middle of main hain
3995        let target = main_chain.get(5).unwrap();
3996        let target_hash = target.hash();
3997        let main_last = main_chain.last().unwrap();
3998        let main_last_hash = main_last.hash();
3999
4000        // insert main chain
4001        test_harness.insert_chain(main_chain).await;
4002
4003        // send fcu to target
4004        test_harness.send_fcu(target_hash, ForkchoiceStatus::Valid).await;
4005
4006        test_harness.check_canon_commit(target_hash).await;
4007        test_harness.check_fcu(target_hash, ForkchoiceStatus::Valid).await;
4008
4009        // send fcu to main tip
4010        test_harness.send_fcu(main_last_hash, ForkchoiceStatus::Valid).await;
4011
4012        test_harness.check_canon_commit(main_last_hash).await;
4013        test_harness.check_fcu(main_last_hash, ForkchoiceStatus::Valid).await;
4014        test_harness.check_canon_head(main_last_hash);
4015    }
4016
4017    #[tokio::test]
4018    async fn test_engine_tree_valid_forks_with_older_canonical_head() {
4019        reth_tracing::init_test_tracing();
4020
4021        let chain_spec = MAINNET.clone();
4022        let mut test_harness = TestHarness::new(chain_spec.clone());
4023
4024        // create base chain and setup test harness with it
4025        let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4026        test_harness = test_harness.with_blocks(base_chain.clone());
4027
4028        let old_head = base_chain.first().unwrap().block();
4029
4030        // extend base chain
4031        let extension_chain = test_harness.block_builder.create_fork(old_head, 5);
4032        let fork_block = extension_chain.last().unwrap().block.clone();
4033
4034        test_harness.setup_range_insertion_for_valid_chain(extension_chain.clone());
4035        test_harness.insert_chain(extension_chain).await;
4036
4037        // fcu to old_head
4038        test_harness.fcu_to(old_head.hash(), ForkchoiceStatus::Valid).await;
4039
4040        // create two competing chains starting from fork_block
4041        let chain_a = test_harness.block_builder.create_fork(&fork_block, 10);
4042        let chain_b = test_harness.block_builder.create_fork(&fork_block, 10);
4043
4044        // insert chain A blocks using newPayload
4045        test_harness.setup_range_insertion_for_valid_chain(chain_a.clone());
4046        for block in &chain_a {
4047            test_harness.send_new_payload(block.clone()).await;
4048        }
4049
4050        test_harness.check_canon_chain_insertion(chain_a.clone()).await;
4051
4052        // insert chain B blocks using newPayload
4053        test_harness.setup_range_insertion_for_valid_chain(chain_b.clone());
4054        for block in &chain_b {
4055            test_harness.send_new_payload(block.clone()).await;
4056        }
4057
4058        test_harness.check_canon_chain_insertion(chain_b.clone()).await;
4059
4060        // send FCU to make the tip of chain B the new head
4061        let chain_b_tip_hash = chain_b.last().unwrap().hash();
4062        test_harness.send_fcu(chain_b_tip_hash, ForkchoiceStatus::Valid).await;
4063
4064        // check for CanonicalChainCommitted event
4065        test_harness.check_canon_commit(chain_b_tip_hash).await;
4066
4067        // verify FCU was processed
4068        test_harness.check_fcu(chain_b_tip_hash, ForkchoiceStatus::Valid).await;
4069
4070        // verify the new canonical head
4071        test_harness.check_canon_head(chain_b_tip_hash);
4072
4073        // verify that chain A is now considered a fork
4074        assert!(test_harness.tree.is_fork(chain_a.last().unwrap().hash()).unwrap());
4075    }
4076
4077    #[tokio::test]
4078    async fn test_engine_tree_buffered_blocks_are_eventually_connected() {
4079        let chain_spec = MAINNET.clone();
4080        let mut test_harness = TestHarness::new(chain_spec.clone());
4081
4082        let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4083        test_harness = test_harness.with_blocks(base_chain.clone());
4084
4085        // side chain consisting of two blocks, the last will be inserted first
4086        // so that we force it to be buffered
4087        let side_chain =
4088            test_harness.block_builder.create_fork(base_chain.last().unwrap().block(), 2);
4089
4090        // buffer last block of side chain
4091        let buffered_block = side_chain.last().unwrap();
4092        let buffered_block_hash = buffered_block.hash();
4093
4094        test_harness.setup_range_insertion_for_valid_chain(vec![buffered_block.clone()]);
4095        test_harness.send_new_payload(buffered_block.clone()).await;
4096
4097        assert!(test_harness.tree.state.buffer.block(&buffered_block_hash).is_some());
4098
4099        let non_buffered_block = side_chain.first().unwrap();
4100        let non_buffered_block_hash = non_buffered_block.hash();
4101
4102        // insert block that continues the canon chain, should not be buffered
4103        test_harness.setup_range_insertion_for_valid_chain(vec![non_buffered_block.clone()]);
4104        test_harness.send_new_payload(non_buffered_block.clone()).await;
4105        assert!(test_harness.tree.state.buffer.block(&non_buffered_block_hash).is_none());
4106
4107        // the previously buffered block should be connected now
4108        assert!(test_harness.tree.state.buffer.block(&buffered_block_hash).is_none());
4109
4110        // both blocks are added to the canon chain in order
4111        test_harness.check_canon_block_added(non_buffered_block_hash).await;
4112        test_harness.check_canon_block_added(buffered_block_hash).await;
4113    }
4114
4115    #[tokio::test]
4116    async fn test_engine_tree_valid_and_invalid_forks_with_older_canonical_head() {
4117        reth_tracing::init_test_tracing();
4118
4119        let chain_spec = MAINNET.clone();
4120        let mut test_harness = TestHarness::new(chain_spec.clone());
4121
4122        // create base chain and setup test harness with it
4123        let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4124        test_harness = test_harness.with_blocks(base_chain.clone());
4125
4126        let old_head = base_chain.first().unwrap().block();
4127
4128        // extend base chain
4129        let extension_chain = test_harness.block_builder.create_fork(old_head, 5);
4130        let fork_block = extension_chain.last().unwrap().block.clone();
4131        test_harness.insert_chain(extension_chain).await;
4132
4133        // fcu to old_head
4134        test_harness.fcu_to(old_head.hash(), ForkchoiceStatus::Valid).await;
4135
4136        // create two competing chains starting from fork_block, one of them invalid
4137        let total_fork_elements = 10;
4138        let chain_a = test_harness.block_builder.create_fork(&fork_block, total_fork_elements);
4139        let chain_b = test_harness.block_builder.create_fork(&fork_block, total_fork_elements);
4140
4141        // insert chain B blocks using newPayload
4142        test_harness.setup_range_insertion_for_valid_chain(chain_b.clone());
4143        for block in &chain_b {
4144            test_harness.send_new_payload(block.clone()).await;
4145            test_harness.send_fcu(block.hash(), ForkchoiceStatus::Valid).await;
4146            test_harness.check_canon_block_added(block.hash()).await;
4147            test_harness.check_canon_commit(block.hash()).await;
4148            test_harness.check_fcu(block.hash(), ForkchoiceStatus::Valid).await;
4149        }
4150
4151        // insert chain A blocks using newPayload, one of the blocks will be invalid
4152        let invalid_index = 3;
4153        test_harness.setup_range_insertion_for_invalid_chain(chain_a.clone(), invalid_index);
4154        for block in &chain_a {
4155            test_harness.send_new_payload(block.clone()).await;
4156        }
4157
4158        // check canon chain insertion up to the invalid index and taking into
4159        // account reversed ordering
4160        test_harness
4161            .check_fork_chain_insertion(
4162                chain_a[..chain_a.len() - invalid_index - 1].iter().cloned(),
4163            )
4164            .await;
4165
4166        // send FCU to make the tip of chain A, expect invalid
4167        let chain_a_tip_hash = chain_a.last().unwrap().hash();
4168        test_harness.fcu_to(chain_a_tip_hash, ForkchoiceStatus::Invalid).await;
4169
4170        // send FCU to make the tip of chain B the new head
4171        let chain_b_tip_hash = chain_b.last().unwrap().hash();
4172
4173        // verify the new canonical head
4174        test_harness.check_canon_head(chain_b_tip_hash);
4175
4176        // verify the canonical head didn't change
4177        test_harness.check_canon_head(chain_b_tip_hash);
4178    }
4179
4180    #[tokio::test]
4181    async fn test_engine_tree_reorg_with_missing_ancestor_expecting_valid() {
4182        reth_tracing::init_test_tracing();
4183        let chain_spec = MAINNET.clone();
4184        let mut test_harness = TestHarness::new(chain_spec.clone());
4185
4186        let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..6).collect();
4187        test_harness = test_harness.with_blocks(base_chain.clone());
4188
4189        // create a side chain with an invalid block
4190        let side_chain =
4191            test_harness.block_builder.create_fork(base_chain.last().unwrap().block(), 15);
4192        let invalid_index = 9;
4193
4194        test_harness.setup_range_insertion_for_invalid_chain(side_chain.clone(), invalid_index);
4195
4196        for (index, block) in side_chain.iter().enumerate() {
4197            test_harness.send_new_payload(block.clone()).await;
4198
4199            if index < side_chain.len() - invalid_index - 1 {
4200                test_harness.send_fcu(block.block.hash(), ForkchoiceStatus::Valid).await;
4201            }
4202        }
4203
4204        // Try to do a forkchoice update to a block after the invalid one
4205        let fork_tip_hash = side_chain.last().unwrap().hash();
4206        test_harness.send_fcu(fork_tip_hash, ForkchoiceStatus::Invalid).await;
4207    }
4208}