1use crate::{
2 backfill::{BackfillAction, BackfillSyncState},
3 backup::{BackupAction, BackupHandle},
4 chain::FromOrchestrator,
5 engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
6 persistence::PersistenceHandle,
7 tree::{
8 cached_state::CachedStateProvider, executor::WorkloadExecutor, metrics::EngineApiMetrics,
9 },
10};
11use alloy_consensus::BlockHeader;
12use alloy_eips::{merge::EPOCH_SLOTS, BlockNumHash, NumHash};
13use alloy_primitives::B256;
14use alloy_rpc_types_engine::{
15 ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
16};
17use error::{InsertBlockError, InsertBlockErrorKind, InsertBlockFatalError};
18use instrumented_state::InstrumentedStateProvider;
19use payload_processor::sparse_trie::StateRootComputeOutcome;
20use persistence_state::CurrentPersistenceAction;
21use precompile_cache::PrecompileCacheMap;
22use reth_chain_state::{
23 CanonicalInMemoryState, ExecutedBlock, ExecutedBlockWithTrieUpdates,
24 MemoryOverlayStateProvider, NewCanonicalChain,
25};
26use reth_consensus::{Consensus, FullConsensus};
27pub use reth_engine_primitives::InvalidBlockHook;
28use reth_engine_primitives::{
29 BeaconConsensusEngineEvent, BeaconEngineMessage, BeaconOnNewPayloadError, EngineValidator,
30 ExecutionPayload, ForkchoiceStateTracker, OnForkChoiceUpdated,
31};
32use reth_errors::{ConsensusError, ProviderResult};
33use reth_evm::{ConfigureEvm, SpecFor};
34use reth_payload_builder::PayloadBuilderHandle;
35use reth_payload_primitives::{EngineApiMessageVersion, PayloadBuilderAttributes, PayloadTypes};
36use reth_primitives_traits::{
37 Block, GotExpected, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader,
38};
39use reth_provider::{
40 providers::ConsistentDbView, BlockNumReader, BlockReader, DBProvider, DatabaseProviderFactory,
41 ExecutionOutcome, HashedPostStateProvider, ProviderError, StateCommitmentProvider,
42 StateProvider, StateProviderBox, StateProviderFactory, StateReader, StateRootProvider,
43 TransactionVariant,
44};
45use reth_revm::{database::StateProviderDatabase, State};
46use reth_stages_api::ControlFlow;
47use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput};
48use reth_trie_db::{DatabaseHashedPostState, StateCommitment};
49use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
50use state::TreeState;
51use std::{
52 fmt::Debug,
53 sync::{
54 mpsc::{Receiver, RecvError, RecvTimeoutError, Sender},
55 Arc,
56 },
57 time::Instant,
58};
59use tokio::sync::{
60 mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
61 oneshot::{self, error::TryRecvError},
62};
63use tracing::*;
64
65mod block_buffer;
66mod cached_state;
67pub mod error;
68mod instrumented_state;
69mod invalid_block_hook;
70mod invalid_headers;
71mod metrics;
72mod payload_processor;
73mod persistence_state;
74#[allow(unused)]
75pub mod precompile_cache;
76#[expect(unused)]
78mod trie_updates;
79
80use crate::tree::error::AdvancePersistenceError;
81pub use block_buffer::BlockBuffer;
82pub use invalid_block_hook::{InvalidBlockHooks, NoopInvalidBlockHook};
83pub use invalid_headers::InvalidHeaderCache;
84pub use payload_processor::*;
85pub use persistence_state::PersistenceState;
86pub use reth_engine_primitives::TreeConfig;
87use reth_evm::execute::BlockExecutionOutput;
88
89pub mod state;
90
91pub(crate) const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
101
102#[derive(Clone, Debug)]
104pub struct StateProviderBuilder<N: NodePrimitives, P> {
105 provider_factory: P,
107 historical: B256,
109 overlay: Option<Vec<ExecutedBlockWithTrieUpdates<N>>>,
111}
112
113impl<N: NodePrimitives, P> StateProviderBuilder<N, P> {
114 pub const fn new(
117 provider_factory: P,
118 historical: B256,
119 overlay: Option<Vec<ExecutedBlockWithTrieUpdates<N>>>,
120 ) -> Self {
121 Self { provider_factory, historical, overlay }
122 }
123}
124
125impl<N: NodePrimitives, P> StateProviderBuilder<N, P>
126where
127 P: BlockReader + StateProviderFactory + StateReader + StateCommitmentProvider + Clone,
128{
129 pub fn build(&self) -> ProviderResult<StateProviderBox> {
131 let mut provider = self.provider_factory.state_by_block_hash(self.historical)?;
132 if let Some(overlay) = self.overlay.clone() {
133 provider = Box::new(MemoryOverlayStateProvider::new(provider, overlay))
134 }
135 Ok(provider)
136 }
137}
138
139#[derive(Debug)]
143pub struct EngineApiTreeState<N: NodePrimitives> {
144 tree_state: TreeState<N>,
146 forkchoice_state_tracker: ForkchoiceStateTracker,
148 buffer: BlockBuffer<N::Block>,
150 invalid_headers: InvalidHeaderCache,
153}
154
155impl<N: NodePrimitives> EngineApiTreeState<N> {
156 fn new(
157 block_buffer_limit: u32,
158 max_invalid_header_cache_length: u32,
159 canonical_block: BlockNumHash,
160 engine_kind: EngineApiKind,
161 ) -> Self {
162 Self {
163 invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length),
164 buffer: BlockBuffer::new(block_buffer_limit),
165 tree_state: TreeState::new(canonical_block, engine_kind),
166 forkchoice_state_tracker: ForkchoiceStateTracker::default(),
167 }
168 }
169}
170
171#[derive(Debug)]
173pub struct TreeOutcome<T> {
174 pub outcome: T,
176 pub event: Option<TreeEvent>,
178}
179
180impl<T> TreeOutcome<T> {
181 pub const fn new(outcome: T) -> Self {
183 Self { outcome, event: None }
184 }
185
186 pub fn with_event(mut self, event: TreeEvent) -> Self {
188 self.event = Some(event);
189 self
190 }
191}
192
193#[derive(Debug)]
195pub enum TreeEvent {
196 TreeAction(TreeAction),
198 BackfillAction(BackfillAction),
200 Download(DownloadRequest),
202}
203
204impl TreeEvent {
205 const fn is_backfill_action(&self) -> bool {
207 matches!(self, Self::BackfillAction(_))
208 }
209}
210
211#[derive(Debug)]
213pub enum TreeAction {
214 MakeCanonical {
216 sync_target_head: B256,
218 },
219}
220
221pub struct EngineApiTreeHandler<N, P, T, V, C>
226where
227 N: NodePrimitives,
228 T: PayloadTypes,
229 C: ConfigureEvm<Primitives = N> + 'static,
230{
231 provider: P,
232 consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
233 payload_validator: V,
234 state: EngineApiTreeState<N>,
236 incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
245 incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
247 outgoing: UnboundedSender<EngineApiEvent<N>>,
249 persistence: PersistenceHandle<N>,
251 persistence_state: PersistenceState,
253 backfill_sync_state: BackfillSyncState,
255 canonical_in_memory_state: CanonicalInMemoryState<N>,
258 payload_builder: PayloadBuilderHandle<T>,
261 config: TreeConfig,
263 metrics: EngineApiMetrics,
265 invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
267 engine_kind: EngineApiKind,
269 payload_processor: PayloadProcessor<N, C>,
271 evm_config: C,
273 #[allow(unused)]
275 precompile_cache_map: PrecompileCacheMap<SpecFor<C>>,
276 backup: BackupHandle,
278}
279
280impl<N, P: Debug, T: PayloadTypes + Debug, V: Debug, C> std::fmt::Debug
281 for EngineApiTreeHandler<N, P, T, V, C>
282where
283 N: NodePrimitives,
284 C: Debug + ConfigureEvm<Primitives = N>,
285{
286 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
287 f.debug_struct("EngineApiTreeHandler")
288 .field("provider", &self.provider)
289 .field("consensus", &self.consensus)
290 .field("payload_validator", &self.payload_validator)
291 .field("state", &self.state)
292 .field("incoming_tx", &self.incoming_tx)
293 .field("persistence", &self.persistence)
294 .field("persistence_state", &self.persistence_state)
295 .field("backfill_sync_state", &self.backfill_sync_state)
296 .field("canonical_in_memory_state", &self.canonical_in_memory_state)
297 .field("payload_builder", &self.payload_builder)
298 .field("config", &self.config)
299 .field("metrics", &self.metrics)
300 .field("invalid_block_hook", &format!("{:p}", self.invalid_block_hook))
301 .field("engine_kind", &self.engine_kind)
302 .field("payload_processor", &self.payload_processor)
303 .field("evm_config", &self.evm_config)
304 .finish()
305 }
306}
307
308impl<N, P, T, V, C> EngineApiTreeHandler<N, P, T, V, C>
309where
310 N: NodePrimitives,
311 P: DatabaseProviderFactory
312 + BlockReader<Block = N::Block, Header = N::BlockHeader>
313 + StateProviderFactory
314 + StateReader<Receipt = N::Receipt>
315 + StateCommitmentProvider
316 + HashedPostStateProvider
317 + Clone
318 + 'static,
319 <P as DatabaseProviderFactory>::Provider:
320 BlockReader<Block = N::Block, Header = N::BlockHeader>,
321 C: ConfigureEvm<Primitives = N> + 'static,
322 T: PayloadTypes,
323 V: EngineValidator<T, Block = N::Block>,
324{
325 #[expect(clippy::too_many_arguments)]
327 pub fn new(
328 provider: P,
329 consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
330 payload_validator: V,
331 outgoing: UnboundedSender<EngineApiEvent<N>>,
332 state: EngineApiTreeState<N>,
333 canonical_in_memory_state: CanonicalInMemoryState<N>,
334 persistence: PersistenceHandle<N>,
335 persistence_state: PersistenceState,
336 payload_builder: PayloadBuilderHandle<T>,
337 config: TreeConfig,
338 engine_kind: EngineApiKind,
339 evm_config: C,
340 backup: BackupHandle,
341 ) -> Self {
342 let (incoming_tx, incoming) = std::sync::mpsc::channel();
343
344 let precompile_cache_map = PrecompileCacheMap::default();
345
346 let payload_processor = PayloadProcessor::new(
347 WorkloadExecutor::default(),
348 evm_config.clone(),
349 &config,
350 precompile_cache_map.clone(),
351 );
352
353 Self {
354 provider,
355 consensus,
356 payload_validator,
357 incoming,
358 outgoing,
359 persistence,
360 persistence_state,
361 backfill_sync_state: BackfillSyncState::Idle,
362 state,
363 canonical_in_memory_state,
364 payload_builder,
365 config,
366 metrics: Default::default(),
367 incoming_tx,
368 invalid_block_hook: Box::new(NoopInvalidBlockHook),
369 engine_kind,
370 payload_processor,
371 evm_config,
372 precompile_cache_map,
373 backup,
374 }
375 }
376
377 fn set_invalid_block_hook(&mut self, invalid_block_hook: Box<dyn InvalidBlockHook<N>>) {
379 self.invalid_block_hook = invalid_block_hook;
380 }
381
382 #[expect(clippy::complexity)]
388 pub fn spawn_new(
389 provider: P,
390 consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
391 payload_validator: V,
392 persistence: PersistenceHandle<N>,
393 payload_builder: PayloadBuilderHandle<T>,
394 canonical_in_memory_state: CanonicalInMemoryState<N>,
395 config: TreeConfig,
396 invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
397 kind: EngineApiKind,
398 evm_config: C,
399 backup: BackupHandle,
400 ) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
401 {
402 let best_block_number = provider.best_block_number().unwrap_or(0);
403 let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
404
405 let persistence_state = PersistenceState {
406 last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
407 rx: None,
408 };
409
410 let (tx, outgoing) = unbounded_channel();
411 let state = EngineApiTreeState::new(
412 config.block_buffer_limit(),
413 config.max_invalid_header_cache_length(),
414 header.num_hash(),
415 kind,
416 );
417
418 let mut task = Self::new(
419 provider,
420 consensus,
421 payload_validator,
422 tx,
423 state,
424 canonical_in_memory_state,
425 persistence,
426 persistence_state,
427 payload_builder,
428 config,
429 kind,
430 evm_config,
431 backup,
432 );
433 task.set_invalid_block_hook(invalid_block_hook);
434 let incoming = task.incoming_tx.clone();
435 std::thread::Builder::new().name("Tree Task".to_string()).spawn(|| task.run()).unwrap();
436 (incoming, outgoing)
437 }
438
439 pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
441 self.incoming_tx.clone()
442 }
443
444 pub fn run(mut self) {
448 loop {
449 match self.try_recv_engine_message() {
450 Ok(Some(msg)) => {
451 debug!(target: "engine::tree", %msg, "received new engine message");
452 if let Err(fatal) = self.on_engine_message(msg) {
453 error!(target: "engine::tree", %fatal, "insert block fatal error");
454 return
455 }
456 }
457 Ok(None) => {
458 debug!(target: "engine::tree", "received no engine message for some time, while waiting for persistence task to complete");
459 }
460 Err(_err) => {
461 error!(target: "engine::tree", "Engine channel disconnected");
462 return
463 }
464 }
465
466 if let Err(err) = self.advance_persistence() {
467 error!(target: "engine::tree", %err, "Advancing persistence failed");
468 return
469 }
470 if let Err(err) = self.advance_backup() {
471 error!(target: "engine::tree", %err, "Advancing backup failed");
472 return
473 }
474 }
475 }
476
477 fn on_downloaded(
483 &mut self,
484 mut blocks: Vec<RecoveredBlock<N::Block>>,
485 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
486 if blocks.is_empty() {
487 return Ok(None)
489 }
490
491 trace!(target: "engine::tree", block_count = %blocks.len(), "received downloaded blocks");
492 let batch = self.config.max_execute_block_batch_size().min(blocks.len());
493 for block in blocks.drain(..batch) {
494 if let Some(event) = self.on_downloaded_block(block)? {
495 let needs_backfill = event.is_backfill_action();
496 self.on_tree_event(event)?;
497 if needs_backfill {
498 return Ok(None)
500 }
501 }
502 }
503
504 if !blocks.is_empty() {
506 let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
507 }
508
509 Ok(None)
510 }
511
512 #[instrument(level = "trace", skip_all, fields(block_hash = %payload.block_hash(), block_num = %payload.block_number(),), target = "engine::tree")]
526 fn on_new_payload(
527 &mut self,
528 payload: T::ExecutionData,
529 ) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
530 trace!(target: "engine::tree", "invoked new payload");
531 self.metrics.engine.new_payload_messages.increment(1);
532
533 let parent_hash = payload.parent_hash();
559 debug!("on_new_payload: payload: {:?}", payload);
560 let block = match self.payload_validator.ensure_well_formed_payload(payload) {
561 Ok(block) => block,
562 Err(error) => {
563 error!(target: "engine::tree", %error, "Invalid payload");
564 let latest_valid_hash =
567 if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
568 None
572 } else {
573 self.latest_valid_hash_for_invalid_payload(parent_hash)?
574 };
575
576 let status = PayloadStatusEnum::from(error);
577 return Ok(TreeOutcome::new(PayloadStatus::new(status, latest_valid_hash)))
578 }
579 };
580
581 let block_hash = block.hash();
582 let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
583 if lowest_buffered_ancestor == block_hash {
584 lowest_buffered_ancestor = block.parent_hash();
585 }
586
587 if let Some(status) =
589 self.check_invalid_ancestor_with_head(lowest_buffered_ancestor, &block)?
590 {
591 return Ok(TreeOutcome::new(status))
592 }
593
594 let status = if self.backfill_sync_state.is_idle() {
595 let mut latest_valid_hash = None;
596 let num_hash = block.num_hash();
597 match self.insert_block(block) {
598 Ok(status) => {
599 let status = match status {
600 InsertPayloadOk::Inserted(BlockStatus::Valid) => {
601 latest_valid_hash = Some(block_hash);
602 self.try_connect_buffered_blocks(num_hash)?;
603 PayloadStatusEnum::Valid
604 }
605 InsertPayloadOk::AlreadySeen(BlockStatus::Valid) => {
606 latest_valid_hash = Some(block_hash);
607 PayloadStatusEnum::Valid
608 }
609 InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) |
610 InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
611 PayloadStatusEnum::Syncing
613 }
614 };
615
616 PayloadStatus::new(status, latest_valid_hash)
617 }
618 Err(error) => self.on_insert_block_error(error)?,
619 }
620 } else if let Err(error) = self.buffer_block(block) {
621 self.on_insert_block_error(error)?
622 } else {
623 PayloadStatus::from_status(PayloadStatusEnum::Syncing)
624 };
625
626 let mut outcome = TreeOutcome::new(status);
627 if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
629 if self.state.tree_state.canonical_block_hash() != block_hash {
631 outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical {
632 sync_target_head: block_hash,
633 }));
634 }
635 }
636
637 Ok(outcome)
638 }
639
640 fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain<N>>> {
647 let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
649 return Ok(None)
650 };
651
652 let new_head_number = new_head_block.recovered_block().number();
653 let mut current_canonical_number = self.state.tree_state.current_canonical_head.number;
654
655 let mut new_chain = vec![new_head_block.clone()];
656 let mut current_hash = new_head_block.recovered_block().parent_hash();
657 let mut current_number = new_head_number - 1;
658
659 while current_number > current_canonical_number {
664 if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
665 {
666 current_hash = block.recovered_block().parent_hash();
667 current_number -= 1;
668 new_chain.push(block);
669 } else {
670 warn!(target: "engine::tree", current_hash=?current_hash, "Sidechain block not found in TreeState");
671 return Ok(None);
674 }
675 }
676
677 if current_hash == self.state.tree_state.current_canonical_head.hash {
680 new_chain.reverse();
681
682 return Ok(Some(NewCanonicalChain::Commit { new: new_chain }));
684 }
685
686 let mut old_chain = Vec::new();
688 let mut old_hash = self.state.tree_state.current_canonical_head.hash;
689
690 while current_canonical_number > current_number {
693 if let Some(block) = self.canonical_block_by_hash(old_hash)? {
694 old_chain.push(block.clone());
695 old_hash = block.recovered_block().parent_hash();
696 current_canonical_number -= 1;
697 } else {
698 warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
700 return Ok(None);
701 }
702 }
703
704 debug_assert_eq!(current_number, current_canonical_number);
706
707 while old_hash != current_hash {
710 if let Some(block) = self.canonical_block_by_hash(old_hash)? {
711 old_hash = block.recovered_block().parent_hash();
712 old_chain.push(block);
713 } else {
714 warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
716 return Ok(None);
717 }
718
719 if let Some(block) = self.state.tree_state.executed_block_by_hash(current_hash).cloned()
720 {
721 current_hash = block.recovered_block().parent_hash();
722 new_chain.push(block);
723 } else {
724 warn!(target: "engine::tree", invalid_hash=?current_hash, "New chain block not found in TreeState");
726 return Ok(None);
727 }
728 }
729 new_chain.reverse();
730 old_chain.reverse();
731
732 Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }))
733 }
734
735 fn is_fork(&self, target_hash: B256) -> ProviderResult<bool> {
742 let canonical_head = self.state.tree_state.canonical_head();
744 let mut current_hash = target_hash;
745 while let Some(current_block) = self.sealed_header_by_hash(current_hash)? {
746 if current_block.hash() == canonical_head.hash {
747 return Ok(false)
748 }
749 if current_block.number() <= canonical_head.number {
751 break
752 }
753 current_hash = current_block.parent_hash();
754 }
755
756 if self.canonical_in_memory_state.header_by_hash(target_hash).is_some() {
758 return Ok(false)
759 }
760
761 if self.provider.block_number(target_hash)?.is_some() {
763 return Ok(false)
764 }
765
766 Ok(true)
767 }
768
769 fn persisting_kind_for(&self, block: &N::BlockHeader) -> PersistingKind {
771 let Some(action) = self.persistence_state.current_action() else {
773 return PersistingKind::NotPersisting
774 };
775 let CurrentPersistenceAction::SavingBlocks { highest } = action else {
777 return PersistingKind::PersistingNotDescendant
778 };
779
780 if block.number() > highest.number && self.state.tree_state.is_descendant(*highest, block) {
783 return PersistingKind::PersistingDescendant
784 }
785
786 PersistingKind::PersistingNotDescendant
788 }
789
790 #[instrument(level = "trace", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash), target = "engine::tree")]
799 fn on_forkchoice_updated(
800 &mut self,
801 state: ForkchoiceState,
802 attrs: Option<T::PayloadAttributes>,
803 version: EngineApiMessageVersion,
804 ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
805 trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
806 self.metrics.engine.forkchoice_updated_messages.increment(1);
807 self.canonical_in_memory_state.on_forkchoice_update_received();
808
809 if let Some(on_updated) = self.pre_validate_forkchoice_update(state)? {
810 return Ok(TreeOutcome::new(on_updated))
811 }
812
813 let valid_outcome = |head| {
814 TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
815 PayloadStatusEnum::Valid,
816 Some(head),
817 )))
818 };
819
820 if self.state.tree_state.canonical_block_hash() == state.head_block_hash {
836 trace!(target: "engine::tree", "fcu head hash is already canonical");
837
838 if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
840 return Ok(TreeOutcome::new(outcome))
842 }
843
844 if let Some(attr) = attrs {
846 let tip = self
847 .block_by_hash(self.state.tree_state.canonical_block_hash())?
848 .ok_or_else(|| {
849 ProviderError::HeaderNotFound(state.head_block_hash.into())
852 })?;
853 let updated = self.process_payload_attributes(attr, tip.header(), state, version);
854 return Ok(TreeOutcome::new(updated))
855 }
856
857 return Ok(valid_outcome(state.head_block_hash))
859 }
860
861 if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
863 debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical");
864
865 if self.engine_kind.is_opstack() {
868 if let Some(attr) = attrs {
869 debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head");
870 let updated =
871 self.process_payload_attributes(attr, &canonical_header, state, version);
872 return Ok(TreeOutcome::new(updated))
873 }
874 }
875
876 return Ok(valid_outcome(state.head_block_hash))
886 }
887
888 if let Some(chain_update) = self.on_new_head(state.head_block_hash)? {
890 let tip = chain_update.tip().clone_sealed_header();
891 self.on_canonical_chain_update(chain_update);
892
893 if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
895 return Ok(TreeOutcome::new(outcome))
897 }
898
899 if let Some(attr) = attrs {
900 let updated = self.process_payload_attributes(attr, &tip, state, version);
901 return Ok(TreeOutcome::new(updated))
902 }
903
904 return Ok(valid_outcome(state.head_block_hash))
905 }
906
907 let target = if self.state.forkchoice_state_tracker.is_empty() &&
914 !state.safe_block_hash.is_zero() &&
916 self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
917 {
918 debug!(target: "engine::tree", "missing safe block on initial FCU, downloading safe block");
919 state.safe_block_hash
920 } else {
921 state.head_block_hash
922 };
923
924 let target = self.lowest_buffered_ancestor_or(target);
925 trace!(target: "engine::tree", %target, "downloading missing block");
926
927 Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
928 PayloadStatusEnum::Syncing,
929 )))
930 .with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
931 }
932
933 #[expect(clippy::type_complexity)]
942 fn try_recv_engine_message(
943 &self,
944 ) -> Result<Option<FromEngine<EngineApiRequest<T, N>, N::Block>>, RecvError> {
945 if self.persistence_state.in_progress() || self.backup.in_progress() {
946 match self.incoming.recv_timeout(std::time::Duration::from_millis(500)) {
948 Ok(msg) => Ok(Some(msg)),
949 Err(err) => match err {
950 RecvTimeoutError::Timeout => Ok(None),
951 RecvTimeoutError::Disconnected => Err(RecvError),
952 },
953 }
954 } else {
955 self.incoming.recv().map(Some)
956 }
957 }
958
959 fn remove_blocks(&mut self, new_tip_num: u64) {
962 debug!(target: "engine::tree", ?new_tip_num, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
963 if new_tip_num < self.persistence_state.last_persisted_block.number {
964 debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
965 let (tx, rx) = oneshot::channel();
966 let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
967 self.persistence_state.start_remove(new_tip_num, rx);
968 }
969 }
970
971 fn persist_blocks(&mut self, blocks_to_persist: Vec<ExecutedBlockWithTrieUpdates<N>>) {
974 if blocks_to_persist.is_empty() {
975 debug!(target: "engine::tree", "Returned empty set of blocks to persist");
976 return
977 }
978
979 let highest_num_hash = blocks_to_persist
981 .iter()
982 .max_by_key(|block| block.recovered_block().number())
983 .map(|b| b.recovered_block().num_hash())
984 .expect("Checked non-empty persisting blocks");
985
986 debug!(target: "engine::tree", blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::<Vec<_>>(), "Persisting blocks");
987 let (tx, rx) = oneshot::channel();
988 let _ = self.persistence.save_blocks(blocks_to_persist, tx);
989
990 self.persistence_state.start_save(highest_num_hash, rx);
991 }
992
993 fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
998 if self.persistence_state.in_progress() {
999 let (mut rx, start_time, current_action) = self
1000 .persistence_state
1001 .rx
1002 .take()
1003 .expect("if a persistence task is in progress Receiver must be Some");
1004 match rx.try_recv() {
1006 Ok(last_persisted_hash_num) => {
1007 self.metrics.engine.persistence_duration.record(start_time.elapsed());
1008 let Some(BlockNumHash {
1009 hash: last_persisted_block_hash,
1010 number: last_persisted_block_number,
1011 }) = last_persisted_hash_num
1012 else {
1013 warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
1016 return Ok(())
1017 };
1018
1019 debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, "Finished persisting, calling finish");
1020 self.persistence_state
1021 .finish(last_persisted_block_hash, last_persisted_block_number);
1022 self.on_new_persisted_block()?;
1023 }
1024 Err(TryRecvError::Closed) => return Err(TryRecvError::Closed.into()),
1025 Err(TryRecvError::Empty) => {
1026 self.persistence_state.rx = Some((rx, start_time, current_action))
1027 }
1028 }
1029 }
1030
1031 if !self.persistence_state.in_progress() {
1032 if let Some(new_tip_num) = self.find_disk_reorg()? {
1033 self.remove_blocks(new_tip_num)
1034 } else if self.should_persist() {
1035 let blocks_to_persist = self.get_canonical_blocks_to_persist();
1036 self.persist_blocks(blocks_to_persist);
1037 }
1038 }
1039
1040 Ok(())
1041 }
1042
1043 fn advance_backup(&mut self) -> Result<(), AdvancePersistenceError> {
1044 debug!(target: "engine::tree", "advance_backup called");
1045 if !self.backup.in_progress() {
1046 if self.should_backup() {
1047 debug!(target: "engine::tree", "sending backup action");
1048 let (tx, rx) = oneshot::channel();
1049 let _ = self.backup.sender.send(BackupAction::BackupAtBlock(
1050 self.persistence_state.last_persisted_block,
1051 tx,
1052 ));
1053 self.backup.start(rx);
1054 }
1055 }
1056
1057 if self.backup.in_progress() {
1058 let (mut rx, start_time) = self
1059 .backup
1060 .rx
1061 .take()
1062 .expect("if a backup task is in progress Receiver must be Some");
1063 match rx.try_recv() {
1065 Ok(last_backup_hash_num) => {
1066 let Some(BlockNumHash {
1067 hash: last_backup_block_hash,
1068 number: last_backup_block_number,
1069 }) = last_backup_hash_num
1070 else {
1071 warn!(target: "engine::tree", "Backup task completed but did not backup any blocks");
1072 return Ok(())
1073 };
1074
1075 debug!(target: "engine::tree", ?last_backup_hash_num, "Finished backup, calling finish");
1076 self.backup.finish(BlockNumHash::new(
1077 last_backup_block_number,
1078 last_backup_block_hash,
1079 ));
1080 }
1081 Err(TryRecvError::Closed) => return Err(TryRecvError::Closed.into()),
1082 Err(TryRecvError::Empty) => self.backup.rx = Some((rx, start_time)),
1083 }
1084 }
1085 Ok(())
1086 }
1087
1088 fn on_engine_message(
1090 &mut self,
1091 msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
1092 ) -> Result<(), InsertBlockFatalError> {
1093 match msg {
1094 FromEngine::Event(event) => match event {
1095 FromOrchestrator::BackfillSyncStarted => {
1096 debug!(target: "engine::tree", "received backfill sync started event");
1097 self.backfill_sync_state = BackfillSyncState::Active;
1098 }
1099 FromOrchestrator::BackfillSyncFinished(ctrl) => {
1100 self.on_backfill_sync_finished(ctrl)?;
1101 }
1102 },
1103 FromEngine::Request(request) => {
1104 match request {
1105 EngineApiRequest::InsertExecutedBlock(block) => {
1106 let block_num_hash = block.recovered_block().num_hash();
1107 if block_num_hash.number <= self.state.tree_state.canonical_block_number() {
1108 return Ok(())
1110 }
1111
1112 debug!(target: "engine::tree", block=?block_num_hash, "inserting already executed block");
1113 let now = Instant::now();
1114
1115 if self.state.tree_state.canonical_block_hash() ==
1118 block.recovered_block().parent_hash()
1119 {
1120 debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
1121 self.canonical_in_memory_state.set_pending_block(block.clone());
1122 }
1123
1124 self.state.tree_state.insert_executed(block.clone());
1125 self.metrics.engine.inserted_already_executed_blocks.increment(1);
1126 self.emit_event(EngineApiEvent::BeaconConsensus(
1127 BeaconConsensusEngineEvent::CanonicalBlockAdded(block, now.elapsed()),
1128 ));
1129 }
1130 EngineApiRequest::Beacon(request) => {
1131 match request {
1132 BeaconEngineMessage::ForkchoiceUpdated {
1133 state,
1134 payload_attrs,
1135 tx,
1136 version,
1137 } => {
1138 let mut output =
1139 self.on_forkchoice_updated(state, payload_attrs, version);
1140
1141 if let Ok(res) = &mut output {
1142 self.state
1144 .forkchoice_state_tracker
1145 .set_latest(state, res.outcome.forkchoice_status());
1146
1147 self.emit_event(BeaconConsensusEngineEvent::ForkchoiceUpdated(
1149 state,
1150 res.outcome.forkchoice_status(),
1151 ));
1152
1153 self.on_maybe_tree_event(res.event.take())?;
1155 }
1156
1157 if let Err(err) =
1158 tx.send(output.map(|o| o.outcome).map_err(Into::into))
1159 {
1160 self.metrics
1161 .engine
1162 .failed_forkchoice_updated_response_deliveries
1163 .increment(1);
1164 error!(target: "engine::tree", "Failed to send event: {err:?}");
1165 }
1166 }
1167 BeaconEngineMessage::NewPayload { payload, tx } => {
1168 debug!("receiving beacon engine message: payload: {:?}", payload);
1169 let mut output = self.on_new_payload(payload);
1170
1171 let maybe_event =
1172 output.as_mut().ok().and_then(|out| out.event.take());
1173
1174 if let Err(err) =
1176 tx.send(output.map(|o| o.outcome).map_err(|e| {
1177 BeaconOnNewPayloadError::Internal(Box::new(e))
1178 }))
1179 {
1180 error!(target: "engine::tree", "Failed to send event: {err:?}");
1181 self.metrics
1182 .engine
1183 .failed_new_payload_response_deliveries
1184 .increment(1);
1185 }
1186
1187 self.on_maybe_tree_event(maybe_event)?;
1189 }
1190 }
1191 }
1192 }
1193 }
1194 FromEngine::DownloadedBlocks(blocks) => {
1195 if let Some(event) = self.on_downloaded(blocks)? {
1196 self.on_tree_event(event)?;
1197 }
1198 }
1199 }
1200 Ok(())
1201 }
1202
1203 fn on_backfill_sync_finished(
1217 &mut self,
1218 ctrl: ControlFlow,
1219 ) -> Result<(), InsertBlockFatalError> {
1220 debug!(target: "engine::tree", "received backfill sync finished event");
1221 self.backfill_sync_state = BackfillSyncState::Idle;
1222
1223 let mut backfill_height = ctrl.block_number();
1225
1226 if let ControlFlow::Unwind { bad_block, target } = &ctrl {
1228 warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
1229 self.state.invalid_headers.insert(**bad_block);
1231
1232 backfill_height = Some(*target);
1234 }
1235
1236 let Some(backfill_height) = backfill_height else { return Ok(()) };
1238
1239 let Some(backfill_num_hash) = self
1245 .provider
1246 .block_hash(backfill_height)?
1247 .map(|hash| BlockNumHash { hash, number: backfill_height })
1248 else {
1249 debug!(target: "engine::tree", ?ctrl, "Backfill block not found");
1250 return Ok(())
1251 };
1252
1253 if ctrl.is_unwind() {
1254 self.state.tree_state.reset(backfill_num_hash)
1257 } else {
1258 self.state.tree_state.remove_until(
1259 backfill_num_hash,
1260 self.persistence_state.last_persisted_block.hash,
1261 Some(backfill_num_hash),
1262 );
1263 }
1264
1265 self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
1266 self.metrics.tree.canonical_chain_height.set(backfill_height as f64);
1267
1268 self.state.buffer.remove_old_blocks(backfill_height);
1270 self.canonical_in_memory_state.clear_state();
1273
1274 if let Ok(Some(new_head)) = self.provider.sealed_header(backfill_height) {
1275 self.state.tree_state.set_canonical_head(new_head.num_hash());
1278 self.persistence_state.finish(new_head.hash(), new_head.number());
1279
1280 self.canonical_in_memory_state.set_canonical_head(new_head);
1282 }
1283
1284 let Some(sync_target_state) = self.state.forkchoice_state_tracker.sync_target_state()
1287 else {
1288 return Ok(())
1289 };
1290 if sync_target_state.finalized_block_hash.is_zero() {
1291 return Ok(())
1293 }
1294 let newest_finalized = self
1296 .state
1297 .buffer
1298 .block(&sync_target_state.finalized_block_hash)
1299 .map(|block| block.number());
1300
1301 if let Some(backfill_target) =
1307 ctrl.block_number().zip(newest_finalized).and_then(|(progress, finalized_number)| {
1308 self.backfill_sync_target(progress, finalized_number, None)
1311 })
1312 {
1313 self.emit_event(EngineApiEvent::BackfillAction(BackfillAction::Start(
1315 backfill_target.into(),
1316 )));
1317 return Ok(())
1318 };
1319
1320 self.try_connect_buffered_blocks(self.state.tree_state.current_canonical_head)
1322 }
1323
1324 fn make_canonical(&mut self, target: B256) -> ProviderResult<()> {
1328 if let Some(chain_update) = self.on_new_head(target)? {
1329 self.on_canonical_chain_update(chain_update);
1330 }
1331
1332 Ok(())
1333 }
1334
1335 fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
1337 if let Some(event) = event {
1338 self.on_tree_event(event)?;
1339 }
1340
1341 Ok(())
1342 }
1343
1344 fn on_tree_event(&mut self, event: TreeEvent) -> ProviderResult<()> {
1348 match event {
1349 TreeEvent::TreeAction(action) => match action {
1350 TreeAction::MakeCanonical { sync_target_head } => {
1351 self.make_canonical(sync_target_head)?;
1352 }
1353 },
1354 TreeEvent::BackfillAction(action) => {
1355 self.emit_event(EngineApiEvent::BackfillAction(action));
1356 }
1357 TreeEvent::Download(action) => {
1358 self.emit_event(EngineApiEvent::Download(action));
1359 }
1360 }
1361
1362 Ok(())
1363 }
1364
1365 fn emit_event(&mut self, event: impl Into<EngineApiEvent<N>>) {
1367 let event = event.into();
1368
1369 if event.is_backfill_action() {
1370 debug_assert_eq!(
1371 self.backfill_sync_state,
1372 BackfillSyncState::Idle,
1373 "backfill action should only be emitted when backfill is idle"
1374 );
1375
1376 if self.persistence_state.in_progress() {
1377 debug!(target: "engine::tree", "skipping backfill file while persistence task is active");
1380 return
1381 }
1382
1383 self.backfill_sync_state = BackfillSyncState::Pending;
1384 self.metrics.engine.pipeline_runs.increment(1);
1385 debug!(target: "engine::tree", "emitting backfill action event");
1386 }
1387
1388 let _ = self.outgoing.send(event).inspect_err(
1389 |err| error!(target: "engine::tree", "Failed to send internal event: {err:?}"),
1390 );
1391 }
1392
1393 pub const fn should_persist(&self) -> bool {
1397 if !self.backfill_sync_state.is_idle() {
1398 return false
1400 }
1401
1402 let min_block = self.persistence_state.last_persisted_block.number;
1403 self.state.tree_state.canonical_block_number().saturating_sub(min_block) >
1404 self.config.persistence_threshold()
1405 }
1406
1407 fn should_backup(&self) -> bool {
1411 debug!(target: "engine::tree", "checking if we should backup");
1412 return false;
1413 }
1414
1415 fn get_canonical_blocks_to_persist(&self) -> Vec<ExecutedBlockWithTrieUpdates<N>> {
1419 let mut blocks_to_persist = Vec::new();
1420 let mut current_hash = self.state.tree_state.canonical_block_hash();
1421 let last_persisted_number = self.persistence_state.last_persisted_block.number;
1422
1423 let canonical_head_number = self.state.tree_state.canonical_block_number();
1424
1425 let target_number =
1426 canonical_head_number.saturating_sub(self.config.memory_block_buffer_target());
1427
1428 debug!(target: "engine::tree", ?last_persisted_number, ?canonical_head_number, ?target_number, ?current_hash, "Returning canonical blocks to persist");
1429 while let Some(block) = self.state.tree_state.blocks_by_hash.get(¤t_hash) {
1430 if block.recovered_block().number() <= last_persisted_number {
1431 break;
1432 }
1433
1434 if block.recovered_block().number() <= target_number {
1435 blocks_to_persist.push(block.clone());
1436 }
1437
1438 current_hash = block.recovered_block().parent_hash();
1439 }
1440
1441 blocks_to_persist.reverse();
1443
1444 blocks_to_persist
1445 }
1446
1447 fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
1455 if let Some(remove_above) = self.find_disk_reorg()? {
1458 self.remove_blocks(remove_above);
1459 return Ok(())
1460 }
1461
1462 let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
1463 self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
1464 self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
1465 number: self.persistence_state.last_persisted_block.number,
1466 hash: self.persistence_state.last_persisted_block.hash,
1467 });
1468 Ok(())
1469 }
1470
1471 fn canonical_block_by_hash(&self, hash: B256) -> ProviderResult<Option<ExecutedBlock<N>>> {
1479 trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
1480 if let Some(block) = self.state.tree_state.executed_block_by_hash(hash).cloned() {
1482 return Ok(Some(block.block))
1483 }
1484
1485 let (block, senders) = self
1486 .provider
1487 .sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
1488 .ok_or_else(|| ProviderError::HeaderNotFound(hash.into()))?
1489 .split_sealed();
1490 let execution_output = self
1491 .provider
1492 .get_state(block.header().number())?
1493 .ok_or_else(|| ProviderError::StateForNumberNotFound(block.header().number()))?;
1494 let hashed_state = self.provider.hashed_post_state(execution_output.state());
1495
1496 Ok(Some(ExecutedBlock {
1497 recovered_block: Arc::new(RecoveredBlock::new_sealed(block, senders)),
1498 execution_output: Arc::new(execution_output),
1499 hashed_state: Arc::new(hashed_state),
1500 }))
1501 }
1502
1503 fn sealed_header_by_hash(
1505 &self,
1506 hash: B256,
1507 ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
1508 let block = self
1510 .state
1511 .tree_state
1512 .block_by_hash(hash)
1513 .map(|block| block.as_ref().clone_sealed_header());
1514
1515 if block.is_some() {
1516 Ok(block)
1517 } else {
1518 self.provider.sealed_header_by_hash(hash)
1519 }
1520 }
1521
1522 fn block_by_hash(&self, hash: B256) -> ProviderResult<Option<N::Block>> {
1524 let mut block = self.provider.block_by_hash(hash)?;
1526 if block.is_none() {
1527 block = self
1530 .state
1531 .tree_state
1532 .block_by_hash(hash)
1533 .map(|block| block.as_ref().clone().into_block());
1535 }
1536 Ok(block)
1537 }
1538
1539 fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
1546 self.state
1547 .buffer
1548 .lowest_ancestor(&hash)
1549 .map(|block| block.parent_hash())
1550 .unwrap_or_else(|| hash)
1551 }
1552
1553 fn latest_valid_hash_for_invalid_payload(
1564 &mut self,
1565 parent_hash: B256,
1566 ) -> ProviderResult<Option<B256>> {
1567 if self.block_by_hash(parent_hash)?.is_some() {
1569 return Ok(Some(parent_hash))
1570 }
1571
1572 let mut current_hash = parent_hash;
1575 let mut current_block = self.state.invalid_headers.get(¤t_hash);
1576 while let Some(block_with_parent) = current_block {
1577 current_hash = block_with_parent.parent;
1578 current_block = self.state.invalid_headers.get(¤t_hash);
1579
1580 if current_block.is_none() && self.block_by_hash(current_hash)?.is_some() {
1583 return Ok(Some(current_hash))
1584 }
1585 }
1586 Ok(None)
1587 }
1588
1589 fn prepare_invalid_response(&mut self, mut parent_hash: B256) -> ProviderResult<PayloadStatus> {
1593 if let Some(parent) = self.block_by_hash(parent_hash)? {
1596 if !parent.header().difficulty().is_zero() {
1597 parent_hash = B256::ZERO;
1598 }
1599 }
1600
1601 let valid_parent_hash = self.latest_valid_hash_for_invalid_payload(parent_hash)?;
1602 Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
1603 validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
1604 })
1605 .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
1606 }
1607
1608 fn is_sync_target_head(&self, block_hash: B256) -> bool {
1612 if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
1613 return target.head_block_hash == block_hash
1614 }
1615 false
1616 }
1617
1618 fn check_invalid_ancestor_with_head(
1624 &mut self,
1625 check: B256,
1626 head: &SealedBlock<N::Block>,
1627 ) -> ProviderResult<Option<PayloadStatus>> {
1628 let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };
1630
1631 let status = self.prepare_invalid_response(header.parent)?;
1633
1634 self.state.invalid_headers.insert_with_invalid_ancestor(head.hash(), header);
1636 self.emit_event(BeaconConsensusEngineEvent::InvalidBlock(Box::new(head.clone())));
1637
1638 Ok(Some(status))
1639 }
1640
1641 fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
1644 let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) };
1646 Ok(Some(self.prepare_invalid_response(header.parent)?))
1648 }
1649
1650 fn validate_block(&self, block: &RecoveredBlock<N::Block>) -> Result<(), ConsensusError> {
1653 if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
1654 error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.hash());
1655 return Err(e)
1656 }
1657
1658 if let Err(e) = self.consensus.validate_block_pre_execution(block.sealed_block()) {
1659 error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.hash());
1660 return Err(e)
1661 }
1662
1663 Ok(())
1664 }
1665
1666 #[instrument(level = "trace", skip(self), target = "engine::tree")]
1668 fn try_connect_buffered_blocks(
1669 &mut self,
1670 parent: BlockNumHash,
1671 ) -> Result<(), InsertBlockFatalError> {
1672 let blocks = self.state.buffer.remove_block_with_children(&parent.hash);
1673
1674 if blocks.is_empty() {
1675 return Ok(())
1677 }
1678
1679 let now = Instant::now();
1680 let block_count = blocks.len();
1681 for child in blocks {
1682 let child_num_hash = child.num_hash();
1683 match self.insert_block(child) {
1684 Ok(res) => {
1685 debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
1686 if self.is_sync_target_head(child_num_hash.hash) &&
1687 matches!(res, InsertPayloadOk::Inserted(BlockStatus::Valid))
1688 {
1689 self.make_canonical(child_num_hash.hash)?;
1690 }
1691 }
1692 Err(err) => {
1693 debug!(target: "engine::tree", ?err, "failed to connect buffered block to tree");
1694 if let Err(fatal) = self.on_insert_block_error(err) {
1695 warn!(target: "engine::tree", %fatal, "fatal error occurred while connecting buffered blocks");
1696 return Err(fatal)
1697 }
1698 }
1699 }
1700 }
1701
1702 debug!(target: "engine::tree", elapsed = ?now.elapsed(), %block_count, "connected buffered blocks");
1703 Ok(())
1704 }
1705
1706 fn buffer_block(
1708 &mut self,
1709 block: RecoveredBlock<N::Block>,
1710 ) -> Result<(), InsertBlockError<N::Block>> {
1711 if let Err(err) = self.validate_block(&block) {
1712 return Err(InsertBlockError::consensus_error(err, block.into_sealed_block()))
1713 }
1714 self.state.buffer.insert_block(block);
1715 Ok(())
1716 }
1717
1718 #[inline]
1723 const fn exceeds_backfill_run_threshold(&self, local_tip: u64, block: u64) -> bool {
1724 block > local_tip && block - local_tip > MIN_BLOCKS_FOR_PIPELINE_RUN
1725 }
1726
1727 #[inline]
1730 const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
1731 if block > local_tip {
1732 Some(block - local_tip)
1733 } else {
1734 None
1735 }
1736 }
1737
1738 fn backfill_sync_target(
1745 &self,
1746 canonical_tip_num: u64,
1747 target_block_number: u64,
1748 downloaded_block: Option<BlockNumHash>,
1749 ) -> Option<B256> {
1750 let sync_target_state = self.state.forkchoice_state_tracker.sync_target_state();
1751
1752 let mut exceeds_backfill_threshold =
1754 self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number);
1755
1756 if let Some(buffered_finalized) = sync_target_state
1758 .as_ref()
1759 .and_then(|state| self.state.buffer.block(&state.finalized_block_hash))
1760 {
1761 exceeds_backfill_threshold =
1764 self.exceeds_backfill_run_threshold(canonical_tip_num, buffered_finalized.number());
1765 }
1766
1767 if let (Some(downloaded_block), Some(ref state)) = (downloaded_block, sync_target_state) {
1770 if downloaded_block.hash == state.finalized_block_hash {
1771 exceeds_backfill_threshold =
1773 self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number);
1774 }
1775 }
1776
1777 if exceeds_backfill_threshold {
1779 if let Some(state) = sync_target_state {
1780 match self.provider.header_by_hash_or_number(state.finalized_block_hash.into()) {
1782 Err(err) => {
1783 warn!(target: "engine::tree", %err, "Failed to get finalized block header");
1784 }
1785 Ok(None) => {
1786 if !state.finalized_block_hash.is_zero() {
1788 return Some(state.finalized_block_hash)
1791 }
1792
1793 debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
1806 return Some(state.head_block_hash)
1807 }
1808 Ok(Some(_)) => {
1809 }
1811 }
1812 }
1813 }
1814
1815 None
1816 }
1817
1818 fn find_disk_reorg(&self) -> ProviderResult<Option<u64>> {
1821 let mut canonical = self.state.tree_state.current_canonical_head;
1822 let mut persisted = self.persistence_state.last_persisted_block;
1823
1824 let parent_num_hash = |num_hash: NumHash| -> ProviderResult<NumHash> {
1825 Ok(self
1826 .sealed_header_by_hash(num_hash.hash)?
1827 .ok_or(ProviderError::BlockHashNotFound(num_hash.hash))?
1828 .parent_num_hash())
1829 };
1830
1831 while canonical.number > persisted.number {
1834 canonical = parent_num_hash(canonical)?;
1835 }
1836
1837 if canonical == persisted {
1839 return Ok(None);
1840 }
1841
1842 while persisted.number > canonical.number {
1848 persisted = parent_num_hash(persisted)?;
1849 }
1850
1851 debug_assert_eq!(persisted.number, canonical.number);
1852
1853 while persisted.hash != canonical.hash {
1855 canonical = parent_num_hash(canonical)?;
1856 persisted = parent_num_hash(persisted)?;
1857 }
1858
1859 debug!(target: "engine::tree", remove_above=persisted.number, "on-disk reorg detected");
1860
1861 Ok(Some(persisted.number))
1862 }
1863
1864 fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain<N>) {
1868 trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks = %chain_update.reorged_block_count(), "applying new chain update");
1869 let start = Instant::now();
1870
1871 self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
1873
1874 let tip = chain_update.tip().clone_sealed_header();
1875 let notification = chain_update.to_chain_notification();
1876
1877 if let NewCanonicalChain::Reorg { new, old } = &chain_update {
1879 let new_first = new.first().map(|first| first.recovered_block().num_hash());
1880 let old_first = old.first().map(|first| first.recovered_block().num_hash());
1881 trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
1882
1883 self.update_reorg_metrics(old.len());
1884 self.reinsert_reorged_blocks(new.clone());
1885 let old = old
1888 .iter()
1889 .filter_map(|block| {
1890 let (_, trie) = self
1891 .state
1892 .tree_state
1893 .persisted_trie_updates
1894 .get(&block.recovered_block.hash())
1895 .cloned()?;
1896 Some(ExecutedBlockWithTrieUpdates { block: block.clone(), trie })
1897 })
1898 .collect::<Vec<_>>();
1899 self.reinsert_reorged_blocks(old);
1900 }
1901
1902 self.canonical_in_memory_state.update_chain(chain_update);
1904 self.canonical_in_memory_state.set_canonical_head(tip.clone());
1905
1906 self.metrics.tree.canonical_chain_height.set(tip.number() as f64);
1908
1909 self.canonical_in_memory_state.notify_canon_state(notification);
1911
1912 self.emit_event(BeaconConsensusEngineEvent::CanonicalChainCommitted(
1914 Box::new(tip),
1915 start.elapsed(),
1916 ));
1917 }
1918
1919 fn update_reorg_metrics(&self, old_chain_length: usize) {
1921 self.metrics.tree.reorgs.increment(1);
1922 self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
1923 }
1924
1925 fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlockWithTrieUpdates<N>>) {
1927 for block in new_chain {
1928 if self
1929 .state
1930 .tree_state
1931 .executed_block_by_hash(block.recovered_block().hash())
1932 .is_none()
1933 {
1934 trace!(target: "engine::tree", num=?block.recovered_block().number(), hash=?block.recovered_block().hash(), "Reinserting block into tree state");
1935 self.state.tree_state.insert_executed(block);
1936 }
1937 }
1938 }
1939
1940 fn on_invalid_block(
1942 &mut self,
1943 parent_header: &SealedHeader<N::BlockHeader>,
1944 block: &RecoveredBlock<N::Block>,
1945 output: &BlockExecutionOutput<N::Receipt>,
1946 trie_updates: Option<(&TrieUpdates, B256)>,
1947 ) {
1948 if self.state.invalid_headers.get(&block.hash()).is_some() {
1949 return;
1951 }
1952 self.invalid_block_hook.on_invalid_block(parent_header, block, output, trie_updates);
1953 }
1954
1955 fn on_disconnected_downloaded_block(
1960 &self,
1961 downloaded_block: BlockNumHash,
1962 missing_parent: BlockNumHash,
1963 head: BlockNumHash,
1964 ) -> Option<TreeEvent> {
1965 if let Some(target) =
1967 self.backfill_sync_target(head.number, missing_parent.number, Some(downloaded_block))
1968 {
1969 trace!(target: "engine::tree", %target, "triggering backfill on downloaded block");
1970 return Some(TreeEvent::BackfillAction(BackfillAction::Start(target.into())));
1971 }
1972
1973 let request = if let Some(distance) =
1983 self.distance_from_local_tip(head.number, missing_parent.number)
1984 {
1985 trace!(target: "engine::tree", %distance, missing=?missing_parent, "downloading missing parent block range");
1986 DownloadRequest::BlockRange(missing_parent.hash, distance)
1987 } else {
1988 trace!(target: "engine::tree", missing=?missing_parent, "downloading missing parent block");
1989 DownloadRequest::single_block(missing_parent.hash)
1992 };
1993
1994 Some(TreeEvent::Download(request))
1995 }
1996
1997 #[instrument(level = "trace", skip_all, fields(block_hash = %block.hash(), block_num = %block.number(),), target = "engine::tree")]
2003 fn on_downloaded_block(
2004 &mut self,
2005 block: RecoveredBlock<N::Block>,
2006 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2007 let block_num_hash = block.num_hash();
2008 let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
2009 if self
2010 .check_invalid_ancestor_with_head(lowest_buffered_ancestor, block.sealed_block())?
2011 .is_some()
2012 {
2013 return Ok(None)
2014 }
2015
2016 if !self.backfill_sync_state.is_idle() {
2017 return Ok(None)
2018 }
2019
2020 match self.insert_block(block) {
2022 Ok(InsertPayloadOk::Inserted(BlockStatus::Valid)) => {
2023 if self.is_sync_target_head(block_num_hash.hash) {
2024 trace!(target: "engine::tree", "appended downloaded sync target block");
2025
2026 return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
2029 sync_target_head: block_num_hash.hash,
2030 })))
2031 }
2032 trace!(target: "engine::tree", "appended downloaded block");
2033 self.try_connect_buffered_blocks(block_num_hash)?;
2034 }
2035 Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected { head, missing_ancestor })) => {
2036 return Ok(self.on_disconnected_downloaded_block(
2039 block_num_hash,
2040 missing_ancestor,
2041 head,
2042 ))
2043 }
2044 Ok(InsertPayloadOk::AlreadySeen(_)) => {
2045 trace!(target: "engine::tree", "downloaded block already executed");
2046 }
2047 Err(err) => {
2048 debug!(target: "engine::tree", err=%err.kind(), "failed to insert downloaded block");
2049 if let Err(fatal) = self.on_insert_block_error(err) {
2050 warn!(target: "engine::tree", %fatal, "fatal error occurred while inserting downloaded block");
2051 return Err(fatal)
2052 }
2053 }
2054 }
2055 Ok(None)
2056 }
2057
2058 fn insert_block(
2059 &mut self,
2060 block: RecoveredBlock<N::Block>,
2061 ) -> Result<InsertPayloadOk, InsertBlockError<N::Block>> {
2062 match self.insert_block_inner(block) {
2063 Ok(result) => Ok(result),
2064 Err((kind, block)) => Err(InsertBlockError::new(block.into_sealed_block(), kind)),
2065 }
2066 }
2067
2068 fn insert_block_inner(
2069 &mut self,
2070 block: RecoveredBlock<N::Block>,
2071 ) -> Result<InsertPayloadOk, (InsertBlockErrorKind, RecoveredBlock<N::Block>)> {
2072 macro_rules! ensure_ok {
2074 ($expr:expr) => {
2075 match $expr {
2076 Ok(val) => val,
2077 Err(e) => return Err((e.into(), block)),
2078 }
2079 };
2080 }
2081
2082 let block_num_hash = block.num_hash();
2083 debug!(target: "engine::tree", block=?block_num_hash, parent = ?block.parent_hash(), state_root = ?block.state_root(), "Inserting new block into tree");
2084
2085 if ensure_ok!(self.block_by_hash(block.hash())).is_some() {
2086 return Ok(InsertPayloadOk::AlreadySeen(BlockStatus::Valid))
2087 }
2088
2089 let start = Instant::now();
2090
2091 trace!(target: "engine::tree", block=?block_num_hash, "Validating block consensus");
2092
2093 ensure_ok!(self.validate_block(&block));
2095
2096 trace!(target: "engine::tree", block=?block_num_hash, parent=?block.parent_hash(), "Fetching block state provider");
2097 let Some(provider_builder) = ensure_ok!(self.state_provider_builder(block.parent_hash()))
2098 else {
2099 let missing_ancestor = self
2102 .state
2103 .buffer
2104 .lowest_ancestor(&block.parent_hash())
2105 .map(|block| block.parent_num_hash())
2106 .unwrap_or_else(|| block.parent_num_hash());
2107
2108 self.state.buffer.insert_block(block);
2109
2110 return Ok(InsertPayloadOk::Inserted(BlockStatus::Disconnected {
2111 head: self.state.tree_state.current_canonical_head,
2112 missing_ancestor,
2113 }))
2114 };
2115
2116 let Some(parent_block) = ensure_ok!(self.sealed_header_by_hash(block.parent_hash())) else {
2118 return Err((
2119 InsertBlockErrorKind::Provider(ProviderError::HeaderNotFound(
2120 block.parent_hash().into(),
2121 )),
2122 block,
2123 ))
2124 };
2125
2126 if let Err(e) =
2127 self.consensus.validate_header_against_parent(block.sealed_header(), &parent_block)
2128 {
2129 warn!(target: "engine::tree", ?block, "Failed to validate header {} against parent: {e}", block.hash());
2130 return Err((e.into(), block))
2131 }
2132
2133 let state_provider = ensure_ok!(provider_builder.build());
2134
2135 let persisting_kind = self.persisting_kind_for(block.header());
2145 let run_parallel_state_root = persisting_kind.can_run_parallel_state_root();
2146
2147 let header = block.clone_sealed_header();
2149 let txs = block.clone_transactions_recovered().collect();
2150 let mut handle = if run_parallel_state_root && self.config.use_state_root_task() {
2151 let consistent_view =
2153 ensure_ok!(ConsistentDbView::new_with_latest_tip(self.provider.clone()));
2154
2155 let trie_input_start = Instant::now();
2157 let res = self.compute_trie_input(
2158 persisting_kind,
2159 consistent_view.clone(),
2160 block.header().parent_hash(),
2161 );
2162 let trie_input = match res {
2163 Ok(val) => val,
2164 Err(e) => return Err((InsertBlockErrorKind::Other(Box::new(e)), block)),
2165 };
2166
2167 self.metrics
2168 .block_validation
2169 .trie_input_duration
2170 .record(trie_input_start.elapsed().as_secs_f64());
2171
2172 self.payload_processor.spawn(
2173 header,
2174 txs,
2175 provider_builder,
2176 consistent_view,
2177 trie_input,
2178 &self.config,
2179 )
2180 } else {
2181 self.payload_processor.spawn_cache_exclusive(header, txs, provider_builder)
2182 };
2183
2184 let state_provider = CachedStateProvider::new_with_caches(
2187 state_provider,
2188 handle.caches(),
2189 handle.cache_metrics(),
2190 );
2191
2192 let (output, execution_finish) = if self.config.state_provider_metrics() {
2193 let state_provider = InstrumentedStateProvider::from_state_provider(&state_provider);
2194 let (output, execution_finish) =
2195 ensure_ok!(self.execute_block(&state_provider, &block, &handle));
2196 state_provider.record_total_latency();
2197 (output, execution_finish)
2198 } else {
2199 let (output, execution_finish) =
2200 ensure_ok!(self.execute_block(&state_provider, &block, &handle));
2201 (output, execution_finish)
2202 };
2203
2204 handle.stop_prewarming_execution();
2206
2207 if let Err(err) = self.consensus.validate_block_post_execution(&block, &output) {
2208 self.on_invalid_block(&parent_block, &block, &output, None);
2210 return Err((err.into(), block))
2211 }
2212
2213 let hashed_state = self.provider.hashed_post_state(&output.state);
2214
2215 if let Err(err) = self
2216 .payload_validator
2217 .validate_block_post_execution_with_hashed_state(&hashed_state, &block)
2218 {
2219 self.on_invalid_block(&parent_block, &block, &output, None);
2221 return Err((err.into(), block))
2222 }
2223
2224 debug!(target: "engine::tree", block=?block_num_hash, "Calculating block state root");
2225
2226 let root_time = Instant::now();
2227
2228 let mut maybe_state_root = None;
2229
2230 if run_parallel_state_root {
2231 if self.config.use_state_root_task() {
2234 match handle.state_root() {
2235 Ok(StateRootComputeOutcome { state_root, trie_updates }) => {
2236 let elapsed = execution_finish.elapsed();
2237 info!(target: "engine::tree", ?state_root, ?elapsed, "State root task finished");
2238 if state_root == block.header().state_root() {
2240 maybe_state_root = Some((state_root, trie_updates, elapsed))
2241 } else {
2242 warn!(
2243 target: "engine::tree",
2244 ?state_root,
2245 block_state_root = ?block.header().state_root(),
2246 "State root task returned incorrect state root"
2247 );
2248 }
2249 }
2250 Err(error) => {
2251 debug!(target: "engine::tree", %error, "Background parallel state root computation failed");
2252 }
2253 }
2254 } else {
2255 match self.compute_state_root_parallel(
2256 persisting_kind,
2257 block.header().parent_hash(),
2258 &hashed_state,
2259 ) {
2260 Ok(result) => {
2261 info!(
2262 target: "engine::tree",
2263 block = ?block_num_hash,
2264 regular_state_root = ?result.0,
2265 "Regular root task finished"
2266 );
2267 maybe_state_root = Some((result.0, result.1, root_time.elapsed()));
2268 }
2269 Err(ParallelStateRootError::Provider(ProviderError::ConsistentView(error))) => {
2270 debug!(target: "engine::tree", %error, "Parallel state root computation failed consistency check, falling back");
2271 }
2272 Err(error) => return Err((InsertBlockErrorKind::Other(Box::new(error)), block)),
2273 }
2274 }
2275 }
2276
2277 let (state_root, trie_output, root_elapsed) = if let Some(maybe_state_root) =
2278 maybe_state_root
2279 {
2280 maybe_state_root
2281 } else {
2282 warn!(target: "engine::tree", block=?block_num_hash, ?persisting_kind, "Failed to compute state root in parallel");
2284 self.metrics.block_validation.state_root_parallel_fallback_total.increment(1);
2285 let (root, updates) =
2286 ensure_ok!(state_provider.state_root_with_updates(hashed_state.clone()));
2287 (root, updates, root_time.elapsed())
2288 };
2289
2290 self.metrics.block_validation.record_state_root(&trie_output, root_elapsed.as_secs_f64());
2291 debug!(target: "engine::tree", ?root_elapsed, block=?block_num_hash, "Calculated state root");
2292
2293 if state_root != block.header().state_root() {
2295 self.on_invalid_block(&parent_block, &block, &output, Some((&trie_output, state_root)));
2297 return Err((
2298 ConsensusError::BodyStateRootDiff(
2299 GotExpected { got: state_root, expected: block.header().state_root() }.into(),
2300 )
2301 .into(),
2302 block,
2303 ))
2304 }
2305
2306 handle.terminate_caching(Some(output.state.clone()));
2308
2309 let executed: ExecutedBlockWithTrieUpdates<N> = ExecutedBlockWithTrieUpdates {
2310 block: ExecutedBlock {
2311 recovered_block: Arc::new(block),
2312 execution_output: Arc::new(ExecutionOutcome::from((output, block_num_hash.number))),
2313 hashed_state: Arc::new(hashed_state),
2314 },
2315 trie: Arc::new(trie_output),
2316 };
2317
2318 if self.state.tree_state.canonical_block_hash() == executed.recovered_block().parent_hash()
2320 {
2321 debug!(target: "engine::tree", pending=?block_num_hash, "updating pending block");
2322 self.canonical_in_memory_state.set_pending_block(executed.clone());
2323 }
2324
2325 self.state.tree_state.insert_executed(executed.clone());
2326 self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
2327
2328 let elapsed = start.elapsed();
2330 let is_fork = match self.is_fork(block_num_hash.hash) {
2331 Ok(val) => val,
2332 Err(e) => return Err((e.into(), executed.block.recovered_block().clone())),
2333 };
2334 let engine_event = if is_fork {
2335 BeaconConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
2336 } else {
2337 BeaconConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
2338 };
2339 self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
2340
2341 debug!(target: "engine::tree", block=?block_num_hash, "Finished inserting block");
2342 Ok(InsertPayloadOk::Inserted(BlockStatus::Valid))
2343 }
2344
2345 fn execute_block<S: StateProvider>(
2347 &mut self,
2348 state_provider: S,
2349 block: &RecoveredBlock<N::Block>,
2350 handle: &PayloadHandle,
2351 ) -> Result<(BlockExecutionOutput<N::Receipt>, Instant), InsertBlockErrorKind> {
2352 debug!(target: "engine::tree", block=?block.num_hash(), "Executing block");
2353 let mut db = State::builder()
2354 .with_database(StateProviderDatabase::new(&state_provider))
2355 .with_bundle_update()
2356 .without_state_clear()
2357 .build();
2358
2359 let executor = self.evm_config.executor_for_block(&mut db, block);
2362 let execution_start = Instant::now();
2373 let output = self.metrics.executor.execute_metered(
2374 executor,
2375 block,
2376 Box::new(handle.state_hook()),
2377 )?;
2378 let execution_finish = Instant::now();
2379 let execution_time = execution_finish.duration_since(execution_start);
2380 debug!(target: "engine::tree", elapsed = ?execution_time, number=?block.number(), "Executed block");
2381 Ok((output, execution_finish))
2382 }
2383
2384 fn compute_state_root_parallel(
2393 &self,
2394 persisting_kind: PersistingKind,
2395 parent_hash: B256,
2396 hashed_state: &HashedPostState,
2397 ) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
2398 let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
2399
2400 let mut input =
2401 self.compute_trie_input(persisting_kind, consistent_view.clone(), parent_hash)?;
2402 input.append_ref(hashed_state);
2404
2405 ParallelStateRoot::new(consistent_view, input).incremental_root_with_updates()
2406 }
2407
2408 fn compute_trie_input(
2424 &self,
2425 persisting_kind: PersistingKind,
2426 consistent_view: ConsistentDbView<P>,
2427 parent_hash: B256,
2428 ) -> Result<TrieInput, ParallelStateRootError> {
2429 let mut input = TrieInput::default();
2430
2431 let provider = consistent_view.provider_ro()?;
2432 let best_block_number = provider.best_block_number()?;
2433
2434 let (mut historical, mut blocks) = self
2435 .state
2436 .tree_state
2437 .blocks_by_hash(parent_hash)
2438 .map_or_else(|| (parent_hash.into(), vec![]), |(hash, blocks)| (hash.into(), blocks));
2439
2440 if persisting_kind.is_descendant() {
2443 while let Some(block) = blocks.last() {
2445 let recovered_block = block.recovered_block();
2446 if recovered_block.number() <= best_block_number {
2447 blocks.pop();
2450 } else {
2451 break
2454 }
2455 }
2456
2457 historical = if let Some(block) = blocks.last() {
2458 (block.recovered_block().number() - 1).into()
2461 } else {
2462 parent_hash.into()
2464 };
2465 }
2466
2467 if blocks.is_empty() {
2468 debug!(target: "engine::tree", %parent_hash, "Parent found on disk");
2469 } else {
2470 debug!(target: "engine::tree", %parent_hash, %historical, blocks = blocks.len(), "Parent found in memory");
2471 }
2472
2473 let block_number = provider
2475 .convert_hash_or_number(historical)?
2476 .ok_or_else(|| ProviderError::BlockHashNotFound(historical.as_hash().unwrap()))?;
2477
2478 let revert_state = if block_number == best_block_number {
2480 debug!(target: "engine::tree", block_number, best_block_number, "Empty revert state");
2483 HashedPostState::default()
2484 } else {
2485 let revert_state = HashedPostState::from_reverts::<
2486 <P::StateCommitment as StateCommitment>::KeyHasher,
2487 >(provider.tx_ref(), block_number + 1)
2488 .map_err(ProviderError::from)?;
2489 debug!(
2490 target: "engine::tree",
2491 block_number,
2492 best_block_number,
2493 accounts = revert_state.accounts.len(),
2494 storages = revert_state.storages.len(),
2495 "Non-empty revert state"
2496 );
2497 revert_state
2498 };
2499 input.append(revert_state);
2500
2501 for block in blocks.iter().rev() {
2503 input.append_cached_ref(block.trie_updates(), block.hashed_state())
2504 }
2505
2506 Ok(input)
2507 }
2508
2509 fn on_insert_block_error(
2515 &mut self,
2516 error: InsertBlockError<N::Block>,
2517 ) -> Result<PayloadStatus, InsertBlockFatalError> {
2518 let (block, error) = error.split();
2519
2520 let validation_err = error.ensure_validation_error()?;
2523
2524 warn!(
2528 target: "engine::tree",
2529 invalid_hash=%block.hash(),
2530 invalid_number=block.number(),
2531 %validation_err,
2532 "Invalid block error on new payload",
2533 );
2534 let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(block.parent_hash())?;
2535
2536 self.state.invalid_headers.insert(block.block_with_parent());
2538 self.emit_event(EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::InvalidBlock(
2539 Box::new(block),
2540 )));
2541 Ok(PayloadStatus::new(
2542 PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() },
2543 latest_valid_hash,
2544 ))
2545 }
2546
2547 pub fn find_canonical_header(
2549 &self,
2550 hash: B256,
2551 ) -> Result<Option<SealedHeader<N::BlockHeader>>, ProviderError> {
2552 let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
2553
2554 if canonical.is_none() {
2555 canonical = self.provider.header(&hash)?.map(|header| SealedHeader::new(header, hash));
2556 }
2557
2558 Ok(canonical)
2559 }
2560
2561 fn update_finalized_block(
2563 &self,
2564 finalized_block_hash: B256,
2565 ) -> Result<(), OnForkChoiceUpdated> {
2566 if finalized_block_hash.is_zero() {
2567 return Ok(())
2568 }
2569
2570 match self.find_canonical_header(finalized_block_hash) {
2571 Ok(None) => {
2572 debug!(target: "engine::tree", "Finalized block not found in canonical chain");
2573 return Err(OnForkChoiceUpdated::invalid_state())
2575 }
2576 Ok(Some(finalized)) => {
2577 if Some(finalized.num_hash()) !=
2578 self.canonical_in_memory_state.get_finalized_num_hash()
2579 {
2580 let _ = self.persistence.save_finalized_block_number(finalized.number());
2583 self.canonical_in_memory_state.set_finalized(finalized);
2584 }
2585 }
2586 Err(err) => {
2587 error!(target: "engine::tree", %err, "Failed to fetch finalized block header");
2588 }
2589 }
2590
2591 Ok(())
2592 }
2593
2594 fn update_safe_block(&self, safe_block_hash: B256) -> Result<(), OnForkChoiceUpdated> {
2596 if safe_block_hash.is_zero() {
2597 return Ok(())
2598 }
2599
2600 match self.find_canonical_header(safe_block_hash) {
2601 Ok(None) => {
2602 debug!(target: "engine::tree", "Safe block not found in canonical chain");
2603 return Err(OnForkChoiceUpdated::invalid_state())
2605 }
2606 Ok(Some(safe)) => {
2607 if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
2608 let _ = self.persistence.save_safe_block_number(safe.number());
2611 self.canonical_in_memory_state.set_safe(safe);
2612 }
2613 }
2614 Err(err) => {
2615 error!(target: "engine::tree", %err, "Failed to fetch safe block header");
2616 }
2617 }
2618
2619 Ok(())
2620 }
2621
2622 fn ensure_consistent_forkchoice_state(
2631 &self,
2632 state: ForkchoiceState,
2633 ) -> Result<(), OnForkChoiceUpdated> {
2634 self.update_finalized_block(state.finalized_block_hash)?;
2640
2641 self.update_safe_block(state.safe_block_hash)
2647 }
2648
2649 fn pre_validate_forkchoice_update(
2654 &mut self,
2655 state: ForkchoiceState,
2656 ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
2657 if state.head_block_hash.is_zero() {
2658 return Ok(Some(OnForkChoiceUpdated::invalid_state()))
2659 }
2660
2661 let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
2664 if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
2665 return Ok(Some(OnForkChoiceUpdated::with_invalid(status)))
2666 }
2667
2668 if !self.backfill_sync_state.is_idle() {
2669 trace!(target: "engine::tree", "Pipeline is syncing, skipping forkchoice update");
2672 return Ok(Some(OnForkChoiceUpdated::syncing()))
2673 }
2674
2675 Ok(None)
2676 }
2677
2678 fn process_payload_attributes(
2683 &self,
2684 attrs: T::PayloadAttributes,
2685 head: &N::BlockHeader,
2686 state: ForkchoiceState,
2687 version: EngineApiMessageVersion,
2688 ) -> OnForkChoiceUpdated {
2689 if let Err(err) =
2690 self.payload_validator.validate_payload_attributes_against_header(&attrs, head)
2691 {
2692 warn!(target: "engine::tree", %err, ?head, "Invalid payload attributes");
2693 return OnForkChoiceUpdated::invalid_payload_attributes()
2694 }
2695
2696 match <T::PayloadBuilderAttributes as PayloadBuilderAttributes>::try_new(
2701 state.head_block_hash,
2702 attrs,
2703 version as u8,
2704 ) {
2705 Ok(attributes) => {
2706 let pending_payload_id = self.payload_builder.send_new_payload(attributes);
2709
2710 OnForkChoiceUpdated::updated_with_pending_payload_id(
2722 PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
2723 pending_payload_id,
2724 )
2725 }
2726 Err(_) => OnForkChoiceUpdated::invalid_payload_attributes(),
2727 }
2728 }
2729
2730 pub(crate) fn remove_before(
2737 &mut self,
2738 upper_bound: BlockNumHash,
2739 finalized_hash: Option<B256>,
2740 ) -> ProviderResult<()> {
2741 let num = if let Some(hash) = finalized_hash {
2744 self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
2745 } else {
2746 None
2747 };
2748
2749 self.state.tree_state.remove_until(
2750 upper_bound,
2751 self.persistence_state.last_persisted_block.hash,
2752 num,
2753 );
2754 Ok(())
2755 }
2756
2757 pub fn state_provider_builder(
2762 &self,
2763 hash: B256,
2764 ) -> ProviderResult<Option<StateProviderBuilder<N, P>>>
2765 where
2766 P: BlockReader + StateProviderFactory + StateReader + StateCommitmentProvider + Clone,
2767 {
2768 if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
2769 debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory, creating provider builder");
2770 return Ok(Some(StateProviderBuilder::new(
2772 self.provider.clone(),
2773 historical,
2774 Some(blocks),
2775 )))
2776 }
2777
2778 if let Some(header) = self.provider.header(&hash)? {
2780 debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
2781 return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
2784 }
2785
2786 debug!(target: "engine::tree", %hash, "no canonical state found for block");
2787 Ok(None)
2788 }
2789}
2790
2791#[derive(Clone, Copy, Debug, Eq, PartialEq)]
2797pub enum BlockStatus {
2798 Valid,
2800 Disconnected {
2802 head: BlockNumHash,
2804 missing_ancestor: BlockNumHash,
2806 },
2807}
2808
2809#[derive(Clone, Copy, Debug, Eq, PartialEq)]
2814pub enum InsertPayloadOk {
2815 AlreadySeen(BlockStatus),
2817 Inserted(BlockStatus),
2819}
2820
2821#[derive(Debug, Clone, Copy)]
2823pub enum PersistingKind {
2824 NotPersisting,
2826 PersistingNotDescendant,
2828 PersistingDescendant,
2830}
2831
2832impl PersistingKind {
2833 pub const fn can_run_parallel_state_root(&self) -> bool {
2838 matches!(self, Self::NotPersisting | Self::PersistingDescendant)
2839 }
2840
2841 pub const fn is_descendant(&self) -> bool {
2844 matches!(self, Self::PersistingDescendant)
2845 }
2846}
2847#[cfg(test)]
2848mod tests {
2849 use super::*;
2850 use crate::persistence::PersistenceAction;
2851 use alloy_consensus::Header;
2852 use alloy_primitives::{
2853 map::{HashMap, HashSet},
2854 Bytes, B256,
2855 };
2856 use alloy_rlp::Decodable;
2857 use alloy_rpc_types_engine::{
2858 CancunPayloadFields, ExecutionData, ExecutionPayloadSidecar, ExecutionPayloadV1,
2859 ExecutionPayloadV3,
2860 };
2861 use assert_matches::assert_matches;
2862 use reth_chain_state::{test_utils::TestBlockBuilder, BlockState};
2863 use reth_chainspec::{ChainSpec, HOLESKY, MAINNET};
2864 use reth_engine_primitives::ForkchoiceStatus;
2865 use reth_ethereum_consensus::EthBeaconConsensus;
2866 use reth_ethereum_engine_primitives::EthEngineTypes;
2867 use reth_ethereum_primitives::{Block, EthPrimitives};
2868 use reth_evm_ethereum::MockEvmConfig;
2869 use reth_node_ethereum::EthereumEngineValidator;
2870 use reth_primitives_traits::Block as _;
2871 use reth_provider::test_utils::MockEthProvider;
2872 use reth_trie::{updates::TrieUpdates, HashedPostState};
2873 use std::{
2874 collections::BTreeMap,
2875 str::FromStr,
2876 sync::mpsc::{channel, Sender},
2877 };
2878
2879 use reth_node_core::dirs::MaybePlatformPath;
2881
2882 struct TestChannel<T> {
2886 release: Receiver<()>,
2888 tx: Sender<T>,
2890 rx: Receiver<T>,
2892 }
2893
2894 impl<T: Send + 'static> TestChannel<T> {
2895 fn spawn_channel() -> (Sender<T>, Receiver<T>, TestChannelHandle) {
2897 let (original_tx, original_rx) = channel();
2898 let (wrapped_tx, wrapped_rx) = channel();
2899 let (release_tx, release_rx) = channel();
2900 let handle = TestChannelHandle::new(release_tx);
2901 let test_channel = Self { release: release_rx, tx: wrapped_tx, rx: original_rx };
2902 std::thread::spawn(move || test_channel.intercept_loop());
2904 (original_tx, wrapped_rx, handle)
2905 }
2906
2907 fn intercept_loop(&self) {
2909 while self.release.recv() == Ok(()) {
2910 let Ok(value) = self.rx.recv() else { return };
2911
2912 let _ = self.tx.send(value);
2913 }
2914 }
2915 }
2916
2917 struct TestChannelHandle {
2918 release: Sender<()>,
2920 }
2921
2922 impl TestChannelHandle {
2923 const fn new(release: Sender<()>) -> Self {
2925 Self { release }
2926 }
2927
2928 #[expect(dead_code)]
2930 fn release(&self) {
2931 let _ = self.release.send(());
2932 }
2933 }
2934
2935 struct TestHarness {
2936 tree: EngineApiTreeHandler<
2937 EthPrimitives,
2938 MockEthProvider,
2939 EthEngineTypes,
2940 EthereumEngineValidator,
2941 MockEvmConfig,
2942 >,
2943 to_tree_tx: Sender<FromEngine<EngineApiRequest<EthEngineTypes, EthPrimitives>, Block>>,
2944 from_tree_rx: UnboundedReceiver<EngineApiEvent>,
2945 blocks: Vec<ExecutedBlockWithTrieUpdates>,
2946 action_rx: Receiver<PersistenceAction>,
2947 evm_config: MockEvmConfig,
2948 block_builder: TestBlockBuilder,
2949 provider: MockEthProvider,
2950 }
2951
2952 impl TestHarness {
2953 fn new(chain_spec: Arc<ChainSpec>) -> Self {
2954 let (action_tx, action_rx) = channel();
2955 Self::with_persistence_channel(chain_spec, action_tx, action_rx)
2956 }
2957
2958 #[expect(dead_code)]
2959 fn with_test_channel(chain_spec: Arc<ChainSpec>) -> (Self, TestChannelHandle) {
2960 let (action_tx, action_rx, handle) = TestChannel::spawn_channel();
2961 (Self::with_persistence_channel(chain_spec, action_tx, action_rx), handle)
2962 }
2963
2964 fn with_persistence_channel(
2965 chain_spec: Arc<ChainSpec>,
2966 action_tx: Sender<PersistenceAction>,
2967 action_rx: Receiver<PersistenceAction>,
2968 ) -> Self {
2969 let persistence_handle = PersistenceHandle::new(action_tx);
2970
2971 let backup_handle = BackupHandle::spawn_service(MaybePlatformPath::chain_default(
2972 chain_spec.chain.clone(),
2973 ));
2974
2975 let consensus = Arc::new(EthBeaconConsensus::new(chain_spec.clone()));
2976
2977 let provider = MockEthProvider::default();
2978
2979 let payload_validator = EthereumEngineValidator::new(chain_spec.clone());
2980
2981 let (from_tree_tx, from_tree_rx) = unbounded_channel();
2982
2983 let header = chain_spec.genesis_header().clone();
2984 let header = SealedHeader::seal_slow(header);
2985 let engine_api_tree_state =
2986 EngineApiTreeState::new(10, 10, header.num_hash(), EngineApiKind::Ethereum);
2987 let canonical_in_memory_state = CanonicalInMemoryState::with_head(header, None, None);
2988
2989 let (to_payload_service, _payload_command_rx) = unbounded_channel();
2990 let payload_builder = PayloadBuilderHandle::new(to_payload_service);
2991
2992 let evm_config = MockEvmConfig::default();
2993
2994 let tree = EngineApiTreeHandler::new(
2995 provider.clone(),
2996 consensus,
2997 payload_validator,
2998 from_tree_tx,
2999 engine_api_tree_state,
3000 canonical_in_memory_state,
3001 persistence_handle,
3002 PersistenceState::default(),
3003 payload_builder,
3004 TreeConfig::default()
3007 .with_legacy_state_root(true)
3008 .with_has_enough_parallelism(true),
3009 EngineApiKind::Ethereum,
3010 evm_config.clone(),
3011 backup_handle,
3012 );
3013
3014 let block_builder = TestBlockBuilder::default().with_chain_spec((*chain_spec).clone());
3015 Self {
3016 to_tree_tx: tree.incoming_tx.clone(),
3017 tree,
3018 from_tree_rx,
3019 blocks: vec![],
3020 action_rx,
3021 evm_config,
3022 block_builder,
3023 provider,
3024 }
3025 }
3026
3027 fn with_blocks(mut self, blocks: Vec<ExecutedBlockWithTrieUpdates>) -> Self {
3028 let mut blocks_by_hash = HashMap::default();
3029 let mut blocks_by_number = BTreeMap::new();
3030 let mut state_by_hash = HashMap::default();
3031 let mut hash_by_number = BTreeMap::new();
3032 let mut parent_to_child: HashMap<B256, HashSet<B256>> = HashMap::default();
3033 let mut parent_hash = B256::ZERO;
3034
3035 for block in &blocks {
3036 let sealed_block = block.recovered_block();
3037 let hash = sealed_block.hash();
3038 let number = sealed_block.number;
3039 blocks_by_hash.insert(hash, block.clone());
3040 blocks_by_number.entry(number).or_insert_with(Vec::new).push(block.clone());
3041 state_by_hash.insert(hash, Arc::new(BlockState::new(block.clone())));
3042 hash_by_number.insert(number, hash);
3043 parent_to_child.entry(parent_hash).or_default().insert(hash);
3044 parent_hash = hash;
3045 }
3046
3047 self.tree.state.tree_state = TreeState {
3048 blocks_by_hash,
3049 blocks_by_number,
3050 current_canonical_head: blocks.last().unwrap().recovered_block().num_hash(),
3051 parent_to_child,
3052 persisted_trie_updates: HashMap::default(),
3053 engine_kind: EngineApiKind::Ethereum,
3054 };
3055
3056 let last_executed_block = blocks.last().unwrap().clone();
3057 let pending = Some(BlockState::new(last_executed_block));
3058 self.tree.canonical_in_memory_state =
3059 CanonicalInMemoryState::new(state_by_hash, hash_by_number, pending, None, None);
3060
3061 self.blocks = blocks.clone();
3062
3063 let recovered_blocks =
3064 blocks.iter().map(|b| b.recovered_block().clone()).collect::<Vec<_>>();
3065
3066 self.persist_blocks(recovered_blocks);
3067
3068 self
3069 }
3070
3071 const fn with_backfill_state(mut self, state: BackfillSyncState) -> Self {
3072 self.tree.backfill_sync_state = state;
3073 self
3074 }
3075
3076 fn extend_execution_outcome(
3077 &self,
3078 execution_outcomes: impl IntoIterator<Item = impl Into<ExecutionOutcome>>,
3079 ) {
3080 self.evm_config.extend(execution_outcomes);
3081 }
3082
3083 fn insert_block(
3084 &mut self,
3085 block: RecoveredBlock<reth_ethereum_primitives::Block>,
3086 ) -> Result<InsertPayloadOk, InsertBlockError<Block>> {
3087 let execution_outcome = self.block_builder.get_execution_outcome(block.clone());
3088 self.extend_execution_outcome([execution_outcome]);
3089 self.tree.provider.add_state_root(block.state_root);
3090 self.tree.insert_block(block)
3091 }
3092
3093 async fn fcu_to(&mut self, block_hash: B256, fcu_status: impl Into<ForkchoiceStatus>) {
3094 let fcu_status = fcu_status.into();
3095
3096 self.send_fcu(block_hash, fcu_status).await;
3097
3098 self.check_fcu(block_hash, fcu_status).await;
3099 }
3100
3101 async fn send_fcu(&mut self, block_hash: B256, fcu_status: impl Into<ForkchoiceStatus>) {
3102 let fcu_state = self.fcu_state(block_hash);
3103
3104 let (tx, rx) = oneshot::channel();
3105 self.tree
3106 .on_engine_message(FromEngine::Request(
3107 BeaconEngineMessage::ForkchoiceUpdated {
3108 state: fcu_state,
3109 payload_attrs: None,
3110 tx,
3111 version: EngineApiMessageVersion::default(),
3112 }
3113 .into(),
3114 ))
3115 .unwrap();
3116
3117 let response = rx.await.unwrap().unwrap().await.unwrap();
3118 match fcu_status.into() {
3119 ForkchoiceStatus::Valid => assert!(response.payload_status.is_valid()),
3120 ForkchoiceStatus::Syncing => assert!(response.payload_status.is_syncing()),
3121 ForkchoiceStatus::Invalid => assert!(response.payload_status.is_invalid()),
3122 }
3123 }
3124
3125 async fn check_fcu(&mut self, block_hash: B256, fcu_status: impl Into<ForkchoiceStatus>) {
3126 let fcu_state = self.fcu_state(block_hash);
3127
3128 let event = self.from_tree_rx.recv().await.unwrap();
3130 match event {
3131 EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::ForkchoiceUpdated(
3132 state,
3133 status,
3134 )) => {
3135 assert_eq!(state, fcu_state);
3136 assert_eq!(status, fcu_status.into());
3137 }
3138 _ => panic!("Unexpected event: {event:#?}"),
3139 }
3140 }
3141
3142 const fn fcu_state(&self, block_hash: B256) -> ForkchoiceState {
3143 ForkchoiceState {
3144 head_block_hash: block_hash,
3145 safe_block_hash: block_hash,
3146 finalized_block_hash: block_hash,
3147 }
3148 }
3149
3150 async fn send_new_payload(
3151 &mut self,
3152 block: RecoveredBlock<reth_ethereum_primitives::Block>,
3153 ) {
3154 let payload = ExecutionPayloadV3::from_block_unchecked(
3155 block.hash(),
3156 &block.clone_sealed_block().into_block(),
3157 );
3158 self.tree
3159 .on_new_payload(ExecutionData {
3160 payload: payload.into(),
3161 sidecar: ExecutionPayloadSidecar::v3(CancunPayloadFields {
3162 parent_beacon_block_root: block.parent_beacon_block_root.unwrap(),
3163 versioned_hashes: vec![],
3164 }),
3165 })
3166 .unwrap();
3167 }
3168
3169 async fn insert_chain(
3170 &mut self,
3171 chain: impl IntoIterator<Item = RecoveredBlock<reth_ethereum_primitives::Block>> + Clone,
3172 ) {
3173 for block in chain.clone() {
3174 self.insert_block(block.clone()).unwrap();
3175 }
3176 self.check_canon_chain_insertion(chain).await;
3177 }
3178
3179 async fn check_canon_commit(&mut self, hash: B256) {
3180 let event = self.from_tree_rx.recv().await.unwrap();
3181 match event {
3182 EngineApiEvent::BeaconConsensus(
3183 BeaconConsensusEngineEvent::CanonicalChainCommitted(header, _),
3184 ) => {
3185 assert_eq!(header.hash(), hash);
3186 }
3187 _ => panic!("Unexpected event: {event:#?}"),
3188 }
3189 }
3190
3191 async fn check_fork_chain_insertion(
3192 &mut self,
3193 chain: impl IntoIterator<Item = RecoveredBlock<reth_ethereum_primitives::Block>> + Clone,
3194 ) {
3195 for block in chain {
3196 self.check_fork_block_added(block.hash()).await;
3197 }
3198 }
3199
3200 async fn check_canon_chain_insertion(
3201 &mut self,
3202 chain: impl IntoIterator<Item = RecoveredBlock<reth_ethereum_primitives::Block>> + Clone,
3203 ) {
3204 for block in chain.clone() {
3205 self.check_canon_block_added(block.hash()).await;
3206 }
3207 }
3208
3209 async fn check_canon_block_added(&mut self, expected_hash: B256) {
3210 let event = self.from_tree_rx.recv().await.unwrap();
3211 match event {
3212 EngineApiEvent::BeaconConsensus(
3213 BeaconConsensusEngineEvent::CanonicalBlockAdded(executed, _),
3214 ) => {
3215 assert_eq!(executed.recovered_block.hash(), expected_hash);
3216 }
3217 _ => panic!("Unexpected event: {event:#?}"),
3218 }
3219 }
3220
3221 async fn check_fork_block_added(&mut self, expected_hash: B256) {
3222 let event = self.from_tree_rx.recv().await.unwrap();
3223 match event {
3224 EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::ForkBlockAdded(
3225 executed,
3226 _,
3227 )) => {
3228 assert_eq!(executed.recovered_block.hash(), expected_hash);
3229 }
3230 _ => panic!("Unexpected event: {event:#?}"),
3231 }
3232 }
3233
3234 async fn check_invalid_block(&mut self, expected_hash: B256) {
3235 let event = self.from_tree_rx.recv().await.unwrap();
3236 match event {
3237 EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::InvalidBlock(
3238 block,
3239 )) => {
3240 assert_eq!(block.hash(), expected_hash);
3241 }
3242 _ => panic!("Unexpected event: {event:#?}"),
3243 }
3244 }
3245
3246 fn persist_blocks(&self, blocks: Vec<RecoveredBlock<reth_ethereum_primitives::Block>>) {
3247 let mut block_data: Vec<(B256, Block)> = Vec::with_capacity(blocks.len());
3248 let mut headers_data: Vec<(B256, Header)> = Vec::with_capacity(blocks.len());
3249
3250 for block in &blocks {
3251 block_data.push((block.hash(), block.clone_block()));
3252 headers_data.push((block.hash(), block.header().clone()));
3253 }
3254
3255 self.provider.extend_blocks(block_data);
3256 self.provider.extend_headers(headers_data);
3257 }
3258
3259 fn setup_range_insertion_for_valid_chain(
3260 &mut self,
3261 chain: Vec<RecoveredBlock<reth_ethereum_primitives::Block>>,
3262 ) {
3263 self.setup_range_insertion_for_chain(chain, None)
3264 }
3265
3266 fn setup_range_insertion_for_invalid_chain(
3267 &mut self,
3268 chain: Vec<RecoveredBlock<reth_ethereum_primitives::Block>>,
3269 index: usize,
3270 ) {
3271 self.setup_range_insertion_for_chain(chain, Some(index))
3272 }
3273
3274 fn setup_range_insertion_for_chain(
3275 &mut self,
3276 chain: Vec<RecoveredBlock<reth_ethereum_primitives::Block>>,
3277 invalid_index: Option<usize>,
3278 ) {
3279 let mut chain_rev = chain;
3282 chain_rev.reverse();
3283
3284 let mut execution_outcomes = Vec::with_capacity(chain_rev.len());
3285 for (index, block) in chain_rev.iter().enumerate() {
3286 let execution_outcome = self.block_builder.get_execution_outcome(block.clone());
3287 let state_root = if invalid_index.is_some() && invalid_index.unwrap() == index {
3288 B256::random()
3289 } else {
3290 block.state_root
3291 };
3292 self.tree.provider.add_state_root(state_root);
3293 execution_outcomes.push(execution_outcome);
3294 }
3295 self.extend_execution_outcome(execution_outcomes);
3296 }
3297
3298 fn check_canon_head(&self, head_hash: B256) {
3299 assert_eq!(self.tree.state.tree_state.canonical_head().hash, head_hash);
3300 }
3301 }
3302
3303 #[test]
3304 fn test_tree_persist_block_batch() {
3305 let tree_config = TreeConfig::default();
3306 let chain_spec = MAINNET.clone();
3307 let mut test_block_builder = TestBlockBuilder::eth().with_chain_spec((*chain_spec).clone());
3308
3309 let blocks: Vec<_> = test_block_builder
3312 .get_executed_blocks(1..tree_config.persistence_threshold() + 2)
3313 .collect();
3314 let mut test_harness = TestHarness::new(chain_spec).with_blocks(blocks);
3315
3316 let mut blocks = vec![];
3317 for idx in 0..tree_config.max_execute_block_batch_size() * 2 {
3318 blocks.push(test_block_builder.generate_random_block(idx as u64, B256::random()));
3319 }
3320
3321 test_harness.to_tree_tx.send(FromEngine::DownloadedBlocks(blocks)).unwrap();
3322
3323 let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap();
3325 test_harness.tree.on_engine_message(msg).unwrap();
3326
3327 let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap();
3329 match msg {
3330 FromEngine::DownloadedBlocks(blocks) => {
3331 assert_eq!(blocks.len(), tree_config.max_execute_block_batch_size());
3332 }
3333 _ => panic!("unexpected message: {msg:#?}"),
3334 }
3335 }
3336
3337 #[tokio::test]
3338 async fn test_tree_persist_blocks() {
3339 let tree_config = TreeConfig::default();
3340 let chain_spec = MAINNET.clone();
3341 let mut test_block_builder = TestBlockBuilder::eth().with_chain_spec((*chain_spec).clone());
3342
3343 let blocks: Vec<_> = test_block_builder
3346 .get_executed_blocks(1..tree_config.persistence_threshold() + 2)
3347 .collect();
3348 let test_harness = TestHarness::new(chain_spec).with_blocks(blocks.clone());
3349 std::thread::Builder::new()
3350 .name("Tree Task".to_string())
3351 .spawn(|| test_harness.tree.run())
3352 .unwrap();
3353
3354 test_harness.to_tree_tx.send(FromEngine::DownloadedBlocks(vec![])).unwrap();
3356
3357 let received_action =
3358 test_harness.action_rx.recv().expect("Failed to receive save blocks action");
3359 if let PersistenceAction::SaveBlocks(saved_blocks, _) = received_action {
3360 let expected_persist_len =
3363 blocks.len() - tree_config.memory_block_buffer_target() as usize;
3364 assert_eq!(saved_blocks.len(), expected_persist_len);
3365 assert_eq!(saved_blocks, blocks[..expected_persist_len]);
3366 } else {
3367 panic!("unexpected action received {received_action:?}");
3368 }
3369 }
3370
3371 #[tokio::test]
3372 async fn test_in_memory_state_trait_impl() {
3373 let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(0..10).collect();
3374 let test_harness = TestHarness::new(MAINNET.clone()).with_blocks(blocks.clone());
3375
3376 for executed_block in blocks {
3377 let sealed_block = executed_block.recovered_block();
3378
3379 let expected_state = BlockState::new(executed_block.clone());
3380
3381 let actual_state_by_hash = test_harness
3382 .tree
3383 .canonical_in_memory_state
3384 .state_by_hash(sealed_block.hash())
3385 .unwrap();
3386 assert_eq!(expected_state, *actual_state_by_hash);
3387
3388 let actual_state_by_number = test_harness
3389 .tree
3390 .canonical_in_memory_state
3391 .state_by_number(sealed_block.number)
3392 .unwrap();
3393 assert_eq!(expected_state, *actual_state_by_number);
3394 }
3395 }
3396
3397 #[tokio::test]
3398 async fn test_engine_request_during_backfill() {
3399 let tree_config = TreeConfig::default();
3400 let blocks: Vec<_> = TestBlockBuilder::eth()
3401 .get_executed_blocks(0..tree_config.persistence_threshold())
3402 .collect();
3403 let mut test_harness = TestHarness::new(MAINNET.clone())
3404 .with_blocks(blocks)
3405 .with_backfill_state(BackfillSyncState::Active);
3406
3407 let (tx, rx) = oneshot::channel();
3408 test_harness
3409 .tree
3410 .on_engine_message(FromEngine::Request(
3411 BeaconEngineMessage::ForkchoiceUpdated {
3412 state: ForkchoiceState {
3413 head_block_hash: B256::random(),
3414 safe_block_hash: B256::random(),
3415 finalized_block_hash: B256::random(),
3416 },
3417 payload_attrs: None,
3418 tx,
3419 version: EngineApiMessageVersion::default(),
3420 }
3421 .into(),
3422 ))
3423 .unwrap();
3424
3425 let resp = rx.await.unwrap().unwrap().await.unwrap();
3426 assert!(resp.payload_status.is_syncing());
3427 }
3428
3429 #[test]
3430 fn test_disconnected_payload() {
3431 let s = include_str!("../../test-data/holesky/2.rlp");
3432 let data = Bytes::from_str(s).unwrap();
3433 let block = Block::decode(&mut data.as_ref()).unwrap();
3434 let sealed = block.seal_slow();
3435 let hash = sealed.hash();
3436 let payload = ExecutionPayloadV1::from_block_unchecked(hash, &sealed.clone().into_block());
3437
3438 let mut test_harness = TestHarness::new(HOLESKY.clone());
3439
3440 let outcome = test_harness
3441 .tree
3442 .on_new_payload(ExecutionData {
3443 payload: payload.into(),
3444 sidecar: ExecutionPayloadSidecar::none(),
3445 })
3446 .unwrap();
3447 assert!(outcome.outcome.is_syncing());
3448
3449 let buffered = test_harness.tree.state.buffer.block(&hash).unwrap();
3451 assert_eq!(buffered.clone_sealed_block(), sealed);
3452 }
3453
3454 #[test]
3455 fn test_disconnected_block() {
3456 let s = include_str!("../../test-data/holesky/2.rlp");
3457 let data = Bytes::from_str(s).unwrap();
3458 let block = Block::decode(&mut data.as_ref()).unwrap();
3459 let sealed = block.seal_slow().try_recover().unwrap();
3460
3461 let mut test_harness = TestHarness::new(HOLESKY.clone());
3462
3463 let outcome = test_harness.tree.insert_block(sealed.clone()).unwrap();
3464 assert_eq!(
3465 outcome,
3466 InsertPayloadOk::Inserted(BlockStatus::Disconnected {
3467 head: test_harness.tree.state.tree_state.current_canonical_head,
3468 missing_ancestor: sealed.parent_num_hash()
3469 })
3470 );
3471 }
3472
3473 #[tokio::test]
3474 async fn test_holesky_payload() {
3475 let s = include_str!("../../test-data/holesky/1.rlp");
3476 let data = Bytes::from_str(s).unwrap();
3477 let block: Block = Block::decode(&mut data.as_ref()).unwrap();
3478 let sealed = block.seal_slow();
3479 let payload =
3480 ExecutionPayloadV1::from_block_unchecked(sealed.hash(), &sealed.clone().into_block());
3481
3482 let mut test_harness =
3483 TestHarness::new(HOLESKY.clone()).with_backfill_state(BackfillSyncState::Active);
3484
3485 let (tx, rx) = oneshot::channel();
3486 test_harness
3487 .tree
3488 .on_engine_message(FromEngine::Request(
3489 BeaconEngineMessage::NewPayload {
3490 payload: ExecutionData {
3491 payload: payload.clone().into(),
3492 sidecar: ExecutionPayloadSidecar::none(),
3493 },
3494 tx,
3495 }
3496 .into(),
3497 ))
3498 .unwrap();
3499
3500 let resp = rx.await.unwrap().unwrap();
3501 assert!(resp.is_syncing());
3502 }
3503
3504 #[tokio::test]
3505 async fn test_tree_state_on_new_head_reorg() {
3506 reth_tracing::init_test_tracing();
3507 let chain_spec = MAINNET.clone();
3508
3509 let mut test_harness = TestHarness::new(chain_spec);
3511 test_harness.tree.config = test_harness
3512 .tree
3513 .config
3514 .with_persistence_threshold(1)
3515 .with_memory_block_buffer_target(1);
3516 let mut test_block_builder = TestBlockBuilder::eth();
3517 let blocks: Vec<_> = test_block_builder.get_executed_blocks(1..6).collect();
3518
3519 for block in &blocks {
3520 test_harness.tree.state.tree_state.insert_executed(block.clone());
3521 }
3522
3523 test_harness
3525 .tree
3526 .state
3527 .tree_state
3528 .set_canonical_head(blocks[2].recovered_block().num_hash());
3529
3530 let fork_block_3 = test_block_builder
3532 .get_executed_block_with_number(3, blocks[1].recovered_block().hash());
3533 let fork_block_4 = test_block_builder
3534 .get_executed_block_with_number(4, fork_block_3.recovered_block().hash());
3535 let fork_block_5 = test_block_builder
3536 .get_executed_block_with_number(5, fork_block_4.recovered_block().hash());
3537
3538 test_harness.tree.state.tree_state.insert_executed(fork_block_3.clone());
3539 test_harness.tree.state.tree_state.insert_executed(fork_block_4.clone());
3540 test_harness.tree.state.tree_state.insert_executed(fork_block_5.clone());
3541
3542 let result = test_harness.tree.on_new_head(blocks[4].recovered_block().hash()).unwrap();
3544 assert!(matches!(result, Some(NewCanonicalChain::Commit { .. })));
3545 if let Some(NewCanonicalChain::Commit { new }) = result {
3546 assert_eq!(new.len(), 2);
3547 assert_eq!(new[0].recovered_block().hash(), blocks[3].recovered_block().hash());
3548 assert_eq!(new[1].recovered_block().hash(), blocks[4].recovered_block().hash());
3549 }
3550
3551 let current_action = test_harness.tree.persistence_state.current_action();
3553 assert_eq!(current_action, None);
3554
3555 test_harness.tree.advance_persistence().unwrap();
3560 let current_action = test_harness.tree.persistence_state.current_action().cloned();
3561 assert_eq!(
3562 current_action,
3563 Some(CurrentPersistenceAction::SavingBlocks {
3564 highest: blocks[1].recovered_block().num_hash()
3565 })
3566 );
3567
3568 let received_action = test_harness.action_rx.recv().unwrap();
3570 let PersistenceAction::SaveBlocks(saved_blocks, sender) = received_action else {
3571 panic!("received wrong action");
3572 };
3573 assert_eq!(saved_blocks, vec![blocks[0].clone(), blocks[1].clone()]);
3574
3575 sender.send(Some(blocks[1].recovered_block().num_hash())).unwrap();
3577
3578 let current_action = test_harness.tree.persistence_state.current_action().cloned();
3580 assert_eq!(
3581 current_action,
3582 Some(CurrentPersistenceAction::SavingBlocks {
3583 highest: blocks[1].recovered_block().num_hash()
3584 })
3585 );
3586
3587 test_harness.tree.advance_persistence().unwrap();
3589 let current_action = test_harness.tree.persistence_state.current_action().cloned();
3590 assert_eq!(current_action, None);
3591
3592 let result = test_harness.tree.on_new_head(fork_block_5.recovered_block().hash()).unwrap();
3594 assert!(matches!(result, Some(NewCanonicalChain::Reorg { .. })));
3595
3596 if let Some(NewCanonicalChain::Reorg { new, old }) = result {
3597 assert_eq!(new.len(), 3);
3598 assert_eq!(new[0].recovered_block().hash(), fork_block_3.recovered_block().hash());
3599 assert_eq!(new[1].recovered_block().hash(), fork_block_4.recovered_block().hash());
3600 assert_eq!(new[2].recovered_block().hash(), fork_block_5.recovered_block().hash());
3601
3602 assert_eq!(old.len(), 1);
3603 assert_eq!(old[0].recovered_block().hash(), blocks[2].recovered_block().hash());
3604 }
3605
3606 test_harness.tree.advance_persistence().unwrap();
3608 let current_action = test_harness.tree.persistence_state.current_action().cloned();
3609 assert_eq!(current_action, None);
3610
3611 test_harness
3613 .tree
3614 .state
3615 .tree_state
3616 .set_canonical_head(fork_block_5.recovered_block().num_hash());
3617
3618 test_harness.tree.advance_persistence().unwrap();
3621 let current_action = test_harness.tree.persistence_state.current_action().cloned();
3622 assert_eq!(
3623 current_action,
3624 Some(CurrentPersistenceAction::SavingBlocks {
3625 highest: fork_block_4.recovered_block().num_hash()
3626 })
3627 );
3628 }
3629
3630 #[test]
3631 fn test_tree_state_on_new_head_deep_fork() {
3632 reth_tracing::init_test_tracing();
3633
3634 let chain_spec = MAINNET.clone();
3635 let mut test_harness = TestHarness::new(chain_spec);
3636 let mut test_block_builder = TestBlockBuilder::eth();
3637
3638 let blocks: Vec<_> = test_block_builder.get_executed_blocks(0..5).collect();
3639
3640 for block in &blocks {
3641 test_harness.tree.state.tree_state.insert_executed(block.clone());
3642 }
3643
3644 let last_block = blocks.last().unwrap().recovered_block().clone();
3646
3647 test_harness.tree.state.tree_state.set_canonical_head(last_block.num_hash());
3648
3649 let chain_a = test_block_builder.create_fork(&last_block, 10);
3651 let chain_b = test_block_builder.create_fork(&last_block, 10);
3652
3653 for block in &chain_a {
3654 test_harness.tree.state.tree_state.insert_executed(ExecutedBlockWithTrieUpdates {
3655 block: ExecutedBlock {
3656 recovered_block: Arc::new(block.clone()),
3657 execution_output: Arc::new(ExecutionOutcome::default()),
3658 hashed_state: Arc::new(HashedPostState::default()),
3659 },
3660 trie: Arc::new(TrieUpdates::default()),
3661 });
3662 }
3663 test_harness.tree.state.tree_state.set_canonical_head(chain_a.last().unwrap().num_hash());
3664
3665 for block in &chain_b {
3666 test_harness.tree.state.tree_state.insert_executed(ExecutedBlockWithTrieUpdates {
3667 block: ExecutedBlock {
3668 recovered_block: Arc::new(block.clone()),
3669 execution_output: Arc::new(ExecutionOutcome::default()),
3670 hashed_state: Arc::new(HashedPostState::default()),
3671 },
3672 trie: Arc::new(TrieUpdates::default()),
3673 });
3674 }
3675
3676 let mut expected_new = Vec::new();
3678 for block in &chain_b {
3679 let result = test_harness.tree.on_new_head(block.hash()).unwrap();
3681 assert_matches!(result, Some(NewCanonicalChain::Reorg { .. }));
3682
3683 expected_new.push(block);
3684 if let Some(NewCanonicalChain::Reorg { new, old }) = result {
3685 assert_eq!(new.len(), expected_new.len());
3686 for (index, block) in expected_new.iter().enumerate() {
3687 assert_eq!(new[index].recovered_block().hash(), block.hash());
3688 }
3689
3690 assert_eq!(old.len(), chain_a.len());
3691 for (index, block) in chain_a.iter().enumerate() {
3692 assert_eq!(old[index].recovered_block().hash(), block.hash());
3693 }
3694 }
3695
3696 test_harness.tree.on_new_head(chain_a.last().unwrap().hash()).unwrap();
3698 }
3699 }
3700
3701 #[tokio::test]
3702 async fn test_get_canonical_blocks_to_persist() {
3703 let chain_spec = MAINNET.clone();
3704 let mut test_harness = TestHarness::new(chain_spec);
3705 let mut test_block_builder = TestBlockBuilder::eth();
3706
3707 let canonical_head_number = 9;
3708 let blocks: Vec<_> =
3709 test_block_builder.get_executed_blocks(0..canonical_head_number + 1).collect();
3710 test_harness = test_harness.with_blocks(blocks.clone());
3711
3712 let last_persisted_block_number = 3;
3713 test_harness.tree.persistence_state.last_persisted_block =
3714 blocks[last_persisted_block_number as usize].recovered_block.num_hash();
3715
3716 let persistence_threshold = 4;
3717 let memory_block_buffer_target = 3;
3718 test_harness.tree.config = TreeConfig::default()
3719 .with_persistence_threshold(persistence_threshold)
3720 .with_memory_block_buffer_target(memory_block_buffer_target);
3721
3722 let blocks_to_persist = test_harness.tree.get_canonical_blocks_to_persist();
3723
3724 let expected_blocks_to_persist_length: usize =
3725 (canonical_head_number - memory_block_buffer_target - last_persisted_block_number)
3726 .try_into()
3727 .unwrap();
3728
3729 assert_eq!(blocks_to_persist.len(), expected_blocks_to_persist_length);
3730 for (i, item) in
3731 blocks_to_persist.iter().enumerate().take(expected_blocks_to_persist_length)
3732 {
3733 assert_eq!(item.recovered_block().number, last_persisted_block_number + i as u64 + 1);
3734 }
3735
3736 let fork_block = test_block_builder.get_executed_block_with_number(4, B256::random());
3738 let fork_block_hash = fork_block.recovered_block().hash();
3739 test_harness.tree.state.tree_state.insert_executed(fork_block);
3740
3741 assert!(test_harness.tree.state.tree_state.block_by_hash(fork_block_hash).is_some());
3742
3743 let blocks_to_persist = test_harness.tree.get_canonical_blocks_to_persist();
3744 assert_eq!(blocks_to_persist.len(), expected_blocks_to_persist_length);
3745
3746 assert!(!blocks_to_persist.iter().any(|b| b.recovered_block().hash() == fork_block_hash));
3748
3749 assert!(blocks_to_persist.iter().any(|b| b.recovered_block().number == 4 &&
3751 b.recovered_block().hash() == blocks[4].recovered_block().hash()));
3752
3753 test_harness.tree.advance_persistence().expect("advancing persistence should succeed");
3755 assert_eq!(
3756 test_harness.tree.persistence_state.current_action().cloned(),
3757 Some(CurrentPersistenceAction::SavingBlocks {
3758 highest: blocks_to_persist.last().unwrap().recovered_block().num_hash()
3759 })
3760 );
3761 }
3762
3763 #[tokio::test]
3764 async fn test_engine_tree_fcu_missing_head() {
3765 let chain_spec = MAINNET.clone();
3766 let mut test_harness = TestHarness::new(chain_spec.clone());
3767
3768 let mut test_block_builder = TestBlockBuilder::eth().with_chain_spec((*chain_spec).clone());
3769
3770 let blocks: Vec<_> = test_block_builder.get_executed_blocks(0..5).collect();
3771 test_harness = test_harness.with_blocks(blocks);
3772
3773 let missing_block = test_block_builder
3774 .generate_random_block(6, test_harness.blocks.last().unwrap().recovered_block().hash());
3775
3776 test_harness.fcu_to(missing_block.hash(), PayloadStatusEnum::Syncing).await;
3777
3778 let event = test_harness.from_tree_rx.recv().await.unwrap();
3780 match event {
3781 EngineApiEvent::Download(DownloadRequest::BlockSet(actual_block_set)) => {
3782 let expected_block_set = HashSet::from_iter([missing_block.hash()]);
3783 assert_eq!(actual_block_set, expected_block_set);
3784 }
3785 _ => panic!("Unexpected event: {event:#?}"),
3786 }
3787 }
3788
3789 #[tokio::test]
3790 async fn test_engine_tree_fcu_canon_chain_insertion() {
3791 let chain_spec = MAINNET.clone();
3792 let mut test_harness = TestHarness::new(chain_spec.clone());
3793
3794 let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
3795 test_harness = test_harness.with_blocks(base_chain.clone());
3796
3797 test_harness
3798 .fcu_to(base_chain.last().unwrap().recovered_block().hash(), ForkchoiceStatus::Valid)
3799 .await;
3800
3801 let main_chain = test_harness.block_builder.create_fork(base_chain[0].recovered_block(), 3);
3803
3804 test_harness.insert_chain(main_chain).await;
3805 }
3806
3807 #[tokio::test]
3808 async fn test_engine_tree_fcu_reorg_with_all_blocks() {
3809 let chain_spec = MAINNET.clone();
3810 let mut test_harness = TestHarness::new(chain_spec.clone());
3811
3812 let main_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..5).collect();
3813 test_harness = test_harness.with_blocks(main_chain.clone());
3814
3815 let fork_chain = test_harness.block_builder.create_fork(main_chain[2].recovered_block(), 3);
3816 let fork_chain_last_hash = fork_chain.last().unwrap().hash();
3817
3818 for block in &fork_chain {
3820 test_harness.insert_block(block.clone()).unwrap();
3821 }
3822
3823 test_harness.send_fcu(fork_chain_last_hash, ForkchoiceStatus::Valid).await;
3824
3825 test_harness.check_fork_chain_insertion(fork_chain.clone()).await;
3827
3828 test_harness.check_canon_commit(fork_chain_last_hash).await;
3830
3831 test_harness.check_fcu(fork_chain_last_hash, ForkchoiceStatus::Valid).await;
3832
3833 test_harness.check_canon_head(fork_chain_last_hash);
3835 }
3836
3837 #[tokio::test]
3838 async fn test_engine_tree_live_sync_transition_required_blocks_requested() {
3839 reth_tracing::init_test_tracing();
3840
3841 let chain_spec = MAINNET.clone();
3842 let mut test_harness = TestHarness::new(chain_spec.clone());
3843
3844 let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
3845 test_harness = test_harness.with_blocks(base_chain.clone());
3846
3847 test_harness
3848 .fcu_to(base_chain.last().unwrap().recovered_block().hash(), ForkchoiceStatus::Valid)
3849 .await;
3850
3851 let main_chain = test_harness
3853 .block_builder
3854 .create_fork(base_chain[0].recovered_block(), MIN_BLOCKS_FOR_PIPELINE_RUN + 10);
3855
3856 let main_chain_last_hash = main_chain.last().unwrap().hash();
3857 test_harness.send_fcu(main_chain_last_hash, ForkchoiceStatus::Syncing).await;
3858
3859 test_harness.check_fcu(main_chain_last_hash, ForkchoiceStatus::Syncing).await;
3860
3861 let backfill_finished_block_number = MIN_BLOCKS_FOR_PIPELINE_RUN + 1;
3863 let backfill_finished = FromOrchestrator::BackfillSyncFinished(ControlFlow::Continue {
3864 block_number: backfill_finished_block_number,
3865 });
3866
3867 let backfill_tip_block = main_chain[(backfill_finished_block_number - 1) as usize].clone();
3868 test_harness.provider.add_block(backfill_tip_block.hash(), backfill_tip_block.into_block());
3870 test_harness.tree.on_engine_message(FromEngine::Event(backfill_finished)).unwrap();
3871
3872 let event = test_harness.from_tree_rx.recv().await.unwrap();
3873 match event {
3874 EngineApiEvent::Download(DownloadRequest::BlockSet(hash_set)) => {
3875 assert_eq!(hash_set, HashSet::from_iter([main_chain_last_hash]));
3876 }
3877 _ => panic!("Unexpected event: {event:#?}"),
3878 }
3879
3880 test_harness
3881 .tree
3882 .on_engine_message(FromEngine::DownloadedBlocks(vec![main_chain
3883 .last()
3884 .unwrap()
3885 .clone()]))
3886 .unwrap();
3887
3888 let event = test_harness.from_tree_rx.recv().await.unwrap();
3889 match event {
3890 EngineApiEvent::Download(DownloadRequest::BlockRange(initial_hash, total_blocks)) => {
3891 assert_eq!(
3892 total_blocks,
3893 (main_chain.len() - backfill_finished_block_number as usize - 1) as u64
3894 );
3895 assert_eq!(initial_hash, main_chain.last().unwrap().parent_hash);
3896 }
3897 _ => panic!("Unexpected event: {event:#?}"),
3898 }
3899 }
3900
3901 #[tokio::test]
3902 async fn test_engine_tree_live_sync_transition_eventually_canonical() {
3903 reth_tracing::init_test_tracing();
3904
3905 let chain_spec = MAINNET.clone();
3906 let mut test_harness = TestHarness::new(chain_spec.clone());
3907 test_harness.tree.config = test_harness.tree.config.with_max_execute_block_batch_size(100);
3908
3909 let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
3911 test_harness = test_harness.with_blocks(base_chain.clone());
3912
3913 test_harness
3915 .fcu_to(base_chain.last().unwrap().recovered_block().hash(), ForkchoiceStatus::Valid)
3916 .await;
3917
3918 let main_chain = test_harness
3921 .block_builder
3922 .create_fork(base_chain[0].recovered_block(), MIN_BLOCKS_FOR_PIPELINE_RUN + 10);
3923
3924 let main_chain_last = main_chain.last().unwrap();
3925 let main_chain_last_hash = main_chain_last.hash();
3926 let main_chain_backfill_target =
3927 main_chain.get(MIN_BLOCKS_FOR_PIPELINE_RUN as usize).unwrap();
3928 let main_chain_backfill_target_hash = main_chain_backfill_target.hash();
3929
3930 test_harness.send_fcu(main_chain_backfill_target_hash, ForkchoiceStatus::Syncing).await;
3932 test_harness.check_fcu(main_chain_backfill_target_hash, ForkchoiceStatus::Syncing).await;
3933
3934 let event = test_harness.from_tree_rx.recv().await.unwrap();
3936 match event {
3937 EngineApiEvent::Download(DownloadRequest::BlockSet(hash_set)) => {
3938 assert_eq!(hash_set, HashSet::from_iter([main_chain_backfill_target_hash]));
3939 }
3940 _ => panic!("Unexpected event: {event:#?}"),
3941 }
3942
3943 test_harness
3945 .tree
3946 .on_engine_message(FromEngine::DownloadedBlocks(vec![
3947 main_chain_backfill_target.clone()
3948 ]))
3949 .unwrap();
3950
3951 let event = test_harness.from_tree_rx.recv().await.unwrap();
3953 match event {
3954 EngineApiEvent::BackfillAction(BackfillAction::Start(
3955 reth_stages::PipelineTarget::Sync(target_hash),
3956 )) => {
3957 assert_eq!(target_hash, main_chain_backfill_target_hash);
3958 }
3959 _ => panic!("Unexpected event: {event:#?}"),
3960 }
3961
3962 let backfilled_chain: Vec<_> =
3964 main_chain.clone().drain(0..(MIN_BLOCKS_FOR_PIPELINE_RUN + 1) as usize).collect();
3965 test_harness.persist_blocks(backfilled_chain.clone());
3966
3967 test_harness.setup_range_insertion_for_valid_chain(backfilled_chain);
3968
3969 test_harness
3971 .tree
3972 .on_engine_message(FromEngine::Event(FromOrchestrator::BackfillSyncFinished(
3973 ControlFlow::Continue { block_number: main_chain_backfill_target.number },
3974 )))
3975 .unwrap();
3976
3977 test_harness.fcu_to(main_chain_last_hash, ForkchoiceStatus::Syncing).await;
3979
3980 let event = test_harness.from_tree_rx.recv().await.unwrap();
3981 match event {
3982 EngineApiEvent::Download(DownloadRequest::BlockSet(target_hash)) => {
3983 assert_eq!(target_hash, HashSet::from_iter([main_chain_last_hash]));
3984 }
3985 _ => panic!("Unexpected event: {event:#?}"),
3986 }
3987
3988 test_harness
3990 .tree
3991 .on_engine_message(FromEngine::DownloadedBlocks(vec![main_chain_last.clone()]))
3992 .unwrap();
3993
3994 let event = test_harness.from_tree_rx.recv().await.unwrap();
3996 match event {
3997 EngineApiEvent::Download(DownloadRequest::BlockRange(initial_hash, total_blocks)) => {
3998 assert_eq!(
3999 total_blocks,
4000 (main_chain.len() - MIN_BLOCKS_FOR_PIPELINE_RUN as usize - 2) as u64
4001 );
4002 assert_eq!(initial_hash, main_chain_last.parent_hash);
4003 }
4004 _ => panic!("Unexpected event: {event:#?}"),
4005 }
4006
4007 let remaining: Vec<_> = main_chain
4008 .clone()
4009 .drain((MIN_BLOCKS_FOR_PIPELINE_RUN + 1) as usize..main_chain.len())
4010 .collect();
4011
4012 test_harness.setup_range_insertion_for_valid_chain(remaining.clone());
4013
4014 test_harness
4016 .tree
4017 .on_engine_message(FromEngine::DownloadedBlocks(remaining.clone()))
4018 .unwrap();
4019
4020 test_harness.check_canon_chain_insertion(remaining).await;
4021
4022 test_harness.check_canon_commit(main_chain_last_hash).await;
4024
4025 test_harness.check_canon_head(main_chain_last_hash);
4027 }
4028
4029 #[tokio::test]
4030 async fn test_engine_tree_live_sync_fcu_extends_canon_chain() {
4031 reth_tracing::init_test_tracing();
4032
4033 let chain_spec = MAINNET.clone();
4034 let mut test_harness = TestHarness::new(chain_spec.clone());
4035
4036 let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4038 test_harness = test_harness.with_blocks(base_chain.clone());
4039
4040 test_harness
4042 .fcu_to(base_chain.last().unwrap().recovered_block().hash(), ForkchoiceStatus::Valid)
4043 .await;
4044
4045 let main_chain =
4047 test_harness.block_builder.create_fork(base_chain[0].recovered_block(), 10);
4048 let target = main_chain.get(5).unwrap();
4050 let target_hash = target.hash();
4051 let main_last = main_chain.last().unwrap();
4052 let main_last_hash = main_last.hash();
4053
4054 test_harness.insert_chain(main_chain).await;
4056
4057 test_harness.send_fcu(target_hash, ForkchoiceStatus::Valid).await;
4059
4060 test_harness.check_canon_commit(target_hash).await;
4061 test_harness.check_fcu(target_hash, ForkchoiceStatus::Valid).await;
4062
4063 test_harness.send_fcu(main_last_hash, ForkchoiceStatus::Valid).await;
4065
4066 test_harness.check_canon_commit(main_last_hash).await;
4067 test_harness.check_fcu(main_last_hash, ForkchoiceStatus::Valid).await;
4068 test_harness.check_canon_head(main_last_hash);
4069 }
4070
4071 #[tokio::test]
4072 async fn test_engine_tree_valid_forks_with_older_canonical_head() {
4073 reth_tracing::init_test_tracing();
4074
4075 let chain_spec = MAINNET.clone();
4076 let mut test_harness = TestHarness::new(chain_spec.clone());
4077
4078 let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4080 test_harness = test_harness.with_blocks(base_chain.clone());
4081
4082 let old_head = base_chain.first().unwrap().recovered_block();
4083
4084 let extension_chain = test_harness.block_builder.create_fork(old_head, 5);
4086 let fork_block = extension_chain.last().unwrap().clone_sealed_block();
4087
4088 test_harness.setup_range_insertion_for_valid_chain(extension_chain.clone());
4089 test_harness.insert_chain(extension_chain).await;
4090
4091 test_harness.fcu_to(old_head.hash(), ForkchoiceStatus::Valid).await;
4093
4094 let chain_a = test_harness.block_builder.create_fork(&fork_block, 10);
4096 let chain_b = test_harness.block_builder.create_fork(&fork_block, 10);
4097
4098 test_harness.setup_range_insertion_for_valid_chain(chain_a.clone());
4100 for block in &chain_a {
4101 test_harness.send_new_payload(block.clone()).await;
4102 }
4103
4104 test_harness.check_canon_chain_insertion(chain_a.clone()).await;
4105
4106 test_harness.setup_range_insertion_for_valid_chain(chain_b.clone());
4108 for block in &chain_b {
4109 test_harness.send_new_payload(block.clone()).await;
4110 }
4111
4112 test_harness.check_canon_chain_insertion(chain_b.clone()).await;
4113
4114 let chain_b_tip_hash = chain_b.last().unwrap().hash();
4116 test_harness.send_fcu(chain_b_tip_hash, ForkchoiceStatus::Valid).await;
4117
4118 test_harness.check_canon_commit(chain_b_tip_hash).await;
4120
4121 test_harness.check_fcu(chain_b_tip_hash, ForkchoiceStatus::Valid).await;
4123
4124 test_harness.check_canon_head(chain_b_tip_hash);
4126
4127 assert!(test_harness.tree.is_fork(chain_a.last().unwrap().hash()).unwrap());
4129 }
4130
4131 #[tokio::test]
4132 async fn test_engine_tree_buffered_blocks_are_eventually_connected() {
4133 let chain_spec = MAINNET.clone();
4134 let mut test_harness = TestHarness::new(chain_spec.clone());
4135
4136 let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4137 test_harness = test_harness.with_blocks(base_chain.clone());
4138
4139 let side_chain =
4142 test_harness.block_builder.create_fork(base_chain.last().unwrap().recovered_block(), 2);
4143
4144 let buffered_block = side_chain.last().unwrap();
4146 let buffered_block_hash = buffered_block.hash();
4147
4148 test_harness.setup_range_insertion_for_valid_chain(vec![buffered_block.clone()]);
4149 test_harness.send_new_payload(buffered_block.clone()).await;
4150
4151 assert!(test_harness.tree.state.buffer.block(&buffered_block_hash).is_some());
4152
4153 let non_buffered_block = side_chain.first().unwrap();
4154 let non_buffered_block_hash = non_buffered_block.hash();
4155
4156 test_harness.setup_range_insertion_for_valid_chain(vec![non_buffered_block.clone()]);
4158 test_harness.send_new_payload(non_buffered_block.clone()).await;
4159 assert!(test_harness.tree.state.buffer.block(&non_buffered_block_hash).is_none());
4160
4161 assert!(test_harness.tree.state.buffer.block(&buffered_block_hash).is_none());
4163
4164 test_harness.check_canon_block_added(non_buffered_block_hash).await;
4166 test_harness.check_canon_block_added(buffered_block_hash).await;
4167 }
4168
4169 #[tokio::test]
4170 async fn test_engine_tree_valid_and_invalid_forks_with_older_canonical_head() {
4171 reth_tracing::init_test_tracing();
4172
4173 let chain_spec = MAINNET.clone();
4174 let mut test_harness = TestHarness::new(chain_spec.clone());
4175
4176 let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4178 test_harness = test_harness.with_blocks(base_chain.clone());
4179
4180 let old_head = base_chain.first().unwrap().recovered_block();
4181
4182 let extension_chain = test_harness.block_builder.create_fork(old_head, 5);
4184 let fork_block = extension_chain.last().unwrap().clone_sealed_block();
4185 test_harness.insert_chain(extension_chain).await;
4186
4187 test_harness.fcu_to(old_head.hash(), ForkchoiceStatus::Valid).await;
4189
4190 let total_fork_elements = 10;
4192 let chain_a = test_harness.block_builder.create_fork(&fork_block, total_fork_elements);
4193 let chain_b = test_harness.block_builder.create_fork(&fork_block, total_fork_elements);
4194
4195 test_harness.setup_range_insertion_for_valid_chain(chain_b.clone());
4197 for block in &chain_b {
4198 test_harness.send_new_payload(block.clone()).await;
4199 test_harness.send_fcu(block.hash(), ForkchoiceStatus::Valid).await;
4200 test_harness.check_canon_block_added(block.hash()).await;
4201 test_harness.check_canon_commit(block.hash()).await;
4202 test_harness.check_fcu(block.hash(), ForkchoiceStatus::Valid).await;
4203 }
4204
4205 let invalid_index = 3;
4207 test_harness.setup_range_insertion_for_invalid_chain(chain_a.clone(), invalid_index);
4208 for block in &chain_a {
4209 test_harness.send_new_payload(block.clone()).await;
4210 }
4211
4212 test_harness
4215 .check_fork_chain_insertion(
4216 chain_a[..chain_a.len() - invalid_index - 1].iter().cloned(),
4217 )
4218 .await;
4219 for block in &chain_a[chain_a.len() - invalid_index - 1..] {
4220 test_harness.check_invalid_block(block.hash()).await;
4221 }
4222
4223 let chain_a_tip_hash = chain_a.last().unwrap().hash();
4225 test_harness.fcu_to(chain_a_tip_hash, ForkchoiceStatus::Invalid).await;
4226
4227 let chain_b_tip_hash = chain_b.last().unwrap().hash();
4229
4230 test_harness.check_canon_head(chain_b_tip_hash);
4232
4233 test_harness.check_canon_head(chain_b_tip_hash);
4235 }
4236
4237 #[tokio::test]
4238 async fn test_engine_tree_reorg_with_missing_ancestor_expecting_valid() {
4239 reth_tracing::init_test_tracing();
4240 let chain_spec = MAINNET.clone();
4241 let mut test_harness = TestHarness::new(chain_spec.clone());
4242
4243 let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..6).collect();
4244 test_harness = test_harness.with_blocks(base_chain.clone());
4245
4246 let side_chain = test_harness
4248 .block_builder
4249 .create_fork(base_chain.last().unwrap().recovered_block(), 15);
4250 let invalid_index = 9;
4251
4252 test_harness.setup_range_insertion_for_invalid_chain(side_chain.clone(), invalid_index);
4253
4254 for (index, block) in side_chain.iter().enumerate() {
4255 test_harness.send_new_payload(block.clone()).await;
4256
4257 if index < side_chain.len() - invalid_index - 1 {
4258 test_harness.send_fcu(block.hash(), ForkchoiceStatus::Valid).await;
4259 }
4260 }
4261
4262 let fork_tip_hash = side_chain.last().unwrap().hash();
4264 test_harness.send_fcu(fork_tip_hash, ForkchoiceStatus::Invalid).await;
4265 }
4266}