1use crate::{
2 capabilities::EngineCapabilities, metrics::EngineApiMetrics, EngineApiError, EngineApiResult,
3};
4use alloy_eips::{
5 eip1898::BlockHashOrNumber,
6 eip4844::BlobAndProofV1,
7 eip7685::{Requests, RequestsOrHash},
8};
9use alloy_primitives::{BlockHash, BlockNumber, B256, U64};
10use alloy_rpc_types_engine::{
11 CancunPayloadFields, ClientVersionV1, ExecutionPayload, ExecutionPayloadBodiesV1,
12 ExecutionPayloadInputV2, ExecutionPayloadSidecar, ExecutionPayloadV1, ExecutionPayloadV3,
13 ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus, PraguePayloadFields,
14 TransitionConfiguration,
15};
16use async_trait::async_trait;
17use jsonrpsee_core::RpcResult;
18use parking_lot::Mutex;
19use reth_beacon_consensus::BeaconConsensusEngineHandle;
20use reth_chainspec::{EthereumHardforks, Hardforks};
21use reth_engine_primitives::{EngineTypes, EngineValidator};
22use reth_evm::provider::EvmEnvProvider;
23use reth_payload_builder::PayloadStore;
24use reth_payload_primitives::{
25 validate_payload_timestamp, EngineApiMessageVersion, PayloadBuilderAttributes,
26 PayloadOrAttributes,
27};
28use reth_primitives::EthereumHardfork;
29use reth_rpc_api::EngineApiServer;
30use reth_rpc_types_compat::engine::payload::{
31 convert_payload_input_v2_to_payload, convert_to_payload_body_v1,
32};
33use reth_storage_api::{BlockReader, HeaderProvider, StateProviderFactory};
34use reth_tasks::TaskSpawner;
35use reth_transaction_pool::TransactionPool;
36use std::{sync::Arc, time::Instant};
37use tokio::sync::oneshot;
38use tracing::{trace, warn};
39
40pub type EngineApiSender<Ok> = oneshot::Sender<EngineApiResult<Ok>>;
42
43const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024;
45
46const MAX_BLOB_LIMIT: usize = 128;
48
49pub struct EngineApi<Provider, EngineT: EngineTypes, Pool, Validator, ChainSpec> {
52 inner: Arc<EngineApiInner<Provider, EngineT, Pool, Validator, ChainSpec>>,
53}
54
55struct EngineApiInner<Provider, EngineT: EngineTypes, Pool, Validator, ChainSpec> {
56 provider: Provider,
58 chain_spec: Arc<ChainSpec>,
60 beacon_consensus: BeaconConsensusEngineHandle<EngineT>,
62 payload_store: PayloadStore<EngineT>,
64 task_spawner: Box<dyn TaskSpawner>,
66 metrics: EngineApiMetrics,
68 client: ClientVersionV1,
70 capabilities: EngineCapabilities,
72 tx_pool: Pool,
74 validator: Validator,
76 latest_new_payload_response: Mutex<Option<Instant>>,
78}
79
80impl<Provider, EngineT, Pool, Validator, ChainSpec>
81 EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
82where
83 Provider: HeaderProvider + BlockReader + StateProviderFactory + EvmEnvProvider + 'static,
84 EngineT: EngineTypes,
85 Pool: TransactionPool + 'static,
86 Validator: EngineValidator<EngineT>,
87 ChainSpec: EthereumHardforks + Send + Sync + 'static,
88{
89 #[allow(clippy::too_many_arguments)]
91 pub fn new(
92 provider: Provider,
93 chain_spec: Arc<ChainSpec>,
94 beacon_consensus: BeaconConsensusEngineHandle<EngineT>,
95 payload_store: PayloadStore<EngineT>,
96 tx_pool: Pool,
97 task_spawner: Box<dyn TaskSpawner>,
98 client: ClientVersionV1,
99 capabilities: EngineCapabilities,
100 validator: Validator,
101 ) -> Self {
102 let inner = Arc::new(EngineApiInner {
103 provider,
104 chain_spec,
105 beacon_consensus,
106 payload_store,
107 task_spawner,
108 metrics: EngineApiMetrics::default(),
109 client,
110 capabilities,
111 tx_pool,
112 validator,
113 latest_new_payload_response: Mutex::new(None),
114 });
115 Self { inner }
116 }
117
118 fn get_client_version_v1(
120 &self,
121 _client: ClientVersionV1,
122 ) -> EngineApiResult<Vec<ClientVersionV1>> {
123 Ok(vec![self.inner.client.clone()])
124 }
125 async fn get_payload_attributes(
127 &self,
128 payload_id: PayloadId,
129 ) -> EngineApiResult<EngineT::PayloadBuilderAttributes> {
130 Ok(self
131 .inner
132 .payload_store
133 .payload_attributes(payload_id)
134 .await
135 .ok_or(EngineApiError::UnknownPayload)??)
136 }
137
138 pub async fn new_payload_v1(
141 &self,
142 payload: ExecutionPayloadV1,
143 ) -> EngineApiResult<PayloadStatus> {
144 let payload = ExecutionPayload::from(payload);
145 let payload_or_attrs =
146 PayloadOrAttributes::<'_, EngineT::PayloadAttributes>::from_execution_payload(
147 &payload, None,
148 );
149 self.inner
150 .validator
151 .validate_version_specific_fields(EngineApiMessageVersion::V1, payload_or_attrs)?;
152
153 Ok(self
154 .inner
155 .beacon_consensus
156 .new_payload(payload, ExecutionPayloadSidecar::none())
157 .await
158 .inspect(|_| self.inner.on_new_payload_response())?)
159 }
160
161 async fn new_payload_v1_metered(
163 &self,
164 payload: ExecutionPayloadV1,
165 ) -> EngineApiResult<PayloadStatus> {
166 let start = Instant::now();
167 let gas_used = payload.gas_used;
168 let res = Self::new_payload_v1(self, payload).await;
169 let elapsed = start.elapsed();
170 self.inner.metrics.latency.new_payload_v1.record(elapsed);
171 self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
172 res
173 }
174
175 pub async fn new_payload_v2(
177 &self,
178 payload: ExecutionPayloadInputV2,
179 ) -> EngineApiResult<PayloadStatus> {
180 let payload = convert_payload_input_v2_to_payload(payload);
181 let payload_or_attrs =
182 PayloadOrAttributes::<'_, EngineT::PayloadAttributes>::from_execution_payload(
183 &payload, None,
184 );
185 self.inner
186 .validator
187 .validate_version_specific_fields(EngineApiMessageVersion::V2, payload_or_attrs)?;
188 Ok(self
189 .inner
190 .beacon_consensus
191 .new_payload(payload, ExecutionPayloadSidecar::none())
192 .await
193 .inspect(|_| self.inner.on_new_payload_response())?)
194 }
195
196 pub async fn new_payload_v2_metered(
198 &self,
199 payload: ExecutionPayloadInputV2,
200 ) -> EngineApiResult<PayloadStatus> {
201 let start = Instant::now();
202 let gas_used = payload.execution_payload.gas_used;
203 let res = Self::new_payload_v2(self, payload).await;
204 let elapsed = start.elapsed();
205 self.inner.metrics.latency.new_payload_v2.record(elapsed);
206 self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
207 res
208 }
209
210 pub async fn new_payload_v3(
212 &self,
213 payload: ExecutionPayloadV3,
214 versioned_hashes: Vec<B256>,
215 parent_beacon_block_root: B256,
216 ) -> EngineApiResult<PayloadStatus> {
217 let payload = ExecutionPayload::from(payload);
218 let payload_or_attrs =
219 PayloadOrAttributes::<'_, EngineT::PayloadAttributes>::from_execution_payload(
220 &payload,
221 Some(parent_beacon_block_root),
222 );
223 self.inner
224 .validator
225 .validate_version_specific_fields(EngineApiMessageVersion::V3, payload_or_attrs)?;
226
227 Ok(self
228 .inner
229 .beacon_consensus
230 .new_payload(
231 payload,
232 ExecutionPayloadSidecar::v3(CancunPayloadFields {
233 versioned_hashes,
234 parent_beacon_block_root,
235 }),
236 )
237 .await
238 .inspect(|_| self.inner.on_new_payload_response())?)
239 }
240
241 async fn new_payload_v3_metered(
243 &self,
244 payload: ExecutionPayloadV3,
245 versioned_hashes: Vec<B256>,
246 parent_beacon_block_root: B256,
247 ) -> RpcResult<PayloadStatus> {
248 let start = Instant::now();
249 let gas_used = payload.payload_inner.payload_inner.gas_used;
250 let res =
251 Self::new_payload_v3(self, payload, versioned_hashes, parent_beacon_block_root).await;
252 let elapsed = start.elapsed();
253 self.inner.metrics.latency.new_payload_v3.record(elapsed);
254 self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
255 Ok(res?)
256 }
257
258 pub async fn new_payload_v4(
260 &self,
261 payload: ExecutionPayloadV3,
262 versioned_hashes: Vec<B256>,
263 parent_beacon_block_root: B256,
264 execution_requests: Requests,
265 ) -> EngineApiResult<PayloadStatus> {
266 let payload = ExecutionPayload::from(payload);
267 let payload_or_attrs =
268 PayloadOrAttributes::<'_, EngineT::PayloadAttributes>::from_execution_payload(
269 &payload,
270 Some(parent_beacon_block_root),
271 );
272 self.inner
273 .validator
274 .validate_version_specific_fields(EngineApiMessageVersion::V4, payload_or_attrs)?;
275
276 Ok(self
277 .inner
278 .beacon_consensus
279 .new_payload(
280 payload,
281 ExecutionPayloadSidecar::v4(
282 CancunPayloadFields { versioned_hashes, parent_beacon_block_root },
283 PraguePayloadFields {
284 requests: RequestsOrHash::Requests(execution_requests),
285 target_blobs_per_block: 0,
287 },
288 ),
289 )
290 .await
291 .inspect(|_| self.inner.on_new_payload_response())?)
292 }
293
294 async fn new_payload_v4_metered(
296 &self,
297 payload: ExecutionPayloadV3,
298 versioned_hashes: Vec<B256>,
299 parent_beacon_block_root: B256,
300 execution_requests: Requests,
301 ) -> RpcResult<PayloadStatus> {
302 let start = Instant::now();
303 let gas_used = payload.payload_inner.payload_inner.gas_used;
304 let res = Self::new_payload_v4(
305 self,
306 payload,
307 versioned_hashes,
308 parent_beacon_block_root,
309 execution_requests,
310 )
311 .await;
312 let elapsed = start.elapsed();
313 self.inner.metrics.latency.new_payload_v4.record(elapsed);
314 self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
315 Ok(res?)
316 }
317
318 pub async fn fork_choice_updated_v1(
325 &self,
326 state: ForkchoiceState,
327 payload_attrs: Option<EngineT::PayloadAttributes>,
328 ) -> EngineApiResult<ForkchoiceUpdated> {
329 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V1, state, payload_attrs)
330 .await
331 }
332
333 pub async fn fork_choice_updated_v2(
338 &self,
339 state: ForkchoiceState,
340 payload_attrs: Option<EngineT::PayloadAttributes>,
341 ) -> EngineApiResult<ForkchoiceUpdated> {
342 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V2, state, payload_attrs)
343 .await
344 }
345
346 pub async fn fork_choice_updated_v3(
351 &self,
352 state: ForkchoiceState,
353 payload_attrs: Option<EngineT::PayloadAttributes>,
354 ) -> EngineApiResult<ForkchoiceUpdated> {
355 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V3, state, payload_attrs)
356 .await
357 }
358
359 pub async fn get_payload_v1(
369 &self,
370 payload_id: PayloadId,
371 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
372 self.inner
373 .payload_store
374 .resolve(payload_id)
375 .await
376 .ok_or(EngineApiError::UnknownPayload)?
377 .map_err(|_| EngineApiError::UnknownPayload)?
378 .try_into()
379 .map_err(|_| {
380 warn!("could not transform built payload into ExecutionPayloadV1");
381 EngineApiError::UnknownPayload
382 })
383 }
384
385 pub async fn get_payload_v2(
393 &self,
394 payload_id: PayloadId,
395 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
396 let attributes = self.get_payload_attributes(payload_id).await?;
398
399 validate_payload_timestamp(
401 &self.inner.chain_spec,
402 EngineApiMessageVersion::V2,
403 attributes.timestamp(),
404 )?;
405
406 self.inner
408 .payload_store
409 .resolve(payload_id)
410 .await
411 .ok_or(EngineApiError::UnknownPayload)?
412 .map_err(|_| EngineApiError::UnknownPayload)?
413 .try_into()
414 .map_err(|_| {
415 warn!("could not transform built payload into ExecutionPayloadV2");
416 EngineApiError::UnknownPayload
417 })
418 }
419
420 pub async fn get_payload_v3(
428 &self,
429 payload_id: PayloadId,
430 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
431 let attributes = self.get_payload_attributes(payload_id).await?;
433
434 validate_payload_timestamp(
436 &self.inner.chain_spec,
437 EngineApiMessageVersion::V3,
438 attributes.timestamp(),
439 )?;
440
441 self.inner
443 .payload_store
444 .resolve(payload_id)
445 .await
446 .ok_or(EngineApiError::UnknownPayload)?
447 .map_err(|_| EngineApiError::UnknownPayload)?
448 .try_into()
449 .map_err(|_| {
450 warn!("could not transform built payload into ExecutionPayloadV3");
451 EngineApiError::UnknownPayload
452 })
453 }
454
455 pub async fn get_payload_v4(
463 &self,
464 payload_id: PayloadId,
465 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
466 let attributes = self.get_payload_attributes(payload_id).await?;
468
469 validate_payload_timestamp(
471 &self.inner.chain_spec,
472 EngineApiMessageVersion::V4,
473 attributes.timestamp(),
474 )?;
475
476 self.inner
478 .payload_store
479 .resolve(payload_id)
480 .await
481 .ok_or(EngineApiError::UnknownPayload)?
482 .map_err(|_| EngineApiError::UnknownPayload)?
483 .try_into()
484 .map_err(|_| {
485 warn!("could not transform built payload into ExecutionPayloadV3");
486 EngineApiError::UnknownPayload
487 })
488 }
489
490 async fn get_payload_bodies_by_range_with<F, R>(
493 &self,
494 start: BlockNumber,
495 count: u64,
496 f: F,
497 ) -> EngineApiResult<Vec<Option<R>>>
498 where
499 F: Fn(Provider::Block) -> R + Send + 'static,
500 R: Send + 'static,
501 {
502 let (tx, rx) = oneshot::channel();
503 let inner = self.inner.clone();
504
505 self.inner.task_spawner.spawn_blocking(Box::pin(async move {
506 if count > MAX_PAYLOAD_BODIES_LIMIT {
507 tx.send(Err(EngineApiError::PayloadRequestTooLarge { len: count })).ok();
508 return;
509 }
510
511 if start == 0 || count == 0 {
512 tx.send(Err(EngineApiError::InvalidBodiesRange { start, count })).ok();
513 return;
514 }
515
516 let mut result = Vec::with_capacity(count as usize);
517
518 let mut end = start.saturating_add(count - 1);
520
521 if let Ok(best_block) = inner.provider.best_block_number() {
524 if end > best_block {
525 end = best_block;
526 }
527 }
528
529 for num in start..=end {
530 let block_result = inner.provider.block(BlockHashOrNumber::Number(num));
531 match block_result {
532 Ok(block) => {
533 result.push(block.map(&f));
534 }
535 Err(err) => {
536 tx.send(Err(EngineApiError::Internal(Box::new(err)))).ok();
537 return;
538 }
539 };
540 }
541 tx.send(Ok(result)).ok();
542 }));
543
544 rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
545 }
546
547 pub async fn get_payload_bodies_by_range_v1(
558 &self,
559 start: BlockNumber,
560 count: u64,
561 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
562 self.get_payload_bodies_by_range_with(start, count, convert_to_payload_body_v1).await
563 }
564
565 async fn get_payload_bodies_by_hash_with<F, R>(
567 &self,
568 hashes: Vec<BlockHash>,
569 f: F,
570 ) -> EngineApiResult<Vec<Option<R>>>
571 where
572 F: Fn(Provider::Block) -> R + Send + 'static,
573 R: Send + 'static,
574 {
575 let len = hashes.len() as u64;
576 if len > MAX_PAYLOAD_BODIES_LIMIT {
577 return Err(EngineApiError::PayloadRequestTooLarge { len });
578 }
579
580 let (tx, rx) = oneshot::channel();
581 let inner = self.inner.clone();
582
583 self.inner.task_spawner.spawn_blocking(Box::pin(async move {
584 let mut result = Vec::with_capacity(hashes.len());
585 for hash in hashes {
586 let block_result = inner.provider.block(BlockHashOrNumber::Hash(hash));
587 match block_result {
588 Ok(block) => {
589 result.push(block.map(&f));
590 }
591 Err(err) => {
592 let _ = tx.send(Err(EngineApiError::Internal(Box::new(err))));
593 return;
594 }
595 }
596 }
597 tx.send(Ok(result)).ok();
598 }));
599
600 rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
601 }
602
603 pub async fn get_payload_bodies_by_hash_v1(
605 &self,
606 hashes: Vec<BlockHash>,
607 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
608 self.get_payload_bodies_by_hash_with(hashes, convert_to_payload_body_v1).await
609 }
610
611 pub fn exchange_transition_configuration(
614 &self,
615 config: TransitionConfiguration,
616 ) -> EngineApiResult<TransitionConfiguration> {
617 let TransitionConfiguration {
618 terminal_total_difficulty,
619 terminal_block_hash,
620 terminal_block_number,
621 } = config;
622
623 let merge_terminal_td = self
624 .inner
625 .chain_spec
626 .fork(EthereumHardfork::Paris)
627 .ttd()
628 .expect("the engine API should not be running for chains w/o paris");
629
630 if merge_terminal_td != terminal_total_difficulty {
632 return Err(EngineApiError::TerminalTD {
633 execution: merge_terminal_td,
634 consensus: terminal_total_difficulty,
635 })
636 }
637
638 self.inner.beacon_consensus.transition_configuration_exchanged();
639
640 if terminal_block_hash.is_zero() {
642 return Ok(TransitionConfiguration {
643 terminal_total_difficulty: merge_terminal_td,
644 ..Default::default()
645 })
646 }
647
648 let local_hash = self
650 .inner
651 .provider
652 .block_hash(terminal_block_number)
653 .map_err(|err| EngineApiError::Internal(Box::new(err)))?;
654
655 match local_hash {
657 Some(hash) if hash == terminal_block_hash => Ok(TransitionConfiguration {
658 terminal_total_difficulty: merge_terminal_td,
659 terminal_block_hash,
660 terminal_block_number,
661 }),
662 _ => Err(EngineApiError::TerminalBlockHash {
663 execution: local_hash,
664 consensus: terminal_block_hash,
665 }),
666 }
667 }
668
669 async fn validate_and_execute_forkchoice(
683 &self,
684 version: EngineApiMessageVersion,
685 state: ForkchoiceState,
686 payload_attrs: Option<EngineT::PayloadAttributes>,
687 ) -> EngineApiResult<ForkchoiceUpdated> {
688 self.inner.record_elapsed_time_on_fcu();
689
690 if let Some(ref attrs) = payload_attrs {
691 let attr_validation_res =
692 self.inner.validator.ensure_well_formed_attributes(version, attrs);
693
694 if let Err(err) = attr_validation_res {
708 let fcu_res =
709 self.inner.beacon_consensus.fork_choice_updated(state, None, version).await?;
710 if fcu_res.is_invalid() {
713 return Ok(fcu_res)
714 }
715 return Err(err.into())
716 }
717 }
718
719 Ok(self.inner.beacon_consensus.fork_choice_updated(state, payload_attrs, version).await?)
720 }
721}
722
723impl<Provider, EngineT, Pool, Validator, ChainSpec>
724 EngineApiInner<Provider, EngineT, Pool, Validator, ChainSpec>
725where
726 EngineT: EngineTypes,
727{
728 fn record_elapsed_time_on_fcu(&self) {
731 if let Some(start_time) = self.latest_new_payload_response.lock().take() {
732 let elapsed_time = start_time.elapsed();
733 self.metrics.latency.new_payload_forkchoice_updated_time_diff.record(elapsed_time);
734 }
735 }
736
737 fn on_new_payload_response(&self) {
739 self.latest_new_payload_response.lock().replace(Instant::now());
740 }
741}
742
743#[async_trait]
744impl<Provider, EngineT, Pool, Validator, ChainSpec> EngineApiServer<EngineT>
745 for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
746where
747 Provider: HeaderProvider + BlockReader + StateProviderFactory + EvmEnvProvider + 'static,
748 EngineT: EngineTypes,
749 Pool: TransactionPool + 'static,
750 Validator: EngineValidator<EngineT>,
751 ChainSpec: EthereumHardforks + Send + Sync + 'static,
752{
753 async fn new_payload_v1(&self, payload: ExecutionPayloadV1) -> RpcResult<PayloadStatus> {
757 trace!(target: "rpc::engine", "Serving engine_newPayloadV1");
758 Ok(self.new_payload_v1_metered(payload).await?)
759 }
760
761 async fn new_payload_v2(&self, payload: ExecutionPayloadInputV2) -> RpcResult<PayloadStatus> {
764 trace!(target: "rpc::engine", "Serving engine_newPayloadV2");
765 Ok(self.new_payload_v2_metered(payload).await?)
766 }
767
768 async fn new_payload_v3(
771 &self,
772 payload: ExecutionPayloadV3,
773 versioned_hashes: Vec<B256>,
774 parent_beacon_block_root: B256,
775 ) -> RpcResult<PayloadStatus> {
776 trace!(target: "rpc::engine", "Serving engine_newPayloadV3");
777 Ok(self.new_payload_v3_metered(payload, versioned_hashes, parent_beacon_block_root).await?)
778 }
779
780 async fn new_payload_v4(
783 &self,
784 payload: ExecutionPayloadV3,
785 versioned_hashes: Vec<B256>,
786 parent_beacon_block_root: B256,
787 execution_requests: Requests,
788 ) -> RpcResult<PayloadStatus> {
789 trace!(target: "rpc::engine", "Serving engine_newPayloadV4");
790 Ok(self
791 .new_payload_v4_metered(
792 payload,
793 versioned_hashes,
794 parent_beacon_block_root,
795 execution_requests,
796 )
797 .await?)
798 }
799
800 async fn fork_choice_updated_v1(
805 &self,
806 fork_choice_state: ForkchoiceState,
807 payload_attributes: Option<EngineT::PayloadAttributes>,
808 ) -> RpcResult<ForkchoiceUpdated> {
809 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV1");
810 let start = Instant::now();
811 let res = Self::fork_choice_updated_v1(self, fork_choice_state, payload_attributes).await;
812 self.inner.metrics.latency.fork_choice_updated_v1.record(start.elapsed());
813 self.inner.metrics.fcu_response.update_response_metrics(&res);
814 Ok(res?)
815 }
816
817 async fn fork_choice_updated_v2(
820 &self,
821 fork_choice_state: ForkchoiceState,
822 payload_attributes: Option<EngineT::PayloadAttributes>,
823 ) -> RpcResult<ForkchoiceUpdated> {
824 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV2");
825 let start = Instant::now();
826 let res = Self::fork_choice_updated_v2(self, fork_choice_state, payload_attributes).await;
827 self.inner.metrics.latency.fork_choice_updated_v2.record(start.elapsed());
828 self.inner.metrics.fcu_response.update_response_metrics(&res);
829 Ok(res?)
830 }
831
832 async fn fork_choice_updated_v3(
836 &self,
837 fork_choice_state: ForkchoiceState,
838 payload_attributes: Option<EngineT::PayloadAttributes>,
839 ) -> RpcResult<ForkchoiceUpdated> {
840 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV3");
841 let start = Instant::now();
842 let res = Self::fork_choice_updated_v3(self, fork_choice_state, payload_attributes).await;
843 self.inner.metrics.latency.fork_choice_updated_v3.record(start.elapsed());
844 self.inner.metrics.fcu_response.update_response_metrics(&res);
845 Ok(res?)
846 }
847
848 async fn get_payload_v1(
860 &self,
861 payload_id: PayloadId,
862 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV1> {
863 trace!(target: "rpc::engine", "Serving engine_getPayloadV1");
864 let start = Instant::now();
865 let res = Self::get_payload_v1(self, payload_id).await;
866 self.inner.metrics.latency.get_payload_v1.record(start.elapsed());
867 Ok(res?)
868 }
869
870 async fn get_payload_v2(
880 &self,
881 payload_id: PayloadId,
882 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV2> {
883 trace!(target: "rpc::engine", "Serving engine_getPayloadV2");
884 let start = Instant::now();
885 let res = Self::get_payload_v2(self, payload_id).await;
886 self.inner.metrics.latency.get_payload_v2.record(start.elapsed());
887 Ok(res?)
888 }
889
890 async fn get_payload_v3(
900 &self,
901 payload_id: PayloadId,
902 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV3> {
903 trace!(target: "rpc::engine", "Serving engine_getPayloadV3");
904 let start = Instant::now();
905 let res = Self::get_payload_v3(self, payload_id).await;
906 self.inner.metrics.latency.get_payload_v3.record(start.elapsed());
907 Ok(res?)
908 }
909
910 async fn get_payload_v4(
920 &self,
921 payload_id: PayloadId,
922 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV4> {
923 trace!(target: "rpc::engine", "Serving engine_getPayloadV4");
924 let start = Instant::now();
925 let res = Self::get_payload_v4(self, payload_id).await;
926 self.inner.metrics.latency.get_payload_v4.record(start.elapsed());
927 Ok(res?)
928 }
929
930 async fn get_payload_bodies_by_hash_v1(
933 &self,
934 block_hashes: Vec<BlockHash>,
935 ) -> RpcResult<ExecutionPayloadBodiesV1> {
936 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV1");
937 let start = Instant::now();
938 let res = Self::get_payload_bodies_by_hash_v1(self, block_hashes);
939 self.inner.metrics.latency.get_payload_bodies_by_hash_v1.record(start.elapsed());
940 Ok(res.await?)
941 }
942
943 async fn get_payload_bodies_by_range_v1(
960 &self,
961 start: U64,
962 count: U64,
963 ) -> RpcResult<ExecutionPayloadBodiesV1> {
964 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV1");
965 let start_time = Instant::now();
966 let res = Self::get_payload_bodies_by_range_v1(self, start.to(), count.to()).await;
967 self.inner.metrics.latency.get_payload_bodies_by_range_v1.record(start_time.elapsed());
968 Ok(res?)
969 }
970
971 async fn exchange_transition_configuration(
974 &self,
975 config: TransitionConfiguration,
976 ) -> RpcResult<TransitionConfiguration> {
977 trace!(target: "rpc::engine", "Serving engine_exchangeTransitionConfigurationV1");
978 let start = Instant::now();
979 let res = Self::exchange_transition_configuration(self, config);
980 self.inner.metrics.latency.exchange_transition_configuration.record(start.elapsed());
981 Ok(res?)
982 }
983
984 async fn get_client_version_v1(
988 &self,
989 client: ClientVersionV1,
990 ) -> RpcResult<Vec<ClientVersionV1>> {
991 trace!(target: "rpc::engine", "Serving engine_getClientVersionV1");
992 let res = Self::get_client_version_v1(self, client);
993
994 Ok(res?)
995 }
996
997 async fn exchange_capabilities(&self, _capabilities: Vec<String>) -> RpcResult<Vec<String>> {
1000 Ok(self.inner.capabilities.list())
1001 }
1002
1003 async fn get_blobs_v1(
1004 &self,
1005 versioned_hashes: Vec<B256>,
1006 ) -> RpcResult<Vec<Option<BlobAndProofV1>>> {
1007 trace!(target: "rpc::engine", "Serving engine_getBlobsV1");
1008 if versioned_hashes.len() > MAX_BLOB_LIMIT {
1009 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() }.into())
1010 }
1011
1012 Ok(self
1013 .inner
1014 .tx_pool
1015 .get_blobs_for_versioned_hashes(&versioned_hashes)
1016 .map_err(|err| EngineApiError::Internal(Box::new(err)))?)
1017 }
1018}
1019
1020impl<Provider, EngineT, Pool, Validator, ChainSpec> std::fmt::Debug
1021 for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
1022where
1023 EngineT: EngineTypes,
1024{
1025 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1026 f.debug_struct("EngineApi").finish_non_exhaustive()
1027 }
1028}
1029
1030#[cfg(test)]
1031mod tests {
1032 use super::*;
1033 use alloy_rpc_types_engine::{ClientCode, ClientVersionV1};
1034 use assert_matches::assert_matches;
1035 use reth_beacon_consensus::BeaconConsensusEngineEvent;
1036 use reth_chainspec::{ChainSpec, MAINNET};
1037 use reth_engine_primitives::BeaconEngineMessage;
1038 use reth_ethereum_engine_primitives::{EthEngineTypes, EthereumEngineValidator};
1039 use reth_payload_builder::test_utils::spawn_test_payload_service;
1040 use reth_primitives::{Block, SealedBlock};
1041 use reth_provider::test_utils::MockEthProvider;
1042 use reth_rpc_types_compat::engine::payload::execution_payload_from_sealed_block;
1043 use reth_tasks::TokioTaskExecutor;
1044 use reth_testing_utils::generators::random_block;
1045 use reth_tokio_util::EventSender;
1046 use reth_transaction_pool::noop::NoopTransactionPool;
1047 use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
1048
1049 fn setup_engine_api() -> (
1050 EngineApiTestHandle,
1051 EngineApi<
1052 Arc<MockEthProvider>,
1053 EthEngineTypes,
1054 NoopTransactionPool,
1055 EthereumEngineValidator,
1056 ChainSpec,
1057 >,
1058 ) {
1059 let client = ClientVersionV1 {
1060 code: ClientCode::RH,
1061 name: "Reth".to_string(),
1062 version: "v0.2.0-beta.5".to_string(),
1063 commit: "defa64b2".to_string(),
1064 };
1065
1066 let chain_spec: Arc<ChainSpec> = MAINNET.clone();
1067 let provider = Arc::new(MockEthProvider::default());
1068 let payload_store = spawn_test_payload_service();
1069 let (to_engine, engine_rx) = unbounded_channel();
1070 let event_sender: EventSender<BeaconConsensusEngineEvent> = Default::default();
1071 let task_executor = Box::<TokioTaskExecutor>::default();
1072 let api = EngineApi::new(
1073 provider.clone(),
1074 chain_spec.clone(),
1075 BeaconConsensusEngineHandle::new(to_engine, event_sender),
1076 payload_store.into(),
1077 NoopTransactionPool::default(),
1078 task_executor,
1079 client,
1080 EngineCapabilities::default(),
1081 EthereumEngineValidator::new(chain_spec.clone()),
1082 );
1083 let handle = EngineApiTestHandle { chain_spec, provider, from_api: engine_rx };
1084 (handle, api)
1085 }
1086
1087 #[tokio::test]
1088 async fn engine_client_version_v1() {
1089 let client = ClientVersionV1 {
1090 code: ClientCode::RH,
1091 name: "Reth".to_string(),
1092 version: "v0.2.0-beta.5".to_string(),
1093 commit: "defa64b2".to_string(),
1094 };
1095 let (_, api) = setup_engine_api();
1096 let res = api.get_client_version_v1(client.clone());
1097 assert_eq!(res.unwrap(), vec![client]);
1098 }
1099
1100 struct EngineApiTestHandle {
1101 chain_spec: Arc<ChainSpec>,
1102 provider: Arc<MockEthProvider>,
1103 from_api: UnboundedReceiver<BeaconEngineMessage<EthEngineTypes>>,
1104 }
1105
1106 #[tokio::test]
1107 async fn forwards_responses_to_consensus_engine() {
1108 let (mut handle, api) = setup_engine_api();
1109
1110 tokio::spawn(async move {
1111 api.new_payload_v1(execution_payload_from_sealed_block(SealedBlock::default()))
1112 .await
1113 .unwrap();
1114 });
1115 assert_matches!(handle.from_api.recv().await, Some(BeaconEngineMessage::NewPayload { .. }));
1116 }
1117
1118 mod get_payload_bodies {
1120 use super::*;
1121 use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
1122
1123 #[tokio::test]
1124 async fn invalid_params() {
1125 let (_, api) = setup_engine_api();
1126
1127 let by_range_tests = [
1128 (0, 0),
1130 (0, 1),
1131 (1, 0),
1132 ];
1133
1134 for (start, count) in by_range_tests {
1136 let res = api.get_payload_bodies_by_range_v1(start, count).await;
1137 assert_matches!(res, Err(EngineApiError::InvalidBodiesRange { .. }));
1138 }
1139 }
1140
1141 #[tokio::test]
1142 async fn request_too_large() {
1143 let (_, api) = setup_engine_api();
1144
1145 let request_count = MAX_PAYLOAD_BODIES_LIMIT + 1;
1146 let res = api.get_payload_bodies_by_range_v1(0, request_count).await;
1147 assert_matches!(res, Err(EngineApiError::PayloadRequestTooLarge { .. }));
1148 }
1149
1150 #[tokio::test]
1151 async fn returns_payload_bodies() {
1152 let mut rng = generators::rng();
1153 let (handle, api) = setup_engine_api();
1154
1155 let (start, count) = (1, 10);
1156 let blocks = random_block_range(
1157 &mut rng,
1158 start..=start + count - 1,
1159 BlockRangeParams { tx_count: 0..2, ..Default::default() },
1160 );
1161 handle.provider.extend_blocks(blocks.iter().cloned().map(|b| (b.hash(), b.unseal())));
1162
1163 let expected = blocks
1164 .iter()
1165 .cloned()
1166 .map(|b| Some(convert_to_payload_body_v1(b.unseal::<Block>())))
1167 .collect::<Vec<_>>();
1168
1169 let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1170 assert_eq!(res, expected);
1171 }
1172
1173 #[tokio::test]
1174 async fn returns_payload_bodies_with_gaps() {
1175 let mut rng = generators::rng();
1176 let (handle, api) = setup_engine_api();
1177
1178 let (start, count) = (1, 100);
1179 let blocks = random_block_range(
1180 &mut rng,
1181 start..=start + count - 1,
1182 BlockRangeParams { tx_count: 0..2, ..Default::default() },
1183 );
1184
1185 let first_missing_range = 26..=50;
1187 let second_missing_range = 76..=100;
1188 handle.provider.extend_blocks(
1189 blocks
1190 .iter()
1191 .filter(|b| {
1192 !first_missing_range.contains(&b.number) &&
1193 !second_missing_range.contains(&b.number)
1194 })
1195 .map(|b| (b.hash(), b.clone().unseal())),
1196 );
1197
1198 let expected = blocks
1199 .iter()
1200 .filter(|b| !second_missing_range.contains(&b.number))
1203 .cloned()
1204 .map(|b| {
1205 if first_missing_range.contains(&b.number) {
1206 None
1207 } else {
1208 Some(convert_to_payload_body_v1(b.unseal::<Block>()))
1209 }
1210 })
1211 .collect::<Vec<_>>();
1212
1213 let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1214 assert_eq!(res, expected);
1215
1216 let expected = blocks
1217 .iter()
1218 .cloned()
1219 .map(|b| {
1222 if first_missing_range.contains(&b.number) ||
1223 second_missing_range.contains(&b.number)
1224 {
1225 None
1226 } else {
1227 Some(convert_to_payload_body_v1(b.unseal::<Block>()))
1228 }
1229 })
1230 .collect::<Vec<_>>();
1231
1232 let hashes = blocks.iter().map(|b| b.hash()).collect();
1233 let res = api.get_payload_bodies_by_hash_v1(hashes).await.unwrap();
1234 assert_eq!(res, expected);
1235 }
1236 }
1237
1238 mod exchange_transition_configuration {
1240 use super::*;
1241 use alloy_primitives::U256;
1242 use reth_testing_utils::generators::{self, BlockParams};
1243
1244 #[tokio::test]
1245 async fn terminal_td_mismatch() {
1246 let (handle, api) = setup_engine_api();
1247
1248 let transition_config = TransitionConfiguration {
1249 terminal_total_difficulty: handle
1250 .chain_spec
1251 .fork(EthereumHardfork::Paris)
1252 .ttd()
1253 .unwrap() +
1254 U256::from(1),
1255 ..Default::default()
1256 };
1257
1258 let res = api.exchange_transition_configuration(transition_config);
1259
1260 assert_matches!(
1261 res,
1262 Err(EngineApiError::TerminalTD { execution, consensus })
1263 if execution == handle.chain_spec.fork(EthereumHardfork::Paris).ttd().unwrap() && consensus == U256::from(transition_config.terminal_total_difficulty)
1264 );
1265 }
1266
1267 #[tokio::test]
1268 async fn terminal_block_hash_mismatch() {
1269 let mut rng = generators::rng();
1270
1271 let (handle, api) = setup_engine_api();
1272
1273 let terminal_block_number = 1000;
1274 let consensus_terminal_block =
1275 random_block(&mut rng, terminal_block_number, BlockParams::default());
1276 let execution_terminal_block =
1277 random_block(&mut rng, terminal_block_number, BlockParams::default());
1278
1279 let transition_config = TransitionConfiguration {
1280 terminal_total_difficulty: handle
1281 .chain_spec
1282 .fork(EthereumHardfork::Paris)
1283 .ttd()
1284 .unwrap(),
1285 terminal_block_hash: consensus_terminal_block.hash(),
1286 terminal_block_number,
1287 };
1288
1289 let res = api.exchange_transition_configuration(transition_config);
1291
1292 assert_matches!(
1293 res,
1294 Err(EngineApiError::TerminalBlockHash { execution, consensus })
1295 if execution.is_none() && consensus == transition_config.terminal_block_hash
1296 );
1297
1298 handle.provider.add_block(
1300 execution_terminal_block.hash(),
1301 execution_terminal_block.clone().unseal(),
1302 );
1303
1304 let res = api.exchange_transition_configuration(transition_config);
1305
1306 assert_matches!(
1307 res,
1308 Err(EngineApiError::TerminalBlockHash { execution, consensus })
1309 if execution == Some(execution_terminal_block.hash()) && consensus == transition_config.terminal_block_hash
1310 );
1311 }
1312
1313 #[tokio::test]
1314 async fn configurations_match() {
1315 let (handle, api) = setup_engine_api();
1316
1317 let terminal_block_number = 1000;
1318 let terminal_block =
1319 random_block(&mut generators::rng(), terminal_block_number, BlockParams::default());
1320
1321 let transition_config = TransitionConfiguration {
1322 terminal_total_difficulty: handle
1323 .chain_spec
1324 .fork(EthereumHardfork::Paris)
1325 .ttd()
1326 .unwrap(),
1327 terminal_block_hash: terminal_block.hash(),
1328 terminal_block_number,
1329 };
1330
1331 handle.provider.add_block(terminal_block.hash(), terminal_block.unseal());
1332
1333 let config = api.exchange_transition_configuration(transition_config).unwrap();
1334 assert_eq!(config, transition_config);
1335 }
1336 }
1337}