1use alloy_consensus::{BlockHeader, Header};
2use alloy_eips::{merge::EPOCH_SLOTS, BlockNumHash};
3use alloy_primitives::{BlockNumber, B256};
4use alloy_rpc_types_engine::{
5 ExecutionPayload, ExecutionPayloadSidecar, ForkchoiceState, PayloadStatus, PayloadStatusEnum,
6 PayloadValidationError,
7};
8use futures::{stream::BoxStream, Future, StreamExt};
9use itertools::Either;
10use reth_blockchain_tree_api::{
11 error::{BlockchainTreeError, CanonicalError, InsertBlockError, InsertBlockErrorKind},
12 BlockStatus, BlockValidationKind, BlockchainTreeEngine, CanonicalOutcome, InsertPayloadOk,
13};
14use reth_engine_primitives::{
15 BeaconEngineMessage, BeaconOnNewPayloadError, EngineApiMessageVersion, EngineTypes,
16 ForkchoiceStateHash, ForkchoiceStateTracker, ForkchoiceStatus, OnForkChoiceUpdated,
17 PayloadTypes,
18};
19use reth_errors::{BlockValidationError, ProviderResult, RethError, RethResult};
20use reth_network_p2p::{
21 sync::{NetworkSyncUpdater, SyncState},
22 EthBlockClient,
23};
24use reth_node_types::{Block, BlockTy, HeaderTy, NodeTypesWithEngine};
25use reth_payload_builder::PayloadBuilderHandle;
26use reth_payload_builder_primitives::PayloadBuilder;
27use reth_payload_primitives::{PayloadAttributes, PayloadBuilderAttributes};
28use reth_payload_validator::ExecutionPayloadValidator;
29use reth_primitives::{Head, SealedBlock, SealedHeader};
30use reth_provider::{
31 providers::{ProviderNodeTypes, TreeNodeTypes},
32 BlockIdReader, BlockReader, BlockSource, CanonChainTracker, ChainSpecProvider, ProviderError,
33 StageCheckpointReader,
34};
35use reth_stages_api::{ControlFlow, Pipeline, PipelineTarget, StageId};
36use reth_tasks::TaskSpawner;
37use reth_tokio_util::EventSender;
38use std::{
39 pin::Pin,
40 sync::Arc,
41 task::{Context, Poll},
42 time::{Duration, Instant},
43};
44use tokio::sync::{
45 mpsc::{self, UnboundedSender},
46 oneshot,
47};
48use tokio_stream::wrappers::UnboundedReceiverStream;
49use tracing::*;
50
51mod error;
52pub use error::{BeaconConsensusEngineError, BeaconEngineResult, BeaconForkChoiceUpdateError};
53
54mod invalid_headers;
55pub use invalid_headers::InvalidHeaderCache;
56
57mod event;
58pub use event::{BeaconConsensusEngineEvent, ConsensusEngineLiveSyncProgress};
59
60mod handle;
61pub use handle::BeaconConsensusEngineHandle;
62
63mod metrics;
64use metrics::EngineMetrics;
65
66pub mod sync;
67use sync::{EngineSyncController, EngineSyncEvent};
68
69pub mod hooks;
72use hooks::{EngineHookContext, EngineHookEvent, EngineHooks, EngineHooksController, PolledHook};
73
74#[cfg(test)]
75pub mod test_utils;
76
77const MAX_INVALID_HEADERS: u32 = 512u32;
79
80pub const MIN_BLOCKS_FOR_PIPELINE_RUN: u64 = EPOCH_SLOTS;
86
87pub trait EngineNodeTypes: ProviderNodeTypes + NodeTypesWithEngine {}
89
90impl<T> EngineNodeTypes for T where T: ProviderNodeTypes + NodeTypesWithEngine {}
91
92type PendingForkchoiceUpdate<PayloadAttributes> =
102 (ForkchoiceState, Option<PayloadAttributes>, oneshot::Sender<RethResult<OnForkChoiceUpdated>>);
103
104#[must_use = "Future does nothing unless polled"]
171#[allow(missing_debug_implementations)]
172pub struct BeaconConsensusEngine<N, BT, Client>
173where
174 N: EngineNodeTypes,
175 Client: EthBlockClient,
176 BT: BlockchainTreeEngine
177 + BlockReader
178 + BlockIdReader
179 + CanonChainTracker
180 + StageCheckpointReader,
181{
182 sync: EngineSyncController<N, Client>,
184 blockchain: BT,
186 sync_state_updater: Box<dyn NetworkSyncUpdater>,
188 engine_message_stream: BoxStream<'static, BeaconEngineMessage<N::Engine>>,
190 handle: BeaconConsensusEngineHandle<N::Engine>,
192 forkchoice_state_tracker: ForkchoiceStateTracker,
194 payload_builder: PayloadBuilderHandle<N::Engine>,
196 payload_validator: ExecutionPayloadValidator<N::ChainSpec>,
198 blockchain_tree_action: Option<BlockchainTreeAction<N::Engine>>,
200 pending_forkchoice_update:
205 Option<PendingForkchoiceUpdate<<N::Engine as PayloadTypes>::PayloadAttributes>>,
206 invalid_headers: InvalidHeaderCache,
209 pipeline_run_threshold: u64,
221 hooks: EngineHooksController,
222 event_sender: EventSender<BeaconConsensusEngineEvent>,
224 metrics: EngineMetrics,
226}
227
228impl<N, BT, Client> BeaconConsensusEngine<N, BT, Client>
229where
230 N: TreeNodeTypes,
231 BT: BlockchainTreeEngine
232 + BlockReader<Block = BlockTy<N>, Header = HeaderTy<N>>
233 + BlockIdReader
234 + CanonChainTracker<Header = HeaderTy<N>>
235 + StageCheckpointReader
236 + ChainSpecProvider<ChainSpec = N::ChainSpec>
237 + 'static,
238 Client: EthBlockClient + 'static,
239{
240 #[allow(clippy::too_many_arguments)]
242 pub fn new(
243 client: Client,
244 pipeline: Pipeline<N>,
245 blockchain: BT,
246 task_spawner: Box<dyn TaskSpawner>,
247 sync_state_updater: Box<dyn NetworkSyncUpdater>,
248 max_block: Option<BlockNumber>,
249 payload_builder: PayloadBuilderHandle<N::Engine>,
250 target: Option<B256>,
251 pipeline_run_threshold: u64,
252 hooks: EngineHooks,
253 ) -> RethResult<(Self, BeaconConsensusEngineHandle<N::Engine>)> {
254 let (to_engine, rx) = mpsc::unbounded_channel();
255 Self::with_channel(
256 client,
257 pipeline,
258 blockchain,
259 task_spawner,
260 sync_state_updater,
261 max_block,
262 payload_builder,
263 target,
264 pipeline_run_threshold,
265 to_engine,
266 Box::pin(UnboundedReceiverStream::from(rx)),
267 hooks,
268 )
269 }
270
271 #[allow(clippy::too_many_arguments)]
285 pub fn with_channel(
286 client: Client,
287 pipeline: Pipeline<N>,
288 blockchain: BT,
289 task_spawner: Box<dyn TaskSpawner>,
290 sync_state_updater: Box<dyn NetworkSyncUpdater>,
291 max_block: Option<BlockNumber>,
292 payload_builder: PayloadBuilderHandle<N::Engine>,
293 target: Option<B256>,
294 pipeline_run_threshold: u64,
295 to_engine: UnboundedSender<BeaconEngineMessage<N::Engine>>,
296 engine_message_stream: BoxStream<'static, BeaconEngineMessage<N::Engine>>,
297 hooks: EngineHooks,
298 ) -> RethResult<(Self, BeaconConsensusEngineHandle<N::Engine>)> {
299 let event_sender = EventSender::default();
300 let handle = BeaconConsensusEngineHandle::new(to_engine, event_sender.clone());
301 let sync = EngineSyncController::new(
302 pipeline,
303 client,
304 task_spawner.clone(),
305 max_block,
306 blockchain.chain_spec(),
307 event_sender.clone(),
308 );
309 let mut this = Self {
310 sync,
311 payload_validator: ExecutionPayloadValidator::new(blockchain.chain_spec()),
312 blockchain,
313 sync_state_updater,
314 engine_message_stream,
315 handle: handle.clone(),
316 forkchoice_state_tracker: Default::default(),
317 payload_builder,
318 invalid_headers: InvalidHeaderCache::new(MAX_INVALID_HEADERS),
319 blockchain_tree_action: None,
320 pending_forkchoice_update: None,
321 pipeline_run_threshold,
322 hooks: EngineHooksController::new(hooks),
323 event_sender,
324 metrics: EngineMetrics::default(),
325 };
326
327 let maybe_pipeline_target = match target {
328 target @ Some(_) => target,
330 None => this.check_pipeline_consistency()?,
331 };
332
333 if let Some(target) = maybe_pipeline_target {
334 this.sync.set_pipeline_sync_target(target.into());
335 }
336
337 Ok((this, handle))
338 }
339
340 fn current_engine_hook_context(&self) -> RethResult<EngineHookContext> {
342 Ok(EngineHookContext {
343 tip_block_number: self.blockchain.canonical_tip().number,
344 finalized_block_number: self
345 .blockchain
346 .finalized_block_number()
347 .map_err(RethError::Provider)?,
348 })
349 }
350
351 fn set_blockchain_tree_action(&mut self, action: BlockchainTreeAction<N::Engine>) {
353 let previous_action = self.blockchain_tree_action.replace(action);
354 debug_assert!(previous_action.is_none(), "Pre-existing action found");
355 }
356
357 fn pre_validate_forkchoice_update(
362 &mut self,
363 state: ForkchoiceState,
364 ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
365 if state.head_block_hash.is_zero() {
366 return Ok(Some(OnForkChoiceUpdated::invalid_state()))
367 }
368
369 let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
372 if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
373 return Ok(Some(OnForkChoiceUpdated::with_invalid(status)))
374 }
375
376 if self.sync.is_pipeline_active() {
377 trace!(target: "consensus::engine", "Pipeline is syncing, skipping forkchoice update");
380 return Ok(Some(OnForkChoiceUpdated::syncing()))
381 }
382
383 Ok(None)
384 }
385
386 fn on_forkchoice_updated_make_canonical_result(
392 &mut self,
393 state: ForkchoiceState,
394 mut attrs: Option<<N::Engine as PayloadTypes>::PayloadAttributes>,
395 make_canonical_result: Result<CanonicalOutcome, CanonicalError>,
396 elapsed: Duration,
397 ) -> Result<OnForkChoiceUpdated, CanonicalError> {
398 match make_canonical_result {
399 Ok(outcome) => {
400 let should_update_head = match &outcome {
401 CanonicalOutcome::AlreadyCanonical { head, header } => {
402 self.on_head_already_canonical(head, header, &mut attrs)
403 }
404 CanonicalOutcome::Committed { head } => {
405 debug!(target: "consensus::engine", hash=?state.head_block_hash, number=head.number, "Canonicalized new head");
407 true
408 }
409 };
410
411 if should_update_head {
412 let head = outcome.header();
413 let _ = self.update_head(head.clone());
414 self.event_sender.notify(BeaconConsensusEngineEvent::CanonicalChainCommitted(
415 Box::new(head.clone()),
416 elapsed,
417 ));
418 }
419
420 let on_updated = if let Some(invalid_fcu_response) =
422 self.ensure_consistent_forkchoice_state(state)?
423 {
424 trace!(target: "consensus::engine", ?state, "Forkchoice state is inconsistent");
425 invalid_fcu_response
426 } else if let Some(attrs) = attrs {
427 let head = outcome.into_header().unseal();
429 self.process_payload_attributes(
430 attrs,
431 head,
432 state,
433 EngineApiMessageVersion::default(),
434 )
435 } else {
436 OnForkChoiceUpdated::valid(PayloadStatus::new(
437 PayloadStatusEnum::Valid,
438 Some(state.head_block_hash),
439 ))
440 };
441 Ok(on_updated)
442 }
443 Err(err) => {
444 if err.is_fatal() {
445 error!(target: "consensus::engine", %err, "Encountered fatal error");
446 Err(err)
447 } else {
448 Ok(OnForkChoiceUpdated::valid(
449 self.on_failed_canonical_forkchoice_update(&state, err)?,
450 ))
451 }
452 }
453 }
454 }
455
456 fn on_head_already_canonical(
460 &self,
461 head: &BlockNumHash,
462 header: &SealedHeader,
463 attrs: &mut Option<<N::Engine as PayloadTypes>::PayloadAttributes>,
464 ) -> bool {
465 #[cfg(feature = "optimism")]
467 if reth_chainspec::EthChainSpec::is_optimism(&self.blockchain.chain_spec()) {
468 debug!(
469 target: "consensus::engine",
470 fcu_head_num=?header.number,
471 current_head_num=?head.number,
472 "[Optimism] Allowing beacon reorg to old head"
473 );
474 return true
475 }
476
477 if head != &header.num_hash() {
484 attrs.take();
485 }
486
487 debug!(
488 target: "consensus::engine",
489 fcu_head_num=?header.number,
490 current_head_num=?head.number,
491 "Ignoring beacon update to old head"
492 );
493 false
494 }
495
496 fn on_forkchoice_updated(
505 &mut self,
506 state: ForkchoiceState,
507 attrs: Option<<N::Engine as PayloadTypes>::PayloadAttributes>,
508 tx: oneshot::Sender<RethResult<OnForkChoiceUpdated>>,
509 ) {
510 self.metrics.forkchoice_updated_messages.increment(1);
511 self.blockchain.on_forkchoice_update_received(&state);
512 trace!(target: "consensus::engine", ?state, "Received new forkchoice state update");
513
514 match self.pre_validate_forkchoice_update(state) {
515 Ok(on_updated_result) => {
516 if let Some(on_updated) = on_updated_result {
517 self.on_forkchoice_updated_status(state, on_updated, tx);
520 } else if let Some(hook) = self.hooks.active_db_write_hook() {
521 let replaced_pending =
525 self.pending_forkchoice_update.replace((state, attrs, tx));
526 warn!(
527 target: "consensus::engine",
528 hook = %hook.name(),
529 head_block_hash = ?state.head_block_hash,
530 safe_block_hash = ?state.safe_block_hash,
531 finalized_block_hash = ?state.finalized_block_hash,
532 replaced_pending = ?replaced_pending.map(|(state, _, _)| state),
533 "Hook is in progress, delaying forkchoice update. \
534 This may affect the performance of your node as a validator."
535 );
536 } else {
537 self.set_blockchain_tree_action(
538 BlockchainTreeAction::MakeForkchoiceHeadCanonical { state, attrs, tx },
539 );
540 }
541 }
542 Err(error) => {
543 let _ = tx.send(Err(error.into()));
544 }
545 }
546 }
547
548 fn on_forkchoice_updated_status(
552 &mut self,
553 state: ForkchoiceState,
554 on_updated: OnForkChoiceUpdated,
555 tx: oneshot::Sender<RethResult<OnForkChoiceUpdated>>,
556 ) {
557 let status = on_updated.forkchoice_status();
559 let _ = tx.send(Ok(on_updated));
560
561 self.forkchoice_state_tracker.set_latest(state, status);
563
564 match status {
565 ForkchoiceStatus::Invalid => {}
566 ForkchoiceStatus::Valid => {
567 self.sync_state_updater.update_sync_state(SyncState::Idle);
569 self.sync.clear_block_download_requests();
571 }
572 ForkchoiceStatus::Syncing => {
573 self.sync_state_updater.update_sync_state(SyncState::Syncing);
575 }
576 }
577
578 self.event_sender.notify(BeaconConsensusEngineEvent::ForkchoiceUpdated(state, status));
580 }
581
582 fn check_pipeline_consistency(&self) -> RethResult<Option<B256>> {
594 let first_stage_checkpoint = self
597 .blockchain
598 .get_stage_checkpoint(*StageId::ALL.first().unwrap())?
599 .unwrap_or_default()
600 .block_number;
601
602 for stage_id in StageId::ALL.iter().skip(1) {
605 let stage_checkpoint =
606 self.blockchain.get_stage_checkpoint(*stage_id)?.unwrap_or_default().block_number;
607
608 if stage_checkpoint < first_stage_checkpoint {
611 debug!(
612 target: "consensus::engine",
613 first_stage_checkpoint,
614 inconsistent_stage_id = %stage_id,
615 inconsistent_stage_checkpoint = stage_checkpoint,
616 "Pipeline sync progress is inconsistent"
617 );
618 return Ok(self.blockchain.block_hash(first_stage_checkpoint)?)
619 }
620 }
621
622 Ok(None)
623 }
624
625 pub fn handle(&self) -> BeaconConsensusEngineHandle<N::Engine> {
630 self.handle.clone()
631 }
632
633 #[inline]
638 const fn exceeds_pipeline_run_threshold(&self, local_tip: u64, block: u64) -> bool {
639 block > local_tip && block - local_tip > self.pipeline_run_threshold
640 }
641
642 fn can_pipeline_sync_to_finalized(
649 &self,
650 canonical_tip_num: u64,
651 target_block_number: u64,
652 downloaded_block: Option<BlockNumHash>,
653 ) -> Option<B256> {
654 let sync_target_state = self.forkchoice_state_tracker.sync_target_state();
655
656 let mut exceeds_pipeline_run_threshold =
658 self.exceeds_pipeline_run_threshold(canonical_tip_num, target_block_number);
659
660 if let Some(ref buffered_finalized) = sync_target_state
662 .as_ref()
663 .and_then(|state| self.blockchain.buffered_header_by_hash(state.finalized_block_hash))
664 {
665 exceeds_pipeline_run_threshold =
668 self.exceeds_pipeline_run_threshold(canonical_tip_num, buffered_finalized.number);
669 }
670
671 if let (Some(downloaded_block), Some(ref state)) = (downloaded_block, sync_target_state) {
674 if downloaded_block.hash == state.finalized_block_hash {
675 exceeds_pipeline_run_threshold =
677 self.exceeds_pipeline_run_threshold(canonical_tip_num, downloaded_block.number);
678 }
679 }
680
681 if exceeds_pipeline_run_threshold {
684 if let Some(state) = sync_target_state {
685 match self.blockchain.header_by_hash_or_number(state.finalized_block_hash.into()) {
688 Err(err) => {
689 warn!(target: "consensus::engine", %err, "Failed to get finalized block header");
690 }
691 Ok(None) => {
692 if !state.finalized_block_hash.is_zero() {
694 return Some(state.finalized_block_hash)
697 }
698
699 debug!(target: "consensus::engine", hash=?state.head_block_hash, "Setting head hash as an optimistic pipeline target.");
712 return Some(state.head_block_hash)
713 }
714 Ok(Some(_)) => {
715 }
718 }
719 }
720 }
721
722 None
723 }
724
725 #[inline]
728 const fn distance_from_local_tip(&self, local_tip: u64, block: u64) -> Option<u64> {
729 if block > local_tip {
730 Some(block - local_tip)
731 } else {
732 None
733 }
734 }
735
736 fn latest_valid_hash_for_invalid_payload(
747 &mut self,
748 parent_hash: B256,
749 ) -> ProviderResult<Option<B256>> {
750 if self.blockchain.find_block_by_hash(parent_hash, BlockSource::Any)?.is_some() {
752 return Ok(Some(parent_hash))
753 }
754
755 let mut current_hash = parent_hash;
758 let mut current_block = self.invalid_headers.get(¤t_hash);
759 while let Some(block) = current_block {
760 current_hash = block.parent;
761 current_block = self.invalid_headers.get(¤t_hash);
762
763 if current_block.is_none() &&
766 self.blockchain.find_block_by_hash(current_hash, BlockSource::Any)?.is_some()
767 {
768 return Ok(Some(current_hash))
769 }
770 }
771 Ok(None)
772 }
773
774 fn prepare_invalid_response(&mut self, mut parent_hash: B256) -> ProviderResult<PayloadStatus> {
778 if let Ok(Some(parent)) = self.blockchain.header_by_hash_or_number(parent_hash.into()) {
781 if !parent.is_zero_difficulty() {
782 parent_hash = B256::ZERO;
783 }
784 }
785
786 let valid_parent_hash = self.latest_valid_hash_for_invalid_payload(parent_hash)?;
787 Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
788 validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
789 })
790 .with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
791 }
792
793 fn check_invalid_ancestor_with_head(
799 &mut self,
800 check: B256,
801 head: B256,
802 ) -> ProviderResult<Option<PayloadStatus>> {
803 let Some(block) = self.invalid_headers.get(&check) else { return Ok(None) };
805
806 let status = self.prepare_invalid_response(block.parent)?;
808
809 self.invalid_headers.insert_with_invalid_ancestor(head, block);
811
812 Ok(Some(status))
813 }
814
815 fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
818 let Some(block) = self.invalid_headers.get(&head) else { return Ok(None) };
820
821 Ok(Some(self.prepare_invalid_response(block.parent)?))
823 }
824
825 fn record_make_canonical_latency(
830 &self,
831 start: Instant,
832 outcome: &Result<CanonicalOutcome, CanonicalError>,
833 ) -> Duration {
834 let elapsed = start.elapsed();
835 self.metrics.make_canonical_latency.record(elapsed);
836 match outcome {
837 Ok(CanonicalOutcome::AlreadyCanonical { .. }) => {
838 self.metrics.make_canonical_already_canonical_latency.record(elapsed)
839 }
840 Ok(CanonicalOutcome::Committed { .. }) => {
841 self.metrics.make_canonical_committed_latency.record(elapsed)
842 }
843 Err(_) => self.metrics.make_canonical_error_latency.record(elapsed),
844 }
845 elapsed
846 }
847
848 fn ensure_consistent_forkchoice_state(
857 &self,
858 state: ForkchoiceState,
859 ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
860 if !state.finalized_block_hash.is_zero() &&
866 !self.blockchain.is_canonical(state.finalized_block_hash)?
867 {
868 return Ok(Some(OnForkChoiceUpdated::invalid_state()))
869 }
870
871 self.update_finalized_block(state.finalized_block_hash)?;
873
874 if !state.safe_block_hash.is_zero() &&
880 !self.blockchain.is_canonical(state.safe_block_hash)?
881 {
882 return Ok(Some(OnForkChoiceUpdated::invalid_state()))
883 }
884
885 self.update_safe_block(state.safe_block_hash)?;
887
888 Ok(None)
889 }
890
891 fn update_canon_chain(&self, head: SealedHeader, update: &ForkchoiceState) -> RethResult<()> {
900 self.update_head(head)?;
901 self.update_finalized_block(update.finalized_block_hash)?;
902 self.update_safe_block(update.safe_block_hash)?;
903 Ok(())
904 }
905
906 #[inline]
913 fn update_head(&self, head: SealedHeader) -> RethResult<()> {
914 let mut head_block = Head {
915 number: head.number,
916 hash: head.hash(),
917 difficulty: head.difficulty,
918 timestamp: head.timestamp,
919 total_difficulty: Default::default(),
921 };
922
923 self.blockchain.set_canonical_head(head);
925
926 head_block.total_difficulty =
927 self.blockchain.header_td_by_number(head_block.number)?.ok_or_else(|| {
928 RethError::Provider(ProviderError::TotalDifficultyNotFound(head_block.number))
929 })?;
930 self.sync_state_updater.update_status(head_block);
931
932 Ok(())
933 }
934
935 #[inline]
939 fn update_safe_block(&self, safe_block_hash: B256) -> ProviderResult<()> {
940 if !safe_block_hash.is_zero() {
941 if self.blockchain.safe_block_hash()? == Some(safe_block_hash) {
942 return Ok(())
944 }
945
946 let safe = self
947 .blockchain
948 .find_block_by_hash(safe_block_hash, BlockSource::Any)?
949 .ok_or(ProviderError::UnknownBlockHash(safe_block_hash))?;
950 self.blockchain.set_safe(SealedHeader::new(safe.split().0, safe_block_hash));
951 }
952 Ok(())
953 }
954
955 #[inline]
959 fn update_finalized_block(&self, finalized_block_hash: B256) -> ProviderResult<()> {
960 if !finalized_block_hash.is_zero() {
961 if self.blockchain.finalized_block_hash()? == Some(finalized_block_hash) {
962 return Ok(())
964 }
965
966 let finalized = self
967 .blockchain
968 .find_block_by_hash(finalized_block_hash, BlockSource::Any)?
969 .ok_or(ProviderError::UnknownBlockHash(finalized_block_hash))?;
970 self.blockchain.finalize_block(finalized.header().number())?;
971 self.blockchain
972 .set_finalized(SealedHeader::new(finalized.split().0, finalized_block_hash));
973 }
974 Ok(())
975 }
976
977 fn on_failed_canonical_forkchoice_update(
985 &mut self,
986 state: &ForkchoiceState,
987 error: CanonicalError,
988 ) -> ProviderResult<PayloadStatus> {
989 debug_assert!(self.sync.is_pipeline_idle(), "pipeline must be idle");
990
991 if let Some(invalid_ancestor) = self.check_invalid_ancestor(state.head_block_hash)? {
994 warn!(target: "consensus::engine", %error, ?state, ?invalid_ancestor, head=?state.head_block_hash, "Failed to canonicalize the head hash, head is also considered invalid");
995 debug!(target: "consensus::engine", head=?state.head_block_hash, current_error=%error, "Head was previously marked as invalid");
996 return Ok(invalid_ancestor)
997 }
998
999 match &error {
1000 CanonicalError::Validation(BlockValidationError::BlockPreMerge { .. }) => {
1001 warn!(target: "consensus::engine", %error, ?state, "Failed to canonicalize the head hash");
1002 return Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
1003 validation_error: error.to_string(),
1004 })
1005 .with_latest_valid_hash(B256::ZERO))
1006 }
1007 CanonicalError::BlockchainTree(BlockchainTreeError::BlockHashNotFoundInChain {
1008 ..
1009 }) => {
1010 }
1014 CanonicalError::OptimisticTargetRevert(block_number) => {
1015 self.sync.set_pipeline_sync_target(PipelineTarget::Unwind(*block_number));
1016 return Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing))
1017 }
1018 _ => {
1019 warn!(target: "consensus::engine", %error, ?state, "Failed to canonicalize the head hash");
1020 }
1023 }
1024
1025 let target = if self.forkchoice_state_tracker.is_empty() &&
1031 !state.safe_block_hash.is_zero() &&
1033 self.blockchain.block_number(state.safe_block_hash).ok().flatten().is_none()
1034 {
1035 state.safe_block_hash
1036 } else {
1037 state.head_block_hash
1038 };
1039
1040 let target = self.lowest_buffered_ancestor_or(target);
1042
1043 if self.pipeline_run_threshold == 0 {
1046 trace!(target: "consensus::engine", %target, "Triggering pipeline run to sync missing ancestors of the new head");
1048 self.sync.set_pipeline_sync_target(target.into());
1049 } else {
1050 trace!(target: "consensus::engine", request=%target, "Triggering full block download for missing ancestors of the new head");
1053 self.sync.download_full_block(target);
1054 }
1055
1056 debug!(target: "consensus::engine", %target, "Syncing to new target");
1057 Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing))
1058 }
1059
1060 fn lowest_buffered_ancestor_or(&self, hash: B256) -> B256 {
1067 self.blockchain
1068 .lowest_buffered_ancestor(hash)
1069 .map(|block| block.parent_hash)
1070 .unwrap_or_else(|| hash)
1071 }
1072
1073 #[instrument(level = "trace", skip(self, payload, sidecar), fields(block_hash = ?payload.block_hash(), block_number = %payload.block_number(), is_pipeline_idle = %self.sync.is_pipeline_idle()), target = "consensus::engine")]
1086 fn on_new_payload(
1087 &mut self,
1088 payload: ExecutionPayload,
1089 sidecar: ExecutionPayloadSidecar,
1090 ) -> Result<Either<PayloadStatus, SealedBlock>, BeaconOnNewPayloadError> {
1091 self.metrics.new_payload_messages.increment(1);
1092
1093 let parent_hash = payload.parent_hash();
1119 let block = match self.payload_validator.ensure_well_formed_payload(payload, sidecar) {
1120 Ok(block) => block,
1121 Err(error) => {
1122 error!(target: "consensus::engine", %error, "Invalid payload");
1123 let latest_valid_hash =
1126 if error.is_block_hash_mismatch() || error.is_invalid_versioned_hashes() {
1127 None
1131 } else {
1132 self.latest_valid_hash_for_invalid_payload(parent_hash)
1133 .map_err(BeaconOnNewPayloadError::internal)?
1134 };
1135
1136 let status = PayloadStatusEnum::from(error);
1137 return Ok(Either::Left(PayloadStatus::new(status, latest_valid_hash)))
1138 }
1139 };
1140
1141 let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block.hash());
1142 if lowest_buffered_ancestor == block.hash() {
1143 lowest_buffered_ancestor = block.parent_hash;
1144 }
1145
1146 if let Some(status) = self
1148 .check_invalid_ancestor_with_head(lowest_buffered_ancestor, block.hash())
1149 .map_err(BeaconOnNewPayloadError::internal)?
1150 {
1151 Ok(Either::Left(status))
1152 } else {
1153 Ok(Either::Right(block))
1154 }
1155 }
1156
1157 fn process_payload_attributes(
1162 &self,
1163 attrs: <N::Engine as PayloadTypes>::PayloadAttributes,
1164 head: Header,
1165 state: ForkchoiceState,
1166 version: EngineApiMessageVersion,
1167 ) -> OnForkChoiceUpdated {
1168 if attrs.timestamp() <= head.timestamp {
1174 return OnForkChoiceUpdated::invalid_payload_attributes()
1175 }
1176
1177 match <<N:: Engine as PayloadTypes>::PayloadBuilderAttributes as PayloadBuilderAttributes>::try_new(
1182 state.head_block_hash,
1183 attrs,
1184 version as u8
1185 ) {
1186 Ok(attributes) => {
1187 let pending_payload_id = self.payload_builder.send_new_payload(attributes);
1190
1191 OnForkChoiceUpdated::updated_with_pending_payload_id(
1203 PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)),
1204 pending_payload_id,
1205 )
1206 }
1207 Err(_) => OnForkChoiceUpdated::invalid_payload_attributes(),
1208 }
1209 }
1210
1211 #[instrument(level = "trace", skip_all, target = "consensus::engine", ret)]
1221 fn try_buffer_payload(
1222 &mut self,
1223 block: SealedBlock,
1224 ) -> Result<PayloadStatus, InsertBlockError> {
1225 self.blockchain.buffer_block_without_senders(block)?;
1226 Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing))
1227 }
1228
1229 #[instrument(level = "trace", skip_all, target = "consensus::engine", ret)]
1233 fn try_insert_new_payload(
1234 &mut self,
1235 block: SealedBlock,
1236 ) -> Result<PayloadStatus, InsertBlockError> {
1237 debug_assert!(self.sync.is_pipeline_idle(), "pipeline must be idle");
1238
1239 let block_hash = block.hash();
1240 let start = Instant::now();
1241 let status = self
1242 .blockchain
1243 .insert_block_without_senders(block.clone(), BlockValidationKind::Exhaustive)?;
1244
1245 let elapsed = start.elapsed();
1246 let mut latest_valid_hash = None;
1247 let status = match status {
1248 InsertPayloadOk::Inserted(BlockStatus::Valid(attachment)) => {
1249 latest_valid_hash = Some(block_hash);
1250 let block = Arc::new(block);
1251 let event = if attachment.is_canonical() {
1252 BeaconConsensusEngineEvent::CanonicalBlockAdded(block, elapsed)
1253 } else {
1254 BeaconConsensusEngineEvent::ForkBlockAdded(block, elapsed)
1255 };
1256 self.event_sender.notify(event);
1257 PayloadStatusEnum::Valid
1258 }
1259 InsertPayloadOk::AlreadySeen(BlockStatus::Valid(_)) => {
1260 latest_valid_hash = Some(block_hash);
1261 PayloadStatusEnum::Valid
1262 }
1263 InsertPayloadOk::Inserted(BlockStatus::Disconnected { .. }) |
1264 InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
1265 if let Some(status) =
1267 self.check_invalid_ancestor_with_head(block.parent_hash, block.hash()).map_err(
1268 |error| InsertBlockError::new(block, InsertBlockErrorKind::Provider(error)),
1269 )?
1270 {
1271 return Ok(status)
1272 }
1273
1274 PayloadStatusEnum::Syncing
1276 }
1277 };
1278 Ok(PayloadStatus::new(status, latest_valid_hash))
1279 }
1280
1281 fn on_disconnected_block(
1292 &mut self,
1293 downloaded_block: BlockNumHash,
1294 missing_parent: BlockNumHash,
1295 head: BlockNumHash,
1296 ) {
1297 if let Some(target) = self.can_pipeline_sync_to_finalized(
1299 head.number,
1300 missing_parent.number,
1301 Some(downloaded_block),
1302 ) {
1303 self.sync.set_pipeline_sync_target(target.into());
1306 return
1308 }
1309
1310 if let Some(distance) = self.distance_from_local_tip(head.number, missing_parent.number) {
1320 self.sync.download_block_range(missing_parent.hash, distance)
1321 } else {
1322 self.sync.download_full_block(missing_parent.hash);
1325 }
1326 }
1327
1328 fn try_make_sync_target_canonical(
1337 &mut self,
1338 inserted: BlockNumHash,
1339 ) -> Result<(), (B256, CanonicalError)> {
1340 let Some(target) = self.forkchoice_state_tracker.sync_target_state() else { return Ok(()) };
1341
1342 let start = Instant::now();
1346 let make_canonical_result = self.blockchain.make_canonical(target.head_block_hash);
1347 let elapsed = self.record_make_canonical_latency(start, &make_canonical_result);
1348 match make_canonical_result {
1349 Ok(outcome) => {
1350 if let CanonicalOutcome::Committed { head } = &outcome {
1351 self.event_sender.notify(BeaconConsensusEngineEvent::CanonicalChainCommitted(
1352 Box::new(head.clone()),
1353 elapsed,
1354 ));
1355 }
1356
1357 let new_head = outcome.into_header();
1358 debug!(target: "consensus::engine", hash=?new_head.hash(), number=new_head.number, "Canonicalized new head");
1359
1360 if let Err(err) = self.update_canon_chain(new_head, &target) {
1362 debug!(target: "consensus::engine", ?err, ?target, "Failed to update the canonical chain tracker");
1363 }
1364
1365 self.sync_state_updater.update_sync_state(SyncState::Idle);
1367
1368 self.sync.clear_block_download_requests();
1370 Ok(())
1371 }
1372 Err(err) => {
1373 if err.is_block_hash_not_found() {
1377 if let Some(target_hash) =
1382 ForkchoiceStateHash::find(&target, inserted.hash).filter(|h| !h.is_head())
1383 {
1384 let _ = self.blockchain.make_canonical(*target_hash.as_ref());
1386 }
1387 } else if let Some(block_number) = err.optimistic_revert_block_number() {
1388 self.sync.set_pipeline_sync_target(PipelineTarget::Unwind(block_number));
1389 }
1390
1391 Err((target.head_block_hash, err))
1392 }
1393 }
1394 }
1395
1396 fn on_sync_event(
1400 &mut self,
1401 event: EngineSyncEvent,
1402 ) -> Result<EngineEventOutcome, BeaconConsensusEngineError> {
1403 let outcome = match event {
1404 EngineSyncEvent::FetchedFullBlock(block) => {
1405 trace!(target: "consensus::engine", hash=?block.hash(), number=%block.number, "Downloaded full block");
1406 if self
1408 .check_invalid_ancestor_with_head(block.parent_hash, block.hash())
1409 .map_err(|error| BeaconConsensusEngineError::Common(error.into()))?
1410 .is_none()
1411 {
1412 self.set_blockchain_tree_action(
1413 BlockchainTreeAction::InsertDownloadedPayload { block },
1414 );
1415 }
1416 EngineEventOutcome::Processed
1417 }
1418 EngineSyncEvent::PipelineStarted(target) => {
1419 trace!(target: "consensus::engine", ?target, continuous = target.is_none(), "Started the pipeline");
1420 self.metrics.pipeline_runs.increment(1);
1421 self.sync_state_updater.update_sync_state(SyncState::Syncing);
1422 EngineEventOutcome::Processed
1423 }
1424 EngineSyncEvent::PipelineFinished { result, reached_max_block } => {
1425 trace!(target: "consensus::engine", ?result, ?reached_max_block, "Pipeline finished");
1426 let ctrl = result?;
1428 if reached_max_block {
1429 EngineEventOutcome::ReachedMaxBlock
1431 } else {
1432 self.on_pipeline_outcome(ctrl)?;
1433 EngineEventOutcome::Processed
1434 }
1435 }
1436 EngineSyncEvent::PipelineTaskDropped => {
1437 error!(target: "consensus::engine", "Failed to receive spawned pipeline");
1438 return Err(BeaconConsensusEngineError::PipelineChannelClosed)
1439 }
1440 };
1441
1442 Ok(outcome)
1443 }
1444
1445 fn on_pipeline_outcome(&mut self, ctrl: ControlFlow) -> RethResult<()> {
1450 if let ControlFlow::Unwind { bad_block, .. } = ctrl {
1452 warn!(target: "consensus::engine", invalid_num_hash=?bad_block.block, "Bad block detected in unwind");
1453 self.invalid_headers.insert(*bad_block);
1455 return Ok(())
1456 }
1457
1458 let sync_target_state = match self.forkchoice_state_tracker.sync_target_state() {
1459 Some(current_state) => current_state,
1460 None => {
1461 warn!(target: "consensus::engine", "No fork choice state available");
1464 return Ok(())
1465 }
1466 };
1467
1468 if sync_target_state.finalized_block_hash.is_zero() {
1469 self.set_canonical_head(ctrl.block_number().unwrap_or_default())?;
1470 self.blockchain.update_block_hashes_and_clear_buffered()?;
1471 self.blockchain.connect_buffered_blocks_to_canonical_hashes()?;
1472 return Ok(())
1474 }
1475
1476 let lowest_buffered_ancestor =
1488 self.lowest_buffered_ancestor_or(sync_target_state.head_block_hash);
1489
1490 if self
1493 .check_invalid_ancestor_with_head(
1494 lowest_buffered_ancestor,
1495 sync_target_state.head_block_hash,
1496 )?
1497 .is_some()
1498 {
1499 warn!(
1500 target: "consensus::engine",
1501 invalid_ancestor = %lowest_buffered_ancestor,
1502 head = %sync_target_state.head_block_hash,
1503 "Current head has an invalid ancestor"
1504 );
1505 return Ok(())
1506 }
1507
1508 let newest_finalized = self
1510 .blockchain
1511 .buffered_header_by_hash(sync_target_state.finalized_block_hash)
1512 .map(|header| header.number);
1513
1514 let pipeline_target =
1520 ctrl.block_number().zip(newest_finalized).and_then(|(progress, finalized_number)| {
1521 self.can_pipeline_sync_to_finalized(progress, finalized_number, None)
1525 });
1526
1527 if let Some(target) = pipeline_target {
1530 self.sync.set_pipeline_sync_target(target.into());
1532 } else if let Some(number) =
1533 self.blockchain.block_number(sync_target_state.finalized_block_hash)?
1534 {
1535 self.blockchain.connect_buffered_blocks_to_canonical_hashes_and_finalize(number).inspect_err(|error| {
1538 error!(target: "consensus::engine", %error, "Error restoring blockchain tree state");
1539 })?;
1540 } else {
1541 self.sync.set_pipeline_sync_target(sync_target_state.finalized_block_hash.into());
1544 }
1545
1546 Ok(())
1547 }
1548
1549 fn set_canonical_head(&self, max_block: BlockNumber) -> RethResult<()> {
1550 let max_header = self.blockchain.sealed_header(max_block)
1551 .inspect_err(|error| {
1552 error!(target: "consensus::engine", %error, "Error getting canonical header for continuous sync");
1553 })?
1554 .ok_or_else(|| ProviderError::HeaderNotFound(max_block.into()))?;
1555 self.blockchain.set_canonical_head(max_header);
1556
1557 Ok(())
1558 }
1559
1560 fn on_hook_result(&self, polled_hook: PolledHook) -> Result<(), BeaconConsensusEngineError> {
1561 if let EngineHookEvent::Finished(Err(error)) = &polled_hook.event {
1562 error!(
1563 target: "consensus::engine",
1564 name = %polled_hook.name,
1565 ?error,
1566 "Hook finished with error"
1567 )
1568 }
1569
1570 if polled_hook.db_access_level.is_read_write() {
1571 match polled_hook.event {
1572 EngineHookEvent::NotReady => {}
1573 EngineHookEvent::Started => {
1574 self.sync_state_updater.update_sync_state(SyncState::Syncing)
1578 }
1579 EngineHookEvent::Finished(_) => {
1580 self.sync_state_updater.update_sync_state(SyncState::Idle);
1584 if let Err(error) =
1587 self.blockchain.connect_buffered_blocks_to_canonical_hashes()
1588 {
1589 error!(target: "consensus::engine", %error, "Error connecting buffered blocks to canonical hashes on hook result");
1590 return Err(RethError::Canonical(error).into())
1591 }
1592 }
1593 }
1594 }
1595
1596 Ok(())
1597 }
1598
1599 fn on_blockchain_tree_action(
1603 &mut self,
1604 action: BlockchainTreeAction<N::Engine>,
1605 ) -> RethResult<EngineEventOutcome> {
1606 match action {
1607 BlockchainTreeAction::MakeForkchoiceHeadCanonical { state, attrs, tx } => {
1608 let start = Instant::now();
1609 let result = self.blockchain.make_canonical(state.head_block_hash);
1610 let elapsed = self.record_make_canonical_latency(start, &result);
1611 match self
1612 .on_forkchoice_updated_make_canonical_result(state, attrs, result, elapsed)
1613 {
1614 Ok(on_updated) => {
1615 trace!(target: "consensus::engine", status = ?on_updated, ?state, "Returning forkchoice status");
1616 let fcu_status = on_updated.forkchoice_status();
1617 self.on_forkchoice_updated_status(state, on_updated, tx);
1618
1619 if fcu_status.is_valid() {
1620 let tip_number = self.blockchain.canonical_tip().number;
1621 if self.sync.has_reached_max_block(tip_number) {
1622 return Ok(EngineEventOutcome::ReachedMaxBlock)
1625 }
1626 }
1627 }
1628 Err(error) => {
1629 let _ = tx.send(Err(RethError::Canonical(error.clone())));
1630 if error.is_fatal() {
1631 return Err(RethError::Canonical(error))
1632 }
1633 }
1634 };
1635 }
1636 BlockchainTreeAction::InsertNewPayload { block, tx } => {
1637 let block_hash = block.hash();
1638 let block_num_hash = block.num_hash();
1639 let result = if self.sync.is_pipeline_idle() {
1640 self.try_insert_new_payload(block)
1643 } else {
1644 self.try_buffer_payload(block)
1645 };
1646
1647 let status = match result {
1648 Ok(status) => status,
1649 Err(error) => {
1650 warn!(target: "consensus::engine", %error, "Error while processing payload");
1651
1652 let (block, error) = error.split();
1653 if !error.is_invalid_block() {
1654 let _ =
1656 tx.send(Err(BeaconOnNewPayloadError::Internal(Box::new(error))));
1657 return Ok(EngineEventOutcome::Processed)
1658 }
1659
1660 warn!(target: "consensus::engine", invalid_hash=?block.hash(), invalid_number=?block.number, %error, "Invalid block error on new payload");
1664 let latest_valid_hash = if error.is_block_pre_merge() {
1665 Some(B256::ZERO)
1667 } else {
1668 self.latest_valid_hash_for_invalid_payload(block.parent_hash)?
1669 };
1670 self.invalid_headers.insert(block.header.block_with_parent());
1672 PayloadStatus::new(
1673 PayloadStatusEnum::Invalid { validation_error: error.to_string() },
1674 latest_valid_hash,
1675 )
1676 }
1677 };
1678
1679 if status.is_valid() {
1680 if let Some(target) = self.forkchoice_state_tracker.sync_target_state() {
1681 if block_hash == target.head_block_hash {
1684 self.set_blockchain_tree_action(
1685 BlockchainTreeAction::MakeNewPayloadCanonical {
1686 payload_num_hash: block_num_hash,
1687 status,
1688 tx,
1689 },
1690 );
1691 return Ok(EngineEventOutcome::Processed)
1692 }
1693 }
1694 self.sync.cancel_full_block_request(block_hash);
1697 }
1698
1699 trace!(target: "consensus::engine", ?status, "Returning payload status");
1700 let _ = tx.send(Ok(status));
1701 }
1702 BlockchainTreeAction::MakeNewPayloadCanonical { payload_num_hash, status, tx } => {
1703 let status = match self.try_make_sync_target_canonical(payload_num_hash) {
1704 Ok(()) => status,
1705 Err((_hash, error)) => {
1706 if error.is_fatal() {
1707 let response =
1708 Err(BeaconOnNewPayloadError::Internal(Box::new(error.clone())));
1709 let _ = tx.send(response);
1710 return Err(RethError::Canonical(error))
1711 } else if error.optimistic_revert_block_number().is_some() {
1712 PayloadStatus::from_status(PayloadStatusEnum::Syncing)
1715 } else {
1716 PayloadStatus::new(
1719 PayloadStatusEnum::Invalid { validation_error: error.to_string() },
1720 self.forkchoice_state_tracker.last_valid_head(),
1723 )
1724 }
1725 }
1726 };
1727
1728 trace!(target: "consensus::engine", ?status, "Returning payload status");
1729 let _ = tx.send(Ok(status));
1730 }
1731
1732 BlockchainTreeAction::InsertDownloadedPayload { block } => {
1733 let downloaded_num_hash = block.num_hash();
1734 match self.blockchain.insert_block_without_senders(
1735 block,
1736 BlockValidationKind::SkipStateRootValidation,
1737 ) {
1738 Ok(status) => {
1739 match status {
1740 InsertPayloadOk::Inserted(BlockStatus::Valid(_)) => {
1741 if let Err((hash, error)) =
1745 self.try_make_sync_target_canonical(downloaded_num_hash)
1746 {
1747 if error.is_fatal() {
1748 error!(target: "consensus::engine", %error, "Encountered fatal error while making sync target canonical: {:?}, {:?}", error, hash);
1749 } else if !error.is_block_hash_not_found() {
1750 debug!(
1751 target: "consensus::engine",
1752 "Unexpected error while making sync target canonical: {:?}, {:?}",
1753 error,
1754 hash
1755 )
1756 }
1757 }
1758 }
1759 InsertPayloadOk::Inserted(BlockStatus::Disconnected {
1760 head,
1761 missing_ancestor: missing_parent,
1762 }) => {
1763 self.on_disconnected_block(
1766 downloaded_num_hash,
1767 missing_parent,
1768 head,
1769 );
1770 }
1771 _ => (),
1772 }
1773 }
1774 Err(err) => {
1775 warn!(target: "consensus::engine", %err, "Failed to insert downloaded block");
1776 if err.kind().is_invalid_block() {
1777 let (block, err) = err.split();
1778 warn!(target: "consensus::engine", invalid_number=?block.number, invalid_hash=?block.hash(), %err, "Marking block as invalid");
1779
1780 self.invalid_headers.insert(block.header.block_with_parent());
1781 }
1782 }
1783 }
1784 }
1785 };
1786 Ok(EngineEventOutcome::Processed)
1787 }
1788}
1789
1790impl<N, BT, Client> Future for BeaconConsensusEngine<N, BT, Client>
1798where
1799 N: TreeNodeTypes,
1800 Client: EthBlockClient + 'static,
1801 BT: BlockchainTreeEngine
1802 + BlockReader<Block = BlockTy<N>, Header = HeaderTy<N>>
1803 + BlockIdReader
1804 + CanonChainTracker<Header = HeaderTy<N>>
1805 + StageCheckpointReader
1806 + ChainSpecProvider<ChainSpec = N::ChainSpec>
1807 + Unpin
1808 + 'static,
1809{
1810 type Output = Result<(), BeaconConsensusEngineError>;
1811
1812 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1813 let this = self.get_mut();
1814
1815 'main: loop {
1817 loop {
1820 if let Poll::Ready(result) =
1823 this.hooks.poll_active_db_write_hook(cx, this.current_engine_hook_context()?)?
1824 {
1825 this.on_hook_result(result)?;
1826 continue
1827 }
1828
1829 if let Some(action) = this.blockchain_tree_action.take() {
1832 match this.on_blockchain_tree_action(action) {
1833 Ok(EngineEventOutcome::Processed) => {}
1834 Ok(EngineEventOutcome::ReachedMaxBlock) => return Poll::Ready(Ok(())),
1835 Err(error) => {
1836 error!(target: "consensus::engine", %error, "Encountered fatal error");
1837 return Poll::Ready(Err(error.into()))
1838 }
1839 };
1840
1841 continue
1843 }
1844
1845 if this.hooks.active_db_write_hook().is_none() {
1848 if let Some((state, attrs, tx)) = this.pending_forkchoice_update.take() {
1849 this.set_blockchain_tree_action(
1850 BlockchainTreeAction::MakeForkchoiceHeadCanonical { state, attrs, tx },
1851 );
1852 continue
1853 }
1854 }
1855
1856 if let Poll::Ready(Some(msg)) = this.engine_message_stream.poll_next_unpin(cx) {
1862 match msg {
1863 BeaconEngineMessage::ForkchoiceUpdated {
1864 state,
1865 payload_attrs,
1866 tx,
1867 version: _version,
1868 } => {
1869 this.on_forkchoice_updated(state, payload_attrs, tx);
1870 }
1871 BeaconEngineMessage::NewPayload { payload, sidecar, tx } => {
1872 match this.on_new_payload(payload, sidecar) {
1873 Ok(Either::Right(block)) => {
1874 this.set_blockchain_tree_action(
1875 BlockchainTreeAction::InsertNewPayload { block, tx },
1876 );
1877 }
1878 Ok(Either::Left(status)) => {
1879 let _ = tx.send(Ok(status));
1880 }
1881 Err(error) => {
1882 let _ = tx.send(Err(error));
1883 }
1884 }
1885 }
1886 BeaconEngineMessage::TransitionConfigurationExchanged => {
1887 this.blockchain.on_transition_configuration_exchanged();
1888 }
1889 }
1890 continue
1891 }
1892
1893 break
1896 }
1897
1898 if let Poll::Ready(sync_event) = this.sync.poll(cx) {
1900 match this.on_sync_event(sync_event)? {
1901 EngineEventOutcome::Processed => (),
1903 EngineEventOutcome::ReachedMaxBlock => return Poll::Ready(Ok(())),
1905 }
1906
1907 continue 'main
1910 }
1911
1912 if !this.forkchoice_state_tracker.is_latest_invalid() {
1918 if let Poll::Ready(result) = this.hooks.poll_next_hook(
1919 cx,
1920 this.current_engine_hook_context()?,
1921 this.sync.is_pipeline_active(),
1922 )? {
1923 this.on_hook_result(result)?;
1924
1925 continue 'main
1928 }
1929 }
1930
1931 return Poll::Pending
1934 }
1935 }
1936}
1937
1938enum BlockchainTreeAction<EngineT: EngineTypes> {
1939 MakeForkchoiceHeadCanonical {
1940 state: ForkchoiceState,
1941 attrs: Option<EngineT::PayloadAttributes>,
1942 tx: oneshot::Sender<RethResult<OnForkChoiceUpdated>>,
1943 },
1944 InsertNewPayload {
1945 block: SealedBlock,
1946 tx: oneshot::Sender<Result<PayloadStatus, BeaconOnNewPayloadError>>,
1947 },
1948 MakeNewPayloadCanonical {
1949 payload_num_hash: BlockNumHash,
1950 status: PayloadStatus,
1951 tx: oneshot::Sender<Result<PayloadStatus, BeaconOnNewPayloadError>>,
1952 },
1953 InsertDownloadedPayload { block: SealedBlock },
1974}
1975
1976#[derive(Debug)]
1978enum EngineEventOutcome {
1979 Processed,
1981 ReachedMaxBlock,
1983}
1984
1985#[cfg(test)]
1986mod tests {
1987 use super::*;
1988 use crate::{
1989 test_utils::{spawn_consensus_engine, TestConsensusEngineBuilder},
1990 BeaconForkChoiceUpdateError,
1991 };
1992 use alloy_rpc_types_engine::{ForkchoiceState, ForkchoiceUpdated, PayloadStatus};
1993 use assert_matches::assert_matches;
1994 use reth_chainspec::{ChainSpecBuilder, MAINNET};
1995 use reth_node_types::FullNodePrimitives;
1996 use reth_primitives::BlockExt;
1997 use reth_provider::{BlockWriter, ProviderFactory, StorageLocation};
1998 use reth_rpc_types_compat::engine::payload::block_to_payload_v1;
1999 use reth_stages::{ExecOutput, PipelineError, StageError};
2000 use reth_stages_api::StageCheckpoint;
2001 use reth_testing_utils::generators::{self, Rng};
2002 use std::{collections::VecDeque, sync::Arc};
2003 use tokio::sync::oneshot::error::TryRecvError;
2004
2005 #[tokio::test]
2007 async fn pipeline_error_is_propagated() {
2008 let mut rng = generators::rng();
2009 let chain_spec = Arc::new(
2010 ChainSpecBuilder::default()
2011 .chain(MAINNET.chain)
2012 .genesis(MAINNET.genesis.clone())
2013 .paris_activated()
2014 .build(),
2015 );
2016
2017 let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
2018 .with_pipeline_exec_outputs(VecDeque::from([Err(StageError::ChannelClosed)]))
2019 .disable_blockchain_tree_sync()
2020 .with_max_block(1)
2021 .build();
2022
2023 let res = spawn_consensus_engine(consensus_engine);
2024
2025 let _ = env
2026 .send_forkchoice_updated(ForkchoiceState {
2027 head_block_hash: rng.gen(),
2028 ..Default::default()
2029 })
2030 .await;
2031 assert_matches!(
2032 res.await,
2033 Ok(Err(BeaconConsensusEngineError::Pipeline(n))) if matches!(*n.as_ref(), PipelineError::Stage(StageError::ChannelClosed))
2034 );
2035 }
2036
2037 #[tokio::test]
2039 async fn is_idle_until_forkchoice_is_set() {
2040 let mut rng = generators::rng();
2041 let chain_spec = Arc::new(
2042 ChainSpecBuilder::default()
2043 .chain(MAINNET.chain)
2044 .genesis(MAINNET.genesis.clone())
2045 .paris_activated()
2046 .build(),
2047 );
2048
2049 let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
2050 .with_pipeline_exec_outputs(VecDeque::from([Err(StageError::ChannelClosed)]))
2051 .disable_blockchain_tree_sync()
2052 .with_max_block(1)
2053 .build();
2054
2055 let mut rx = spawn_consensus_engine(consensus_engine);
2056
2057 tokio::time::sleep(Duration::from_millis(100)).await;
2059 assert_matches!(rx.try_recv(), Err(TryRecvError::Empty));
2060
2061 let _ = env
2063 .send_new_payload(
2064 block_to_payload_v1(SealedBlock::default()),
2065 ExecutionPayloadSidecar::none(),
2066 )
2067 .await;
2068
2069 assert_matches!(rx.try_recv(), Err(TryRecvError::Empty));
2070
2071 let _ = env
2073 .send_forkchoice_updated(ForkchoiceState {
2074 head_block_hash: rng.gen(),
2075 ..Default::default()
2076 })
2077 .await;
2078 assert_matches!(rx.try_recv(), Err(TryRecvError::Empty));
2079
2080 loop {
2083 match rx.try_recv() {
2084 Ok(result) => {
2085 assert_matches!(
2086 result,
2087 Err(BeaconConsensusEngineError::Pipeline(n)) if matches!(*n.as_ref(), PipelineError::Stage(StageError::ChannelClosed))
2088 );
2089 break
2090 }
2091 Err(TryRecvError::Empty) => {
2092 let _ = env
2093 .send_forkchoice_updated(ForkchoiceState {
2094 head_block_hash: rng.gen(),
2095 ..Default::default()
2096 })
2097 .await;
2098 }
2099 Err(err) => panic!("receive error: {err}"),
2100 }
2101 }
2102 }
2103
2104 #[tokio::test]
2108 async fn runs_pipeline_again_if_tree_not_restored() {
2109 let mut rng = generators::rng();
2110 let chain_spec = Arc::new(
2111 ChainSpecBuilder::default()
2112 .chain(MAINNET.chain)
2113 .genesis(MAINNET.genesis.clone())
2114 .paris_activated()
2115 .build(),
2116 );
2117
2118 let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
2119 .with_pipeline_exec_outputs(VecDeque::from([
2120 Ok(ExecOutput { checkpoint: StageCheckpoint::new(1), done: true }),
2121 Err(StageError::ChannelClosed),
2122 ]))
2123 .disable_blockchain_tree_sync()
2124 .with_max_block(2)
2125 .build();
2126
2127 let rx = spawn_consensus_engine(consensus_engine);
2128
2129 let _ = env
2130 .send_forkchoice_updated(ForkchoiceState {
2131 head_block_hash: rng.gen(),
2132 finalized_block_hash: rng.gen(),
2133 ..Default::default()
2134 })
2135 .await;
2136
2137 assert_matches!(
2138 rx.await,
2139 Ok(Err(BeaconConsensusEngineError::Pipeline(n))) if matches!(*n.as_ref(), PipelineError::Stage(StageError::ChannelClosed))
2140 );
2141 }
2142
2143 #[tokio::test]
2144 async fn terminates_upon_reaching_max_block() {
2145 let mut rng = generators::rng();
2146 let max_block = 1000;
2147 let chain_spec = Arc::new(
2148 ChainSpecBuilder::default()
2149 .chain(MAINNET.chain)
2150 .genesis(MAINNET.genesis.clone())
2151 .paris_activated()
2152 .build(),
2153 );
2154
2155 let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
2156 .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
2157 checkpoint: StageCheckpoint::new(max_block),
2158 done: true,
2159 })]))
2160 .with_max_block(max_block)
2161 .disable_blockchain_tree_sync()
2162 .build();
2163
2164 let rx = spawn_consensus_engine(consensus_engine);
2165
2166 let _ = env
2167 .send_forkchoice_updated(ForkchoiceState {
2168 head_block_hash: rng.gen(),
2169 ..Default::default()
2170 })
2171 .await;
2172 assert_matches!(rx.await, Ok(Ok(())));
2173 }
2174
2175 fn insert_blocks<
2176 'a,
2177 N: ProviderNodeTypes<
2178 Primitives: FullNodePrimitives<
2179 BlockBody = reth_primitives::BlockBody,
2180 BlockHeader = reth_primitives::Header,
2181 >,
2182 >,
2183 >(
2184 provider_factory: ProviderFactory<N>,
2185 mut blocks: impl Iterator<Item = &'a SealedBlock>,
2186 ) {
2187 let provider = provider_factory.provider_rw().unwrap();
2188 blocks
2189 .try_for_each(|b| {
2190 provider
2191 .insert_block(
2192 b.clone().try_seal_with_senders().expect("invalid tx signature in block"),
2193 StorageLocation::Database,
2194 )
2195 .map(drop)
2196 })
2197 .expect("failed to insert");
2198 provider.commit().unwrap();
2199 }
2200
2201 mod fork_choice_updated {
2202 use super::*;
2203 use alloy_primitives::U256;
2204 use alloy_rpc_types_engine::ForkchoiceUpdateError;
2205 use generators::BlockParams;
2206 use reth_db::{tables, test_utils::create_test_static_files_dir, Database};
2207 use reth_db_api::transaction::DbTxMut;
2208 use reth_provider::{providers::StaticFileProvider, test_utils::MockNodeTypesWithDB};
2209 use reth_testing_utils::generators::random_block;
2210
2211 #[tokio::test]
2212 async fn empty_head() {
2213 let chain_spec = Arc::new(
2214 ChainSpecBuilder::default()
2215 .chain(MAINNET.chain)
2216 .genesis(MAINNET.genesis.clone())
2217 .paris_activated()
2218 .build(),
2219 );
2220
2221 let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
2222 .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
2223 checkpoint: StageCheckpoint::new(0),
2224 done: true,
2225 })]))
2226 .build();
2227
2228 let mut engine_rx = spawn_consensus_engine(consensus_engine);
2229
2230 let res = env.send_forkchoice_updated(ForkchoiceState::default()).await;
2231 assert_matches!(
2232 res,
2233 Err(BeaconForkChoiceUpdateError::ForkchoiceUpdateError(
2234 ForkchoiceUpdateError::InvalidState
2235 ))
2236 );
2237
2238 assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty));
2239 }
2240
2241 #[tokio::test]
2242 async fn valid_forkchoice() {
2243 let mut rng = generators::rng();
2244 let chain_spec = Arc::new(
2245 ChainSpecBuilder::default()
2246 .chain(MAINNET.chain)
2247 .genesis(MAINNET.genesis.clone())
2248 .paris_activated()
2249 .build(),
2250 );
2251
2252 let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
2253 .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
2254 checkpoint: StageCheckpoint::new(0),
2255 done: true,
2256 })]))
2257 .build();
2258
2259 let genesis = random_block(
2260 &mut rng,
2261 0,
2262 BlockParams { ommers_count: Some(0), ..Default::default() },
2263 );
2264 let block1 = random_block(
2265 &mut rng,
2266 1,
2267 BlockParams {
2268 parent: Some(genesis.hash()),
2269 ommers_count: Some(0),
2270 ..Default::default()
2271 },
2272 );
2273 let (_static_dir, static_dir_path) = create_test_static_files_dir();
2274
2275 insert_blocks(
2276 ProviderFactory::<MockNodeTypesWithDB>::new(
2277 env.db.clone(),
2278 chain_spec.clone(),
2279 StaticFileProvider::read_write(static_dir_path).unwrap(),
2280 ),
2281 [&genesis, &block1].into_iter(),
2282 );
2283 env.db
2284 .update(|tx| {
2285 tx.put::<tables::StageCheckpoints>(
2286 StageId::Finish.to_string(),
2287 StageCheckpoint::new(block1.number),
2288 )
2289 })
2290 .unwrap()
2291 .unwrap();
2292
2293 let mut engine_rx = spawn_consensus_engine(consensus_engine);
2294
2295 let forkchoice = ForkchoiceState {
2296 head_block_hash: block1.hash(),
2297 finalized_block_hash: block1.hash(),
2298 ..Default::default()
2299 };
2300
2301 let result = env.send_forkchoice_updated(forkchoice).await.unwrap();
2302 let expected_result = ForkchoiceUpdated::new(PayloadStatus::new(
2303 PayloadStatusEnum::Valid,
2304 Some(block1.hash()),
2305 ));
2306 assert_eq!(result, expected_result);
2307 assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty));
2308 }
2309
2310 #[tokio::test]
2311 async fn unknown_head_hash() {
2312 let mut rng = generators::rng();
2313
2314 let chain_spec = Arc::new(
2315 ChainSpecBuilder::default()
2316 .chain(MAINNET.chain)
2317 .genesis(MAINNET.genesis.clone())
2318 .paris_activated()
2319 .build(),
2320 );
2321
2322 let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
2323 .with_pipeline_exec_outputs(VecDeque::from([
2324 Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }),
2325 Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }),
2326 ]))
2327 .disable_blockchain_tree_sync()
2328 .build();
2329
2330 let genesis = random_block(
2331 &mut rng,
2332 0,
2333 BlockParams { ommers_count: Some(0), ..Default::default() },
2334 );
2335 let block1 = random_block(
2336 &mut rng,
2337 1,
2338 BlockParams { parent: Some(genesis.hash()), ..Default::default() },
2339 );
2340
2341 let (_static_dir, static_dir_path) = create_test_static_files_dir();
2342
2343 insert_blocks(
2344 ProviderFactory::<MockNodeTypesWithDB>::new(
2345 env.db.clone(),
2346 chain_spec.clone(),
2347 StaticFileProvider::read_write(static_dir_path).unwrap(),
2348 ),
2349 [&genesis, &block1].into_iter(),
2350 );
2351
2352 let mut engine_rx = spawn_consensus_engine(consensus_engine);
2353 let next_head = random_block(
2354 &mut rng,
2355 2,
2356 BlockParams {
2357 parent: Some(block1.hash()),
2358 ommers_count: Some(0),
2359 ..Default::default()
2360 },
2361 );
2362 let next_forkchoice_state = ForkchoiceState {
2363 head_block_hash: next_head.hash(),
2364 finalized_block_hash: block1.hash(),
2365 ..Default::default()
2366 };
2367
2368 let invalid_rx = env.send_forkchoice_updated(next_forkchoice_state).await;
2371 let (_static_dir, static_dir_path) = create_test_static_files_dir();
2372
2373 insert_blocks(
2375 ProviderFactory::<MockNodeTypesWithDB>::new(
2376 env.db.clone(),
2377 chain_spec.clone(),
2378 StaticFileProvider::read_write(static_dir_path).unwrap(),
2379 ),
2380 std::iter::once(&next_head),
2381 );
2382
2383 let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing);
2384 assert_matches!(invalid_rx, Ok(result) => assert_eq!(result, expected_result));
2385
2386 let result = env.send_forkchoice_retry_on_syncing(next_forkchoice_state).await.unwrap();
2387 let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Valid)
2388 .with_latest_valid_hash(next_head.hash());
2389 assert_eq!(result, expected_result);
2390
2391 assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty));
2392 }
2393
2394 #[tokio::test]
2395 async fn unknown_finalized_hash() {
2396 let mut rng = generators::rng();
2397 let chain_spec = Arc::new(
2398 ChainSpecBuilder::default()
2399 .chain(MAINNET.chain)
2400 .genesis(MAINNET.genesis.clone())
2401 .paris_activated()
2402 .build(),
2403 );
2404
2405 let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
2406 .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
2407 checkpoint: StageCheckpoint::new(0),
2408 done: true,
2409 })]))
2410 .disable_blockchain_tree_sync()
2411 .build();
2412
2413 let genesis = random_block(
2414 &mut rng,
2415 0,
2416 BlockParams { ommers_count: Some(0), ..Default::default() },
2417 );
2418 let block1 = random_block(
2419 &mut rng,
2420 1,
2421 BlockParams {
2422 parent: Some(genesis.hash()),
2423 ommers_count: Some(0),
2424 ..Default::default()
2425 },
2426 );
2427
2428 let (_static_dir, static_dir_path) = create_test_static_files_dir();
2429
2430 insert_blocks(
2431 ProviderFactory::<MockNodeTypesWithDB>::new(
2432 env.db.clone(),
2433 chain_spec.clone(),
2434 StaticFileProvider::read_write(static_dir_path).unwrap(),
2435 ),
2436 [&genesis, &block1].into_iter(),
2437 );
2438
2439 let engine = spawn_consensus_engine(consensus_engine);
2440
2441 let res = env
2442 .send_forkchoice_updated(ForkchoiceState {
2443 head_block_hash: rng.gen(),
2444 finalized_block_hash: block1.hash(),
2445 ..Default::default()
2446 })
2447 .await;
2448 let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing);
2449 assert_matches!(res, Ok(result) => assert_eq!(result, expected_result));
2450 drop(engine);
2451 }
2452
2453 #[tokio::test]
2454 async fn forkchoice_updated_pre_merge() {
2455 let mut rng = generators::rng();
2456 let chain_spec = Arc::new(
2457 ChainSpecBuilder::default()
2458 .chain(MAINNET.chain)
2459 .genesis(MAINNET.genesis.clone())
2460 .london_activated()
2461 .paris_at_ttd(U256::from(3))
2462 .build(),
2463 );
2464
2465 let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
2466 .with_pipeline_exec_outputs(VecDeque::from([
2467 Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }),
2468 Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }),
2469 ]))
2470 .build();
2471
2472 let genesis = random_block(
2473 &mut rng,
2474 0,
2475 BlockParams { ommers_count: Some(0), ..Default::default() },
2476 );
2477 let mut block1 = random_block(
2478 &mut rng,
2479 1,
2480 BlockParams {
2481 parent: Some(genesis.hash()),
2482 ommers_count: Some(0),
2483 ..Default::default()
2484 },
2485 );
2486 block1.header.set_difficulty(U256::from(1));
2487
2488 let mut block2 = random_block(
2490 &mut rng,
2491 1,
2492 BlockParams {
2493 parent: Some(genesis.hash()),
2494 ommers_count: Some(0),
2495 ..Default::default()
2496 },
2497 );
2498 block2.header.set_difficulty(U256::from(1));
2499
2500 let mut block3 = random_block(
2502 &mut rng,
2503 1,
2504 BlockParams {
2505 parent: Some(genesis.hash()),
2506 ommers_count: Some(0),
2507 ..Default::default()
2508 },
2509 );
2510 block3.header.set_difficulty(U256::from(1));
2511
2512 let (_static_dir, static_dir_path) = create_test_static_files_dir();
2513 insert_blocks(
2514 ProviderFactory::<MockNodeTypesWithDB>::new(
2515 env.db.clone(),
2516 chain_spec.clone(),
2517 StaticFileProvider::read_write(static_dir_path).unwrap(),
2518 ),
2519 [&genesis, &block1, &block2, &block3].into_iter(),
2520 );
2521
2522 let _engine = spawn_consensus_engine(consensus_engine);
2523
2524 let res = env
2525 .send_forkchoice_updated(ForkchoiceState {
2526 head_block_hash: block1.hash(),
2527 finalized_block_hash: block1.hash(),
2528 ..Default::default()
2529 })
2530 .await;
2531
2532 assert_matches!(res, Ok(result) => {
2533 let ForkchoiceUpdated { payload_status, .. } = result;
2534 assert_matches!(payload_status.status, PayloadStatusEnum::Invalid { .. });
2535 assert_eq!(payload_status.latest_valid_hash, Some(B256::ZERO));
2536 });
2537 }
2538
2539 #[tokio::test]
2540 async fn forkchoice_updated_invalid_pow() {
2541 let mut rng = generators::rng();
2542 let chain_spec = Arc::new(
2543 ChainSpecBuilder::default()
2544 .chain(MAINNET.chain)
2545 .genesis(MAINNET.genesis.clone())
2546 .london_activated()
2547 .build(),
2548 );
2549
2550 let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
2551 .with_pipeline_exec_outputs(VecDeque::from([
2552 Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }),
2553 Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }),
2554 ]))
2555 .build();
2556
2557 let genesis = random_block(
2558 &mut rng,
2559 0,
2560 BlockParams { ommers_count: Some(0), ..Default::default() },
2561 );
2562 let block1 = random_block(
2563 &mut rng,
2564 1,
2565 BlockParams {
2566 parent: Some(genesis.hash()),
2567 ommers_count: Some(0),
2568 ..Default::default()
2569 },
2570 );
2571
2572 let (_temp_dir, temp_dir_path) = create_test_static_files_dir();
2573
2574 insert_blocks(
2575 ProviderFactory::<MockNodeTypesWithDB>::new(
2576 env.db.clone(),
2577 chain_spec.clone(),
2578 StaticFileProvider::read_write(temp_dir_path).unwrap(),
2579 ),
2580 [&genesis, &block1].into_iter(),
2581 );
2582
2583 let _engine = spawn_consensus_engine(consensus_engine);
2584
2585 let res = env
2586 .send_forkchoice_updated(ForkchoiceState {
2587 head_block_hash: block1.hash(),
2588 finalized_block_hash: block1.hash(),
2589 ..Default::default()
2590 })
2591 .await;
2592 let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Invalid {
2593 validation_error: BlockValidationError::BlockPreMerge { hash: block1.hash() }
2594 .to_string(),
2595 })
2596 .with_latest_valid_hash(B256::ZERO);
2597 assert_matches!(res, Ok(result) => assert_eq!(result, expected_result));
2598 }
2599 }
2600
2601 mod new_payload {
2602 use super::*;
2603 use alloy_genesis::Genesis;
2604 use alloy_primitives::U256;
2605 use generators::BlockParams;
2606 use reth_db::test_utils::create_test_static_files_dir;
2607 use reth_primitives::EthereumHardfork;
2608 use reth_provider::{
2609 providers::StaticFileProvider,
2610 test_utils::{blocks::BlockchainTestData, MockNodeTypesWithDB},
2611 };
2612 use reth_testing_utils::{generators::random_block, GenesisAllocator};
2613 #[tokio::test]
2614 async fn new_payload_before_forkchoice() {
2615 let mut rng = generators::rng();
2616 let chain_spec = Arc::new(
2617 ChainSpecBuilder::default()
2618 .chain(MAINNET.chain)
2619 .genesis(MAINNET.genesis.clone())
2620 .paris_activated()
2621 .build(),
2622 );
2623
2624 let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
2625 .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
2626 checkpoint: StageCheckpoint::new(0),
2627 done: true,
2628 })]))
2629 .build();
2630
2631 let mut engine_rx = spawn_consensus_engine(consensus_engine);
2632
2633 let res = env
2635 .send_new_payload(
2636 block_to_payload_v1(random_block(
2637 &mut rng,
2638 0,
2639 BlockParams { ommers_count: Some(0), ..Default::default() },
2640 )),
2641 ExecutionPayloadSidecar::none(),
2642 )
2643 .await;
2644
2645 assert_matches!(res, Ok(result) => assert_matches!(result.status, PayloadStatusEnum::Invalid { .. }));
2647
2648 let res = env
2650 .send_new_payload(
2651 block_to_payload_v1(random_block(
2652 &mut rng,
2653 1,
2654 BlockParams { ommers_count: Some(0), ..Default::default() },
2655 )),
2656 ExecutionPayloadSidecar::none(),
2657 )
2658 .await;
2659
2660 let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Syncing);
2661 assert_matches!(res, Ok(result) => assert_eq!(result, expected_result));
2662
2663 assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty));
2664 }
2665
2666 #[tokio::test]
2667 async fn payload_known() {
2668 let mut rng = generators::rng();
2669 let chain_spec = Arc::new(
2670 ChainSpecBuilder::default()
2671 .chain(MAINNET.chain)
2672 .genesis(MAINNET.genesis.clone())
2673 .paris_activated()
2674 .build(),
2675 );
2676
2677 let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
2678 .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
2679 checkpoint: StageCheckpoint::new(0),
2680 done: true,
2681 })]))
2682 .build();
2683
2684 let genesis = random_block(
2685 &mut rng,
2686 0,
2687 BlockParams { ommers_count: Some(0), ..Default::default() },
2688 );
2689 let block1 = random_block(
2690 &mut rng,
2691 1,
2692 BlockParams {
2693 parent: Some(genesis.hash()),
2694 ommers_count: Some(0),
2695 ..Default::default()
2696 },
2697 );
2698 let block2 = random_block(
2699 &mut rng,
2700 2,
2701 BlockParams {
2702 parent: Some(block1.hash()),
2703 ommers_count: Some(0),
2704 ..Default::default()
2705 },
2706 );
2707
2708 let (_static_dir, static_dir_path) = create_test_static_files_dir();
2709 insert_blocks(
2710 ProviderFactory::<MockNodeTypesWithDB>::new(
2711 env.db.clone(),
2712 chain_spec.clone(),
2713 StaticFileProvider::read_write(static_dir_path).unwrap(),
2714 ),
2715 [&genesis, &block1, &block2].into_iter(),
2716 );
2717
2718 let mut engine_rx = spawn_consensus_engine(consensus_engine);
2719
2720 let res = env
2722 .send_forkchoice_updated(ForkchoiceState {
2723 head_block_hash: block1.hash(),
2724 finalized_block_hash: block1.hash(),
2725 ..Default::default()
2726 })
2727 .await;
2728 let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Valid)
2729 .with_latest_valid_hash(block1.hash());
2730 assert_matches!(res, Ok(ForkchoiceUpdated { payload_status, .. }) => assert_eq!(payload_status, expected_result));
2731
2732 let result = env
2734 .send_new_payload_retry_on_syncing(
2735 block_to_payload_v1(block2.clone()),
2736 ExecutionPayloadSidecar::none(),
2737 )
2738 .await
2739 .unwrap();
2740
2741 let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Valid)
2742 .with_latest_valid_hash(block2.hash());
2743 assert_eq!(result, expected_result);
2744 assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty));
2745 }
2746
2747 #[tokio::test]
2748 async fn simple_validate_block() {
2749 let mut rng = generators::rng();
2750 let amount = U256::from(1000000000000000000u64);
2751 let mut allocator = GenesisAllocator::default().with_rng(&mut rng);
2752 for _ in 0..16 {
2753 allocator.new_funded_account(amount);
2755 }
2756
2757 let alloc = allocator.build();
2758
2759 let genesis = Genesis::default().extend_accounts(alloc);
2760
2761 let chain_spec = Arc::new(
2762 ChainSpecBuilder::default()
2763 .chain(MAINNET.chain)
2764 .genesis(genesis)
2765 .shanghai_activated()
2766 .build(),
2767 );
2768
2769 let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
2770 .with_real_pipeline()
2771 .with_real_executor()
2772 .with_real_consensus()
2773 .build();
2774
2775 let genesis =
2776 SealedBlock { header: chain_spec.sealed_genesis_header(), ..Default::default() };
2777 let block1 = random_block(
2778 &mut rng,
2779 1,
2780 BlockParams {
2781 parent: Some(chain_spec.genesis_hash()),
2782 ommers_count: Some(0),
2783 ..Default::default()
2784 },
2785 );
2786
2787 let (_static_dir, static_dir_path) = create_test_static_files_dir();
2791
2792 insert_blocks(
2793 ProviderFactory::<MockNodeTypesWithDB>::new(
2794 env.db.clone(),
2795 chain_spec.clone(),
2796 StaticFileProvider::read_write(static_dir_path).unwrap(),
2797 ),
2798 [&genesis, &block1].into_iter(),
2799 );
2800
2801 let mut engine_rx = spawn_consensus_engine(consensus_engine);
2802
2803 let res = env
2805 .send_forkchoice_updated(ForkchoiceState {
2806 head_block_hash: block1.hash(),
2807 finalized_block_hash: block1.hash(),
2808 ..Default::default()
2809 })
2810 .await;
2811 let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Valid)
2812 .with_latest_valid_hash(block1.hash());
2813 assert_matches!(res, Ok(ForkchoiceUpdated { payload_status, .. }) => assert_eq!(payload_status, expected_result));
2814 assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty));
2815 }
2816
2817 #[tokio::test]
2818 async fn payload_parent_unknown() {
2819 let mut rng = generators::rng();
2820 let chain_spec = Arc::new(
2821 ChainSpecBuilder::default()
2822 .chain(MAINNET.chain)
2823 .genesis(MAINNET.genesis.clone())
2824 .paris_activated()
2825 .build(),
2826 );
2827
2828 let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
2829 .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
2830 checkpoint: StageCheckpoint::new(0),
2831 done: true,
2832 })]))
2833 .build();
2834 let genesis = random_block(
2835 &mut rng,
2836 0,
2837 BlockParams { ommers_count: Some(0), ..Default::default() },
2838 );
2839
2840 let (_static_dir, static_dir_path) = create_test_static_files_dir();
2841
2842 insert_blocks(
2843 ProviderFactory::<MockNodeTypesWithDB>::new(
2844 env.db.clone(),
2845 chain_spec.clone(),
2846 StaticFileProvider::read_write(static_dir_path).unwrap(),
2847 ),
2848 std::iter::once(&genesis),
2849 );
2850
2851 let mut engine_rx = spawn_consensus_engine(consensus_engine);
2852
2853 let res = env
2855 .send_forkchoice_updated(ForkchoiceState {
2856 head_block_hash: genesis.hash(),
2857 finalized_block_hash: genesis.hash(),
2858 ..Default::default()
2859 })
2860 .await;
2861 let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Valid)
2862 .with_latest_valid_hash(genesis.hash());
2863 assert_matches!(res, Ok(ForkchoiceUpdated { payload_status, .. }) => assert_eq!(payload_status, expected_result));
2864
2865 let parent = rng.gen();
2867 let block = random_block(
2868 &mut rng,
2869 2,
2870 BlockParams { parent: Some(parent), ommers_count: Some(0), ..Default::default() },
2871 );
2872 let res = env
2873 .send_new_payload(block_to_payload_v1(block), ExecutionPayloadSidecar::none())
2874 .await;
2875 let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Syncing);
2876 assert_matches!(res, Ok(result) => assert_eq!(result, expected_result));
2877
2878 assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty));
2879 }
2880
2881 #[tokio::test]
2882 async fn payload_pre_merge() {
2883 let data = BlockchainTestData::default();
2884 let mut block1 = data.blocks[0].0.block.clone();
2885 block1.header.set_difficulty(
2886 MAINNET.fork(EthereumHardfork::Paris).ttd().unwrap() - U256::from(1),
2887 );
2888 block1 = block1.unseal::<reth_primitives::Block>().seal_slow();
2889 let (block2, exec_result2) = data.blocks[1].clone();
2890 let mut block2 = block2.unseal().block;
2891 block2.body.withdrawals = None;
2892 block2.header.parent_hash = block1.hash();
2893 block2.header.base_fee_per_gas = Some(100);
2894 block2.header.difficulty = U256::ZERO;
2895 let block2 = block2.clone().seal_slow();
2896
2897 let chain_spec = Arc::new(
2898 ChainSpecBuilder::default()
2899 .chain(MAINNET.chain)
2900 .genesis(MAINNET.genesis.clone())
2901 .london_activated()
2902 .build(),
2903 );
2904
2905 let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone())
2906 .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
2907 checkpoint: StageCheckpoint::new(0),
2908 done: true,
2909 })]))
2910 .with_executor_results(Vec::from([exec_result2]))
2911 .build();
2912
2913 let (_static_dir, static_dir_path) = create_test_static_files_dir();
2914
2915 insert_blocks(
2916 ProviderFactory::<MockNodeTypesWithDB>::new(
2917 env.db.clone(),
2918 chain_spec.clone(),
2919 StaticFileProvider::read_write(static_dir_path).unwrap(),
2920 ),
2921 [&data.genesis, &block1].into_iter(),
2922 );
2923
2924 let mut engine_rx = spawn_consensus_engine(consensus_engine);
2925
2926 let res = env
2928 .send_forkchoice_updated(ForkchoiceState {
2929 head_block_hash: block1.hash(),
2930 finalized_block_hash: block1.hash(),
2931 ..Default::default()
2932 })
2933 .await;
2934
2935 let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2936 validation_error: BlockValidationError::BlockPreMerge { hash: block1.hash() }
2937 .to_string(),
2938 })
2939 .with_latest_valid_hash(B256::ZERO);
2940 assert_matches!(res, Ok(ForkchoiceUpdated { payload_status, .. }) => assert_eq!(payload_status, expected_result));
2941
2942 let result = env
2944 .send_new_payload_retry_on_syncing(
2945 block_to_payload_v1(block2.clone()),
2946 ExecutionPayloadSidecar::none(),
2947 )
2948 .await
2949 .unwrap();
2950
2951 let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Invalid {
2952 validation_error: BlockValidationError::BlockPreMerge { hash: block2.hash() }
2953 .to_string(),
2954 })
2955 .with_latest_valid_hash(B256::ZERO);
2956 assert_eq!(result, expected_result);
2957
2958 assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty));
2959 }
2960 }
2961}