1use crate::{
2 backfill::{BackfillAction, BackfillSyncState},
3 backup::{BackupAction, BackupHandle},
4 chain::FromOrchestrator,
5 engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
6 persistence::PersistenceHandle,
7 tree::metrics::EngineApiMetrics,
8};
9use alloy_consensus::BlockHeader;
10use alloy_eips::BlockNumHash;
11use alloy_primitives::{
12 map::{HashMap, HashSet},
13 BlockNumber, B256, U256,
14};
15use alloy_rpc_types_engine::{
16 ExecutionPayload, ExecutionPayloadSidecar, ForkchoiceState, PayloadStatus, PayloadStatusEnum,
17 PayloadValidationError,
18};
19use reth_beacon_consensus::{
20 BeaconConsensusEngineEvent, InvalidHeaderCache, MIN_BLOCKS_FOR_PIPELINE_RUN,
21};
22use reth_blockchain_tree::{
23 error::{InsertBlockErrorKindTwo, InsertBlockErrorTwo, InsertBlockFatalError},
24 BlockBuffer, BlockStatus2, InsertPayloadOk2,
25};
26use reth_chain_state::{
27 CanonicalInMemoryState, ExecutedBlock, MemoryOverlayStateProvider, NewCanonicalChain,
28};
29use reth_consensus::{Consensus, FullConsensus, PostExecutionInput};
30use reth_engine_primitives::{
31 BeaconEngineMessage, BeaconOnNewPayloadError, EngineApiMessageVersion, EngineTypes,
32 EngineValidator, ForkchoiceStateTracker, OnForkChoiceUpdated,
33};
34use reth_errors::{ConsensusError, ProviderResult};
35use reth_evm::execute::BlockExecutorProvider;
36use reth_payload_builder::PayloadBuilderHandle;
37use reth_payload_builder_primitives::PayloadBuilder;
38use reth_payload_primitives::PayloadBuilderAttributes;
39use reth_primitives::{
40 EthPrimitives, GotExpected, NodePrimitives, SealedBlockFor, SealedBlockWithSenders,
41 SealedHeader,
42};
43use reth_primitives_traits::Block;
44use reth_provider::{
45 providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, ExecutionOutcome,
46 HashedPostStateProvider, ProviderError, StateCommitmentProvider, StateProviderBox,
47 StateProviderFactory, StateReader, StateRootProvider, TransactionVariant,
48};
49use reth_revm::database::StateProviderDatabase;
50use reth_stages_api::ControlFlow;
51use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput};
52use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
53use revm_primitives::EvmState;
54use std::{
55 cmp::Ordering,
56 collections::{btree_map, hash_map, BTreeMap, VecDeque},
57 fmt::Debug,
58 ops::Bound,
59 sync::{
60 mpsc::{Receiver, RecvError, RecvTimeoutError, Sender},
61 Arc,
62 },
63 time::Instant,
64};
65use tokio::sync::{
66 mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
67 oneshot::{self, error::TryRecvError},
68};
69use tracing::*;
70
71pub mod config;
72mod invalid_block_hook;
73mod metrics;
74mod persistence_state;
75pub use config::TreeConfig;
76pub use invalid_block_hook::{InvalidBlockHooks, NoopInvalidBlockHook};
77pub use persistence_state::PersistenceState;
78pub use reth_engine_primitives::InvalidBlockHook;
79
80pub mod root;
81
82#[derive(Debug, Default)]
89pub struct TreeState<N: NodePrimitives = EthPrimitives> {
90 blocks_by_hash: HashMap<B256, ExecutedBlock<N>>,
94 blocks_by_number: BTreeMap<BlockNumber, Vec<ExecutedBlock<N>>>,
100 parent_to_child: HashMap<B256, HashSet<B256>>,
102 persisted_trie_updates: HashMap<B256, (BlockNumber, Arc<TrieUpdates>)>,
106 current_canonical_head: BlockNumHash,
108}
109
110impl<N: NodePrimitives> TreeState<N> {
111 fn new(current_canonical_head: BlockNumHash) -> Self {
113 Self {
114 blocks_by_hash: HashMap::default(),
115 blocks_by_number: BTreeMap::new(),
116 current_canonical_head,
117 parent_to_child: HashMap::default(),
118 persisted_trie_updates: HashMap::default(),
119 }
120 }
121
122 fn block_count(&self) -> usize {
124 self.blocks_by_hash.len()
125 }
126
127 fn executed_block_by_hash(&self, hash: B256) -> Option<&ExecutedBlock<N>> {
129 self.blocks_by_hash.get(&hash)
130 }
131
132 fn block_by_hash(&self, hash: B256) -> Option<Arc<SealedBlockFor<N::Block>>> {
134 self.blocks_by_hash.get(&hash).map(|b| b.block.clone())
135 }
136
137 fn blocks_by_hash(&self, hash: B256) -> Option<(B256, Vec<ExecutedBlock<N>>)> {
142 let block = self.blocks_by_hash.get(&hash).cloned()?;
143 let mut parent_hash = block.block().parent_hash();
144 let mut blocks = vec![block];
145 while let Some(executed) = self.blocks_by_hash.get(&parent_hash) {
146 parent_hash = executed.block.parent_hash();
147 blocks.push(executed.clone());
148 }
149
150 Some((parent_hash, blocks))
151 }
152
153 fn insert_executed(&mut self, executed: ExecutedBlock<N>) {
155 let hash = executed.block.hash();
156 let parent_hash = executed.block.parent_hash();
157 let block_number = executed.block.number();
158
159 if self.blocks_by_hash.contains_key(&hash) {
160 return;
161 }
162
163 self.blocks_by_hash.insert(hash, executed.clone());
164
165 self.blocks_by_number.entry(block_number).or_default().push(executed);
166
167 self.parent_to_child.entry(parent_hash).or_default().insert(hash);
168
169 if let Some(existing_blocks) = self.blocks_by_number.get(&block_number) {
170 if existing_blocks.len() > 1 {
171 self.parent_to_child.entry(parent_hash).or_default().insert(hash);
172 }
173 }
174
175 for children in self.parent_to_child.values_mut() {
176 children.retain(|child| self.blocks_by_hash.contains_key(child));
177 }
178 }
179
180 fn remove_by_hash(&mut self, hash: B256) -> Option<(ExecutedBlock<N>, HashSet<B256>)> {
186 let executed = self.blocks_by_hash.remove(&hash)?;
187
188 let parent_entry = self.parent_to_child.entry(executed.block.parent_hash());
190 if let hash_map::Entry::Occupied(mut entry) = parent_entry {
191 entry.get_mut().remove(&hash);
192
193 if entry.get().is_empty() {
194 entry.remove();
195 }
196 }
197
198 let children = self.parent_to_child.remove(&hash).unwrap_or_default();
200
201 let block_number_entry = self.blocks_by_number.entry(executed.block.number());
203 if let btree_map::Entry::Occupied(mut entry) = block_number_entry {
204 if let Some(index) = entry.get().iter().position(|b| b.block.hash() == hash) {
206 entry.get_mut().swap_remove(index);
207
208 if entry.get().is_empty() {
210 entry.remove();
211 }
212 }
213 }
214
215 Some((executed, children))
216 }
217
218 pub(crate) fn is_canonical(&self, hash: B256) -> bool {
220 let mut current_block = self.current_canonical_head.hash;
221 if current_block == hash {
222 return true
223 }
224
225 while let Some(executed) = self.blocks_by_hash.get(¤t_block) {
226 current_block = executed.block.parent_hash();
227 if current_block == hash {
228 return true
229 }
230 }
231
232 false
233 }
234
235 pub(crate) fn remove_canonical_until(
238 &mut self,
239 upper_bound: BlockNumber,
240 last_persisted_hash: B256,
241 ) {
242 debug!(target: "engine::tree", ?upper_bound, ?last_persisted_hash, "Removing canonical blocks from the tree");
243
244 if !self.is_canonical(last_persisted_hash) {
247 return
248 }
249
250 let mut current_block = self.current_canonical_head.hash;
253 while let Some(executed) = self.blocks_by_hash.get(¤t_block) {
254 current_block = executed.block.parent_hash();
255 if executed.block.number() <= upper_bound {
256 debug!(target: "engine::tree", num_hash=?executed.block.num_hash(), "Attempting to remove block walking back from the head");
257 if let Some((removed, _)) = self.remove_by_hash(executed.block.hash()) {
258 debug!(target: "engine::tree", num_hash=?removed.block.num_hash(), "Removed block walking back from the head");
259 self.persisted_trie_updates
261 .insert(removed.block.hash(), (removed.block.number(), removed.trie));
262 }
263 }
264 }
265 debug!(target: "engine::tree", ?upper_bound, ?last_persisted_hash, "Removed canonical blocks from the tree");
266 }
267
268 pub(crate) fn prune_finalized_sidechains(&mut self, finalized_num_hash: BlockNumHash) {
271 let BlockNumHash { number: finalized_num, hash: finalized_hash } = finalized_num_hash;
272
273 let blocks_to_remove = self
282 .blocks_by_number
283 .range((Bound::Unbounded, Bound::Excluded(finalized_num)))
284 .flat_map(|(_, blocks)| blocks.iter().map(|b| b.block.hash()))
285 .collect::<Vec<_>>();
286 for hash in blocks_to_remove {
287 if let Some((removed, _)) = self.remove_by_hash(hash) {
288 debug!(target: "engine::tree", num_hash=?removed.block.num_hash(), "Removed finalized sidechain block");
289 }
290 }
291
292 self.persisted_trie_updates.retain(|_, (block_num, _)| *block_num > finalized_num);
294
295 let mut blocks_to_remove = self.blocks_by_number.remove(&finalized_num).unwrap_or_default();
302
303 if let Some(position) =
305 blocks_to_remove.iter().position(|b| b.block.hash() == finalized_hash)
306 {
307 let finalized_block = blocks_to_remove.swap_remove(position);
308 self.blocks_by_number.insert(finalized_num, vec![finalized_block]);
309 }
310
311 let mut blocks_to_remove =
312 blocks_to_remove.into_iter().map(|e| e.block.hash()).collect::<VecDeque<_>>();
313 while let Some(block) = blocks_to_remove.pop_front() {
314 if let Some((removed, children)) = self.remove_by_hash(block) {
315 debug!(target: "engine::tree", num_hash=?removed.block.num_hash(), "Removed finalized sidechain child block");
316 blocks_to_remove.extend(children);
317 }
318 }
319 }
320
321 pub(crate) fn remove_until(
332 &mut self,
333 upper_bound: BlockNumHash,
334 last_persisted_hash: B256,
335 finalized_num_hash: Option<BlockNumHash>,
336 ) {
337 debug!(target: "engine::tree", ?upper_bound, ?finalized_num_hash, "Removing blocks from the tree");
338
339 let finalized_num_hash = finalized_num_hash.map(|mut finalized| {
342 if upper_bound.number < finalized.number {
343 finalized = upper_bound;
344 debug!(target: "engine::tree", ?finalized, "Adjusted upper bound");
345 }
346 finalized
347 });
348
349 self.remove_canonical_until(upper_bound.number, last_persisted_hash);
357
358 if let Some(finalized_num_hash) = finalized_num_hash {
361 self.prune_finalized_sidechains(finalized_num_hash);
362 }
363 }
364
365 fn set_canonical_head(&mut self, new_head: BlockNumHash) {
367 self.current_canonical_head = new_head;
368 }
369
370 const fn canonical_head(&self) -> &BlockNumHash {
372 &self.current_canonical_head
373 }
374
375 const fn canonical_block_hash(&self) -> B256 {
377 self.canonical_head().hash
378 }
379
380 const fn canonical_block_number(&self) -> BlockNumber {
382 self.canonical_head().number
383 }
384}
385
386#[derive(Debug)]
390pub struct EngineApiTreeState<N: NodePrimitives> {
391 tree_state: TreeState<N>,
393 forkchoice_state_tracker: ForkchoiceStateTracker,
395 buffer: BlockBuffer<N::Block>,
397 invalid_headers: InvalidHeaderCache,
400}
401
402impl<N: NodePrimitives> EngineApiTreeState<N> {
403 fn new(
404 block_buffer_limit: u32,
405 max_invalid_header_cache_length: u32,
406 canonical_block: BlockNumHash,
407 ) -> Self {
408 Self {
409 invalid_headers: InvalidHeaderCache::new(max_invalid_header_cache_length),
410 buffer: BlockBuffer::new(block_buffer_limit),
411 tree_state: TreeState::new(canonical_block),
412 forkchoice_state_tracker: ForkchoiceStateTracker::default(),
413 }
414 }
415}
416
417#[derive(Debug)]
419pub struct TreeOutcome<T> {
420 pub outcome: T,
422 pub event: Option<TreeEvent>,
424}
425
426impl<T> TreeOutcome<T> {
427 pub const fn new(outcome: T) -> Self {
429 Self { outcome, event: None }
430 }
431
432 pub fn with_event(mut self, event: TreeEvent) -> Self {
434 self.event = Some(event);
435 self
436 }
437}
438
439#[derive(Debug)]
441pub enum TreeEvent {
442 TreeAction(TreeAction),
444 BackfillAction(BackfillAction),
446 Download(DownloadRequest),
448}
449
450impl TreeEvent {
451 const fn is_backfill_action(&self) -> bool {
453 matches!(self, Self::BackfillAction(_))
454 }
455}
456
457#[derive(Debug)]
459pub enum TreeAction {
460 MakeCanonical {
462 sync_target_head: B256,
464 },
465}
466
467pub struct EngineApiTreeHandler<N, P, E, T, V>
472where
473 N: NodePrimitives,
474 T: EngineTypes,
475{
476 provider: P,
477 executor_provider: E,
478 consensus: Arc<dyn FullConsensus<N>>,
479 payload_validator: V,
480 state: EngineApiTreeState<N>,
482 incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
491 incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
493 outgoing: UnboundedSender<EngineApiEvent<N>>,
495 persistence: PersistenceHandle<N>,
497 persistence_state: PersistenceState,
499 backfill_sync_state: BackfillSyncState,
501 canonical_in_memory_state: CanonicalInMemoryState<N>,
504 payload_builder: PayloadBuilderHandle<T>,
507 config: TreeConfig,
509 metrics: EngineApiMetrics,
511 invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
513 engine_kind: EngineApiKind,
515 backup: BackupHandle,
517}
518
519impl<N, P: Debug, E: Debug, T: EngineTypes + Debug, V: Debug> std::fmt::Debug
520 for EngineApiTreeHandler<N, P, E, T, V>
521where
522 N: NodePrimitives,
523{
524 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
525 f.debug_struct("EngineApiTreeHandler")
526 .field("provider", &self.provider)
527 .field("executor_provider", &self.executor_provider)
528 .field("consensus", &self.consensus)
529 .field("payload_validator", &self.payload_validator)
530 .field("state", &self.state)
531 .field("incoming_tx", &self.incoming_tx)
532 .field("persistence", &self.persistence)
533 .field("persistence_state", &self.persistence_state)
534 .field("backfill_sync_state", &self.backfill_sync_state)
535 .field("canonical_in_memory_state", &self.canonical_in_memory_state)
536 .field("payload_builder", &self.payload_builder)
537 .field("config", &self.config)
538 .field("metrics", &self.metrics)
539 .field("invalid_block_hook", &format!("{:p}", self.invalid_block_hook))
540 .field("engine_kind", &self.engine_kind)
541 .finish()
542 }
543}
544
545impl<N, P, E, T, V> EngineApiTreeHandler<N, P, E, T, V>
546where
547 N: NodePrimitives,
548 P: DatabaseProviderFactory
549 + BlockReader<Block = N::Block, Header = N::BlockHeader>
550 + StateProviderFactory
551 + StateReader<Receipt = N::Receipt>
552 + StateCommitmentProvider
553 + HashedPostStateProvider
554 + Clone
555 + 'static,
556 <P as DatabaseProviderFactory>::Provider:
557 BlockReader<Block = N::Block, Header = N::BlockHeader>,
558 E: BlockExecutorProvider<Primitives = N>,
559 T: EngineTypes,
560 V: EngineValidator<T, Block = N::Block>,
561{
562 #[expect(clippy::too_many_arguments)]
564 pub fn new(
565 provider: P,
566 executor_provider: E,
567 consensus: Arc<dyn FullConsensus<N>>,
568 payload_validator: V,
569 outgoing: UnboundedSender<EngineApiEvent<N>>,
570 state: EngineApiTreeState<N>,
571 canonical_in_memory_state: CanonicalInMemoryState<N>,
572 persistence: PersistenceHandle<N>,
573 persistence_state: PersistenceState,
574 payload_builder: PayloadBuilderHandle<T>,
575 config: TreeConfig,
576 engine_kind: EngineApiKind,
577 backup: BackupHandle,
578 ) -> Self {
579 let (incoming_tx, incoming) = std::sync::mpsc::channel();
580
581 Self {
582 provider,
583 executor_provider,
584 consensus,
585 payload_validator,
586 incoming,
587 outgoing,
588 persistence,
589 persistence_state,
590 backfill_sync_state: BackfillSyncState::Idle,
591 state,
592 canonical_in_memory_state,
593 payload_builder,
594 config,
595 metrics: Default::default(),
596 incoming_tx,
597 invalid_block_hook: Box::new(NoopInvalidBlockHook),
598 engine_kind,
599 backup,
600 }
601 }
602
603 fn set_invalid_block_hook(&mut self, invalid_block_hook: Box<dyn InvalidBlockHook<N>>) {
605 self.invalid_block_hook = invalid_block_hook;
606 }
607
608 #[expect(clippy::complexity)]
614 pub fn spawn_new(
615 provider: P,
616 executor_provider: E,
617 consensus: Arc<dyn FullConsensus<N>>,
618 payload_validator: V,
619 persistence: PersistenceHandle<N>,
620 payload_builder: PayloadBuilderHandle<T>,
621 canonical_in_memory_state: CanonicalInMemoryState<N>,
622 config: TreeConfig,
623 invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
624 kind: EngineApiKind,
625 backup: BackupHandle,
626 ) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
627 {
628 let best_block_number = provider.best_block_number().unwrap_or(0);
629 let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
630
631 let persistence_state = PersistenceState {
632 last_persisted_block: BlockNumHash::new(best_block_number, header.hash()),
633 rx: None,
634 remove_above_state: VecDeque::new(),
635 };
636
637 let (tx, outgoing) = unbounded_channel();
638 let state = EngineApiTreeState::new(
639 config.block_buffer_limit(),
640 config.max_invalid_header_cache_length(),
641 header.num_hash(),
642 );
643
644 let mut task = Self::new(
645 provider,
646 executor_provider,
647 consensus,
648 payload_validator,
649 tx,
650 state,
651 canonical_in_memory_state,
652 persistence,
653 persistence_state,
654 payload_builder,
655 config,
656 kind,
657 backup,
658 );
659 task.set_invalid_block_hook(invalid_block_hook);
660 let incoming = task.incoming_tx.clone();
661 std::thread::Builder::new().name("Tree Task".to_string()).spawn(|| task.run()).unwrap();
662 (incoming, outgoing)
663 }
664
665 pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
667 self.incoming_tx.clone()
668 }
669
670 pub fn run(mut self) {
674 loop {
675 match self.try_recv_engine_message() {
676 Ok(Some(msg)) => {
677 debug!(target: "engine::tree", %msg, "received new engine message");
678 if let Err(fatal) = self.on_engine_message(msg) {
679 error!(target: "engine::tree", %fatal, "insert block fatal error");
680 return
681 }
682 }
683 Ok(None) => {
684 debug!(target: "engine::tree", "received no engine message for some time, while waiting for persistence task to complete");
685 }
686 Err(_err) => {
687 error!(target: "engine::tree", "Engine channel disconnected");
688 return
689 }
690 }
691
692 if let Err(err) = self.advance_persistence() {
693 error!(target: "engine::tree", %err, "Advancing persistence failed");
694 return
695 }
696 if let Err(err) = self.advance_backup() {
697 error!(target: "engine::tree", %err, "Advancing backup failed");
698 return
699 }
700 }
701 }
702
703 fn on_downloaded(
709 &mut self,
710 mut blocks: Vec<SealedBlockWithSenders<N::Block>>,
711 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
712 if blocks.is_empty() {
713 return Ok(None)
715 }
716
717 trace!(target: "engine::tree", block_count = %blocks.len(), "received downloaded blocks");
718 let batch = self.config.max_execute_block_batch_size().min(blocks.len());
719 for block in blocks.drain(..batch) {
720 if let Some(event) = self.on_downloaded_block(block)? {
721 let needs_backfill = event.is_backfill_action();
722 self.on_tree_event(event)?;
723 if needs_backfill {
724 return Ok(None)
726 }
727 }
728 }
729
730 if !blocks.is_empty() {
732 let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
733 }
734
735 Ok(None)
736 }
737
738 #[instrument(level = "trace", skip_all, fields(block_hash = %payload.block_hash(), block_num = %payload.block_number(),), target = "engine::tree")]
751 fn on_new_payload(
752 &mut self,
753 payload: ExecutionPayload,
754 sidecar: ExecutionPayloadSidecar,
755 ) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
756 trace!(target: "engine::tree", "invoked new payload");
757 self.metrics.engine.new_payload_messages.increment(1);
758
759 let parent_hash = payload.parent_hash();
785 let block = match self.payload_validator.ensure_well_formed_payload(payload, sidecar) {
786 Ok(block) => block,
787 Err(error) => {
788 error!(target: "engine::tree", %error, "Invalid payload");
789 let latest_valid_hash =
792 if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
793 None
797 } else {
798 self.latest_valid_hash_for_invalid_payload(parent_hash)?
799 };
800
801 let status = PayloadStatusEnum::from(error);
802 return Ok(TreeOutcome::new(PayloadStatus::new(status, latest_valid_hash)))
803 }
804 };
805
806 let block_hash = block.hash();
807 let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
808 if lowest_buffered_ancestor == block_hash {
809 lowest_buffered_ancestor = block.parent_hash();
810 }
811
812 if let Some(status) =
814 self.check_invalid_ancestor_with_head(lowest_buffered_ancestor, block_hash)?
815 {
816 return Ok(TreeOutcome::new(status))
817 }
818
819 let status = if self.backfill_sync_state.is_idle() {
820 let mut latest_valid_hash = None;
821 let num_hash = block.num_hash();
822 match self.insert_block_without_senders(block) {
823 Ok(status) => {
824 let status = match status {
825 InsertPayloadOk2::Inserted(BlockStatus2::Valid) => {
826 latest_valid_hash = Some(block_hash);
827 self.try_connect_buffered_blocks(num_hash)?;
828 PayloadStatusEnum::Valid
829 }
830 InsertPayloadOk2::AlreadySeen(BlockStatus2::Valid) => {
831 latest_valid_hash = Some(block_hash);
832 PayloadStatusEnum::Valid
833 }
834 InsertPayloadOk2::Inserted(BlockStatus2::Disconnected { .. }) |
835 InsertPayloadOk2::AlreadySeen(BlockStatus2::Disconnected { .. }) => {
836 PayloadStatusEnum::Syncing
838 }
839 };
840
841 PayloadStatus::new(status, latest_valid_hash)
842 }
843 Err(error) => self.on_insert_block_error(error)?,
844 }
845 } else if let Err(error) = self.buffer_block_without_senders(block) {
846 self.on_insert_block_error(error)?
847 } else {
848 PayloadStatus::from_status(PayloadStatusEnum::Syncing)
849 };
850
851 let mut outcome = TreeOutcome::new(status);
852 if outcome.outcome.is_valid() && self.is_sync_target_head(block_hash) {
853 outcome = outcome.with_event(TreeEvent::TreeAction(TreeAction::MakeCanonical {
855 sync_target_head: block_hash,
856 }));
857 }
858
859 Ok(outcome)
860 }
861
862 fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain<N>>> {
869 let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
871 return Ok(None)
872 };
873
874 let new_head_number = new_head_block.block.number();
875 let mut current_canonical_number = self.state.tree_state.current_canonical_head.number;
876
877 let mut new_chain = vec![new_head_block.clone()];
878 let mut current_hash = new_head_block.block.parent_hash();
879 let mut current_number = new_head_number - 1;
880
881 while current_number > current_canonical_number {
886 if let Some(block) = self.executed_block_by_hash(current_hash)? {
887 current_hash = block.block.parent_hash();
888 current_number -= 1;
889 new_chain.push(block);
890 } else {
891 warn!(target: "engine::tree", current_hash=?current_hash, "Sidechain block not found in TreeState");
892 return Ok(None);
895 }
896 }
897
898 if current_hash == self.state.tree_state.current_canonical_head.hash {
901 new_chain.reverse();
902
903 return Ok(Some(NewCanonicalChain::Commit { new: new_chain }));
905 }
906
907 let mut old_chain = Vec::new();
909 let mut old_hash = self.state.tree_state.current_canonical_head.hash;
910
911 while current_canonical_number > current_number {
914 if let Some(block) = self.executed_block_by_hash(old_hash)? {
915 old_chain.push(block.clone());
916 old_hash = block.block.header.parent_hash();
917 current_canonical_number -= 1;
918 } else {
919 warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
921 return Ok(None);
922 }
923 }
924
925 debug_assert_eq!(current_number, current_canonical_number);
927
928 while old_hash != current_hash {
931 if let Some(block) = self.executed_block_by_hash(old_hash)? {
932 old_hash = block.block.header.parent_hash();
933 old_chain.push(block);
934 } else {
935 warn!(target: "engine::tree", current_hash=?old_hash, "Canonical block not found in TreeState");
937 return Ok(None);
938 }
939
940 if let Some(block) = self.executed_block_by_hash(current_hash)? {
941 current_hash = block.block.parent_hash();
942 new_chain.push(block);
943 } else {
944 warn!(target: "engine::tree", invalid_hash=?current_hash, "New chain block not found in TreeState");
946 return Ok(None);
947 }
948 }
949 new_chain.reverse();
950 old_chain.reverse();
951
952 Ok(Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }))
953 }
954
955 fn is_fork(&self, target_hash: B256) -> ProviderResult<bool> {
962 let canonical_head = self.state.tree_state.canonical_head();
964 let mut current_hash = target_hash;
965 while let Some(current_block) = self.sealed_header_by_hash(current_hash)? {
966 if current_block.hash() == canonical_head.hash {
967 return Ok(false)
968 }
969 if current_block.number() <= canonical_head.number {
971 break
972 }
973 current_hash = current_block.parent_hash();
974 }
975
976 if self.canonical_in_memory_state.header_by_hash(target_hash).is_some() {
978 return Ok(false)
979 }
980
981 if self.provider.block_number(target_hash)?.is_some() {
983 return Ok(false)
984 }
985
986 Ok(true)
987 }
988
989 #[instrument(level = "trace", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash), target = "engine::tree")]
998 fn on_forkchoice_updated(
999 &mut self,
1000 state: ForkchoiceState,
1001 attrs: Option<T::PayloadAttributes>,
1002 version: EngineApiMessageVersion,
1003 ) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
1004 trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
1005 self.metrics.engine.forkchoice_updated_messages.increment(1);
1006 self.canonical_in_memory_state.on_forkchoice_update_received();
1007
1008 if let Some(on_updated) = self.pre_validate_forkchoice_update(state)? {
1009 return Ok(TreeOutcome::new(on_updated))
1010 }
1011
1012 let valid_outcome = |head| {
1013 TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::new(
1014 PayloadStatusEnum::Valid,
1015 Some(head),
1016 )))
1017 };
1018
1019 if self.state.tree_state.canonical_block_hash() == state.head_block_hash {
1035 trace!(target: "engine::tree", "fcu head hash is already canonical");
1036
1037 if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1039 return Ok(TreeOutcome::new(outcome))
1041 }
1042
1043 if let Some(attr) = attrs {
1045 let tip = self
1046 .block_by_hash(self.state.tree_state.canonical_block_hash())?
1047 .ok_or_else(|| {
1048 ProviderError::HeaderNotFound(state.head_block_hash.into())
1051 })?;
1052 let updated = self.process_payload_attributes(attr, tip.header(), state, version);
1053 return Ok(TreeOutcome::new(updated))
1054 }
1055
1056 return Ok(valid_outcome(state.head_block_hash))
1058 }
1059
1060 if let Some(chain_update) = self.on_new_head(state.head_block_hash)? {
1062 let tip = chain_update.tip().header.clone();
1063 self.on_canonical_chain_update(chain_update);
1064
1065 if let Err(outcome) = self.ensure_consistent_forkchoice_state(state) {
1067 return Ok(TreeOutcome::new(outcome))
1069 }
1070
1071 if let Some(attr) = attrs {
1072 let updated = self.process_payload_attributes(attr, &tip, state, version);
1073 return Ok(TreeOutcome::new(updated))
1074 }
1075
1076 return Ok(valid_outcome(state.head_block_hash))
1077 }
1078
1079 if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
1081 debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical");
1082
1083 if self.engine_kind.is_opstack() {
1086 if let Some(attr) = attrs {
1087 debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head");
1088 let updated =
1089 self.process_payload_attributes(attr, &canonical_header, state, version);
1090 return Ok(TreeOutcome::new(updated))
1091 }
1092 }
1093
1094 return Ok(valid_outcome(state.head_block_hash))
1104 }
1105
1106 let target = if self.state.forkchoice_state_tracker.is_empty() &&
1113 !state.safe_block_hash.is_zero() &&
1115 self.find_canonical_header(state.safe_block_hash).ok().flatten().is_none()
1116 {
1117 debug!(target: "engine::tree", "missing safe block on initial FCU, downloading safe block");
1118 state.safe_block_hash
1119 } else {
1120 state.head_block_hash
1121 };
1122
1123 let target = self.lowest_buffered_ancestor_or(target);
1124 trace!(target: "engine::tree", %target, "downloading missing block");
1125
1126 Ok(TreeOutcome::new(OnForkChoiceUpdated::valid(PayloadStatus::from_status(
1127 PayloadStatusEnum::Syncing,
1128 )))
1129 .with_event(TreeEvent::Download(DownloadRequest::single_block(target))))
1130 }
1131
1132 #[expect(clippy::type_complexity)]
1141 fn try_recv_engine_message(
1142 &self,
1143 ) -> Result<Option<FromEngine<EngineApiRequest<T, N>, N::Block>>, RecvError> {
1144 if self.persistence_state.in_progress() || self.backup.in_progress() {
1145 match self.incoming.recv_timeout(std::time::Duration::from_millis(500)) {
1147 Ok(msg) => Ok(Some(msg)),
1148 Err(err) => match err {
1149 RecvTimeoutError::Timeout => Ok(None),
1150 RecvTimeoutError::Disconnected => Err(RecvError),
1151 },
1152 }
1153 } else {
1154 self.incoming.recv().map(Some)
1155 }
1156 }
1157
1158 fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
1163 if !self.persistence_state.in_progress() {
1164 if let Some(new_tip_num) = self.persistence_state.remove_above_state.pop_front() {
1165 debug!(target: "engine::tree", ?new_tip_num, remove_state=?self.persistence_state.remove_above_state, last_persisted_block_number=?self.persistence_state.last_persisted_block.number, "Removing blocks using persistence task");
1166 if new_tip_num < self.persistence_state.last_persisted_block.number {
1167 debug!(target: "engine::tree", ?new_tip_num, "Starting remove blocks job");
1168 let (tx, rx) = oneshot::channel();
1169 let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
1170 self.persistence_state.start(rx);
1171 }
1172 } else if self.should_persist() {
1173 let blocks_to_persist = self.get_canonical_blocks_to_persist();
1174 if blocks_to_persist.is_empty() {
1175 debug!(target: "engine::tree", "Returned empty set of blocks to persist");
1176 } else {
1177 debug!(target: "engine::tree", blocks = ?blocks_to_persist.iter().map(|block| block.block.num_hash()).collect::<Vec<_>>(), "Persisting blocks");
1178 let (tx, rx) = oneshot::channel();
1179 let _ = self.persistence.save_blocks(blocks_to_persist, tx);
1180 self.persistence_state.start(rx);
1181 }
1182 }
1183 }
1184
1185 if self.persistence_state.in_progress() {
1186 let (mut rx, start_time) = self
1187 .persistence_state
1188 .rx
1189 .take()
1190 .expect("if a persistence task is in progress Receiver must be Some");
1191 match rx.try_recv() {
1193 Ok(last_persisted_hash_num) => {
1194 self.metrics.engine.persistence_duration.record(start_time.elapsed());
1195 let Some(BlockNumHash {
1196 hash: last_persisted_block_hash,
1197 number: last_persisted_block_number,
1198 }) = last_persisted_hash_num
1199 else {
1200 warn!(target: "engine::tree", "Persistence task completed but did not persist any blocks");
1203 return Ok(())
1204 };
1205
1206 debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, "Finished persisting, calling finish");
1207 self.persistence_state
1208 .finish(last_persisted_block_hash, last_persisted_block_number);
1209 self.on_new_persisted_block()?;
1210 }
1211 Err(TryRecvError::Closed) => return Err(TryRecvError::Closed.into()),
1212 Err(TryRecvError::Empty) => self.persistence_state.rx = Some((rx, start_time)),
1213 }
1214 }
1215 Ok(())
1216 }
1217
1218 fn advance_backup(&mut self) -> Result<(), AdvancePersistenceError> {
1219 debug!(target: "engine::tree", "advance_backup called");
1220 if !self.backup.in_progress() {
1221 if self.should_backup() {
1222 debug!(target: "engine::tree", "sending backup action");
1223 let (tx, rx) = oneshot::channel();
1224 let _ = self.backup.sender.send(BackupAction::BackupAtBlock(
1225 self.persistence_state.last_persisted_block,
1226 tx,
1227 ));
1228 self.backup.start(rx);
1229 }
1230 }
1231
1232 if self.backup.in_progress() {
1233 let (mut rx, start_time) = self
1234 .backup
1235 .rx
1236 .take()
1237 .expect("if a backup task is in progress Receiver must be Some");
1238 match rx.try_recv() {
1240 Ok(last_backup_hash_num) => {
1241 let Some(BlockNumHash {
1242 hash: last_backup_block_hash,
1243 number: last_backup_block_number,
1244 }) = last_backup_hash_num
1245 else {
1246 warn!(target: "engine::tree", "Backup task completed but did not backup any blocks");
1247 return Ok(())
1248 };
1249
1250 debug!(target: "engine::tree", ?last_backup_hash_num, "Finished backup, calling finish");
1251 self.backup.finish(BlockNumHash::new(
1252 last_backup_block_number,
1253 last_backup_block_hash,
1254 ));
1255 }
1256 Err(TryRecvError::Closed) => return Err(TryRecvError::Closed.into()),
1257 Err(TryRecvError::Empty) => self.backup.rx = Some((rx, start_time)),
1258 }
1259 }
1260 Ok(())
1261 }
1262
1263 fn on_engine_message(
1265 &mut self,
1266 msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
1267 ) -> Result<(), InsertBlockFatalError> {
1268 match msg {
1269 FromEngine::Event(event) => match event {
1270 FromOrchestrator::BackfillSyncStarted => {
1271 debug!(target: "engine::tree", "received backfill sync started event");
1272 self.backfill_sync_state = BackfillSyncState::Active;
1273 }
1274 FromOrchestrator::BackfillSyncFinished(ctrl) => {
1275 self.on_backfill_sync_finished(ctrl)?;
1276 }
1277 },
1278 FromEngine::Request(request) => {
1279 match request {
1280 EngineApiRequest::InsertExecutedBlock(block) => {
1281 debug!(target: "engine::tree", block=?block.block().num_hash(), "inserting already executed block");
1282 let now = Instant::now();
1283 let sealed_block = block.block.clone();
1284 self.state.tree_state.insert_executed(block);
1285 self.metrics.engine.inserted_already_executed_blocks.increment(1);
1286
1287 self.emit_event(EngineApiEvent::BeaconConsensus(
1288 BeaconConsensusEngineEvent::CanonicalBlockAdded(
1289 sealed_block,
1290 now.elapsed(),
1291 ),
1292 ));
1293 }
1294 EngineApiRequest::Beacon(request) => {
1295 match request {
1296 BeaconEngineMessage::ForkchoiceUpdated {
1297 state,
1298 payload_attrs,
1299 tx,
1300 version,
1301 } => {
1302 let mut output =
1303 self.on_forkchoice_updated(state, payload_attrs, version);
1304
1305 if let Ok(res) = &mut output {
1306 self.state
1308 .forkchoice_state_tracker
1309 .set_latest(state, res.outcome.forkchoice_status());
1310
1311 self.emit_event(BeaconConsensusEngineEvent::ForkchoiceUpdated(
1313 state,
1314 res.outcome.forkchoice_status(),
1315 ));
1316
1317 self.on_maybe_tree_event(res.event.take())?;
1319 }
1320
1321 if let Err(err) =
1322 tx.send(output.map(|o| o.outcome).map_err(Into::into))
1323 {
1324 self.metrics
1325 .engine
1326 .failed_forkchoice_updated_response_deliveries
1327 .increment(1);
1328 error!(target: "engine::tree", "Failed to send event: {err:?}");
1329 }
1330 }
1331 BeaconEngineMessage::NewPayload { payload, sidecar, tx } => {
1332 let output = self.on_new_payload(payload, sidecar);
1333 if let Err(err) =
1334 tx.send(output.map(|o| o.outcome).map_err(|e| {
1335 BeaconOnNewPayloadError::Internal(Box::new(e))
1336 }))
1337 {
1338 error!(target: "engine::tree", "Failed to send event: {err:?}");
1339 self.metrics
1340 .engine
1341 .failed_new_payload_response_deliveries
1342 .increment(1);
1343 }
1344 }
1345 BeaconEngineMessage::TransitionConfigurationExchanged => {
1346 self.canonical_in_memory_state
1349 .on_transition_configuration_exchanged();
1350 }
1351 }
1352 }
1353 }
1354 }
1355 FromEngine::DownloadedBlocks(blocks) => {
1356 if let Some(event) = self.on_downloaded(blocks)? {
1357 self.on_tree_event(event)?;
1358 }
1359 }
1360 }
1361 Ok(())
1362 }
1363
1364 fn on_backfill_sync_finished(
1375 &mut self,
1376 ctrl: ControlFlow,
1377 ) -> Result<(), InsertBlockFatalError> {
1378 debug!(target: "engine::tree", "received backfill sync finished event");
1379 self.backfill_sync_state = BackfillSyncState::Idle;
1380
1381 if let ControlFlow::Unwind { bad_block, .. } = ctrl {
1383 warn!(target: "engine::tree", invalid_block=?bad_block, "Bad block detected in unwind");
1384 self.state.invalid_headers.insert(*bad_block);
1386 return Ok(())
1387 }
1388
1389 let Some(backfill_height) = ctrl.block_number() else { return Ok(()) };
1391
1392 let backfill_num_hash = self
1398 .provider
1399 .block_hash(backfill_height)?
1400 .map(|hash| BlockNumHash { hash, number: backfill_height });
1401
1402 self.state.tree_state.remove_until(
1403 backfill_num_hash
1404 .expect("after backfill the block target hash should be present in the db"),
1405 self.persistence_state.last_persisted_block.hash,
1406 backfill_num_hash,
1407 );
1408 self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
1409 self.metrics.tree.canonical_chain_height.set(backfill_height as f64);
1410
1411 self.state.buffer.remove_old_blocks(backfill_height);
1413 self.canonical_in_memory_state.clear_state();
1416
1417 if let Ok(Some(new_head)) = self.provider.sealed_header(backfill_height) {
1418 self.state.tree_state.set_canonical_head(new_head.num_hash());
1421 self.persistence_state.finish(new_head.hash(), new_head.number());
1422
1423 self.canonical_in_memory_state.set_canonical_head(new_head);
1425 }
1426
1427 let Some(sync_target_state) = self.state.forkchoice_state_tracker.sync_target_state()
1430 else {
1431 return Ok(())
1432 };
1433 if sync_target_state.finalized_block_hash.is_zero() {
1434 return Ok(())
1436 }
1437 let newest_finalized = self
1439 .state
1440 .buffer
1441 .block(&sync_target_state.finalized_block_hash)
1442 .map(|block| block.number());
1443
1444 if let Some(backfill_target) =
1450 ctrl.block_number().zip(newest_finalized).and_then(|(progress, finalized_number)| {
1451 self.backfill_sync_target(progress, finalized_number, None)
1454 })
1455 {
1456 self.emit_event(EngineApiEvent::BackfillAction(BackfillAction::Start(
1458 backfill_target.into(),
1459 )));
1460 return Ok(())
1461 };
1462
1463 self.try_connect_buffered_blocks(self.state.tree_state.current_canonical_head)
1465 }
1466
1467 fn make_canonical(&mut self, target: B256) -> ProviderResult<()> {
1471 if let Some(chain_update) = self.on_new_head(target)? {
1472 self.on_canonical_chain_update(chain_update);
1473 }
1474
1475 Ok(())
1476 }
1477
1478 fn on_maybe_tree_event(&mut self, event: Option<TreeEvent>) -> ProviderResult<()> {
1480 if let Some(event) = event {
1481 self.on_tree_event(event)?;
1482 }
1483
1484 Ok(())
1485 }
1486
1487 fn on_tree_event(&mut self, event: TreeEvent) -> ProviderResult<()> {
1489 match event {
1490 TreeEvent::TreeAction(action) => match action {
1491 TreeAction::MakeCanonical { sync_target_head } => {
1492 self.make_canonical(sync_target_head)?;
1493 }
1494 },
1495 TreeEvent::BackfillAction(action) => {
1496 self.emit_event(EngineApiEvent::BackfillAction(action));
1497 }
1498 TreeEvent::Download(action) => {
1499 self.emit_event(EngineApiEvent::Download(action));
1500 }
1501 }
1502
1503 Ok(())
1504 }
1505
1506 fn emit_event(&mut self, event: impl Into<EngineApiEvent<N>>) {
1508 let event = event.into();
1509
1510 if event.is_backfill_action() {
1511 debug_assert_eq!(
1512 self.backfill_sync_state,
1513 BackfillSyncState::Idle,
1514 "backfill action should only be emitted when backfill is idle"
1515 );
1516
1517 if self.persistence_state.in_progress() {
1518 debug!(target: "engine::tree", "skipping backfill file while persistence task is active");
1521 return
1522 }
1523
1524 self.backfill_sync_state = BackfillSyncState::Pending;
1525 self.metrics.engine.pipeline_runs.increment(1);
1526 debug!(target: "engine::tree", "emitting backfill action event");
1527 }
1528
1529 let _ = self.outgoing.send(event).inspect_err(
1530 |err| error!(target: "engine::tree", "Failed to send internal event: {err:?}"),
1531 );
1532 }
1533
1534 const fn should_persist(&self) -> bool {
1538 if !self.backfill_sync_state.is_idle() {
1539 return false
1541 }
1542
1543 let min_block = self.persistence_state.last_persisted_block.number;
1544 self.state.tree_state.canonical_block_number().saturating_sub(min_block) >
1545 self.config.persistence_threshold()
1546 }
1547
1548 fn should_backup(&self) -> bool {
1552 debug!(target: "engine::tree", "checking if we should backup");
1553 return false;
1554 }
1555
1556 fn get_canonical_blocks_to_persist(&self) -> Vec<ExecutedBlock<N>> {
1560 let mut blocks_to_persist = Vec::new();
1561 let mut current_hash = self.state.tree_state.canonical_block_hash();
1562 let last_persisted_number = self.persistence_state.last_persisted_block.number;
1563
1564 let canonical_head_number = self.state.tree_state.canonical_block_number();
1565
1566 let target_number =
1567 canonical_head_number.saturating_sub(self.config.memory_block_buffer_target());
1568
1569 debug!(target: "engine::tree", ?last_persisted_number, ?canonical_head_number, ?target_number, ?current_hash, "Returning canonical blocks to persist");
1570 while let Some(block) = self.state.tree_state.blocks_by_hash.get(¤t_hash) {
1571 if block.block.number() <= last_persisted_number {
1572 break;
1573 }
1574
1575 if block.block.number() <= target_number {
1576 blocks_to_persist.push(block.clone());
1577 }
1578
1579 current_hash = block.block.parent_hash();
1580 }
1581
1582 blocks_to_persist.reverse();
1584
1585 blocks_to_persist
1586 }
1587
1588 fn on_new_persisted_block(&mut self) -> ProviderResult<()> {
1596 let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
1597 self.remove_before(self.persistence_state.last_persisted_block, finalized)?;
1598 self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash {
1599 number: self.persistence_state.last_persisted_block.number,
1600 hash: self.persistence_state.last_persisted_block.hash,
1601 });
1602 Ok(())
1603 }
1604
1605 fn executed_block_by_hash(&self, hash: B256) -> ProviderResult<Option<ExecutedBlock<N>>> {
1613 trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
1614 let block = self.state.tree_state.executed_block_by_hash(hash).cloned();
1616
1617 if block.is_some() {
1618 return Ok(block)
1619 }
1620
1621 let Some((_, updates)) = self.state.tree_state.persisted_trie_updates.get(&hash) else {
1622 return Ok(None)
1623 };
1624
1625 let SealedBlockWithSenders { block, senders } = self
1626 .provider
1627 .sealed_block_with_senders(hash.into(), TransactionVariant::WithHash)?
1628 .ok_or_else(|| ProviderError::HeaderNotFound(hash.into()))?;
1629 let execution_output = self
1630 .provider
1631 .get_state(block.number())?
1632 .ok_or_else(|| ProviderError::StateForNumberNotFound(block.number()))?;
1633 let hashed_state = self.provider.hashed_post_state(execution_output.state());
1634
1635 Ok(Some(ExecutedBlock {
1636 block: Arc::new(block),
1637 senders: Arc::new(senders),
1638 trie: updates.clone(),
1639 execution_output: Arc::new(execution_output),
1640 hashed_state: Arc::new(hashed_state),
1641 }))
1642 }
1643
1644 fn sealed_header_by_hash(
1646 &self,
1647 hash: B256,
1648 ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
1649 let block =
1651 self.state.tree_state.block_by_hash(hash).map(|block| block.as_ref().clone().header);
1652
1653 if block.is_some() {
1654 Ok(block)
1655 } else {
1656 self.provider.sealed_header_by_hash(hash)
1657 }
1658 }
1659
1660 fn block_by_hash(&self, hash: B256) -> ProviderResult<Option<N::Block>> {
1662 let mut block = self.provider.block_by_hash(hash)?;
1664 if block.is_none() {
1665 block = self
1668 .state
1669 .tree_state
1670 .block_by_hash(hash)
1671 .map(|block| block.as_ref().clone().unseal());
1673 }
1674 Ok(block)
1675 }
1676
1677 fn state_provider(&self, hash: B256) -> ProviderResult<Option<StateProviderBox>> {
1688 if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(hash) {
1689 debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory");
1690 let historical = self.provider.state_by_block_hash(historical)?;
1692 return Ok(Some(Box::new(MemoryOverlayStateProvider::new(historical, blocks))))
1693 }
1694
1695 if let Some(header) = self.provider.header(&hash)? {
1697 debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database");
1698 let historical = self.provider.state_by_block_hash(hash)?;
1700 return Ok(Some(historical))
1701 }
1702
1703 debug!(target: "engine::tree", %hash, "no canonical state found for block");
1704
1705 Ok(None)
1706 }
1707
1708 fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
1715 self.state
1716 .buffer
1717 .lowest_ancestor(&hash)
1718 .map(|block| block.parent_hash())
1719 .unwrap_or_else(|| hash)
1720 }
1721
1722 fn latest_valid_hash_for_invalid_payload(
1733 &mut self,
1734 parent_hash: B256,
1735 ) -> ProviderResult<Option<B256>> {
1736 if self.block_by_hash(parent_hash)?.is_some() {
1738 return Ok(Some(parent_hash))
1739 }
1740
1741 let mut current_hash = parent_hash;
1744 let mut current_block = self.state.invalid_headers.get(¤t_hash);
1745 while let Some(block_with_parent) = current_block {
1746 current_hash = block_with_parent.parent;
1747 current_block = self.state.invalid_headers.get(¤t_hash);
1748
1749 if current_block.is_none() && self.block_by_hash(current_hash)?.is_some() {
1752 return Ok(Some(current_hash))
1753 }
1754 }
1755 Ok(None)
1756 }
1757
1758 fn prepare_invalid_response(&mut self, mut parent_hash: B256) -> ProviderResult<PayloadStatus> {
1762 if let Some(parent) = self.block_by_hash(parent_hash)? {
1765 if !parent.header().difficulty().is_zero() {
1766 parent_hash = B256::ZERO;
1767 }
1768 }
1769
1770 let valid_parent_hash = self.latest_valid_hash_for_invalid_payload(parent_hash)?;
1771 Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
1772 validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
1773 })
1774 .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
1775 }
1776
1777 fn is_sync_target_head(&self, block_hash: B256) -> bool {
1781 if let Some(target) = self.state.forkchoice_state_tracker.sync_target_state() {
1782 return target.head_block_hash == block_hash
1783 }
1784 false
1785 }
1786
1787 fn check_invalid_ancestor_with_head(
1793 &mut self,
1794 check: B256,
1795 head: B256,
1796 ) -> ProviderResult<Option<PayloadStatus>> {
1797 let Some(header) = self.state.invalid_headers.get(&check) else { return Ok(None) };
1799
1800 let status = self.prepare_invalid_response(header.parent)?;
1802
1803 self.state.invalid_headers.insert_with_invalid_ancestor(head, header);
1805
1806 Ok(Some(status))
1807 }
1808
1809 fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
1812 let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) };
1814 Ok(Some(self.prepare_invalid_response(header.parent)?))
1816 }
1817
1818 fn validate_block(
1821 &self,
1822 block: &SealedBlockWithSenders<N::Block>,
1823 ) -> Result<(), ConsensusError> {
1824 if let Err(e) = self.consensus.validate_header_with_total_difficulty(block, U256::MAX) {
1825 error!(
1826 target: "engine::tree",
1827 ?block,
1828 "Failed to validate total difficulty for block {}: {e}",
1829 block.header.hash()
1830 );
1831 return Err(e)
1832 }
1833
1834 if let Err(e) = self.consensus.validate_header(block) {
1835 error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.header.hash());
1836 return Err(e)
1837 }
1838
1839 if let Err(e) = self.consensus.validate_block_pre_execution(block) {
1840 error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.header.hash());
1841 return Err(e)
1842 }
1843
1844 Ok(())
1845 }
1846
1847 #[instrument(level = "trace", skip(self), target = "engine::tree")]
1849 fn try_connect_buffered_blocks(
1850 &mut self,
1851 parent: BlockNumHash,
1852 ) -> Result<(), InsertBlockFatalError> {
1853 let blocks = self.state.buffer.remove_block_with_children(&parent.hash);
1854
1855 if blocks.is_empty() {
1856 return Ok(())
1858 }
1859
1860 let now = Instant::now();
1861 let block_count = blocks.len();
1862 for child in blocks {
1863 let child_num_hash = child.num_hash();
1864 match self.insert_block(child) {
1865 Ok(res) => {
1866 debug!(target: "engine::tree", child =?child_num_hash, ?res, "connected buffered block");
1867 if self.is_sync_target_head(child_num_hash.hash) &&
1868 matches!(res, InsertPayloadOk2::Inserted(BlockStatus2::Valid))
1869 {
1870 self.make_canonical(child_num_hash.hash)?;
1871 }
1872 }
1873 Err(err) => {
1874 debug!(target: "engine::tree", ?err, "failed to connect buffered block to tree");
1875 if let Err(fatal) = self.on_insert_block_error(err) {
1876 warn!(target: "engine::tree", %fatal, "fatal error occurred while connecting buffered blocks");
1877 return Err(fatal)
1878 }
1879 }
1880 }
1881 }
1882
1883 debug!(target: "engine::tree", elapsed = ?now.elapsed(), %block_count, "connected buffered blocks");
1884 Ok(())
1885 }
1886
1887 fn buffer_block_without_senders(
1891 &mut self,
1892 block: SealedBlockFor<N::Block>,
1893 ) -> Result<(), InsertBlockErrorTwo<N::Block>> {
1894 match block.try_seal_with_senders() {
1895 Ok(block) => self.buffer_block(block),
1896 Err(block) => Err(InsertBlockErrorTwo::sender_recovery_error(block)),
1897 }
1898 }
1899
1900 fn buffer_block(
1902 &mut self,
1903 block: SealedBlockWithSenders<N::Block>,
1904 ) -> Result<(), InsertBlockErrorTwo<N::Block>> {
1905 if let Err(err) = self.validate_block(&block) {
1906 return Err(InsertBlockErrorTwo::consensus_error(err, block.block))
1907 }
1908 self.state.buffer.insert_block(block);
1909 Ok(())
1910 }
1911
1912 #[inline]
1917 const fn exceeds_backfill_run_threshold(&self, local_tip: u64, block: u64) -> bool {
1918 block > local_tip && block - local_tip > MIN_BLOCKS_FOR_PIPELINE_RUN
1919 }
1920
1921 #[inline]
1924 const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
1925 if block > local_tip {
1926 Some(block - local_tip)
1927 } else {
1928 None
1929 }
1930 }
1931
1932 fn backfill_sync_target(
1939 &self,
1940 canonical_tip_num: u64,
1941 target_block_number: u64,
1942 downloaded_block: Option<BlockNumHash>,
1943 ) -> Option<B256> {
1944 let sync_target_state = self.state.forkchoice_state_tracker.sync_target_state();
1945
1946 let mut exceeds_backfill_threshold =
1948 self.exceeds_backfill_run_threshold(canonical_tip_num, target_block_number);
1949
1950 if let Some(buffered_finalized) = sync_target_state
1952 .as_ref()
1953 .and_then(|state| self.state.buffer.block(&state.finalized_block_hash))
1954 {
1955 exceeds_backfill_threshold =
1958 self.exceeds_backfill_run_threshold(canonical_tip_num, buffered_finalized.number());
1959 }
1960
1961 if let (Some(downloaded_block), Some(ref state)) = (downloaded_block, sync_target_state) {
1964 if downloaded_block.hash == state.finalized_block_hash {
1965 exceeds_backfill_threshold =
1967 self.exceeds_backfill_run_threshold(canonical_tip_num, downloaded_block.number);
1968 }
1969 }
1970
1971 if exceeds_backfill_threshold {
1973 if let Some(state) = sync_target_state {
1974 match self.provider.header_by_hash_or_number(state.finalized_block_hash.into()) {
1976 Err(err) => {
1977 warn!(target: "engine::tree", %err, "Failed to get finalized block header");
1978 }
1979 Ok(None) => {
1980 if !state.finalized_block_hash.is_zero() {
1982 return Some(state.finalized_block_hash)
1985 }
1986
1987 debug!(target: "engine::tree", hash=?state.head_block_hash, "Setting head hash as an optimistic backfill target.");
2000 return Some(state.head_block_hash)
2001 }
2002 Ok(Some(_)) => {
2003 }
2005 }
2006 }
2007 }
2008
2009 None
2010 }
2011
2012 fn find_disk_reorg(&self, chain_update: &NewCanonicalChain<N>) -> Option<u64> {
2024 let NewCanonicalChain::Reorg { new, old: _ } = chain_update else { return None };
2025
2026 let BlockNumHash { number: new_num, hash: new_hash } =
2027 new.first().map(|block| block.block.num_hash())?;
2028
2029 match new_num.cmp(&self.persistence_state.last_persisted_block.number) {
2030 Ordering::Greater => {
2031 None
2034 }
2035 Ordering::Equal => {
2036 (self.persistence_state.last_persisted_block.hash != new_hash).then_some(new_num)
2039 }
2040 Ordering::Less => {
2041 Some(new_num)
2043 }
2044 }
2045 }
2046
2047 fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain<N>) {
2051 trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks = %chain_update.reorged_block_count(), "applying new chain update");
2052 let start = Instant::now();
2053
2054 if let Some(height) = self.find_disk_reorg(&chain_update) {
2056 let new_tip_num = height.saturating_sub(1);
2058 self.persistence_state.schedule_removal(new_tip_num);
2059 }
2060
2061 self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
2063
2064 let tip = chain_update.tip().header.clone();
2065 let notification = chain_update.to_chain_notification();
2066
2067 if let NewCanonicalChain::Reorg { new, old } = &chain_update {
2069 let new_first = new.first().map(|first| first.block.num_hash());
2070 let old_first = old.first().map(|first| first.block.num_hash());
2071 trace!(target: "engine::tree", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
2072
2073 self.update_reorg_metrics(old.len());
2074 self.reinsert_reorged_blocks(new.clone());
2075 self.reinsert_reorged_blocks(old.clone());
2076 }
2077
2078 self.canonical_in_memory_state.update_chain(chain_update);
2080 self.canonical_in_memory_state.set_canonical_head(tip.clone());
2081
2082 self.metrics.tree.canonical_chain_height.set(tip.number() as f64);
2084
2085 self.canonical_in_memory_state.notify_canon_state(notification);
2087
2088 self.emit_event(BeaconConsensusEngineEvent::CanonicalChainCommitted(
2090 Box::new(tip),
2091 start.elapsed(),
2092 ));
2093 }
2094
2095 fn update_reorg_metrics(&self, old_chain_length: usize) {
2097 self.metrics.tree.reorgs.increment(1);
2098 self.metrics.tree.latest_reorg_depth.set(old_chain_length as f64);
2099 }
2100
2101 fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlock<N>>) {
2103 for block in new_chain {
2104 if self.state.tree_state.executed_block_by_hash(block.block.hash()).is_none() {
2105 trace!(target: "engine::tree", num=?block.block.number(), hash=?block.block.hash(), "Reinserting block into tree state");
2106 self.state.tree_state.insert_executed(block);
2107 }
2108 }
2109 }
2110
2111 fn on_disconnected_downloaded_block(
2116 &self,
2117 downloaded_block: BlockNumHash,
2118 missing_parent: BlockNumHash,
2119 head: BlockNumHash,
2120 ) -> Option<TreeEvent> {
2121 if let Some(target) =
2123 self.backfill_sync_target(head.number, missing_parent.number, Some(downloaded_block))
2124 {
2125 trace!(target: "engine::tree", %target, "triggering backfill on downloaded block");
2126 return Some(TreeEvent::BackfillAction(BackfillAction::Start(target.into())));
2127 }
2128
2129 let request = if let Some(distance) =
2139 self.distance_from_local_tip(head.number, missing_parent.number)
2140 {
2141 trace!(target: "engine::tree", %distance, missing=?missing_parent, "downloading missing parent block range");
2142 DownloadRequest::BlockRange(missing_parent.hash, distance)
2143 } else {
2144 trace!(target: "engine::tree", missing=?missing_parent, "downloading missing parent block");
2145 DownloadRequest::single_block(missing_parent.hash)
2148 };
2149
2150 Some(TreeEvent::Download(request))
2151 }
2152
2153 #[instrument(level = "trace", skip_all, fields(block_hash = %block.hash(), block_num = %block.number(),), target = "engine::tree")]
2159 fn on_downloaded_block(
2160 &mut self,
2161 block: SealedBlockWithSenders<N::Block>,
2162 ) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
2163 let block_num_hash = block.num_hash();
2164 let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
2165 if self
2166 .check_invalid_ancestor_with_head(lowest_buffered_ancestor, block_num_hash.hash)?
2167 .is_some()
2168 {
2169 return Ok(None)
2170 }
2171
2172 if !self.backfill_sync_state.is_idle() {
2173 return Ok(None)
2174 }
2175
2176 match self.insert_block(block) {
2178 Ok(InsertPayloadOk2::Inserted(BlockStatus2::Valid)) => {
2179 if self.is_sync_target_head(block_num_hash.hash) {
2180 trace!(target: "engine::tree", "appended downloaded sync target block");
2181
2182 return Ok(Some(TreeEvent::TreeAction(TreeAction::MakeCanonical {
2185 sync_target_head: block_num_hash.hash,
2186 })))
2187 }
2188 trace!(target: "engine::tree", "appended downloaded block");
2189 self.try_connect_buffered_blocks(block_num_hash)?;
2190 }
2191 Ok(InsertPayloadOk2::Inserted(BlockStatus2::Disconnected {
2192 head,
2193 missing_ancestor,
2194 })) => {
2195 return Ok(self.on_disconnected_downloaded_block(
2198 block_num_hash,
2199 missing_ancestor,
2200 head,
2201 ))
2202 }
2203 Ok(InsertPayloadOk2::AlreadySeen(_)) => {
2204 trace!(target: "engine::tree", "downloaded block already executed");
2205 }
2206 Err(err) => {
2207 debug!(target: "engine::tree", err=%err.kind(), "failed to insert downloaded block");
2208 if let Err(fatal) = self.on_insert_block_error(err) {
2209 warn!(target: "engine::tree", %fatal, "fatal error occurred while inserting downloaded block");
2210 return Err(fatal)
2211 }
2212 }
2213 }
2214 Ok(None)
2215 }
2216
2217 fn insert_block_without_senders(
2218 &mut self,
2219 block: SealedBlockFor<N::Block>,
2220 ) -> Result<InsertPayloadOk2, InsertBlockErrorTwo<N::Block>> {
2221 match block.try_seal_with_senders() {
2222 Ok(block) => self.insert_block(block),
2223 Err(block) => Err(InsertBlockErrorTwo::sender_recovery_error(block)),
2224 }
2225 }
2226
2227 fn insert_block(
2228 &mut self,
2229 block: SealedBlockWithSenders<N::Block>,
2230 ) -> Result<InsertPayloadOk2, InsertBlockErrorTwo<N::Block>> {
2231 self.insert_block_inner(block.clone())
2232 .map_err(|kind| InsertBlockErrorTwo::new(block.block, kind))
2233 }
2234
2235 fn insert_block_inner(
2236 &mut self,
2237 block: SealedBlockWithSenders<N::Block>,
2238 ) -> Result<InsertPayloadOk2, InsertBlockErrorKindTwo> {
2239 debug!(target: "engine::tree", block=?block.num_hash(), parent = ?block.parent_hash(), state_root = ?block.state_root(), "Inserting new block into tree");
2240
2241 if self.block_by_hash(block.hash())?.is_some() {
2242 return Ok(InsertPayloadOk2::AlreadySeen(BlockStatus2::Valid))
2243 }
2244
2245 let start = Instant::now();
2246
2247 trace!(target: "engine::tree", block=?block.num_hash(), "Validating block consensus");
2248 self.validate_block(&block)?;
2250
2251 trace!(target: "engine::tree", block=?block.num_hash(), parent=?block.parent_hash(), "Fetching block state provider");
2252 let Some(state_provider) = self.state_provider(block.parent_hash())? else {
2253 let missing_ancestor = self
2256 .state
2257 .buffer
2258 .lowest_ancestor(&block.parent_hash())
2259 .map(|block| block.parent_num_hash())
2260 .unwrap_or_else(|| block.parent_num_hash());
2261
2262 self.state.buffer.insert_block(block);
2263
2264 return Ok(InsertPayloadOk2::Inserted(BlockStatus2::Disconnected {
2265 head: self.state.tree_state.current_canonical_head,
2266 missing_ancestor,
2267 }))
2268 };
2269
2270 let parent_block = self.sealed_header_by_hash(block.parent_hash())?.ok_or_else(|| {
2272 InsertBlockErrorKindTwo::Provider(ProviderError::HeaderNotFound(
2273 block.parent_hash().into(),
2274 ))
2275 })?;
2276 if let Err(e) = self.consensus.validate_header_against_parent(&block, &parent_block) {
2277 warn!(target: "engine::tree", ?block, "Failed to validate header {} against parent: {e}", block.header.hash());
2278 return Err(e.into())
2279 }
2280
2281 trace!(target: "engine::tree", block=?block.num_hash(), "Executing block");
2282 let executor = self.executor_provider.executor(StateProviderDatabase::new(&state_provider));
2283
2284 let block_number = block.number();
2285 let block_hash = block.hash();
2286 let sealed_block = Arc::new(block.block.clone());
2287 let block = block.unseal();
2288
2289 let exec_time = Instant::now();
2290
2291 let noop_state_hook = |_state: &EvmState| {};
2294 let output = self.metrics.executor.execute_metered(
2295 executor,
2296 (&block, U256::MAX).into(),
2297 Box::new(noop_state_hook),
2298 )?;
2299
2300 trace!(target: "engine::tree", elapsed=?exec_time.elapsed(), ?block_number, "Executed block");
2301
2302 if let Err(err) = self.consensus.validate_block_post_execution(
2303 &block,
2304 PostExecutionInput::new(&output.receipts, &output.requests),
2305 ) {
2306 self.invalid_block_hook.on_invalid_block(
2308 &parent_block,
2309 &block.seal_slow(),
2310 &output,
2311 None,
2312 );
2313 return Err(err.into())
2314 }
2315
2316 let hashed_state = self.provider.hashed_post_state(&output.state);
2317
2318 trace!(target: "engine::tree", block=?sealed_block.num_hash(), "Calculating block state root");
2319 let root_time = Instant::now();
2320 let mut state_root_result = None;
2321
2322 let persistence_in_progress = self.persistence_state.in_progress();
2330 if !persistence_in_progress {
2331 state_root_result = match self
2332 .compute_state_root_parallel(block.header().parent_hash(), &hashed_state)
2333 {
2334 Ok((state_root, trie_output)) => Some((state_root, trie_output)),
2335 Err(ParallelStateRootError::Provider(ProviderError::ConsistentView(error))) => {
2336 debug!(target: "engine", %error, "Parallel state root computation failed consistency check, falling back");
2337 None
2338 }
2339 Err(error) => return Err(InsertBlockErrorKindTwo::Other(Box::new(error))),
2340 };
2341 }
2342
2343 let (state_root, trie_output) = if let Some(result) = state_root_result {
2344 result
2345 } else {
2346 debug!(target: "engine::tree", block=?sealed_block.num_hash(), persistence_in_progress, "Failed to compute state root in parallel");
2347 state_provider.state_root_with_updates(hashed_state.clone())?
2348 };
2349
2350 if state_root != block.header().state_root() {
2351 self.invalid_block_hook.on_invalid_block(
2353 &parent_block,
2354 &block.clone().seal_slow(),
2355 &output,
2356 Some((&trie_output, state_root)),
2357 );
2358 return Err(ConsensusError::BodyStateRootDiff(
2359 GotExpected { got: state_root, expected: block.header().state_root() }.into(),
2360 )
2361 .into())
2362 }
2363
2364 let root_elapsed = root_time.elapsed();
2365 self.metrics.block_validation.record_state_root(&trie_output, root_elapsed.as_secs_f64());
2366 debug!(target: "engine::tree", ?root_elapsed, block=?sealed_block.num_hash(), "Calculated state root");
2367
2368 let executed: ExecutedBlock<N> = ExecutedBlock {
2369 block: sealed_block.clone(),
2370 senders: Arc::new(block.senders),
2371 execution_output: Arc::new(ExecutionOutcome::from((output, block_number))),
2372 hashed_state: Arc::new(hashed_state),
2373 trie: Arc::new(trie_output),
2374 };
2375
2376 if self.state.tree_state.canonical_block_hash() == executed.block().parent_hash() {
2377 debug!(target: "engine::tree", pending = ?executed.block().num_hash() ,"updating pending block");
2378 self.canonical_in_memory_state.set_pending_block(executed.clone());
2380 }
2381
2382 self.state.tree_state.insert_executed(executed);
2383 self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
2384
2385 let elapsed = start.elapsed();
2387 let engine_event = if self.is_fork(block_hash)? {
2388 BeaconConsensusEngineEvent::ForkBlockAdded(sealed_block, elapsed)
2389 } else {
2390 BeaconConsensusEngineEvent::CanonicalBlockAdded(sealed_block, elapsed)
2391 };
2392 self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
2393
2394 debug!(target: "engine::tree", block=?BlockNumHash::new(block_number, block_hash), "Finished inserting block");
2395 Ok(InsertPayloadOk2::Inserted(BlockStatus2::Valid))
2396 }
2397
2398 fn compute_state_root_parallel(
2407 &self,
2408 parent_hash: B256,
2409 hashed_state: &HashedPostState,
2410 ) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
2411 let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
2415 let mut input = TrieInput::default();
2416
2417 if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(parent_hash) {
2418 debug!(target: "engine::tree", %parent_hash, %historical, "Calculating state root in parallel, parent found in memory");
2419 let revert_state = consistent_view.revert_state(historical)?;
2421 input.append(revert_state);
2422
2423 for block in blocks.iter().rev() {
2425 input.append_cached_ref(block.trie_updates(), block.hashed_state())
2426 }
2427 } else {
2428 debug!(target: "engine::tree", %parent_hash, "Calculating state root in parallel, parent found in disk");
2430 let revert_state = consistent_view.revert_state(parent_hash)?;
2431 input.append(revert_state);
2432 }
2433
2434 input.append_ref(hashed_state);
2436
2437 ParallelStateRoot::new(consistent_view, input).incremental_root_with_updates()
2438 }
2439
2440 fn on_insert_block_error(
2446 &mut self,
2447 error: InsertBlockErrorTwo<N::Block>,
2448 ) -> Result<PayloadStatus, InsertBlockFatalError> {
2449 let (block, error) = error.split();
2450
2451 let validation_err = error.ensure_validation_error()?;
2454
2455 warn!(target: "engine::tree", invalid_hash=?block.hash(), invalid_number=?block.number(), %validation_err, "Invalid block error on new payload");
2459 let latest_valid_hash = if validation_err.is_block_pre_merge() {
2460 Some(B256::ZERO)
2462 } else {
2463 self.latest_valid_hash_for_invalid_payload(block.parent_hash())?
2464 };
2465
2466 self.state.invalid_headers.insert(block.header.block_with_parent());
2468 Ok(PayloadStatus::new(
2469 PayloadStatusEnum::Invalid { validation_error: validation_err.to_string() },
2470 latest_valid_hash,
2471 ))
2472 }
2473
2474 pub fn find_canonical_header(
2476 &self,
2477 hash: B256,
2478 ) -> Result<Option<SealedHeader<N::BlockHeader>>, ProviderError> {
2479 let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
2480
2481 if canonical.is_none() {
2482 canonical = self.provider.header(&hash)?.map(|header| SealedHeader::new(header, hash));
2483 }
2484
2485 Ok(canonical)
2486 }
2487
2488 fn update_finalized_block(
2490 &self,
2491 finalized_block_hash: B256,
2492 ) -> Result<(), OnForkChoiceUpdated> {
2493 if finalized_block_hash.is_zero() {
2494 return Ok(())
2495 }
2496
2497 match self.find_canonical_header(finalized_block_hash) {
2498 Ok(None) => {
2499 debug!(target: "engine::tree", "Finalized block not found in canonical chain");
2500 return Err(OnForkChoiceUpdated::invalid_state())
2502 }
2503 Ok(Some(finalized)) => {
2504 if Some(finalized.num_hash()) !=
2505 self.canonical_in_memory_state.get_finalized_num_hash()
2506 {
2507 let _ = self.persistence.save_finalized_block_number(finalized.number());
2510 self.canonical_in_memory_state.set_finalized(finalized);
2511 }
2512 }
2513 Err(err) => {
2514 error!(target: "engine::tree", %err, "Failed to fetch finalized block header");
2515 }
2516 }
2517
2518 Ok(())
2519 }
2520
2521 fn update_safe_block(&self, safe_block_hash: B256) -> Result<(), OnForkChoiceUpdated> {
2523 if safe_block_hash.is_zero() {
2524 return Ok(())
2525 }
2526
2527 match self.find_canonical_header(safe_block_hash) {
2528 Ok(None) => {
2529 debug!(target: "engine::tree", "Safe block not found in canonical chain");
2530 return Err(OnForkChoiceUpdated::invalid_state())
2532 }
2533 Ok(Some(safe)) => {
2534 if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
2535 let _ = self.persistence.save_safe_block_number(safe.number());
2538 self.canonical_in_memory_state.set_safe(safe);
2539 }
2540 }
2541 Err(err) => {
2542 error!(target: "engine::tree", %err, "Failed to fetch safe block header");
2543 }
2544 }
2545
2546 Ok(())
2547 }
2548
2549 fn ensure_consistent_forkchoice_state(
2558 &self,
2559 state: ForkchoiceState,
2560 ) -> Result<(), OnForkChoiceUpdated> {
2561 self.update_finalized_block(state.finalized_block_hash)?;
2567
2568 self.update_safe_block(state.safe_block_hash)
2574 }
2575
2576 fn pre_validate_forkchoice_update(
2581 &mut self,
2582 state: ForkchoiceState,
2583 ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
2584 if state.head_block_hash.is_zero() {
2585 return Ok(Some(OnForkChoiceUpdated::invalid_state()))
2586 }
2587
2588 let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
2591 if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
2592 return Ok(Some(OnForkChoiceUpdated::with_invalid(status)))
2593 }
2594
2595 if !self.backfill_sync_state.is_idle() {
2596 trace!(target: "engine::tree", "Pipeline is syncing, skipping forkchoice update");
2599 return Ok(Some(OnForkChoiceUpdated::syncing()))
2600 }
2601
2602 Ok(None)
2603 }
2604
2605 fn process_payload_attributes(
2610 &self,
2611 attrs: T::PayloadAttributes,
2612 head: &N::BlockHeader,
2613 state: ForkchoiceState,
2614 version: EngineApiMessageVersion,
2615 ) -> OnForkChoiceUpdated {
2616 if let Err(err) =
2617 self.payload_validator.validate_payload_attributes_against_header(&attrs, head)
2618 {
2619 warn!(target: "engine::tree", %err, ?head, "Invalid payload attributes");
2620 return OnForkChoiceUpdated::invalid_payload_attributes()
2621 }
2622
2623 match <T::PayloadBuilderAttributes as PayloadBuilderAttributes>::try_new(
2628 state.head_block_hash,
2629 attrs,
2630 version as u8,
2631 ) {
2632 Ok(attributes) => {
2633 let pending_payload_id = self.payload_builder.send_new_payload(attributes);
2636
2637 OnForkChoiceUpdated::updated_with_pending_payload_id(
2649 PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
2650 pending_payload_id,
2651 )
2652 }
2653 Err(_) => OnForkChoiceUpdated::invalid_payload_attributes(),
2654 }
2655 }
2656
2657 pub(crate) fn remove_before(
2664 &mut self,
2665 upper_bound: BlockNumHash,
2666 finalized_hash: Option<B256>,
2667 ) -> ProviderResult<()> {
2668 let num = if let Some(hash) = finalized_hash {
2671 self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
2672 } else {
2673 None
2674 };
2675
2676 self.state.tree_state.remove_until(
2677 upper_bound,
2678 self.persistence_state.last_persisted_block.hash,
2679 num,
2680 );
2681 Ok(())
2682 }
2683}
2684
2685#[derive(Debug, thiserror::Error)]
2688pub enum AdvancePersistenceError {
2689 #[error(transparent)]
2691 RecvError(#[from] TryRecvError),
2692 #[error(transparent)]
2694 Provider(#[from] ProviderError),
2695}
2696
2697#[cfg(test)]
2698mod tests {
2699 use super::*;
2700 use crate::persistence::PersistenceAction;
2701 use alloy_consensus::Header;
2702 use alloy_primitives::Bytes;
2703 use alloy_rlp::Decodable;
2704 use alloy_rpc_types_engine::{CancunPayloadFields, ExecutionPayloadSidecar};
2705 use assert_matches::assert_matches;
2706 use reth_beacon_consensus::EthBeaconConsensus;
2707 use reth_chain_state::{test_utils::TestBlockBuilder, BlockState};
2708 use reth_chainspec::{ChainSpec, HOLESKY, MAINNET};
2709 use reth_engine_primitives::ForkchoiceStatus;
2710 use reth_ethereum_engine_primitives::{EthEngineTypes, EthereumEngineValidator};
2711 use reth_evm::test_utils::MockExecutorProvider;
2712 use reth_node_core::dirs::MaybePlatformPath;
2713 use reth_primitives::{Block, BlockExt, EthPrimitives};
2714 use reth_provider::test_utils::MockEthProvider;
2715 use reth_rpc_types_compat::engine::{block_to_payload_v1, payload::block_to_payload_v3};
2716 use reth_trie::updates::TrieUpdates;
2717 use std::{
2718 str::FromStr,
2719 sync::mpsc::{channel, Sender},
2720 };
2721
2722 #[allow(dead_code)]
2726 struct TestChannel<T> {
2727 release: Receiver<()>,
2729 tx: Sender<T>,
2731 rx: Receiver<T>,
2733 }
2734
2735 impl<T: Send + 'static> TestChannel<T> {
2736 #[allow(dead_code)]
2738 fn spawn_channel() -> (Sender<T>, Receiver<T>, TestChannelHandle) {
2739 let (original_tx, original_rx) = channel();
2740 let (wrapped_tx, wrapped_rx) = channel();
2741 let (release_tx, release_rx) = channel();
2742 let handle = TestChannelHandle::new(release_tx);
2743 let test_channel = Self { release: release_rx, tx: wrapped_tx, rx: original_rx };
2744 std::thread::spawn(move || test_channel.intercept_loop());
2746 (original_tx, wrapped_rx, handle)
2747 }
2748
2749 fn intercept_loop(&self) {
2751 while self.release.recv() == Ok(()) {
2752 let Ok(value) = self.rx.recv() else { return };
2753
2754 let _ = self.tx.send(value);
2755 }
2756 }
2757 }
2758
2759 struct TestChannelHandle {
2760 release: Sender<()>,
2762 }
2763
2764 impl TestChannelHandle {
2765 const fn new(release: Sender<()>) -> Self {
2767 Self { release }
2768 }
2769
2770 #[allow(dead_code)]
2772 fn release(&self) {
2773 let _ = self.release.send(());
2774 }
2775 }
2776
2777 struct TestHarness {
2778 tree: EngineApiTreeHandler<
2779 EthPrimitives,
2780 MockEthProvider,
2781 MockExecutorProvider,
2782 EthEngineTypes,
2783 EthereumEngineValidator,
2784 >,
2785 to_tree_tx: Sender<FromEngine<EngineApiRequest<EthEngineTypes, EthPrimitives>, Block>>,
2786 from_tree_rx: UnboundedReceiver<EngineApiEvent>,
2787 blocks: Vec<ExecutedBlock>,
2788 action_rx: Receiver<PersistenceAction>,
2789 executor_provider: MockExecutorProvider,
2790 block_builder: TestBlockBuilder,
2791 provider: MockEthProvider,
2792 }
2793
2794 impl TestHarness {
2795 fn new(chain_spec: Arc<ChainSpec>) -> Self {
2796 let (action_tx, action_rx) = channel();
2797 Self::with_persistence_channel(chain_spec, action_tx, action_rx)
2798 }
2799
2800 #[allow(dead_code)]
2801 fn with_test_channel(chain_spec: Arc<ChainSpec>) -> (Self, TestChannelHandle) {
2802 let (action_tx, action_rx, handle) = TestChannel::spawn_channel();
2803 (Self::with_persistence_channel(chain_spec, action_tx, action_rx), handle)
2804 }
2805
2806 fn with_persistence_channel(
2807 chain_spec: Arc<ChainSpec>,
2808 action_tx: Sender<PersistenceAction>,
2809 action_rx: Receiver<PersistenceAction>,
2810 ) -> Self {
2811 let persistence_handle = PersistenceHandle::new(action_tx);
2812
2813 let backup_handle = BackupHandle::spawn_service(MaybePlatformPath::chain_default(
2814 chain_spec.chain.clone(),
2815 ));
2816
2817 let consensus = Arc::new(EthBeaconConsensus::new(chain_spec.clone()));
2818
2819 let provider = MockEthProvider::default();
2820 let executor_provider = MockExecutorProvider::default();
2821
2822 let payload_validator = EthereumEngineValidator::new(chain_spec.clone());
2823
2824 let (from_tree_tx, from_tree_rx) = unbounded_channel();
2825
2826 let header = chain_spec.genesis_header().clone();
2827 let header = SealedHeader::seal(header);
2828 let engine_api_tree_state = EngineApiTreeState::new(10, 10, header.num_hash());
2829 let canonical_in_memory_state = CanonicalInMemoryState::with_head(header, None, None);
2830
2831 let (to_payload_service, _payload_command_rx) = unbounded_channel();
2832 let payload_builder = PayloadBuilderHandle::new(to_payload_service);
2833
2834 let tree = EngineApiTreeHandler::new(
2835 provider.clone(),
2836 executor_provider.clone(),
2837 consensus,
2838 payload_validator,
2839 from_tree_tx,
2840 engine_api_tree_state,
2841 canonical_in_memory_state,
2842 persistence_handle,
2843 PersistenceState::default(),
2844 payload_builder,
2845 TreeConfig::default(),
2846 EngineApiKind::Ethereum,
2847 backup_handle,
2848 );
2849
2850 let block_builder = TestBlockBuilder::default().with_chain_spec((*chain_spec).clone());
2851 Self {
2852 to_tree_tx: tree.incoming_tx.clone(),
2853 tree,
2854 from_tree_rx,
2855 blocks: vec![],
2856 action_rx,
2857 executor_provider,
2858 block_builder,
2859 provider,
2860 }
2861 }
2862
2863 fn with_blocks(mut self, blocks: Vec<ExecutedBlock>) -> Self {
2864 let mut blocks_by_hash = HashMap::default();
2865 let mut blocks_by_number = BTreeMap::new();
2866 let mut state_by_hash = HashMap::default();
2867 let mut hash_by_number = BTreeMap::new();
2868 let mut parent_to_child: HashMap<B256, HashSet<B256>> = HashMap::default();
2869 let mut parent_hash = B256::ZERO;
2870
2871 for block in &blocks {
2872 let sealed_block = block.block();
2873 let hash = sealed_block.hash();
2874 let number = sealed_block.number;
2875 blocks_by_hash.insert(hash, block.clone());
2876 blocks_by_number.entry(number).or_insert_with(Vec::new).push(block.clone());
2877 state_by_hash.insert(hash, Arc::new(BlockState::new(block.clone())));
2878 hash_by_number.insert(number, hash);
2879 parent_to_child.entry(parent_hash).or_default().insert(hash);
2880 parent_hash = hash;
2881 }
2882
2883 self.tree.state.tree_state = TreeState {
2884 blocks_by_hash,
2885 blocks_by_number,
2886 current_canonical_head: blocks.last().unwrap().block().num_hash(),
2887 parent_to_child,
2888 persisted_trie_updates: HashMap::default(),
2889 };
2890
2891 let last_executed_block = blocks.last().unwrap().clone();
2892 let pending = Some(BlockState::new(last_executed_block));
2893 self.tree.canonical_in_memory_state =
2894 CanonicalInMemoryState::new(state_by_hash, hash_by_number, pending, None, None);
2895
2896 self.blocks = blocks.clone();
2897 self.persist_blocks(
2898 blocks
2899 .into_iter()
2900 .map(|b| SealedBlockWithSenders {
2901 block: (*b.block).clone(),
2902 senders: b.senders.to_vec(),
2903 })
2904 .collect(),
2905 );
2906
2907 self
2908 }
2909
2910 const fn with_backfill_state(mut self, state: BackfillSyncState) -> Self {
2911 self.tree.backfill_sync_state = state;
2912 self
2913 }
2914
2915 fn extend_execution_outcome(
2916 &self,
2917 execution_outcomes: impl IntoIterator<Item = impl Into<ExecutionOutcome>>,
2918 ) {
2919 self.executor_provider.extend(execution_outcomes);
2920 }
2921
2922 fn insert_block(
2923 &mut self,
2924 block: SealedBlockWithSenders,
2925 ) -> Result<InsertPayloadOk2, InsertBlockErrorTwo<Block>> {
2926 let execution_outcome = self.block_builder.get_execution_outcome(block.clone());
2927 self.extend_execution_outcome([execution_outcome]);
2928 self.tree.provider.add_state_root(block.state_root);
2929 self.tree.insert_block(block)
2930 }
2931
2932 async fn fcu_to(&mut self, block_hash: B256, fcu_status: impl Into<ForkchoiceStatus>) {
2933 let fcu_status = fcu_status.into();
2934
2935 self.send_fcu(block_hash, fcu_status).await;
2936
2937 self.check_fcu(block_hash, fcu_status).await;
2938 }
2939
2940 async fn send_fcu(&mut self, block_hash: B256, fcu_status: impl Into<ForkchoiceStatus>) {
2941 let fcu_state = self.fcu_state(block_hash);
2942
2943 let (tx, rx) = oneshot::channel();
2944 self.tree
2945 .on_engine_message(FromEngine::Request(
2946 BeaconEngineMessage::ForkchoiceUpdated {
2947 state: fcu_state,
2948 payload_attrs: None,
2949 tx,
2950 version: EngineApiMessageVersion::default(),
2951 }
2952 .into(),
2953 ))
2954 .unwrap();
2955
2956 let response = rx.await.unwrap().unwrap().await.unwrap();
2957 match fcu_status.into() {
2958 ForkchoiceStatus::Valid => assert!(response.payload_status.is_valid()),
2959 ForkchoiceStatus::Syncing => assert!(response.payload_status.is_syncing()),
2960 ForkchoiceStatus::Invalid => assert!(response.payload_status.is_invalid()),
2961 }
2962 }
2963
2964 async fn check_fcu(&mut self, block_hash: B256, fcu_status: impl Into<ForkchoiceStatus>) {
2965 let fcu_state = self.fcu_state(block_hash);
2966
2967 let event = self.from_tree_rx.recv().await.unwrap();
2969 match event {
2970 EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::ForkchoiceUpdated(
2971 state,
2972 status,
2973 )) => {
2974 assert_eq!(state, fcu_state);
2975 assert_eq!(status, fcu_status.into());
2976 }
2977 _ => panic!("Unexpected event: {:#?}", event),
2978 }
2979 }
2980
2981 const fn fcu_state(&self, block_hash: B256) -> ForkchoiceState {
2982 ForkchoiceState {
2983 head_block_hash: block_hash,
2984 safe_block_hash: block_hash,
2985 finalized_block_hash: block_hash,
2986 }
2987 }
2988
2989 async fn send_new_payload(&mut self, block: SealedBlockWithSenders) {
2990 let payload = block_to_payload_v3(block.block.clone());
2991 self.tree
2992 .on_new_payload(
2993 payload.into(),
2994 ExecutionPayloadSidecar::v3(CancunPayloadFields {
2995 parent_beacon_block_root: block.parent_beacon_block_root.unwrap(),
2996 versioned_hashes: vec![],
2997 }),
2998 )
2999 .unwrap();
3000 }
3001
3002 async fn insert_chain(
3003 &mut self,
3004 chain: impl IntoIterator<Item = SealedBlockWithSenders> + Clone,
3005 ) {
3006 for block in chain.clone() {
3007 self.insert_block(block.clone()).unwrap();
3008 }
3009 self.check_canon_chain_insertion(chain).await;
3010 }
3011
3012 async fn check_canon_commit(&mut self, hash: B256) {
3013 let event = self.from_tree_rx.recv().await.unwrap();
3014 match event {
3015 EngineApiEvent::BeaconConsensus(
3016 BeaconConsensusEngineEvent::CanonicalChainCommitted(header, _),
3017 ) => {
3018 assert_eq!(header.hash(), hash);
3019 }
3020 _ => panic!("Unexpected event: {:#?}", event),
3021 }
3022 }
3023
3024 async fn check_fork_chain_insertion(
3025 &mut self,
3026 chain: impl IntoIterator<Item = SealedBlockWithSenders> + Clone,
3027 ) {
3028 for block in chain {
3029 self.check_fork_block_added(block.block.hash()).await;
3030 }
3031 }
3032
3033 async fn check_canon_chain_insertion(
3034 &mut self,
3035 chain: impl IntoIterator<Item = SealedBlockWithSenders> + Clone,
3036 ) {
3037 for block in chain.clone() {
3038 self.check_canon_block_added(block.hash()).await;
3039 }
3040 }
3041
3042 async fn check_canon_block_added(&mut self, expected_hash: B256) {
3043 let event = self.from_tree_rx.recv().await.unwrap();
3044 match event {
3045 EngineApiEvent::BeaconConsensus(
3046 BeaconConsensusEngineEvent::CanonicalBlockAdded(block, _),
3047 ) => {
3048 assert_eq!(block.hash(), expected_hash);
3049 }
3050 _ => panic!("Unexpected event: {:#?}", event),
3051 }
3052 }
3053
3054 async fn check_fork_block_added(&mut self, expected_hash: B256) {
3055 let event = self.from_tree_rx.recv().await.unwrap();
3056 match event {
3057 EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::ForkBlockAdded(
3058 block,
3059 _,
3060 )) => {
3061 assert!(block.hash() == expected_hash);
3062 }
3063 _ => panic!("Unexpected event: {:#?}", event),
3064 }
3065 }
3066
3067 fn persist_blocks(&self, blocks: Vec<SealedBlockWithSenders>) {
3068 let mut block_data: Vec<(B256, Block)> = Vec::with_capacity(blocks.len());
3069 let mut headers_data: Vec<(B256, Header)> = Vec::with_capacity(blocks.len());
3070
3071 for block in &blocks {
3072 let unsealed_block = block.clone().unseal();
3073 block_data.push((block.hash(), unsealed_block.clone().block));
3074 headers_data.push((block.hash(), unsealed_block.header.clone()));
3075 }
3076
3077 self.provider.extend_blocks(block_data);
3078 self.provider.extend_headers(headers_data);
3079 }
3080
3081 fn setup_range_insertion_for_valid_chain(&mut self, chain: Vec<SealedBlockWithSenders>) {
3082 self.setup_range_insertion_for_chain(chain, None)
3083 }
3084
3085 fn setup_range_insertion_for_invalid_chain(
3086 &mut self,
3087 chain: Vec<SealedBlockWithSenders>,
3088 index: usize,
3089 ) {
3090 self.setup_range_insertion_for_chain(chain, Some(index))
3091 }
3092
3093 fn setup_range_insertion_for_chain(
3094 &mut self,
3095 chain: Vec<SealedBlockWithSenders>,
3096 invalid_index: Option<usize>,
3097 ) {
3098 let mut chain_rev = chain;
3101 chain_rev.reverse();
3102
3103 let mut execution_outcomes = Vec::with_capacity(chain_rev.len());
3104 for (index, block) in chain_rev.iter().enumerate() {
3105 let execution_outcome = self.block_builder.get_execution_outcome(block.clone());
3106 let state_root = if invalid_index.is_some() && invalid_index.unwrap() == index {
3107 B256::random()
3108 } else {
3109 block.state_root
3110 };
3111 self.tree.provider.add_state_root(state_root);
3112 execution_outcomes.push(execution_outcome);
3113 }
3114 self.extend_execution_outcome(execution_outcomes);
3115 }
3116
3117 fn check_canon_head(&self, head_hash: B256) {
3118 assert_eq!(self.tree.state.tree_state.canonical_head().hash, head_hash);
3119 }
3120 }
3121
3122 #[test]
3123 fn test_tree_persist_block_batch() {
3124 let tree_config = TreeConfig::default();
3125 let chain_spec = MAINNET.clone();
3126 let mut test_block_builder =
3127 TestBlockBuilder::default().with_chain_spec((*chain_spec).clone());
3128
3129 let blocks: Vec<_> = test_block_builder
3132 .get_executed_blocks(1..tree_config.persistence_threshold() + 2)
3133 .collect();
3134 let mut test_harness = TestHarness::new(chain_spec).with_blocks(blocks);
3135
3136 let mut blocks = vec![];
3137 for idx in 0..tree_config.max_execute_block_batch_size() * 2 {
3138 blocks.push(test_block_builder.generate_random_block(idx as u64, B256::random()));
3139 }
3140
3141 test_harness.to_tree_tx.send(FromEngine::DownloadedBlocks(blocks)).unwrap();
3142
3143 let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap();
3145 test_harness.tree.on_engine_message(msg).unwrap();
3146
3147 let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap();
3149 match msg {
3150 FromEngine::DownloadedBlocks(blocks) => {
3151 assert_eq!(blocks.len(), tree_config.max_execute_block_batch_size());
3152 }
3153 _ => panic!("unexpected message: {:#?}", msg),
3154 }
3155 }
3156
3157 #[tokio::test]
3158 async fn test_tree_persist_blocks() {
3159 let tree_config = TreeConfig::default();
3160 let chain_spec = MAINNET.clone();
3161 let mut test_block_builder =
3162 TestBlockBuilder::default().with_chain_spec((*chain_spec).clone());
3163
3164 let blocks: Vec<_> = test_block_builder
3167 .get_executed_blocks(1..tree_config.persistence_threshold() + 2)
3168 .collect();
3169 let test_harness = TestHarness::new(chain_spec).with_blocks(blocks.clone());
3170 std::thread::Builder::new()
3171 .name("Tree Task".to_string())
3172 .spawn(|| test_harness.tree.run())
3173 .unwrap();
3174
3175 test_harness.to_tree_tx.send(FromEngine::DownloadedBlocks(vec![])).unwrap();
3177
3178 let received_action =
3179 test_harness.action_rx.recv().expect("Failed to receive save blocks action");
3180 if let PersistenceAction::SaveBlocks(saved_blocks, _) = received_action {
3181 let expected_persist_len =
3184 blocks.len() - tree_config.memory_block_buffer_target() as usize;
3185 assert_eq!(saved_blocks.len(), expected_persist_len);
3186 assert_eq!(saved_blocks, blocks[..expected_persist_len]);
3187 } else {
3188 panic!("unexpected action received {received_action:?}");
3189 }
3190 }
3191
3192 #[tokio::test]
3193 async fn test_in_memory_state_trait_impl() {
3194 let blocks: Vec<_> = TestBlockBuilder::default().get_executed_blocks(0..10).collect();
3195 let test_harness = TestHarness::new(MAINNET.clone()).with_blocks(blocks.clone());
3196
3197 for executed_block in blocks {
3198 let sealed_block = executed_block.block();
3199
3200 let expected_state = BlockState::new(executed_block.clone());
3201
3202 let actual_state_by_hash = test_harness
3203 .tree
3204 .canonical_in_memory_state
3205 .state_by_hash(sealed_block.hash())
3206 .unwrap();
3207 assert_eq!(expected_state, *actual_state_by_hash);
3208
3209 let actual_state_by_number = test_harness
3210 .tree
3211 .canonical_in_memory_state
3212 .state_by_number(sealed_block.number)
3213 .unwrap();
3214 assert_eq!(expected_state, *actual_state_by_number);
3215 }
3216 }
3217
3218 #[tokio::test]
3219 async fn test_engine_request_during_backfill() {
3220 let tree_config = TreeConfig::default();
3221 let blocks: Vec<_> = TestBlockBuilder::default()
3222 .get_executed_blocks(0..tree_config.persistence_threshold())
3223 .collect();
3224 let mut test_harness = TestHarness::new(MAINNET.clone())
3225 .with_blocks(blocks)
3226 .with_backfill_state(BackfillSyncState::Active);
3227
3228 let (tx, rx) = oneshot::channel();
3229 test_harness
3230 .tree
3231 .on_engine_message(FromEngine::Request(
3232 BeaconEngineMessage::ForkchoiceUpdated {
3233 state: ForkchoiceState {
3234 head_block_hash: B256::random(),
3235 safe_block_hash: B256::random(),
3236 finalized_block_hash: B256::random(),
3237 },
3238 payload_attrs: None,
3239 tx,
3240 version: EngineApiMessageVersion::default(),
3241 }
3242 .into(),
3243 ))
3244 .unwrap();
3245
3246 let resp = rx.await.unwrap().unwrap().await.unwrap();
3247 assert!(resp.payload_status.is_syncing());
3248 }
3249
3250 #[test]
3251 fn test_disconnected_payload() {
3252 let s = include_str!("../../test-data/holesky/2.rlp");
3253 let data = Bytes::from_str(s).unwrap();
3254 let block = Block::decode(&mut data.as_ref()).unwrap();
3255 let sealed = block.seal_slow();
3256 let hash = sealed.hash();
3257 let payload = block_to_payload_v1(sealed.clone());
3258
3259 let mut test_harness = TestHarness::new(HOLESKY.clone());
3260
3261 let outcome = test_harness
3262 .tree
3263 .on_new_payload(payload.into(), ExecutionPayloadSidecar::none())
3264 .unwrap();
3265 assert!(outcome.outcome.is_syncing());
3266
3267 let buffered = test_harness.tree.state.buffer.block(&hash).unwrap();
3269 assert_eq!(buffered.block, sealed);
3270 }
3271
3272 #[test]
3273 fn test_disconnected_block() {
3274 let s = include_str!("../../test-data/holesky/2.rlp");
3275 let data = Bytes::from_str(s).unwrap();
3276 let block = Block::decode(&mut data.as_ref()).unwrap();
3277 let sealed = block.seal_slow();
3278
3279 let mut test_harness = TestHarness::new(HOLESKY.clone());
3280
3281 let outcome = test_harness.tree.insert_block_without_senders(sealed.clone()).unwrap();
3282 assert_eq!(
3283 outcome,
3284 InsertPayloadOk2::Inserted(BlockStatus2::Disconnected {
3285 head: test_harness.tree.state.tree_state.current_canonical_head,
3286 missing_ancestor: sealed.parent_num_hash()
3287 })
3288 );
3289 }
3290
3291 #[tokio::test]
3292 async fn test_holesky_payload() {
3293 let s = include_str!("../../test-data/holesky/1.rlp");
3294 let data = Bytes::from_str(s).unwrap();
3295 let block = Block::decode(&mut data.as_ref()).unwrap();
3296 let sealed = block.seal_slow();
3297 let payload = block_to_payload_v1(sealed);
3298
3299 let mut test_harness =
3300 TestHarness::new(HOLESKY.clone()).with_backfill_state(BackfillSyncState::Active);
3301
3302 let (tx, rx) = oneshot::channel();
3303 test_harness
3304 .tree
3305 .on_engine_message(FromEngine::Request(
3306 BeaconEngineMessage::NewPayload {
3307 payload: payload.clone().into(),
3308 sidecar: ExecutionPayloadSidecar::none(),
3309 tx,
3310 }
3311 .into(),
3312 ))
3313 .unwrap();
3314
3315 let resp = rx.await.unwrap().unwrap();
3316 assert!(resp.is_syncing());
3317 }
3318
3319 #[tokio::test]
3320 async fn test_tree_state_insert_executed() {
3321 let mut tree_state = TreeState::new(BlockNumHash::default());
3322 let blocks: Vec<_> = TestBlockBuilder::default().get_executed_blocks(1..4).collect();
3323
3324 tree_state.insert_executed(blocks[0].clone());
3325 tree_state.insert_executed(blocks[1].clone());
3326
3327 assert_eq!(
3328 tree_state.parent_to_child.get(&blocks[0].block.hash()),
3329 Some(&HashSet::from_iter([blocks[1].block.hash()]))
3330 );
3331
3332 assert!(!tree_state.parent_to_child.contains_key(&blocks[1].block.hash()));
3333
3334 tree_state.insert_executed(blocks[2].clone());
3335
3336 assert_eq!(
3337 tree_state.parent_to_child.get(&blocks[1].block.hash()),
3338 Some(&HashSet::from_iter([blocks[2].block.hash()]))
3339 );
3340 assert!(tree_state.parent_to_child.contains_key(&blocks[1].block.hash()));
3341
3342 assert!(!tree_state.parent_to_child.contains_key(&blocks[2].block.hash()));
3343 }
3344
3345 #[tokio::test]
3346 async fn test_tree_state_insert_executed_with_reorg() {
3347 let mut tree_state = TreeState::new(BlockNumHash::default());
3348 let mut test_block_builder = TestBlockBuilder::default();
3349 let blocks: Vec<_> = test_block_builder.get_executed_blocks(1..6).collect();
3350
3351 for block in &blocks {
3352 tree_state.insert_executed(block.clone());
3353 }
3354 assert_eq!(tree_state.blocks_by_hash.len(), 5);
3355
3356 let fork_block_3 =
3357 test_block_builder.get_executed_block_with_number(3, blocks[1].block.hash());
3358 let fork_block_4 =
3359 test_block_builder.get_executed_block_with_number(4, fork_block_3.block.hash());
3360 let fork_block_5 =
3361 test_block_builder.get_executed_block_with_number(5, fork_block_4.block.hash());
3362
3363 tree_state.insert_executed(fork_block_3.clone());
3364 tree_state.insert_executed(fork_block_4.clone());
3365 tree_state.insert_executed(fork_block_5.clone());
3366
3367 assert_eq!(tree_state.blocks_by_hash.len(), 8);
3368 assert_eq!(tree_state.blocks_by_number[&3].len(), 2); assert_eq!(tree_state.parent_to_child[&blocks[1].block.hash()].len(), 2); tree_state.insert_executed(fork_block_4.clone());
3373 assert_eq!(tree_state.blocks_by_hash.len(), 8);
3374
3375 assert!(tree_state.parent_to_child[&fork_block_3.block.hash()]
3376 .contains(&fork_block_4.block.hash()));
3377 assert!(tree_state.parent_to_child[&fork_block_4.block.hash()]
3378 .contains(&fork_block_5.block.hash()));
3379
3380 assert_eq!(tree_state.blocks_by_number[&4].len(), 2);
3381 assert_eq!(tree_state.blocks_by_number[&5].len(), 2);
3382 }
3383
3384 #[tokio::test]
3385 async fn test_tree_state_remove_before() {
3386 let start_num_hash = BlockNumHash::default();
3387 let mut tree_state = TreeState::new(start_num_hash);
3388 let blocks: Vec<_> = TestBlockBuilder::default().get_executed_blocks(1..6).collect();
3389
3390 for block in &blocks {
3391 tree_state.insert_executed(block.clone());
3392 }
3393
3394 let last = blocks.last().unwrap();
3395
3396 tree_state.set_canonical_head(last.block.num_hash());
3398
3399 tree_state.remove_until(
3401 BlockNumHash::new(2, blocks[1].block.hash()),
3402 start_num_hash.hash,
3403 Some(blocks[1].block.num_hash()),
3404 );
3405
3406 assert!(!tree_state.blocks_by_hash.contains_key(&blocks[0].block.hash()));
3407 assert!(!tree_state.blocks_by_hash.contains_key(&blocks[1].block.hash()));
3408 assert!(!tree_state.blocks_by_number.contains_key(&1));
3409 assert!(!tree_state.blocks_by_number.contains_key(&2));
3410
3411 assert!(tree_state.blocks_by_hash.contains_key(&blocks[2].block.hash()));
3412 assert!(tree_state.blocks_by_hash.contains_key(&blocks[3].block.hash()));
3413 assert!(tree_state.blocks_by_hash.contains_key(&blocks[4].block.hash()));
3414 assert!(tree_state.blocks_by_number.contains_key(&3));
3415 assert!(tree_state.blocks_by_number.contains_key(&4));
3416 assert!(tree_state.blocks_by_number.contains_key(&5));
3417
3418 assert!(!tree_state.parent_to_child.contains_key(&blocks[0].block.hash()));
3419 assert!(!tree_state.parent_to_child.contains_key(&blocks[1].block.hash()));
3420 assert!(tree_state.parent_to_child.contains_key(&blocks[2].block.hash()));
3421 assert!(tree_state.parent_to_child.contains_key(&blocks[3].block.hash()));
3422 assert!(!tree_state.parent_to_child.contains_key(&blocks[4].block.hash()));
3423
3424 assert_eq!(
3425 tree_state.parent_to_child.get(&blocks[2].block.hash()),
3426 Some(&HashSet::from_iter([blocks[3].block.hash()]))
3427 );
3428 assert_eq!(
3429 tree_state.parent_to_child.get(&blocks[3].block.hash()),
3430 Some(&HashSet::from_iter([blocks[4].block.hash()]))
3431 );
3432 }
3433
3434 #[tokio::test]
3435 async fn test_tree_state_remove_before_finalized() {
3436 let start_num_hash = BlockNumHash::default();
3437 let mut tree_state = TreeState::new(start_num_hash);
3438 let blocks: Vec<_> = TestBlockBuilder::default().get_executed_blocks(1..6).collect();
3439
3440 for block in &blocks {
3441 tree_state.insert_executed(block.clone());
3442 }
3443
3444 let last = blocks.last().unwrap();
3445
3446 tree_state.set_canonical_head(last.block.num_hash());
3448
3449 tree_state.remove_until(
3451 BlockNumHash::new(2, blocks[1].block.hash()),
3452 start_num_hash.hash,
3453 None,
3454 );
3455
3456 assert!(!tree_state.blocks_by_hash.contains_key(&blocks[0].block.hash()));
3457 assert!(!tree_state.blocks_by_hash.contains_key(&blocks[1].block.hash()));
3458 assert!(!tree_state.blocks_by_number.contains_key(&1));
3459 assert!(!tree_state.blocks_by_number.contains_key(&2));
3460
3461 assert!(tree_state.blocks_by_hash.contains_key(&blocks[2].block.hash()));
3462 assert!(tree_state.blocks_by_hash.contains_key(&blocks[3].block.hash()));
3463 assert!(tree_state.blocks_by_hash.contains_key(&blocks[4].block.hash()));
3464 assert!(tree_state.blocks_by_number.contains_key(&3));
3465 assert!(tree_state.blocks_by_number.contains_key(&4));
3466 assert!(tree_state.blocks_by_number.contains_key(&5));
3467
3468 assert!(!tree_state.parent_to_child.contains_key(&blocks[0].block.hash()));
3469 assert!(!tree_state.parent_to_child.contains_key(&blocks[1].block.hash()));
3470 assert!(tree_state.parent_to_child.contains_key(&blocks[2].block.hash()));
3471 assert!(tree_state.parent_to_child.contains_key(&blocks[3].block.hash()));
3472 assert!(!tree_state.parent_to_child.contains_key(&blocks[4].block.hash()));
3473
3474 assert_eq!(
3475 tree_state.parent_to_child.get(&blocks[2].block.hash()),
3476 Some(&HashSet::from_iter([blocks[3].block.hash()]))
3477 );
3478 assert_eq!(
3479 tree_state.parent_to_child.get(&blocks[3].block.hash()),
3480 Some(&HashSet::from_iter([blocks[4].block.hash()]))
3481 );
3482 }
3483
3484 #[tokio::test]
3485 async fn test_tree_state_remove_before_lower_finalized() {
3486 let start_num_hash = BlockNumHash::default();
3487 let mut tree_state = TreeState::new(start_num_hash);
3488 let blocks: Vec<_> = TestBlockBuilder::default().get_executed_blocks(1..6).collect();
3489
3490 for block in &blocks {
3491 tree_state.insert_executed(block.clone());
3492 }
3493
3494 let last = blocks.last().unwrap();
3495
3496 tree_state.set_canonical_head(last.block.num_hash());
3498
3499 tree_state.remove_until(
3501 BlockNumHash::new(2, blocks[1].block.hash()),
3502 start_num_hash.hash,
3503 Some(blocks[0].block.num_hash()),
3504 );
3505
3506 assert!(!tree_state.blocks_by_hash.contains_key(&blocks[0].block.hash()));
3507 assert!(!tree_state.blocks_by_hash.contains_key(&blocks[1].block.hash()));
3508 assert!(!tree_state.blocks_by_number.contains_key(&1));
3509 assert!(!tree_state.blocks_by_number.contains_key(&2));
3510
3511 assert!(tree_state.blocks_by_hash.contains_key(&blocks[2].block.hash()));
3512 assert!(tree_state.blocks_by_hash.contains_key(&blocks[3].block.hash()));
3513 assert!(tree_state.blocks_by_hash.contains_key(&blocks[4].block.hash()));
3514 assert!(tree_state.blocks_by_number.contains_key(&3));
3515 assert!(tree_state.blocks_by_number.contains_key(&4));
3516 assert!(tree_state.blocks_by_number.contains_key(&5));
3517
3518 assert!(!tree_state.parent_to_child.contains_key(&blocks[0].block.hash()));
3519 assert!(!tree_state.parent_to_child.contains_key(&blocks[1].block.hash()));
3520 assert!(tree_state.parent_to_child.contains_key(&blocks[2].block.hash()));
3521 assert!(tree_state.parent_to_child.contains_key(&blocks[3].block.hash()));
3522 assert!(!tree_state.parent_to_child.contains_key(&blocks[4].block.hash()));
3523
3524 assert_eq!(
3525 tree_state.parent_to_child.get(&blocks[2].block.hash()),
3526 Some(&HashSet::from_iter([blocks[3].block.hash()]))
3527 );
3528 assert_eq!(
3529 tree_state.parent_to_child.get(&blocks[3].block.hash()),
3530 Some(&HashSet::from_iter([blocks[4].block.hash()]))
3531 );
3532 }
3533
3534 #[tokio::test]
3535 async fn test_tree_state_on_new_head() {
3536 let chain_spec = MAINNET.clone();
3537 let mut test_harness = TestHarness::new(chain_spec);
3538 let mut test_block_builder = TestBlockBuilder::default();
3539
3540 let blocks: Vec<_> = test_block_builder.get_executed_blocks(1..6).collect();
3541
3542 for block in &blocks {
3543 test_harness.tree.state.tree_state.insert_executed(block.clone());
3544 }
3545
3546 test_harness.tree.state.tree_state.set_canonical_head(blocks[2].block.num_hash());
3548
3549 let fork_block_3 =
3551 test_block_builder.get_executed_block_with_number(3, blocks[1].block.hash());
3552 let fork_block_4 =
3553 test_block_builder.get_executed_block_with_number(4, fork_block_3.block.hash());
3554 let fork_block_5 =
3555 test_block_builder.get_executed_block_with_number(5, fork_block_4.block.hash());
3556
3557 test_harness.tree.state.tree_state.insert_executed(fork_block_3.clone());
3558 test_harness.tree.state.tree_state.insert_executed(fork_block_4.clone());
3559 test_harness.tree.state.tree_state.insert_executed(fork_block_5.clone());
3560
3561 let result = test_harness.tree.on_new_head(blocks[4].block.hash()).unwrap();
3563 assert!(matches!(result, Some(NewCanonicalChain::Commit { .. })));
3564 if let Some(NewCanonicalChain::Commit { new }) = result {
3565 assert_eq!(new.len(), 2);
3566 assert_eq!(new[0].block.hash(), blocks[3].block.hash());
3567 assert_eq!(new[1].block.hash(), blocks[4].block.hash());
3568 }
3569
3570 let result = test_harness.tree.on_new_head(fork_block_5.block.hash()).unwrap();
3572 assert!(matches!(result, Some(NewCanonicalChain::Reorg { .. })));
3573 if let Some(NewCanonicalChain::Reorg { new, old }) = result {
3574 assert_eq!(new.len(), 3);
3575 assert_eq!(new[0].block.hash(), fork_block_3.block.hash());
3576 assert_eq!(new[1].block.hash(), fork_block_4.block.hash());
3577 assert_eq!(new[2].block.hash(), fork_block_5.block.hash());
3578
3579 assert_eq!(old.len(), 1);
3580 assert_eq!(old[0].block.hash(), blocks[2].block.hash());
3581 }
3582 }
3583
3584 #[tokio::test]
3585 async fn test_tree_state_on_new_head_deep_fork() {
3586 reth_tracing::init_test_tracing();
3587
3588 let chain_spec = MAINNET.clone();
3589 let mut test_harness = TestHarness::new(chain_spec);
3590 let mut test_block_builder = TestBlockBuilder::default();
3591
3592 let blocks: Vec<_> = test_block_builder.get_executed_blocks(0..5).collect();
3593
3594 for block in &blocks {
3595 test_harness.tree.state.tree_state.insert_executed(block.clone());
3596 }
3597
3598 let last_block = blocks.last().unwrap().block.clone();
3600
3601 test_harness.tree.state.tree_state.set_canonical_head(last_block.num_hash());
3602
3603 let chain_a = test_block_builder.create_fork(&last_block, 10);
3605 let chain_b = test_block_builder.create_fork(&last_block, 10);
3606
3607 for block in &chain_a {
3608 test_harness.tree.state.tree_state.insert_executed(ExecutedBlock {
3609 block: Arc::new(block.block.clone()),
3610 senders: Arc::new(block.senders.clone()),
3611 execution_output: Arc::new(ExecutionOutcome::default()),
3612 hashed_state: Arc::new(HashedPostState::default()),
3613 trie: Arc::new(TrieUpdates::default()),
3614 });
3615 }
3616 test_harness.tree.state.tree_state.set_canonical_head(chain_a.last().unwrap().num_hash());
3617
3618 for block in &chain_b {
3619 test_harness.tree.state.tree_state.insert_executed(ExecutedBlock {
3620 block: Arc::new(block.block.clone()),
3621 senders: Arc::new(block.senders.clone()),
3622 execution_output: Arc::new(ExecutionOutcome::default()),
3623 hashed_state: Arc::new(HashedPostState::default()),
3624 trie: Arc::new(TrieUpdates::default()),
3625 });
3626 }
3627
3628 let mut expected_new = Vec::new();
3630 for block in &chain_b {
3631 let result = test_harness.tree.on_new_head(block.block.hash()).unwrap();
3633 assert_matches!(result, Some(NewCanonicalChain::Reorg { .. }));
3634
3635 expected_new.push(block);
3636 if let Some(NewCanonicalChain::Reorg { new, old }) = result {
3637 assert_eq!(new.len(), expected_new.len());
3638 for (index, block) in expected_new.iter().enumerate() {
3639 assert_eq!(new[index].block.hash(), block.block.hash());
3640 }
3641
3642 assert_eq!(old.len(), chain_a.len());
3643 for (index, block) in chain_a.iter().enumerate() {
3644 assert_eq!(old[index].block.hash(), block.block.hash());
3645 }
3646 }
3647
3648 test_harness.tree.on_new_head(chain_a.last().unwrap().hash()).unwrap();
3650 }
3651 }
3652
3653 #[tokio::test]
3654 async fn test_get_canonical_blocks_to_persist() {
3655 let chain_spec = MAINNET.clone();
3656 let mut test_harness = TestHarness::new(chain_spec);
3657 let mut test_block_builder = TestBlockBuilder::default();
3658
3659 let canonical_head_number = 9;
3660 let blocks: Vec<_> =
3661 test_block_builder.get_executed_blocks(0..canonical_head_number + 1).collect();
3662 test_harness = test_harness.with_blocks(blocks.clone());
3663
3664 let last_persisted_block_number = 3;
3665 test_harness.tree.persistence_state.last_persisted_block.number =
3666 last_persisted_block_number;
3667
3668 let persistence_threshold = 4;
3669 let memory_block_buffer_target = 3;
3670 test_harness.tree.config = TreeConfig::default()
3671 .with_persistence_threshold(persistence_threshold)
3672 .with_memory_block_buffer_target(memory_block_buffer_target);
3673
3674 let blocks_to_persist = test_harness.tree.get_canonical_blocks_to_persist();
3675
3676 let expected_blocks_to_persist_length: usize =
3677 (canonical_head_number - memory_block_buffer_target - last_persisted_block_number)
3678 .try_into()
3679 .unwrap();
3680
3681 assert_eq!(blocks_to_persist.len(), expected_blocks_to_persist_length);
3682 for (i, item) in
3683 blocks_to_persist.iter().enumerate().take(expected_blocks_to_persist_length)
3684 {
3685 assert_eq!(item.block.number, last_persisted_block_number + i as u64 + 1);
3686 }
3687
3688 let fork_block = test_block_builder.get_executed_block_with_number(4, B256::random());
3690 let fork_block_hash = fork_block.block.hash();
3691 test_harness.tree.state.tree_state.insert_executed(fork_block);
3692
3693 assert!(test_harness.tree.state.tree_state.block_by_hash(fork_block_hash).is_some());
3694
3695 let blocks_to_persist = test_harness.tree.get_canonical_blocks_to_persist();
3696 assert_eq!(blocks_to_persist.len(), expected_blocks_to_persist_length);
3697
3698 assert!(!blocks_to_persist.iter().any(|b| b.block.hash() == fork_block_hash));
3700
3701 assert!(blocks_to_persist
3703 .iter()
3704 .any(|b| b.block.number == 4 && b.block.hash() == blocks[4].block.hash()));
3705 }
3706
3707 #[tokio::test]
3708 async fn test_engine_tree_fcu_missing_head() {
3709 let chain_spec = MAINNET.clone();
3710 let mut test_harness = TestHarness::new(chain_spec.clone());
3711
3712 let mut test_block_builder =
3713 TestBlockBuilder::default().with_chain_spec((*chain_spec).clone());
3714
3715 let blocks: Vec<_> = test_block_builder.get_executed_blocks(0..5).collect();
3716 test_harness = test_harness.with_blocks(blocks);
3717
3718 let missing_block = test_block_builder
3719 .generate_random_block(6, test_harness.blocks.last().unwrap().block().hash());
3720
3721 test_harness.fcu_to(missing_block.hash(), PayloadStatusEnum::Syncing).await;
3722
3723 let event = test_harness.from_tree_rx.recv().await.unwrap();
3725 match event {
3726 EngineApiEvent::Download(DownloadRequest::BlockSet(actual_block_set)) => {
3727 let expected_block_set = HashSet::from_iter([missing_block.hash()]);
3728 assert_eq!(actual_block_set, expected_block_set);
3729 }
3730 _ => panic!("Unexpected event: {:#?}", event),
3731 }
3732 }
3733
3734 #[tokio::test]
3735 async fn test_engine_tree_fcu_canon_chain_insertion() {
3736 let chain_spec = MAINNET.clone();
3737 let mut test_harness = TestHarness::new(chain_spec.clone());
3738
3739 let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
3740 test_harness = test_harness.with_blocks(base_chain.clone());
3741
3742 test_harness
3743 .fcu_to(base_chain.last().unwrap().block().hash(), ForkchoiceStatus::Valid)
3744 .await;
3745
3746 let main_chain = test_harness.block_builder.create_fork(base_chain[0].block(), 3);
3748
3749 test_harness.insert_chain(main_chain).await;
3750 }
3751
3752 #[tokio::test]
3753 async fn test_engine_tree_fcu_reorg_with_all_blocks() {
3754 let chain_spec = MAINNET.clone();
3755 let mut test_harness = TestHarness::new(chain_spec.clone());
3756
3757 let main_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..5).collect();
3758 test_harness = test_harness.with_blocks(main_chain.clone());
3759
3760 let fork_chain = test_harness.block_builder.create_fork(main_chain[2].block(), 3);
3761 let fork_chain_last_hash = fork_chain.last().unwrap().hash();
3762
3763 for block in &fork_chain {
3765 test_harness.insert_block(block.clone()).unwrap();
3766 }
3767
3768 test_harness.send_fcu(fork_chain_last_hash, ForkchoiceStatus::Valid).await;
3769
3770 test_harness.check_fork_chain_insertion(fork_chain.clone()).await;
3772
3773 test_harness.check_canon_commit(fork_chain_last_hash).await;
3775
3776 test_harness.check_fcu(fork_chain_last_hash, ForkchoiceStatus::Valid).await;
3777
3778 test_harness.check_canon_head(fork_chain_last_hash);
3780 }
3781
3782 #[tokio::test]
3783 async fn test_engine_tree_live_sync_transition_required_blocks_requested() {
3784 reth_tracing::init_test_tracing();
3785
3786 let chain_spec = MAINNET.clone();
3787 let mut test_harness = TestHarness::new(chain_spec.clone());
3788
3789 let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
3790 test_harness = test_harness.with_blocks(base_chain.clone());
3791
3792 test_harness
3793 .fcu_to(base_chain.last().unwrap().block().hash(), ForkchoiceStatus::Valid)
3794 .await;
3795
3796 let main_chain = test_harness
3798 .block_builder
3799 .create_fork(base_chain[0].block(), MIN_BLOCKS_FOR_PIPELINE_RUN + 10);
3800
3801 let main_chain_last_hash = main_chain.last().unwrap().hash();
3802 test_harness.send_fcu(main_chain_last_hash, ForkchoiceStatus::Syncing).await;
3803
3804 test_harness.check_fcu(main_chain_last_hash, ForkchoiceStatus::Syncing).await;
3805
3806 let backfill_finished_block_number = MIN_BLOCKS_FOR_PIPELINE_RUN + 1;
3808 let backfill_finished = FromOrchestrator::BackfillSyncFinished(ControlFlow::Continue {
3809 block_number: backfill_finished_block_number,
3810 });
3811
3812 let backfill_tip_block = main_chain[(backfill_finished_block_number - 1) as usize].clone();
3813 test_harness
3815 .provider
3816 .add_block(backfill_tip_block.hash(), backfill_tip_block.block.unseal());
3817 test_harness.tree.on_engine_message(FromEngine::Event(backfill_finished)).unwrap();
3818
3819 let event = test_harness.from_tree_rx.recv().await.unwrap();
3820 match event {
3821 EngineApiEvent::Download(DownloadRequest::BlockSet(hash_set)) => {
3822 assert_eq!(hash_set, HashSet::from_iter([main_chain_last_hash]));
3823 }
3824 _ => panic!("Unexpected event: {:#?}", event),
3825 }
3826
3827 test_harness
3828 .tree
3829 .on_engine_message(FromEngine::DownloadedBlocks(vec![main_chain
3830 .last()
3831 .unwrap()
3832 .clone()]))
3833 .unwrap();
3834
3835 let event = test_harness.from_tree_rx.recv().await.unwrap();
3836 match event {
3837 EngineApiEvent::Download(DownloadRequest::BlockRange(initial_hash, total_blocks)) => {
3838 assert_eq!(
3839 total_blocks,
3840 (main_chain.len() - backfill_finished_block_number as usize - 1) as u64
3841 );
3842 assert_eq!(initial_hash, main_chain.last().unwrap().parent_hash);
3843 }
3844 _ => panic!("Unexpected event: {:#?}", event),
3845 }
3846 }
3847
3848 #[tokio::test]
3849 async fn test_engine_tree_live_sync_transition_eventually_canonical() {
3850 reth_tracing::init_test_tracing();
3851
3852 let chain_spec = MAINNET.clone();
3853 let mut test_harness = TestHarness::new(chain_spec.clone());
3854 test_harness.tree.config = test_harness.tree.config.with_max_execute_block_batch_size(100);
3855
3856 let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
3858 test_harness = test_harness.with_blocks(base_chain.clone());
3859
3860 test_harness
3862 .fcu_to(base_chain.last().unwrap().block().hash(), ForkchoiceStatus::Valid)
3863 .await;
3864
3865 let main_chain = test_harness
3868 .block_builder
3869 .create_fork(base_chain[0].block(), MIN_BLOCKS_FOR_PIPELINE_RUN + 10);
3870
3871 let main_chain_last = main_chain.last().unwrap();
3872 let main_chain_last_hash = main_chain_last.hash();
3873 let main_chain_backfill_target =
3874 main_chain.get(MIN_BLOCKS_FOR_PIPELINE_RUN as usize).unwrap();
3875 let main_chain_backfill_target_hash = main_chain_backfill_target.hash();
3876
3877 test_harness.send_fcu(main_chain_backfill_target_hash, ForkchoiceStatus::Syncing).await;
3879 test_harness.check_fcu(main_chain_backfill_target_hash, ForkchoiceStatus::Syncing).await;
3880
3881 let event = test_harness.from_tree_rx.recv().await.unwrap();
3883 match event {
3884 EngineApiEvent::Download(DownloadRequest::BlockSet(hash_set)) => {
3885 assert_eq!(hash_set, HashSet::from_iter([main_chain_backfill_target_hash]));
3886 }
3887 _ => panic!("Unexpected event: {:#?}", event),
3888 }
3889
3890 test_harness
3892 .tree
3893 .on_engine_message(FromEngine::DownloadedBlocks(vec![
3894 main_chain_backfill_target.clone()
3895 ]))
3896 .unwrap();
3897
3898 let event = test_harness.from_tree_rx.recv().await.unwrap();
3900 match event {
3901 EngineApiEvent::BackfillAction(BackfillAction::Start(
3902 reth_stages::PipelineTarget::Sync(target_hash),
3903 )) => {
3904 assert_eq!(target_hash, main_chain_backfill_target_hash);
3905 }
3906 _ => panic!("Unexpected event: {:#?}", event),
3907 }
3908
3909 let backfilled_chain: Vec<_> =
3911 main_chain.clone().drain(0..(MIN_BLOCKS_FOR_PIPELINE_RUN + 1) as usize).collect();
3912 test_harness.persist_blocks(backfilled_chain.clone());
3913
3914 test_harness.setup_range_insertion_for_valid_chain(backfilled_chain);
3915
3916 test_harness
3918 .tree
3919 .on_engine_message(FromEngine::Event(FromOrchestrator::BackfillSyncFinished(
3920 ControlFlow::Continue { block_number: main_chain_backfill_target.number },
3921 )))
3922 .unwrap();
3923
3924 test_harness.fcu_to(main_chain_last_hash, ForkchoiceStatus::Syncing).await;
3926
3927 let event = test_harness.from_tree_rx.recv().await.unwrap();
3928 match event {
3929 EngineApiEvent::Download(DownloadRequest::BlockSet(target_hash)) => {
3930 assert_eq!(target_hash, HashSet::from_iter([main_chain_last_hash]));
3931 }
3932 _ => panic!("Unexpected event: {:#?}", event),
3933 }
3934
3935 test_harness
3937 .tree
3938 .on_engine_message(FromEngine::DownloadedBlocks(vec![main_chain_last.clone()]))
3939 .unwrap();
3940
3941 let event = test_harness.from_tree_rx.recv().await.unwrap();
3943 match event {
3944 EngineApiEvent::Download(DownloadRequest::BlockRange(initial_hash, total_blocks)) => {
3945 assert_eq!(
3946 total_blocks,
3947 (main_chain.len() - MIN_BLOCKS_FOR_PIPELINE_RUN as usize - 2) as u64
3948 );
3949 assert_eq!(initial_hash, main_chain_last.parent_hash);
3950 }
3951 _ => panic!("Unexpected event: {:#?}", event),
3952 }
3953
3954 let remaining: Vec<_> = main_chain
3955 .clone()
3956 .drain((MIN_BLOCKS_FOR_PIPELINE_RUN + 1) as usize..main_chain.len())
3957 .collect();
3958
3959 test_harness.setup_range_insertion_for_valid_chain(remaining.clone());
3960
3961 test_harness
3963 .tree
3964 .on_engine_message(FromEngine::DownloadedBlocks(remaining.clone()))
3965 .unwrap();
3966
3967 test_harness.check_canon_chain_insertion(remaining).await;
3968
3969 test_harness.check_canon_commit(main_chain_last_hash).await;
3971
3972 test_harness.check_canon_head(main_chain_last_hash);
3974 }
3975
3976 #[tokio::test]
3977 async fn test_engine_tree_live_sync_fcu_extends_canon_chain() {
3978 reth_tracing::init_test_tracing();
3979
3980 let chain_spec = MAINNET.clone();
3981 let mut test_harness = TestHarness::new(chain_spec.clone());
3982
3983 let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
3985 test_harness = test_harness.with_blocks(base_chain.clone());
3986
3987 test_harness
3989 .fcu_to(base_chain.last().unwrap().block().hash(), ForkchoiceStatus::Valid)
3990 .await;
3991
3992 let main_chain = test_harness.block_builder.create_fork(base_chain[0].block(), 10);
3994 let target = main_chain.get(5).unwrap();
3996 let target_hash = target.hash();
3997 let main_last = main_chain.last().unwrap();
3998 let main_last_hash = main_last.hash();
3999
4000 test_harness.insert_chain(main_chain).await;
4002
4003 test_harness.send_fcu(target_hash, ForkchoiceStatus::Valid).await;
4005
4006 test_harness.check_canon_commit(target_hash).await;
4007 test_harness.check_fcu(target_hash, ForkchoiceStatus::Valid).await;
4008
4009 test_harness.send_fcu(main_last_hash, ForkchoiceStatus::Valid).await;
4011
4012 test_harness.check_canon_commit(main_last_hash).await;
4013 test_harness.check_fcu(main_last_hash, ForkchoiceStatus::Valid).await;
4014 test_harness.check_canon_head(main_last_hash);
4015 }
4016
4017 #[tokio::test]
4018 async fn test_engine_tree_valid_forks_with_older_canonical_head() {
4019 reth_tracing::init_test_tracing();
4020
4021 let chain_spec = MAINNET.clone();
4022 let mut test_harness = TestHarness::new(chain_spec.clone());
4023
4024 let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4026 test_harness = test_harness.with_blocks(base_chain.clone());
4027
4028 let old_head = base_chain.first().unwrap().block();
4029
4030 let extension_chain = test_harness.block_builder.create_fork(old_head, 5);
4032 let fork_block = extension_chain.last().unwrap().block.clone();
4033
4034 test_harness.setup_range_insertion_for_valid_chain(extension_chain.clone());
4035 test_harness.insert_chain(extension_chain).await;
4036
4037 test_harness.fcu_to(old_head.hash(), ForkchoiceStatus::Valid).await;
4039
4040 let chain_a = test_harness.block_builder.create_fork(&fork_block, 10);
4042 let chain_b = test_harness.block_builder.create_fork(&fork_block, 10);
4043
4044 test_harness.setup_range_insertion_for_valid_chain(chain_a.clone());
4046 for block in &chain_a {
4047 test_harness.send_new_payload(block.clone()).await;
4048 }
4049
4050 test_harness.check_canon_chain_insertion(chain_a.clone()).await;
4051
4052 test_harness.setup_range_insertion_for_valid_chain(chain_b.clone());
4054 for block in &chain_b {
4055 test_harness.send_new_payload(block.clone()).await;
4056 }
4057
4058 test_harness.check_canon_chain_insertion(chain_b.clone()).await;
4059
4060 let chain_b_tip_hash = chain_b.last().unwrap().hash();
4062 test_harness.send_fcu(chain_b_tip_hash, ForkchoiceStatus::Valid).await;
4063
4064 test_harness.check_canon_commit(chain_b_tip_hash).await;
4066
4067 test_harness.check_fcu(chain_b_tip_hash, ForkchoiceStatus::Valid).await;
4069
4070 test_harness.check_canon_head(chain_b_tip_hash);
4072
4073 assert!(test_harness.tree.is_fork(chain_a.last().unwrap().hash()).unwrap());
4075 }
4076
4077 #[tokio::test]
4078 async fn test_engine_tree_buffered_blocks_are_eventually_connected() {
4079 let chain_spec = MAINNET.clone();
4080 let mut test_harness = TestHarness::new(chain_spec.clone());
4081
4082 let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4083 test_harness = test_harness.with_blocks(base_chain.clone());
4084
4085 let side_chain =
4088 test_harness.block_builder.create_fork(base_chain.last().unwrap().block(), 2);
4089
4090 let buffered_block = side_chain.last().unwrap();
4092 let buffered_block_hash = buffered_block.hash();
4093
4094 test_harness.setup_range_insertion_for_valid_chain(vec![buffered_block.clone()]);
4095 test_harness.send_new_payload(buffered_block.clone()).await;
4096
4097 assert!(test_harness.tree.state.buffer.block(&buffered_block_hash).is_some());
4098
4099 let non_buffered_block = side_chain.first().unwrap();
4100 let non_buffered_block_hash = non_buffered_block.hash();
4101
4102 test_harness.setup_range_insertion_for_valid_chain(vec![non_buffered_block.clone()]);
4104 test_harness.send_new_payload(non_buffered_block.clone()).await;
4105 assert!(test_harness.tree.state.buffer.block(&non_buffered_block_hash).is_none());
4106
4107 assert!(test_harness.tree.state.buffer.block(&buffered_block_hash).is_none());
4109
4110 test_harness.check_canon_block_added(non_buffered_block_hash).await;
4112 test_harness.check_canon_block_added(buffered_block_hash).await;
4113 }
4114
4115 #[tokio::test]
4116 async fn test_engine_tree_valid_and_invalid_forks_with_older_canonical_head() {
4117 reth_tracing::init_test_tracing();
4118
4119 let chain_spec = MAINNET.clone();
4120 let mut test_harness = TestHarness::new(chain_spec.clone());
4121
4122 let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
4124 test_harness = test_harness.with_blocks(base_chain.clone());
4125
4126 let old_head = base_chain.first().unwrap().block();
4127
4128 let extension_chain = test_harness.block_builder.create_fork(old_head, 5);
4130 let fork_block = extension_chain.last().unwrap().block.clone();
4131 test_harness.insert_chain(extension_chain).await;
4132
4133 test_harness.fcu_to(old_head.hash(), ForkchoiceStatus::Valid).await;
4135
4136 let total_fork_elements = 10;
4138 let chain_a = test_harness.block_builder.create_fork(&fork_block, total_fork_elements);
4139 let chain_b = test_harness.block_builder.create_fork(&fork_block, total_fork_elements);
4140
4141 test_harness.setup_range_insertion_for_valid_chain(chain_b.clone());
4143 for block in &chain_b {
4144 test_harness.send_new_payload(block.clone()).await;
4145 test_harness.send_fcu(block.hash(), ForkchoiceStatus::Valid).await;
4146 test_harness.check_canon_block_added(block.hash()).await;
4147 test_harness.check_canon_commit(block.hash()).await;
4148 test_harness.check_fcu(block.hash(), ForkchoiceStatus::Valid).await;
4149 }
4150
4151 let invalid_index = 3;
4153 test_harness.setup_range_insertion_for_invalid_chain(chain_a.clone(), invalid_index);
4154 for block in &chain_a {
4155 test_harness.send_new_payload(block.clone()).await;
4156 }
4157
4158 test_harness
4161 .check_fork_chain_insertion(
4162 chain_a[..chain_a.len() - invalid_index - 1].iter().cloned(),
4163 )
4164 .await;
4165
4166 let chain_a_tip_hash = chain_a.last().unwrap().hash();
4168 test_harness.fcu_to(chain_a_tip_hash, ForkchoiceStatus::Invalid).await;
4169
4170 let chain_b_tip_hash = chain_b.last().unwrap().hash();
4172
4173 test_harness.check_canon_head(chain_b_tip_hash);
4175
4176 test_harness.check_canon_head(chain_b_tip_hash);
4178 }
4179
4180 #[tokio::test]
4181 async fn test_engine_tree_reorg_with_missing_ancestor_expecting_valid() {
4182 reth_tracing::init_test_tracing();
4183 let chain_spec = MAINNET.clone();
4184 let mut test_harness = TestHarness::new(chain_spec.clone());
4185
4186 let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..6).collect();
4187 test_harness = test_harness.with_blocks(base_chain.clone());
4188
4189 let side_chain =
4191 test_harness.block_builder.create_fork(base_chain.last().unwrap().block(), 15);
4192 let invalid_index = 9;
4193
4194 test_harness.setup_range_insertion_for_invalid_chain(side_chain.clone(), invalid_index);
4195
4196 for (index, block) in side_chain.iter().enumerate() {
4197 test_harness.send_new_payload(block.clone()).await;
4198
4199 if index < side_chain.len() - invalid_index - 1 {
4200 test_harness.send_fcu(block.block.hash(), ForkchoiceStatus::Valid).await;
4201 }
4202 }
4203
4204 let fork_tip_hash = side_chain.last().unwrap().hash();
4206 test_harness.send_fcu(fork_tip_hash, ForkchoiceStatus::Invalid).await;
4207 }
4208}