1use alloy_primitives::B256;
6use alloy_provider::{ext::EngineApi, Network};
7use alloy_rpc_types_engine::{
8 ExecutionPayload, ExecutionPayloadInputV2, ExecutionPayloadV1, ExecutionPayloadV3,
9 ForkchoiceState, ForkchoiceUpdated, PayloadAttributes, PayloadStatus,
10};
11use alloy_transport::{Transport, TransportResult};
12use reth_node_api::EngineApiMessageVersion;
13use tracing::error;
14
15#[async_trait::async_trait]
17pub trait EngineApiValidWaitExt<N, T>: Send + Sync {
18 async fn new_payload_v1_wait(
21 &self,
22 payload: ExecutionPayloadV1,
23 ) -> TransportResult<PayloadStatus>;
24
25 async fn new_payload_v2_wait(
28 &self,
29 payload: ExecutionPayloadInputV2,
30 ) -> TransportResult<PayloadStatus>;
31
32 async fn new_payload_v3_wait(
35 &self,
36 payload: ExecutionPayloadV3,
37 versioned_hashes: Vec<B256>,
38 parent_beacon_block_root: B256,
39 ) -> TransportResult<PayloadStatus>;
40
41 async fn fork_choice_updated_v1_wait(
44 &self,
45 fork_choice_state: ForkchoiceState,
46 payload_attributes: Option<PayloadAttributes>,
47 ) -> TransportResult<ForkchoiceUpdated>;
48
49 async fn fork_choice_updated_v2_wait(
52 &self,
53 fork_choice_state: ForkchoiceState,
54 payload_attributes: Option<PayloadAttributes>,
55 ) -> TransportResult<ForkchoiceUpdated>;
56
57 async fn fork_choice_updated_v3_wait(
60 &self,
61 fork_choice_state: ForkchoiceState,
62 payload_attributes: Option<PayloadAttributes>,
63 ) -> TransportResult<ForkchoiceUpdated>;
64}
65
66#[async_trait::async_trait]
67impl<T, N, P> EngineApiValidWaitExt<N, T> for P
68where
69 N: Network,
70 T: Transport + Clone,
71 P: EngineApi<N, T>,
72{
73 async fn new_payload_v1_wait(
74 &self,
75 payload: ExecutionPayloadV1,
76 ) -> TransportResult<PayloadStatus> {
77 let mut status = self.new_payload_v1(payload.clone()).await?;
78 while !status.is_valid() {
79 if status.is_invalid() {
80 error!(?status, ?payload, "Invalid newPayloadV1",);
81 panic!("Invalid newPayloadV1: {status:?}");
82 }
83 status = self.new_payload_v1(payload.clone()).await?;
84 }
85 Ok(status)
86 }
87
88 async fn new_payload_v2_wait(
89 &self,
90 payload: ExecutionPayloadInputV2,
91 ) -> TransportResult<PayloadStatus> {
92 let mut status = self.new_payload_v2(payload.clone()).await?;
93 while !status.is_valid() {
94 if status.is_invalid() {
95 error!(?status, ?payload, "Invalid newPayloadV2",);
96 panic!("Invalid newPayloadV2: {status:?}");
97 }
98 status = self.new_payload_v2(payload.clone()).await?;
99 }
100 Ok(status)
101 }
102
103 async fn new_payload_v3_wait(
104 &self,
105 payload: ExecutionPayloadV3,
106 versioned_hashes: Vec<B256>,
107 parent_beacon_block_root: B256,
108 ) -> TransportResult<PayloadStatus> {
109 let mut status = self
110 .new_payload_v3(payload.clone(), versioned_hashes.clone(), parent_beacon_block_root)
111 .await?;
112 while !status.is_valid() {
113 if status.is_invalid() {
114 error!(
115 ?status,
116 ?payload,
117 ?versioned_hashes,
118 ?parent_beacon_block_root,
119 "Invalid newPayloadV3",
120 );
121 panic!("Invalid newPayloadV3: {status:?}");
122 }
123 status = self
124 .new_payload_v3(payload.clone(), versioned_hashes.clone(), parent_beacon_block_root)
125 .await?;
126 }
127 Ok(status)
128 }
129
130 async fn fork_choice_updated_v1_wait(
131 &self,
132 fork_choice_state: ForkchoiceState,
133 payload_attributes: Option<PayloadAttributes>,
134 ) -> TransportResult<ForkchoiceUpdated> {
135 let mut status =
136 self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
137
138 while !status.is_valid() {
139 if status.is_invalid() {
140 error!(
141 ?status,
142 ?fork_choice_state,
143 ?payload_attributes,
144 "Invalid forkchoiceUpdatedV1 message",
145 );
146 panic!("Invalid forkchoiceUpdatedV1: {status:?}");
147 }
148 status =
149 self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
150 }
151
152 Ok(status)
153 }
154
155 async fn fork_choice_updated_v2_wait(
156 &self,
157 fork_choice_state: ForkchoiceState,
158 payload_attributes: Option<PayloadAttributes>,
159 ) -> TransportResult<ForkchoiceUpdated> {
160 let mut status =
161 self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
162
163 while !status.is_valid() {
164 if status.is_invalid() {
165 error!(
166 ?status,
167 ?fork_choice_state,
168 ?payload_attributes,
169 "Invalid forkchoiceUpdatedV2 message",
170 );
171 panic!("Invalid forkchoiceUpdatedV2: {status:?}");
172 }
173 status =
174 self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
175 }
176
177 Ok(status)
178 }
179
180 async fn fork_choice_updated_v3_wait(
181 &self,
182 fork_choice_state: ForkchoiceState,
183 payload_attributes: Option<PayloadAttributes>,
184 ) -> TransportResult<ForkchoiceUpdated> {
185 let mut status =
186 self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
187
188 while !status.is_valid() {
189 if status.is_invalid() {
190 error!(
191 ?status,
192 ?fork_choice_state,
193 ?payload_attributes,
194 "Invalid forkchoiceUpdatedV3 message",
195 );
196 panic!("Invalid forkchoiceUpdatedV3: {status:?}");
197 }
198 status =
199 self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
200 }
201
202 Ok(status)
203 }
204}
205
206pub(crate) async fn call_new_payload<N, T, P: EngineApiValidWaitExt<N, T>>(
212 provider: P,
213 payload: ExecutionPayload,
214 parent_beacon_block_root: Option<B256>,
215 versioned_hashes: Vec<B256>,
216) -> TransportResult<EngineApiMessageVersion> {
217 match payload {
218 ExecutionPayload::V3(payload) => {
219 let parent_beacon_block_root = parent_beacon_block_root
221 .expect("parent_beacon_block_root is required for V3 payloads");
222 provider
223 .new_payload_v3_wait(payload, versioned_hashes, parent_beacon_block_root)
224 .await?;
225
226 Ok(EngineApiMessageVersion::V3)
227 }
228 ExecutionPayload::V2(payload) => {
229 let input = ExecutionPayloadInputV2 {
230 execution_payload: payload.payload_inner,
231 withdrawals: Some(payload.withdrawals),
232 };
233
234 provider.new_payload_v2_wait(input).await?;
235
236 Ok(EngineApiMessageVersion::V2)
237 }
238 ExecutionPayload::V1(payload) => {
239 provider.new_payload_v1_wait(payload).await?;
240
241 Ok(EngineApiMessageVersion::V1)
242 }
243 }
244}
245
246pub(crate) async fn call_forkchoice_updated<N, T, P: EngineApiValidWaitExt<N, T>>(
250 provider: P,
251 message_version: EngineApiMessageVersion,
252 forkchoice_state: ForkchoiceState,
253 payload_attributes: Option<PayloadAttributes>,
254) -> TransportResult<ForkchoiceUpdated> {
255 match message_version {
256 EngineApiMessageVersion::V4 => todo!("V4 payloads not supported yet"),
257 EngineApiMessageVersion::V3 => {
258 provider.fork_choice_updated_v3_wait(forkchoice_state, payload_attributes).await
259 }
260 EngineApiMessageVersion::V2 => {
261 provider.fork_choice_updated_v2_wait(forkchoice_state, payload_attributes).await
262 }
263 EngineApiMessageVersion::V1 => {
264 provider.fork_choice_updated_v1_wait(forkchoice_state, payload_attributes).await
265 }
266 }
267}