1use crate::{
2 capabilities::EngineCapabilities, metrics::EngineApiMetrics, EngineApiError, EngineApiResult,
3};
4use alloy_eips::{
5 eip1898::BlockHashOrNumber,
6 eip4844::{BlobAndProofV1, BlobAndProofV2},
7 eip4895::Withdrawals,
8 eip7685::RequestsOrHash,
9};
10use alloy_primitives::{BlockHash, BlockNumber, B256, U64};
11use alloy_rpc_types_engine::{
12 CancunPayloadFields, ClientVersionV1, ExecutionData, ExecutionPayloadBodiesV1,
13 ExecutionPayloadBodyV1, ExecutionPayloadInputV2, ExecutionPayloadSidecar, ExecutionPayloadV1,
14 ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus,
15 PraguePayloadFields,
16};
17use async_trait::async_trait;
18use jsonrpsee_core::{server::RpcModule, RpcResult};
19use parking_lot::Mutex;
20use reth_chainspec::EthereumHardforks;
21use reth_engine_primitives::{BeaconConsensusEngineHandle, EngineTypes, EngineValidator};
22use reth_payload_builder::PayloadStore;
23use reth_payload_primitives::{
24 validate_payload_timestamp, EngineApiMessageVersion, ExecutionPayload,
25 PayloadBuilderAttributes, PayloadOrAttributes, PayloadTypes,
26};
27use reth_primitives_traits::{Block, BlockBody};
28use reth_rpc_api::{EngineApiServer, IntoEngineApiRpcModule};
29use reth_storage_api::{BlockReader, HeaderProvider, StateProviderFactory};
30use reth_tasks::TaskSpawner;
31use reth_transaction_pool::TransactionPool;
32use std::{sync::Arc, time::Instant};
33use tokio::sync::oneshot;
34use tracing::{debug, trace, warn};
35
36pub type EngineApiSender<Ok> = oneshot::Sender<EngineApiResult<Ok>>;
38
39const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024;
41
42const MAX_BLOB_LIMIT: usize = 128;
44
45pub struct EngineApi<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
61 inner: Arc<EngineApiInner<Provider, PayloadT, Pool, Validator, ChainSpec>>,
62}
63
64struct EngineApiInner<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
65 provider: Provider,
67 chain_spec: Arc<ChainSpec>,
69 beacon_consensus: BeaconConsensusEngineHandle<PayloadT>,
71 payload_store: PayloadStore<PayloadT>,
73 task_spawner: Box<dyn TaskSpawner>,
75 metrics: EngineApiMetrics,
77 client: ClientVersionV1,
79 capabilities: EngineCapabilities,
81 tx_pool: Pool,
83 validator: Validator,
85 latest_new_payload_response: Mutex<Option<Instant>>,
87 accept_execution_requests_hash: bool,
88}
89
90impl<Provider, PayloadT, Pool, Validator, ChainSpec>
91 EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
92where
93 Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
94 PayloadT: PayloadTypes,
95 Pool: TransactionPool + 'static,
96 Validator: EngineValidator<PayloadT>,
97 ChainSpec: EthereumHardforks + Send + Sync + 'static,
98{
99 #[expect(clippy::too_many_arguments)]
101 pub fn new(
102 provider: Provider,
103 chain_spec: Arc<ChainSpec>,
104 beacon_consensus: BeaconConsensusEngineHandle<PayloadT>,
105 payload_store: PayloadStore<PayloadT>,
106 tx_pool: Pool,
107 task_spawner: Box<dyn TaskSpawner>,
108 client: ClientVersionV1,
109 capabilities: EngineCapabilities,
110 validator: Validator,
111 accept_execution_requests_hash: bool,
112 ) -> Self {
113 let inner = Arc::new(EngineApiInner {
114 provider,
115 chain_spec,
116 beacon_consensus,
117 payload_store,
118 task_spawner,
119 metrics: EngineApiMetrics::default(),
120 client,
121 capabilities,
122 tx_pool,
123 validator,
124 latest_new_payload_response: Mutex::new(None),
125 accept_execution_requests_hash,
126 });
127 Self { inner }
128 }
129
130 pub fn get_client_version_v1(
132 &self,
133 _client: ClientVersionV1,
134 ) -> EngineApiResult<Vec<ClientVersionV1>> {
135 Ok(vec![self.inner.client.clone()])
136 }
137
138 async fn get_payload_attributes(
140 &self,
141 payload_id: PayloadId,
142 ) -> EngineApiResult<PayloadT::PayloadBuilderAttributes> {
143 Ok(self
144 .inner
145 .payload_store
146 .payload_attributes(payload_id)
147 .await
148 .ok_or(EngineApiError::UnknownPayload)??)
149 }
150
151 pub async fn new_payload_v1(
154 &self,
155 payload: PayloadT::ExecutionData,
156 ) -> EngineApiResult<PayloadStatus> {
157 let payload_or_attrs = PayloadOrAttributes::<
158 '_,
159 PayloadT::ExecutionData,
160 PayloadT::PayloadAttributes,
161 >::from_execution_payload(&payload);
162
163 self.inner
164 .validator
165 .validate_version_specific_fields(EngineApiMessageVersion::V1, payload_or_attrs)?;
166
167 Ok(self
168 .inner
169 .beacon_consensus
170 .new_payload(payload)
171 .await
172 .inspect(|_| self.inner.on_new_payload_response())?)
173 }
174
175 async fn new_payload_v1_metered(
177 &self,
178 payload: PayloadT::ExecutionData,
179 ) -> EngineApiResult<PayloadStatus> {
180 let start = Instant::now();
181 let gas_used = payload.gas_used();
182
183 let res = Self::new_payload_v1(self, payload).await;
184 let elapsed = start.elapsed();
185 self.inner.metrics.latency.new_payload_v1.record(elapsed);
186 self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
187 res
188 }
189
190 pub async fn new_payload_v2(
192 &self,
193 payload: PayloadT::ExecutionData,
194 ) -> EngineApiResult<PayloadStatus> {
195 let payload_or_attrs = PayloadOrAttributes::<
196 '_,
197 PayloadT::ExecutionData,
198 PayloadT::PayloadAttributes,
199 >::from_execution_payload(&payload);
200 self.inner
201 .validator
202 .validate_version_specific_fields(EngineApiMessageVersion::V2, payload_or_attrs)?;
203 Ok(self
204 .inner
205 .beacon_consensus
206 .new_payload(payload)
207 .await
208 .inspect(|_| self.inner.on_new_payload_response())?)
209 }
210
211 pub async fn new_payload_v2_metered(
213 &self,
214 payload: PayloadT::ExecutionData,
215 ) -> EngineApiResult<PayloadStatus> {
216 let start = Instant::now();
217 let gas_used = payload.gas_used();
218
219 let res = Self::new_payload_v2(self, payload).await;
220 let elapsed = start.elapsed();
221 self.inner.metrics.latency.new_payload_v2.record(elapsed);
222 self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
223 res
224 }
225
226 pub async fn new_payload_v3(
228 &self,
229 payload: PayloadT::ExecutionData,
230 ) -> EngineApiResult<PayloadStatus> {
231 let payload_or_attrs = PayloadOrAttributes::<
232 '_,
233 PayloadT::ExecutionData,
234 PayloadT::PayloadAttributes,
235 >::from_execution_payload(&payload);
236 self.inner
237 .validator
238 .validate_version_specific_fields(EngineApiMessageVersion::V3, payload_or_attrs)?;
239
240 Ok(self
241 .inner
242 .beacon_consensus
243 .new_payload(payload)
244 .await
245 .inspect(|_| self.inner.on_new_payload_response())?)
246 }
247
248 pub async fn new_payload_v3_metered(
250 &self,
251 payload: PayloadT::ExecutionData,
252 ) -> RpcResult<PayloadStatus> {
253 let start = Instant::now();
254 let gas_used = payload.gas_used();
255
256 let res = Self::new_payload_v3(self, payload).await;
257 let elapsed = start.elapsed();
258 self.inner.metrics.latency.new_payload_v3.record(elapsed);
259 self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
260 Ok(res?)
261 }
262
263 pub async fn new_payload_v4(
265 &self,
266 payload: PayloadT::ExecutionData,
267 ) -> EngineApiResult<PayloadStatus> {
268 let payload_or_attrs = PayloadOrAttributes::<
269 '_,
270 PayloadT::ExecutionData,
271 PayloadT::PayloadAttributes,
272 >::from_execution_payload(&payload);
273 self.inner
274 .validator
275 .validate_version_specific_fields(EngineApiMessageVersion::V4, payload_or_attrs)?;
276
277 Ok(self
278 .inner
279 .beacon_consensus
280 .new_payload(payload)
281 .await
282 .inspect(|_| self.inner.on_new_payload_response())?)
283 }
284
285 pub async fn new_payload_v4_metered(
287 &self,
288 payload: PayloadT::ExecutionData,
289 ) -> RpcResult<PayloadStatus> {
290 let start = Instant::now();
291 let gas_used = payload.gas_used();
292
293 let res = Self::new_payload_v4(self, payload).await;
294
295 let elapsed = start.elapsed();
296 self.inner.metrics.latency.new_payload_v4.record(elapsed);
297 self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
298 Ok(res?)
299 }
300}
301
302impl<Provider, EngineT, Pool, Validator, ChainSpec>
303 EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
304where
305 Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
306 EngineT: EngineTypes,
307 Pool: TransactionPool + 'static,
308 Validator: EngineValidator<EngineT>,
309 ChainSpec: EthereumHardforks + Send + Sync + 'static,
310{
311 pub async fn fork_choice_updated_v1(
318 &self,
319 state: ForkchoiceState,
320 payload_attrs: Option<EngineT::PayloadAttributes>,
321 ) -> EngineApiResult<ForkchoiceUpdated> {
322 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V1, state, payload_attrs)
323 .await
324 }
325
326 pub async fn fork_choice_updated_v1_metered(
328 &self,
329 state: ForkchoiceState,
330 payload_attrs: Option<EngineT::PayloadAttributes>,
331 ) -> EngineApiResult<ForkchoiceUpdated> {
332 let start = Instant::now();
333 let res = Self::fork_choice_updated_v1(self, state, payload_attrs).await;
334 self.inner.metrics.latency.fork_choice_updated_v1.record(start.elapsed());
335 self.inner.metrics.fcu_response.update_response_metrics(&res);
336 res
337 }
338
339 pub async fn fork_choice_updated_v2(
344 &self,
345 state: ForkchoiceState,
346 payload_attrs: Option<EngineT::PayloadAttributes>,
347 ) -> EngineApiResult<ForkchoiceUpdated> {
348 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V2, state, payload_attrs)
349 .await
350 }
351
352 pub async fn fork_choice_updated_v2_metered(
354 &self,
355 state: ForkchoiceState,
356 payload_attrs: Option<EngineT::PayloadAttributes>,
357 ) -> EngineApiResult<ForkchoiceUpdated> {
358 let start = Instant::now();
359 let res = Self::fork_choice_updated_v2(self, state, payload_attrs).await;
360 self.inner.metrics.latency.fork_choice_updated_v2.record(start.elapsed());
361 self.inner.metrics.fcu_response.update_response_metrics(&res);
362 res
363 }
364
365 pub async fn fork_choice_updated_v3(
370 &self,
371 state: ForkchoiceState,
372 payload_attrs: Option<EngineT::PayloadAttributes>,
373 ) -> EngineApiResult<ForkchoiceUpdated> {
374 self.validate_and_execute_forkchoice(EngineApiMessageVersion::V3, state, payload_attrs)
375 .await
376 }
377
378 pub async fn fork_choice_updated_v3_metered(
380 &self,
381 state: ForkchoiceState,
382 payload_attrs: Option<EngineT::PayloadAttributes>,
383 ) -> EngineApiResult<ForkchoiceUpdated> {
384 let start = Instant::now();
385 let res = Self::fork_choice_updated_v3(self, state, payload_attrs).await;
386 self.inner.metrics.latency.fork_choice_updated_v3.record(start.elapsed());
387 self.inner.metrics.fcu_response.update_response_metrics(&res);
388 res
389 }
390
391 async fn get_built_payload(
393 &self,
394 payload_id: PayloadId,
395 ) -> EngineApiResult<EngineT::BuiltPayload> {
396 self.inner
397 .payload_store
398 .resolve(payload_id)
399 .await
400 .ok_or(EngineApiError::UnknownPayload)?
401 .map_err(|_| EngineApiError::UnknownPayload)
402 }
403
404 async fn get_payload_inner<R>(
407 &self,
408 payload_id: PayloadId,
409 version: EngineApiMessageVersion,
410 ) -> EngineApiResult<R>
411 where
412 EngineT::BuiltPayload: TryInto<R>,
413 {
414 let attributes = self.get_payload_attributes(payload_id).await?;
416
417 validate_payload_timestamp(&self.inner.chain_spec, version, attributes.timestamp())?;
419
420 self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
422 warn!(?version, "could not transform built payload");
423 EngineApiError::UnknownPayload
424 })
425 }
426
427 pub async fn get_payload_v1(
437 &self,
438 payload_id: PayloadId,
439 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
440 self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
441 warn!(version = ?EngineApiMessageVersion::V1, "could not transform built payload");
442 EngineApiError::UnknownPayload
443 })
444 }
445
446 pub async fn get_payload_v1_metered(
448 &self,
449 payload_id: PayloadId,
450 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
451 let start = Instant::now();
452 let res = Self::get_payload_v1(self, payload_id).await;
453 self.inner.metrics.latency.get_payload_v1.record(start.elapsed());
454 res
455 }
456
457 pub async fn get_payload_v2(
465 &self,
466 payload_id: PayloadId,
467 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
468 self.get_payload_inner(payload_id, EngineApiMessageVersion::V2).await
469 }
470
471 pub async fn get_payload_v2_metered(
473 &self,
474 payload_id: PayloadId,
475 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
476 let start = Instant::now();
477 let res = Self::get_payload_v2(self, payload_id).await;
478 self.inner.metrics.latency.get_payload_v2.record(start.elapsed());
479 res
480 }
481
482 pub async fn get_payload_v3(
490 &self,
491 payload_id: PayloadId,
492 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
493 self.get_payload_inner(payload_id, EngineApiMessageVersion::V3).await
494 }
495
496 pub async fn get_payload_v3_metered(
498 &self,
499 payload_id: PayloadId,
500 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
501 let start = Instant::now();
502 let res = Self::get_payload_v3(self, payload_id).await;
503 self.inner.metrics.latency.get_payload_v3.record(start.elapsed());
504 res
505 }
506
507 pub async fn get_payload_v4(
515 &self,
516 payload_id: PayloadId,
517 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
518 self.get_payload_inner(payload_id, EngineApiMessageVersion::V4).await
519 }
520
521 pub async fn get_payload_v4_metered(
523 &self,
524 payload_id: PayloadId,
525 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
526 let start = Instant::now();
527 let res = Self::get_payload_v4(self, payload_id).await;
528 self.inner.metrics.latency.get_payload_v4.record(start.elapsed());
529 res
530 }
531
532 pub async fn get_payload_v5(
542 &self,
543 payload_id: PayloadId,
544 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
545 self.get_payload_inner(payload_id, EngineApiMessageVersion::V5).await
546 }
547
548 pub async fn get_payload_v5_metered(
550 &self,
551 payload_id: PayloadId,
552 ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
553 let start = Instant::now();
554 let res = Self::get_payload_v5(self, payload_id).await;
555 self.inner.metrics.latency.get_payload_v5.record(start.elapsed());
556 res
557 }
558
559 pub async fn get_payload_bodies_by_range_with<F, R>(
562 &self,
563 start: BlockNumber,
564 count: u64,
565 f: F,
566 ) -> EngineApiResult<Vec<Option<R>>>
567 where
568 F: Fn(Provider::Block) -> R + Send + 'static,
569 R: Send + 'static,
570 {
571 let (tx, rx) = oneshot::channel();
572 let inner = self.inner.clone();
573
574 self.inner.task_spawner.spawn_blocking(Box::pin(async move {
575 if count > MAX_PAYLOAD_BODIES_LIMIT {
576 tx.send(Err(EngineApiError::PayloadRequestTooLarge { len: count })).ok();
577 return;
578 }
579
580 if start == 0 || count == 0 {
581 tx.send(Err(EngineApiError::InvalidBodiesRange { start, count })).ok();
582 return;
583 }
584
585 let mut result = Vec::with_capacity(count as usize);
586
587 let mut end = start.saturating_add(count - 1);
589
590 if let Ok(best_block) = inner.provider.best_block_number() {
593 if end > best_block {
594 end = best_block;
595 }
596 }
597
598 for num in start..=end {
599 let block_result = inner.provider.block(BlockHashOrNumber::Number(num));
600 match block_result {
601 Ok(block) => {
602 result.push(block.map(&f));
603 }
604 Err(err) => {
605 tx.send(Err(EngineApiError::Internal(Box::new(err)))).ok();
606 return;
607 }
608 };
609 }
610 tx.send(Ok(result)).ok();
611 }));
612
613 rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
614 }
615
616 pub async fn get_payload_bodies_by_range_v1(
627 &self,
628 start: BlockNumber,
629 count: u64,
630 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
631 self.get_payload_bodies_by_range_with(start, count, |block| ExecutionPayloadBodyV1 {
632 transactions: block.body().encoded_2718_transactions(),
633 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
634 })
635 .await
636 }
637
638 pub async fn get_payload_bodies_by_range_v1_metered(
640 &self,
641 start: BlockNumber,
642 count: u64,
643 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
644 let start_time = Instant::now();
645 let res = Self::get_payload_bodies_by_range_v1(self, start, count).await;
646 self.inner.metrics.latency.get_payload_bodies_by_range_v1.record(start_time.elapsed());
647 res
648 }
649
650 pub async fn get_payload_bodies_by_hash_with<F, R>(
652 &self,
653 hashes: Vec<BlockHash>,
654 f: F,
655 ) -> EngineApiResult<Vec<Option<R>>>
656 where
657 F: Fn(Provider::Block) -> R + Send + 'static,
658 R: Send + 'static,
659 {
660 let len = hashes.len() as u64;
661 if len > MAX_PAYLOAD_BODIES_LIMIT {
662 return Err(EngineApiError::PayloadRequestTooLarge { len });
663 }
664
665 let (tx, rx) = oneshot::channel();
666 let inner = self.inner.clone();
667
668 self.inner.task_spawner.spawn_blocking(Box::pin(async move {
669 let mut result = Vec::with_capacity(hashes.len());
670 for hash in hashes {
671 let block_result = inner.provider.block(BlockHashOrNumber::Hash(hash));
672 match block_result {
673 Ok(block) => {
674 result.push(block.map(&f));
675 }
676 Err(err) => {
677 let _ = tx.send(Err(EngineApiError::Internal(Box::new(err))));
678 return;
679 }
680 }
681 }
682 tx.send(Ok(result)).ok();
683 }));
684
685 rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
686 }
687
688 pub async fn get_payload_bodies_by_hash_v1(
690 &self,
691 hashes: Vec<BlockHash>,
692 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
693 self.get_payload_bodies_by_hash_with(hashes, |block| ExecutionPayloadBodyV1 {
694 transactions: block.body().encoded_2718_transactions(),
695 withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
696 })
697 .await
698 }
699
700 pub async fn get_payload_bodies_by_hash_v1_metered(
702 &self,
703 hashes: Vec<BlockHash>,
704 ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
705 let start = Instant::now();
706 let res = Self::get_payload_bodies_by_hash_v1(self, hashes);
707 self.inner.metrics.latency.get_payload_bodies_by_hash_v1.record(start.elapsed());
708 res.await
709 }
710
711 async fn validate_and_execute_forkchoice(
725 &self,
726 version: EngineApiMessageVersion,
727 state: ForkchoiceState,
728 payload_attrs: Option<EngineT::PayloadAttributes>,
729 ) -> EngineApiResult<ForkchoiceUpdated> {
730 self.inner.record_elapsed_time_on_fcu();
731
732 if let Some(ref attrs) = payload_attrs {
733 let attr_validation_res =
734 self.inner.validator.ensure_well_formed_attributes(version, attrs);
735
736 if let Err(err) = attr_validation_res {
750 let fcu_res =
751 self.inner.beacon_consensus.fork_choice_updated(state, None, version).await?;
752 if fcu_res.is_invalid() {
755 return Ok(fcu_res)
756 }
757 return Err(err.into())
758 }
759 }
760
761 Ok(self.inner.beacon_consensus.fork_choice_updated(state, payload_attrs, version).await?)
762 }
763
764 pub fn capabilities(&self) -> &EngineCapabilities {
766 &self.inner.capabilities
767 }
768
769 fn get_blobs_v1(
770 &self,
771 versioned_hashes: Vec<B256>,
772 ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
773 if versioned_hashes.len() > MAX_BLOB_LIMIT {
774 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
775 }
776
777 self.inner
778 .tx_pool
779 .get_blobs_for_versioned_hashes_v1(&versioned_hashes)
780 .map_err(|err| EngineApiError::Internal(Box::new(err)))
781 }
782
783 fn get_blobs_v1_metered(
784 &self,
785 versioned_hashes: Vec<B256>,
786 ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
787 let hashes_len = versioned_hashes.len();
788 let start = Instant::now();
789 let res = Self::get_blobs_v1(self, versioned_hashes);
790 self.inner.metrics.latency.get_blobs_v1.record(start.elapsed());
791
792 if let Ok(blobs) = &res {
793 let blobs_found = blobs.iter().flatten().count();
794 let blobs_missed = hashes_len - blobs_found;
795
796 self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
797 self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
798 }
799
800 res
801 }
802
803 fn get_blobs_v2(
804 &self,
805 versioned_hashes: Vec<B256>,
806 ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
807 if versioned_hashes.len() > MAX_BLOB_LIMIT {
808 return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
809 }
810
811 self.inner
812 .tx_pool
813 .get_blobs_for_versioned_hashes_v2(&versioned_hashes)
814 .map_err(|err| EngineApiError::Internal(Box::new(err)))
815 }
816
817 fn get_blobs_v2_metered(
818 &self,
819 versioned_hashes: Vec<B256>,
820 ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
821 let hashes_len = versioned_hashes.len();
822 let start = Instant::now();
823 let res = Self::get_blobs_v2(self, versioned_hashes);
824 self.inner.metrics.latency.get_blobs_v2.record(start.elapsed());
825
826 if let Ok(blobs) = &res {
827 let blobs_found = blobs.iter().flatten().count();
828
829 self.inner
830 .metrics
831 .blob_metrics
832 .get_blobs_requests_blobs_total
833 .increment(hashes_len as u64);
834 self.inner
835 .metrics
836 .blob_metrics
837 .get_blobs_requests_blobs_in_blobpool_total
838 .increment(blobs_found as u64);
839
840 if blobs_found == hashes_len {
841 self.inner.metrics.blob_metrics.get_blobs_requests_success_total.increment(1);
842 } else {
843 self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
844 }
845 } else {
846 self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
847 }
848
849 res
850 }
851}
852
853impl<Provider, PayloadT, Pool, Validator, ChainSpec>
854 EngineApiInner<Provider, PayloadT, Pool, Validator, ChainSpec>
855where
856 PayloadT: PayloadTypes,
857{
858 fn record_elapsed_time_on_fcu(&self) {
861 if let Some(start_time) = self.latest_new_payload_response.lock().take() {
862 let elapsed_time = start_time.elapsed();
863 self.metrics.latency.new_payload_forkchoice_updated_time_diff.record(elapsed_time);
864 }
865 }
866
867 fn on_new_payload_response(&self) {
869 self.latest_new_payload_response.lock().replace(Instant::now());
870 }
871}
872
873#[async_trait]
875impl<Provider, EngineT, Pool, Validator, ChainSpec> EngineApiServer<EngineT>
876 for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
877where
878 Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
879 EngineT: EngineTypes<ExecutionData = ExecutionData>,
880 Pool: TransactionPool + 'static,
881 Validator: EngineValidator<EngineT>,
882 ChainSpec: EthereumHardforks + Send + Sync + 'static,
883{
884 async fn new_payload_v1(&self, payload: ExecutionPayloadV1) -> RpcResult<PayloadStatus> {
888 trace!(target: "rpc::engine", "Serving engine_newPayloadV1");
889 let payload =
890 ExecutionData { payload: payload.into(), sidecar: ExecutionPayloadSidecar::none() };
891 Ok(self.new_payload_v1_metered(payload).await?)
892 }
893
894 async fn new_payload_v2(&self, payload: ExecutionPayloadInputV2) -> RpcResult<PayloadStatus> {
897 trace!(target: "rpc::engine", "Serving engine_newPayloadV2");
898 let payload = ExecutionData {
899 payload: payload.into_payload(),
900 sidecar: ExecutionPayloadSidecar::none(),
901 };
902
903 Ok(self.new_payload_v2_metered(payload).await?)
904 }
905
906 async fn new_payload_v3(
909 &self,
910 payload: ExecutionPayloadV3,
911 versioned_hashes: Vec<B256>,
912 parent_beacon_block_root: B256,
913 ) -> RpcResult<PayloadStatus> {
914 trace!(target: "rpc::engine", "Serving engine_newPayloadV3");
915 let payload = ExecutionData {
916 payload: payload.into(),
917 sidecar: ExecutionPayloadSidecar::v3(CancunPayloadFields {
918 versioned_hashes,
919 parent_beacon_block_root,
920 }),
921 };
922
923 Ok(self.new_payload_v3_metered(payload).await?)
924 }
925
926 async fn new_payload_v4(
929 &self,
930 payload: ExecutionPayloadV3,
931 versioned_hashes: Vec<B256>,
932 parent_beacon_block_root: B256,
933 requests: RequestsOrHash,
934 ) -> RpcResult<PayloadStatus> {
935 trace!(target: "rpc::engine", "Serving engine_newPayloadV4");
936
937 if requests.is_hash() && !self.inner.accept_execution_requests_hash {
939 return Err(EngineApiError::UnexpectedRequestsHash.into());
940 }
941
942 let payload = ExecutionData {
943 payload: payload.into(),
944 sidecar: ExecutionPayloadSidecar::v4(
945 CancunPayloadFields { versioned_hashes, parent_beacon_block_root },
946 PraguePayloadFields { requests },
947 ),
948 };
949
950 Ok(self.new_payload_v4_metered(payload).await?)
951 }
952
953 async fn fork_choice_updated_v1(
958 &self,
959 fork_choice_state: ForkchoiceState,
960 payload_attributes: Option<EngineT::PayloadAttributes>,
961 ) -> RpcResult<ForkchoiceUpdated> {
962 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV1");
963 Ok(self.fork_choice_updated_v1_metered(fork_choice_state, payload_attributes).await?)
964 }
965
966 async fn fork_choice_updated_v2(
969 &self,
970 fork_choice_state: ForkchoiceState,
971 payload_attributes: Option<EngineT::PayloadAttributes>,
972 ) -> RpcResult<ForkchoiceUpdated> {
973 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV2");
974 Ok(self.fork_choice_updated_v2_metered(fork_choice_state, payload_attributes).await?)
975 }
976
977 async fn fork_choice_updated_v3(
981 &self,
982 fork_choice_state: ForkchoiceState,
983 payload_attributes: Option<EngineT::PayloadAttributes>,
984 ) -> RpcResult<ForkchoiceUpdated> {
985 trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV3");
986 Ok(self.fork_choice_updated_v3_metered(fork_choice_state, payload_attributes).await?)
987 }
988
989 async fn get_payload_v1(
1001 &self,
1002 payload_id: PayloadId,
1003 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV1> {
1004 trace!(target: "rpc::engine", "Serving engine_getPayloadV1");
1005 Ok(self.get_payload_v1_metered(payload_id).await?)
1006 }
1007
1008 async fn get_payload_v2(
1018 &self,
1019 payload_id: PayloadId,
1020 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV2> {
1021 debug!(target: "rpc::engine", id = %payload_id, "Serving engine_getPayloadV2");
1022 Ok(self.get_payload_v2_metered(payload_id).await?)
1023 }
1024
1025 async fn get_payload_v3(
1035 &self,
1036 payload_id: PayloadId,
1037 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV3> {
1038 trace!(target: "rpc::engine", "Serving engine_getPayloadV3");
1039 Ok(self.get_payload_v3_metered(payload_id).await?)
1040 }
1041
1042 async fn get_payload_v4(
1052 &self,
1053 payload_id: PayloadId,
1054 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV4> {
1055 trace!(target: "rpc::engine", "Serving engine_getPayloadV4");
1056 Ok(self.get_payload_v4_metered(payload_id).await?)
1057 }
1058
1059 async fn get_payload_v5(
1069 &self,
1070 payload_id: PayloadId,
1071 ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV5> {
1072 trace!(target: "rpc::engine", "Serving engine_getPayloadV5");
1073 Ok(self.get_payload_v5_metered(payload_id).await?)
1074 }
1075
1076 async fn get_payload_bodies_by_hash_v1(
1079 &self,
1080 block_hashes: Vec<BlockHash>,
1081 ) -> RpcResult<ExecutionPayloadBodiesV1> {
1082 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV1");
1083 Ok(self.get_payload_bodies_by_hash_v1_metered(block_hashes).await?)
1084 }
1085
1086 async fn get_payload_bodies_by_range_v1(
1103 &self,
1104 start: U64,
1105 count: U64,
1106 ) -> RpcResult<ExecutionPayloadBodiesV1> {
1107 trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV1");
1108 Ok(self.get_payload_bodies_by_range_v1_metered(start.to(), count.to()).await?)
1109 }
1110
1111 async fn get_client_version_v1(
1115 &self,
1116 client: ClientVersionV1,
1117 ) -> RpcResult<Vec<ClientVersionV1>> {
1118 trace!(target: "rpc::engine", "Serving engine_getClientVersionV1");
1119 Ok(Self::get_client_version_v1(self, client)?)
1120 }
1121
1122 async fn exchange_capabilities(&self, _capabilities: Vec<String>) -> RpcResult<Vec<String>> {
1125 Ok(self.capabilities().list())
1126 }
1127
1128 async fn get_blobs_v1(
1129 &self,
1130 versioned_hashes: Vec<B256>,
1131 ) -> RpcResult<Vec<Option<BlobAndProofV1>>> {
1132 trace!(target: "rpc::engine", "Serving engine_getBlobsV1");
1133 Ok(self.get_blobs_v1_metered(versioned_hashes)?)
1134 }
1135
1136 async fn get_blobs_v2(
1137 &self,
1138 versioned_hashes: Vec<B256>,
1139 ) -> RpcResult<Option<Vec<BlobAndProofV2>>> {
1140 trace!(target: "rpc::engine", "Serving engine_getBlobsV2");
1141 Ok(self.get_blobs_v2_metered(versioned_hashes)?)
1142 }
1143}
1144
1145impl<Provider, EngineT, Pool, Validator, ChainSpec> IntoEngineApiRpcModule
1146 for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
1147where
1148 EngineT: EngineTypes,
1149 Self: EngineApiServer<EngineT>,
1150{
1151 fn into_rpc_module(self) -> RpcModule<()> {
1152 self.into_rpc().remove_context()
1153 }
1154}
1155
1156impl<Provider, PayloadT, Pool, Validator, ChainSpec> std::fmt::Debug
1157 for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1158where
1159 PayloadT: PayloadTypes,
1160{
1161 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1162 f.debug_struct("EngineApi").finish_non_exhaustive()
1163 }
1164}
1165
1166#[cfg(test)]
1167mod tests {
1168 use super::*;
1169 use alloy_rpc_types_engine::{ClientCode, ClientVersionV1};
1170 use assert_matches::assert_matches;
1171 use reth_chainspec::{ChainSpec, MAINNET};
1172 use reth_engine_primitives::BeaconEngineMessage;
1173 use reth_ethereum_engine_primitives::EthEngineTypes;
1174 use reth_ethereum_primitives::Block;
1175 use reth_node_ethereum::EthereumEngineValidator;
1176 use reth_payload_builder::test_utils::spawn_test_payload_service;
1177 use reth_provider::test_utils::MockEthProvider;
1178 use reth_tasks::TokioTaskExecutor;
1179 use reth_transaction_pool::noop::NoopTransactionPool;
1180 use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
1181
1182 fn setup_engine_api() -> (
1183 EngineApiTestHandle,
1184 EngineApi<
1185 Arc<MockEthProvider>,
1186 EthEngineTypes,
1187 NoopTransactionPool,
1188 EthereumEngineValidator,
1189 ChainSpec,
1190 >,
1191 ) {
1192 let client = ClientVersionV1 {
1193 code: ClientCode::RH,
1194 name: "Reth".to_string(),
1195 version: "v0.2.0-beta.5".to_string(),
1196 commit: "defa64b2".to_string(),
1197 };
1198
1199 let chain_spec: Arc<ChainSpec> = MAINNET.clone();
1200 let provider = Arc::new(MockEthProvider::default());
1201 let payload_store = spawn_test_payload_service();
1202 let (to_engine, engine_rx) = unbounded_channel();
1203 let task_executor = Box::<TokioTaskExecutor>::default();
1204 let api = EngineApi::new(
1205 provider.clone(),
1206 chain_spec.clone(),
1207 BeaconConsensusEngineHandle::new(to_engine),
1208 payload_store.into(),
1209 NoopTransactionPool::default(),
1210 task_executor,
1211 client,
1212 EngineCapabilities::default(),
1213 EthereumEngineValidator::new(chain_spec.clone()),
1214 false,
1215 );
1216 let handle = EngineApiTestHandle { chain_spec, provider, from_api: engine_rx };
1217 (handle, api)
1218 }
1219
1220 #[tokio::test]
1221 async fn engine_client_version_v1() {
1222 let client = ClientVersionV1 {
1223 code: ClientCode::RH,
1224 name: "Reth".to_string(),
1225 version: "v0.2.0-beta.5".to_string(),
1226 commit: "defa64b2".to_string(),
1227 };
1228 let (_, api) = setup_engine_api();
1229 let res = api.get_client_version_v1(client.clone());
1230 assert_eq!(res.unwrap(), vec![client]);
1231 }
1232
1233 struct EngineApiTestHandle {
1234 #[allow(dead_code)]
1235 chain_spec: Arc<ChainSpec>,
1236 provider: Arc<MockEthProvider>,
1237 from_api: UnboundedReceiver<BeaconEngineMessage<EthEngineTypes>>,
1238 }
1239
1240 #[tokio::test]
1241 async fn forwards_responses_to_consensus_engine() {
1242 let (mut handle, api) = setup_engine_api();
1243
1244 tokio::spawn(async move {
1245 let payload_v1 = ExecutionPayloadV1::from_block_slow(&Block::default());
1246 let execution_data = ExecutionData {
1247 payload: payload_v1.into(),
1248 sidecar: ExecutionPayloadSidecar::none(),
1249 };
1250
1251 api.new_payload_v1(execution_data).await.unwrap();
1252 });
1253 assert_matches!(handle.from_api.recv().await, Some(BeaconEngineMessage::NewPayload { .. }));
1254 }
1255
1256 mod get_payload_bodies {
1258 use super::*;
1259 use alloy_rpc_types_engine::ExecutionPayloadBodyV1;
1260 use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
1261
1262 #[tokio::test]
1263 async fn invalid_params() {
1264 let (_, api) = setup_engine_api();
1265
1266 let by_range_tests = [
1267 (0, 0),
1269 (0, 1),
1270 (1, 0),
1271 ];
1272
1273 for (start, count) in by_range_tests {
1275 let res = api.get_payload_bodies_by_range_v1(start, count).await;
1276 assert_matches!(res, Err(EngineApiError::InvalidBodiesRange { .. }));
1277 }
1278 }
1279
1280 #[tokio::test]
1281 async fn request_too_large() {
1282 let (_, api) = setup_engine_api();
1283
1284 let request_count = MAX_PAYLOAD_BODIES_LIMIT + 1;
1285 let res = api.get_payload_bodies_by_range_v1(0, request_count).await;
1286 assert_matches!(res, Err(EngineApiError::PayloadRequestTooLarge { .. }));
1287 }
1288
1289 #[tokio::test]
1290 async fn returns_payload_bodies() {
1291 let mut rng = generators::rng();
1292 let (handle, api) = setup_engine_api();
1293
1294 let (start, count) = (1, 10);
1295 let blocks = random_block_range(
1296 &mut rng,
1297 start..=start + count - 1,
1298 BlockRangeParams { tx_count: 0..2, ..Default::default() },
1299 );
1300 handle
1301 .provider
1302 .extend_blocks(blocks.iter().cloned().map(|b| (b.hash(), b.into_block())));
1303
1304 let expected = blocks
1305 .iter()
1306 .cloned()
1307 .map(|b| Some(ExecutionPayloadBodyV1::from_block(b.into_block())))
1308 .collect::<Vec<_>>();
1309
1310 let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1311 assert_eq!(res, expected);
1312 }
1313
1314 #[tokio::test]
1315 async fn returns_payload_bodies_with_gaps() {
1316 let mut rng = generators::rng();
1317 let (handle, api) = setup_engine_api();
1318
1319 let (start, count) = (1, 100);
1320 let blocks = random_block_range(
1321 &mut rng,
1322 start..=start + count - 1,
1323 BlockRangeParams { tx_count: 0..2, ..Default::default() },
1324 );
1325
1326 let first_missing_range = 26..=50;
1328 let second_missing_range = 76..=100;
1329 handle.provider.extend_blocks(
1330 blocks
1331 .iter()
1332 .filter(|b| {
1333 !first_missing_range.contains(&b.number) &&
1334 !second_missing_range.contains(&b.number)
1335 })
1336 .map(|b| (b.hash(), b.clone().into_block())),
1337 );
1338
1339 let expected = blocks
1340 .iter()
1341 .filter(|b| !second_missing_range.contains(&b.number))
1344 .cloned()
1345 .map(|b| {
1346 if first_missing_range.contains(&b.number) {
1347 None
1348 } else {
1349 Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1350 }
1351 })
1352 .collect::<Vec<_>>();
1353
1354 let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1355 assert_eq!(res, expected);
1356
1357 let expected = blocks
1358 .iter()
1359 .cloned()
1360 .map(|b| {
1363 if first_missing_range.contains(&b.number) ||
1364 second_missing_range.contains(&b.number)
1365 {
1366 None
1367 } else {
1368 Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1369 }
1370 })
1371 .collect::<Vec<_>>();
1372
1373 let hashes = blocks.iter().map(|b| b.hash()).collect();
1374 let res = api.get_payload_bodies_by_hash_v1(hashes).await.unwrap();
1375 assert_eq!(res, expected);
1376 }
1377 }
1378}