reth_engine_primitives/
message.rs

1use crate::{BeaconOnNewPayloadError, EngineApiMessageVersion, EngineTypes, ForkchoiceStatus};
2use alloy_rpc_types_engine::{
3    ExecutionPayload, ExecutionPayloadSidecar, ForkChoiceUpdateResult, ForkchoiceState,
4    ForkchoiceUpdateError, ForkchoiceUpdated, PayloadId, PayloadStatus, PayloadStatusEnum,
5};
6use futures::{future::Either, FutureExt};
7use reth_errors::RethResult;
8use reth_payload_builder_primitives::PayloadBuilderError;
9use std::{
10    fmt::Display,
11    future::Future,
12    pin::Pin,
13    task::{ready, Context, Poll},
14};
15use tokio::sync::oneshot;
16
17/// Represents the outcome of forkchoice update.
18///
19/// This is a future that resolves to [`ForkChoiceUpdateResult`]
20#[must_use = "futures do nothing unless you `.await` or poll them"]
21#[derive(Debug)]
22pub struct OnForkChoiceUpdated {
23    /// Represents the status of the forkchoice update.
24    ///
25    /// Note: This is separate from the response `fut`, because we still can return an error
26    /// depending on the payload attributes, even if the forkchoice update itself is valid.
27    forkchoice_status: ForkchoiceStatus,
28    /// Returns the result of the forkchoice update.
29    fut: Either<futures::future::Ready<ForkChoiceUpdateResult>, PendingPayloadId>,
30}
31
32// === impl OnForkChoiceUpdated ===
33
34impl OnForkChoiceUpdated {
35    /// Returns the determined status of the received `ForkchoiceState`.
36    pub const fn forkchoice_status(&self) -> ForkchoiceStatus {
37        self.forkchoice_status
38    }
39
40    /// Creates a new instance of `OnForkChoiceUpdated` for the `SYNCING` state
41    pub fn syncing() -> Self {
42        let status = PayloadStatus::from_status(PayloadStatusEnum::Syncing);
43        Self {
44            forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
45            fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
46        }
47    }
48
49    /// Creates a new instance of `OnForkChoiceUpdated` if the forkchoice update succeeded and no
50    /// payload attributes were provided.
51    pub fn valid(status: PayloadStatus) -> Self {
52        Self {
53            forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
54            fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
55        }
56    }
57
58    /// Creates a new instance of `OnForkChoiceUpdated` with the given payload status, if the
59    /// forkchoice update failed due to an invalid payload.
60    pub fn with_invalid(status: PayloadStatus) -> Self {
61        Self {
62            forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
63            fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
64        }
65    }
66
67    /// Creates a new instance of `OnForkChoiceUpdated` if the forkchoice update failed because the
68    /// given state is considered invalid
69    pub fn invalid_state() -> Self {
70        Self {
71            forkchoice_status: ForkchoiceStatus::Invalid,
72            fut: Either::Left(futures::future::ready(Err(ForkchoiceUpdateError::InvalidState))),
73        }
74    }
75
76    /// Creates a new instance of `OnForkChoiceUpdated` if the forkchoice update was successful but
77    /// payload attributes were invalid.
78    pub fn invalid_payload_attributes() -> Self {
79        Self {
80            // This is valid because this is only reachable if the state and payload is valid
81            forkchoice_status: ForkchoiceStatus::Valid,
82            fut: Either::Left(futures::future::ready(Err(
83                ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes,
84            ))),
85        }
86    }
87
88    /// If the forkchoice update was successful and no payload attributes were provided, this method
89    pub const fn updated_with_pending_payload_id(
90        payload_status: PayloadStatus,
91        pending_payload_id: oneshot::Receiver<Result<PayloadId, PayloadBuilderError>>,
92    ) -> Self {
93        Self {
94            forkchoice_status: ForkchoiceStatus::from_payload_status(&payload_status.status),
95            fut: Either::Right(PendingPayloadId {
96                payload_status: Some(payload_status),
97                pending_payload_id,
98            }),
99        }
100    }
101}
102
103impl Future for OnForkChoiceUpdated {
104    type Output = ForkChoiceUpdateResult;
105
106    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
107        self.get_mut().fut.poll_unpin(cx)
108    }
109}
110
111/// A future that returns the payload id of a yet to be initiated payload job after a successful
112/// forkchoice update
113#[derive(Debug)]
114struct PendingPayloadId {
115    payload_status: Option<PayloadStatus>,
116    pending_payload_id: oneshot::Receiver<Result<PayloadId, PayloadBuilderError>>,
117}
118
119impl Future for PendingPayloadId {
120    type Output = ForkChoiceUpdateResult;
121
122    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
123        let this = self.get_mut();
124        let res = ready!(this.pending_payload_id.poll_unpin(cx));
125        match res {
126            Ok(Ok(payload_id)) => Poll::Ready(Ok(ForkchoiceUpdated {
127                payload_status: this.payload_status.take().expect("Polled after completion"),
128                payload_id: Some(payload_id),
129            })),
130            Err(_) | Ok(Err(_)) => {
131                // failed to initiate a payload build job
132                Poll::Ready(Err(ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes))
133            }
134        }
135    }
136}
137
138/// A message for the beacon engine from other components of the node (engine RPC API invoked by the
139/// consensus layer).
140#[derive(Debug)]
141pub enum BeaconEngineMessage<Engine: EngineTypes> {
142    /// Message with new payload.
143    NewPayload {
144        /// The execution payload received by Engine API.
145        payload: ExecutionPayload,
146        /// The execution payload sidecar with additional version-specific fields received by
147        /// engine API.
148        sidecar: ExecutionPayloadSidecar,
149        /// The sender for returning payload status result.
150        tx: oneshot::Sender<Result<PayloadStatus, BeaconOnNewPayloadError>>,
151    },
152    /// Message with updated forkchoice state.
153    ForkchoiceUpdated {
154        /// The updated forkchoice state.
155        state: ForkchoiceState,
156        /// The payload attributes for block building.
157        payload_attrs: Option<Engine::PayloadAttributes>,
158        /// The Engine API Version.
159        version: EngineApiMessageVersion,
160        /// The sender for returning forkchoice updated result.
161        tx: oneshot::Sender<RethResult<OnForkChoiceUpdated>>,
162    },
163    /// Message with exchanged transition configuration.
164    TransitionConfigurationExchanged,
165}
166
167impl<Engine: EngineTypes> Display for BeaconEngineMessage<Engine> {
168    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169        match self {
170            Self::NewPayload { payload, .. } => {
171                write!(
172                    f,
173                    "NewPayload(parent: {}, number: {}, hash: {})",
174                    payload.parent_hash(),
175                    payload.block_number(),
176                    payload.block_hash()
177                )
178            }
179            Self::ForkchoiceUpdated { state, payload_attrs, .. } => {
180                // we don't want to print the entire payload attributes, because for OP this
181                // includes all txs
182                write!(
183                    f,
184                    "ForkchoiceUpdated {{ state: {state:?}, has_payload_attributes: {} }}",
185                    payload_attrs.is_some()
186                )
187            }
188            Self::TransitionConfigurationExchanged => {
189                write!(f, "TransitionConfigurationExchanged")
190            }
191        }
192    }
193}