reth_engine_primitives/
message.rs1use crate::{BeaconOnNewPayloadError, EngineApiMessageVersion, EngineTypes, ForkchoiceStatus};
2use alloy_rpc_types_engine::{
3 ExecutionPayload, ExecutionPayloadSidecar, ForkChoiceUpdateResult, ForkchoiceState,
4 ForkchoiceUpdateError, ForkchoiceUpdated, PayloadId, PayloadStatus, PayloadStatusEnum,
5};
6use futures::{future::Either, FutureExt};
7use reth_errors::RethResult;
8use reth_payload_builder_primitives::PayloadBuilderError;
9use std::{
10 fmt::Display,
11 future::Future,
12 pin::Pin,
13 task::{ready, Context, Poll},
14};
15use tokio::sync::oneshot;
16
17#[must_use = "futures do nothing unless you `.await` or poll them"]
21#[derive(Debug)]
22pub struct OnForkChoiceUpdated {
23 forkchoice_status: ForkchoiceStatus,
28 fut: Either<futures::future::Ready<ForkChoiceUpdateResult>, PendingPayloadId>,
30}
31
32impl OnForkChoiceUpdated {
35 pub const fn forkchoice_status(&self) -> ForkchoiceStatus {
37 self.forkchoice_status
38 }
39
40 pub fn syncing() -> Self {
42 let status = PayloadStatus::from_status(PayloadStatusEnum::Syncing);
43 Self {
44 forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
45 fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
46 }
47 }
48
49 pub fn valid(status: PayloadStatus) -> Self {
52 Self {
53 forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
54 fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
55 }
56 }
57
58 pub fn with_invalid(status: PayloadStatus) -> Self {
61 Self {
62 forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status),
63 fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
64 }
65 }
66
67 pub fn invalid_state() -> Self {
70 Self {
71 forkchoice_status: ForkchoiceStatus::Invalid,
72 fut: Either::Left(futures::future::ready(Err(ForkchoiceUpdateError::InvalidState))),
73 }
74 }
75
76 pub fn invalid_payload_attributes() -> Self {
79 Self {
80 forkchoice_status: ForkchoiceStatus::Valid,
82 fut: Either::Left(futures::future::ready(Err(
83 ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes,
84 ))),
85 }
86 }
87
88 pub const fn updated_with_pending_payload_id(
90 payload_status: PayloadStatus,
91 pending_payload_id: oneshot::Receiver<Result<PayloadId, PayloadBuilderError>>,
92 ) -> Self {
93 Self {
94 forkchoice_status: ForkchoiceStatus::from_payload_status(&payload_status.status),
95 fut: Either::Right(PendingPayloadId {
96 payload_status: Some(payload_status),
97 pending_payload_id,
98 }),
99 }
100 }
101}
102
103impl Future for OnForkChoiceUpdated {
104 type Output = ForkChoiceUpdateResult;
105
106 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
107 self.get_mut().fut.poll_unpin(cx)
108 }
109}
110
111#[derive(Debug)]
114struct PendingPayloadId {
115 payload_status: Option<PayloadStatus>,
116 pending_payload_id: oneshot::Receiver<Result<PayloadId, PayloadBuilderError>>,
117}
118
119impl Future for PendingPayloadId {
120 type Output = ForkChoiceUpdateResult;
121
122 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
123 let this = self.get_mut();
124 let res = ready!(this.pending_payload_id.poll_unpin(cx));
125 match res {
126 Ok(Ok(payload_id)) => Poll::Ready(Ok(ForkchoiceUpdated {
127 payload_status: this.payload_status.take().expect("Polled after completion"),
128 payload_id: Some(payload_id),
129 })),
130 Err(_) | Ok(Err(_)) => {
131 Poll::Ready(Err(ForkchoiceUpdateError::UpdatedInvalidPayloadAttributes))
133 }
134 }
135 }
136}
137
138#[derive(Debug)]
141pub enum BeaconEngineMessage<Engine: EngineTypes> {
142 NewPayload {
144 payload: ExecutionPayload,
146 sidecar: ExecutionPayloadSidecar,
149 tx: oneshot::Sender<Result<PayloadStatus, BeaconOnNewPayloadError>>,
151 },
152 ForkchoiceUpdated {
154 state: ForkchoiceState,
156 payload_attrs: Option<Engine::PayloadAttributes>,
158 version: EngineApiMessageVersion,
160 tx: oneshot::Sender<RethResult<OnForkChoiceUpdated>>,
162 },
163 TransitionConfigurationExchanged,
165}
166
167impl<Engine: EngineTypes> Display for BeaconEngineMessage<Engine> {
168 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169 match self {
170 Self::NewPayload { payload, .. } => {
171 write!(
172 f,
173 "NewPayload(parent: {}, number: {}, hash: {})",
174 payload.parent_hash(),
175 payload.block_number(),
176 payload.block_hash()
177 )
178 }
179 Self::ForkchoiceUpdated { state, payload_attrs, .. } => {
180 write!(
183 f,
184 "ForkchoiceUpdated {{ state: {state:?}, has_payload_attributes: {} }}",
185 payload_attrs.is_some()
186 )
187 }
188 Self::TransitionConfigurationExchanged => {
189 write!(f, "TransitionConfigurationExchanged")
190 }
191 }
192 }
193}