reth_rpc_engine_api/
engine_api.rs

1use crate::{
2    capabilities::EngineCapabilities, metrics::EngineApiMetrics, EngineApiError, EngineApiResult,
3};
4use alloy_eips::{
5    eip1898::BlockHashOrNumber,
6    eip4844::{BlobAndProofV1, BlobAndProofV2},
7    eip4895::Withdrawals,
8    eip7685::RequestsOrHash,
9};
10use alloy_primitives::{BlockHash, BlockNumber, B256, U64};
11use alloy_rpc_types_engine::{
12    CancunPayloadFields, ClientVersionV1, ExecutionData, ExecutionPayloadBodiesV1,
13    ExecutionPayloadBodyV1, ExecutionPayloadInputV2, ExecutionPayloadSidecar, ExecutionPayloadV1,
14    ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus,
15    PraguePayloadFields,
16};
17use async_trait::async_trait;
18use jsonrpsee_core::{server::RpcModule, RpcResult};
19use parking_lot::Mutex;
20use reth_chainspec::EthereumHardforks;
21use reth_engine_primitives::{BeaconConsensusEngineHandle, EngineTypes, EngineValidator};
22use reth_payload_builder::PayloadStore;
23use reth_payload_primitives::{
24    validate_payload_timestamp, EngineApiMessageVersion, ExecutionPayload,
25    PayloadBuilderAttributes, PayloadOrAttributes, PayloadTypes,
26};
27use reth_primitives_traits::{Block, BlockBody};
28use reth_rpc_api::{EngineApiServer, IntoEngineApiRpcModule};
29use reth_storage_api::{BlockReader, HeaderProvider, StateProviderFactory};
30use reth_tasks::TaskSpawner;
31use reth_transaction_pool::TransactionPool;
32use std::{sync::Arc, time::Instant};
33use tokio::sync::oneshot;
34use tracing::{debug, trace, warn};
35
36/// The Engine API response sender.
37pub type EngineApiSender<Ok> = oneshot::Sender<EngineApiResult<Ok>>;
38
39/// The upper limit for payload bodies request.
40const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024;
41
42/// The upper limit for blobs in `engine_getBlobsVx`.
43const MAX_BLOB_LIMIT: usize = 128;
44
45/// The Engine API implementation that grants the Consensus layer access to data and
46/// functions in the Execution layer that are crucial for the consensus process.
47///
48/// This type is generic over [`EngineTypes`] and intended to be used as the entrypoint for engine
49/// API processing. It can be reused by other non L1 engine APIs that deviate from the L1 spec but
50/// are still follow the engine API model.
51///
52/// ## Implementers
53///
54/// Implementing support for an engine API jsonrpsee RPC handler is done by defining the engine API
55/// server trait and implementing it on a type that can either wrap this [`EngineApi`] type or
56/// use a custom [`EngineTypes`] implementation if it mirrors ethereum's versioned engine API
57/// endpoints (e.g. opstack).
58/// See also [`EngineApiServer`] implementation for this type which is the
59/// L1 implementation.
60pub struct EngineApi<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
61    inner: Arc<EngineApiInner<Provider, PayloadT, Pool, Validator, ChainSpec>>,
62}
63
64struct EngineApiInner<Provider, PayloadT: PayloadTypes, Pool, Validator, ChainSpec> {
65    /// The provider to interact with the chain.
66    provider: Provider,
67    /// Consensus configuration
68    chain_spec: Arc<ChainSpec>,
69    /// The channel to send messages to the beacon consensus engine.
70    beacon_consensus: BeaconConsensusEngineHandle<PayloadT>,
71    /// The type that can communicate with the payload service to retrieve payloads.
72    payload_store: PayloadStore<PayloadT>,
73    /// For spawning and executing async tasks
74    task_spawner: Box<dyn TaskSpawner>,
75    /// The latency and response type metrics for engine api calls
76    metrics: EngineApiMetrics,
77    /// Identification of the execution client used by the consensus client
78    client: ClientVersionV1,
79    /// The list of all supported Engine capabilities available over the engine endpoint.
80    capabilities: EngineCapabilities,
81    /// Transaction pool.
82    tx_pool: Pool,
83    /// Engine validator.
84    validator: Validator,
85    /// Start time of the latest payload request
86    latest_new_payload_response: Mutex<Option<Instant>>,
87    accept_execution_requests_hash: bool,
88}
89
90impl<Provider, PayloadT, Pool, Validator, ChainSpec>
91    EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
92where
93    Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
94    PayloadT: PayloadTypes,
95    Pool: TransactionPool + 'static,
96    Validator: EngineValidator<PayloadT>,
97    ChainSpec: EthereumHardforks + Send + Sync + 'static,
98{
99    /// Create new instance of [`EngineApi`].
100    #[expect(clippy::too_many_arguments)]
101    pub fn new(
102        provider: Provider,
103        chain_spec: Arc<ChainSpec>,
104        beacon_consensus: BeaconConsensusEngineHandle<PayloadT>,
105        payload_store: PayloadStore<PayloadT>,
106        tx_pool: Pool,
107        task_spawner: Box<dyn TaskSpawner>,
108        client: ClientVersionV1,
109        capabilities: EngineCapabilities,
110        validator: Validator,
111        accept_execution_requests_hash: bool,
112    ) -> Self {
113        let inner = Arc::new(EngineApiInner {
114            provider,
115            chain_spec,
116            beacon_consensus,
117            payload_store,
118            task_spawner,
119            metrics: EngineApiMetrics::default(),
120            client,
121            capabilities,
122            tx_pool,
123            validator,
124            latest_new_payload_response: Mutex::new(None),
125            accept_execution_requests_hash,
126        });
127        Self { inner }
128    }
129
130    /// Fetches the client version.
131    pub fn get_client_version_v1(
132        &self,
133        _client: ClientVersionV1,
134    ) -> EngineApiResult<Vec<ClientVersionV1>> {
135        Ok(vec![self.inner.client.clone()])
136    }
137
138    /// Fetches the attributes for the payload with the given id.
139    async fn get_payload_attributes(
140        &self,
141        payload_id: PayloadId,
142    ) -> EngineApiResult<PayloadT::PayloadBuilderAttributes> {
143        Ok(self
144            .inner
145            .payload_store
146            .payload_attributes(payload_id)
147            .await
148            .ok_or(EngineApiError::UnknownPayload)??)
149    }
150
151    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_newpayloadv1>
152    /// Caution: This should not accept the `withdrawals` field
153    pub async fn new_payload_v1(
154        &self,
155        payload: PayloadT::ExecutionData,
156    ) -> EngineApiResult<PayloadStatus> {
157        let payload_or_attrs = PayloadOrAttributes::<
158            '_,
159            PayloadT::ExecutionData,
160            PayloadT::PayloadAttributes,
161        >::from_execution_payload(&payload);
162
163        self.inner
164            .validator
165            .validate_version_specific_fields(EngineApiMessageVersion::V1, payload_or_attrs)?;
166
167        Ok(self
168            .inner
169            .beacon_consensus
170            .new_payload(payload)
171            .await
172            .inspect(|_| self.inner.on_new_payload_response())?)
173    }
174
175    /// Metered version of `new_payload_v1`.
176    async fn new_payload_v1_metered(
177        &self,
178        payload: PayloadT::ExecutionData,
179    ) -> EngineApiResult<PayloadStatus> {
180        let start = Instant::now();
181        let gas_used = payload.gas_used();
182
183        let res = Self::new_payload_v1(self, payload).await;
184        let elapsed = start.elapsed();
185        self.inner.metrics.latency.new_payload_v1.record(elapsed);
186        self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
187        res
188    }
189
190    /// See also <https://github.com/ethereum/execution-apis/blob/584905270d8ad665718058060267061ecfd79ca5/src/engine/shanghai.md#engine_newpayloadv2>
191    pub async fn new_payload_v2(
192        &self,
193        payload: PayloadT::ExecutionData,
194    ) -> EngineApiResult<PayloadStatus> {
195        let payload_or_attrs = PayloadOrAttributes::<
196            '_,
197            PayloadT::ExecutionData,
198            PayloadT::PayloadAttributes,
199        >::from_execution_payload(&payload);
200        self.inner
201            .validator
202            .validate_version_specific_fields(EngineApiMessageVersion::V2, payload_or_attrs)?;
203        Ok(self
204            .inner
205            .beacon_consensus
206            .new_payload(payload)
207            .await
208            .inspect(|_| self.inner.on_new_payload_response())?)
209    }
210
211    /// Metered version of `new_payload_v2`.
212    pub async fn new_payload_v2_metered(
213        &self,
214        payload: PayloadT::ExecutionData,
215    ) -> EngineApiResult<PayloadStatus> {
216        let start = Instant::now();
217        let gas_used = payload.gas_used();
218
219        let res = Self::new_payload_v2(self, payload).await;
220        let elapsed = start.elapsed();
221        self.inner.metrics.latency.new_payload_v2.record(elapsed);
222        self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
223        res
224    }
225
226    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_newpayloadv3>
227    pub async fn new_payload_v3(
228        &self,
229        payload: PayloadT::ExecutionData,
230    ) -> EngineApiResult<PayloadStatus> {
231        let payload_or_attrs = PayloadOrAttributes::<
232            '_,
233            PayloadT::ExecutionData,
234            PayloadT::PayloadAttributes,
235        >::from_execution_payload(&payload);
236        self.inner
237            .validator
238            .validate_version_specific_fields(EngineApiMessageVersion::V3, payload_or_attrs)?;
239
240        Ok(self
241            .inner
242            .beacon_consensus
243            .new_payload(payload)
244            .await
245            .inspect(|_| self.inner.on_new_payload_response())?)
246    }
247
248    /// Metrics version of `new_payload_v3`
249    pub async fn new_payload_v3_metered(
250        &self,
251        payload: PayloadT::ExecutionData,
252    ) -> RpcResult<PayloadStatus> {
253        let start = Instant::now();
254        let gas_used = payload.gas_used();
255
256        let res = Self::new_payload_v3(self, payload).await;
257        let elapsed = start.elapsed();
258        self.inner.metrics.latency.new_payload_v3.record(elapsed);
259        self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
260        Ok(res?)
261    }
262
263    /// See also <https://github.com/ethereum/execution-apis/blob/7907424db935b93c2fe6a3c0faab943adebe8557/src/engine/prague.md#engine_newpayloadv4>
264    pub async fn new_payload_v4(
265        &self,
266        payload: PayloadT::ExecutionData,
267    ) -> EngineApiResult<PayloadStatus> {
268        let payload_or_attrs = PayloadOrAttributes::<
269            '_,
270            PayloadT::ExecutionData,
271            PayloadT::PayloadAttributes,
272        >::from_execution_payload(&payload);
273        self.inner
274            .validator
275            .validate_version_specific_fields(EngineApiMessageVersion::V4, payload_or_attrs)?;
276
277        Ok(self
278            .inner
279            .beacon_consensus
280            .new_payload(payload)
281            .await
282            .inspect(|_| self.inner.on_new_payload_response())?)
283    }
284
285    /// Metrics version of `new_payload_v4`
286    pub async fn new_payload_v4_metered(
287        &self,
288        payload: PayloadT::ExecutionData,
289    ) -> RpcResult<PayloadStatus> {
290        let start = Instant::now();
291        let gas_used = payload.gas_used();
292
293        let res = Self::new_payload_v4(self, payload).await;
294
295        let elapsed = start.elapsed();
296        self.inner.metrics.latency.new_payload_v4.record(elapsed);
297        self.inner.metrics.new_payload_response.update_response_metrics(&res, gas_used, elapsed);
298        Ok(res?)
299    }
300}
301
302impl<Provider, EngineT, Pool, Validator, ChainSpec>
303    EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
304where
305    Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
306    EngineT: EngineTypes,
307    Pool: TransactionPool + 'static,
308    Validator: EngineValidator<EngineT>,
309    ChainSpec: EthereumHardforks + Send + Sync + 'static,
310{
311    /// Sends a message to the beacon consensus engine to update the fork choice _without_
312    /// withdrawals.
313    ///
314    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_forkchoiceUpdatedV1>
315    ///
316    /// Caution: This should not accept the `withdrawals` field
317    pub async fn fork_choice_updated_v1(
318        &self,
319        state: ForkchoiceState,
320        payload_attrs: Option<EngineT::PayloadAttributes>,
321    ) -> EngineApiResult<ForkchoiceUpdated> {
322        self.validate_and_execute_forkchoice(EngineApiMessageVersion::V1, state, payload_attrs)
323            .await
324    }
325
326    /// Metrics version of `fork_choice_updated_v1`
327    pub async fn fork_choice_updated_v1_metered(
328        &self,
329        state: ForkchoiceState,
330        payload_attrs: Option<EngineT::PayloadAttributes>,
331    ) -> EngineApiResult<ForkchoiceUpdated> {
332        let start = Instant::now();
333        let res = Self::fork_choice_updated_v1(self, state, payload_attrs).await;
334        self.inner.metrics.latency.fork_choice_updated_v1.record(start.elapsed());
335        self.inner.metrics.fcu_response.update_response_metrics(&res);
336        res
337    }
338
339    /// Sends a message to the beacon consensus engine to update the fork choice _with_ withdrawals,
340    /// but only _after_ shanghai.
341    ///
342    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_forkchoiceupdatedv2>
343    pub async fn fork_choice_updated_v2(
344        &self,
345        state: ForkchoiceState,
346        payload_attrs: Option<EngineT::PayloadAttributes>,
347    ) -> EngineApiResult<ForkchoiceUpdated> {
348        self.validate_and_execute_forkchoice(EngineApiMessageVersion::V2, state, payload_attrs)
349            .await
350    }
351
352    /// Metrics version of `fork_choice_updated_v2`
353    pub async fn fork_choice_updated_v2_metered(
354        &self,
355        state: ForkchoiceState,
356        payload_attrs: Option<EngineT::PayloadAttributes>,
357    ) -> EngineApiResult<ForkchoiceUpdated> {
358        let start = Instant::now();
359        let res = Self::fork_choice_updated_v2(self, state, payload_attrs).await;
360        self.inner.metrics.latency.fork_choice_updated_v2.record(start.elapsed());
361        self.inner.metrics.fcu_response.update_response_metrics(&res);
362        res
363    }
364
365    /// Sends a message to the beacon consensus engine to update the fork choice _with_ withdrawals,
366    /// but only _after_ cancun.
367    ///
368    /// See also  <https://github.com/ethereum/execution-apis/blob/main/src/engine/cancun.md#engine_forkchoiceupdatedv3>
369    pub async fn fork_choice_updated_v3(
370        &self,
371        state: ForkchoiceState,
372        payload_attrs: Option<EngineT::PayloadAttributes>,
373    ) -> EngineApiResult<ForkchoiceUpdated> {
374        self.validate_and_execute_forkchoice(EngineApiMessageVersion::V3, state, payload_attrs)
375            .await
376    }
377
378    /// Metrics version of `fork_choice_updated_v3`
379    pub async fn fork_choice_updated_v3_metered(
380        &self,
381        state: ForkchoiceState,
382        payload_attrs: Option<EngineT::PayloadAttributes>,
383    ) -> EngineApiResult<ForkchoiceUpdated> {
384        let start = Instant::now();
385        let res = Self::fork_choice_updated_v3(self, state, payload_attrs).await;
386        self.inner.metrics.latency.fork_choice_updated_v3.record(start.elapsed());
387        self.inner.metrics.fcu_response.update_response_metrics(&res);
388        res
389    }
390
391    /// Helper function for retrieving the build payload by id.
392    async fn get_built_payload(
393        &self,
394        payload_id: PayloadId,
395    ) -> EngineApiResult<EngineT::BuiltPayload> {
396        self.inner
397            .payload_store
398            .resolve(payload_id)
399            .await
400            .ok_or(EngineApiError::UnknownPayload)?
401            .map_err(|_| EngineApiError::UnknownPayload)
402    }
403
404    /// Helper function for validating the payload timestamp and retrieving & converting the payload
405    /// into desired envelope.
406    async fn get_payload_inner<R>(
407        &self,
408        payload_id: PayloadId,
409        version: EngineApiMessageVersion,
410    ) -> EngineApiResult<R>
411    where
412        EngineT::BuiltPayload: TryInto<R>,
413    {
414        // First we fetch the payload attributes to check the timestamp
415        let attributes = self.get_payload_attributes(payload_id).await?;
416
417        // validate timestamp according to engine rules
418        validate_payload_timestamp(&self.inner.chain_spec, version, attributes.timestamp())?;
419
420        // Now resolve the payload
421        self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
422            warn!(?version, "could not transform built payload");
423            EngineApiError::UnknownPayload
424        })
425    }
426
427    /// Returns the most recent version of the payload that is available in the corresponding
428    /// payload build process at the time of receiving this call.
429    ///
430    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_getPayloadV1>
431    ///
432    /// Caution: This should not return the `withdrawals` field
433    ///
434    /// Note:
435    /// > Provider software MAY stop the corresponding build process after serving this call.
436    pub async fn get_payload_v1(
437        &self,
438        payload_id: PayloadId,
439    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
440        self.get_built_payload(payload_id).await?.try_into().map_err(|_| {
441            warn!(version = ?EngineApiMessageVersion::V1, "could not transform built payload");
442            EngineApiError::UnknownPayload
443        })
444    }
445
446    /// Metrics version of `get_payload_v1`
447    pub async fn get_payload_v1_metered(
448        &self,
449        payload_id: PayloadId,
450    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV1> {
451        let start = Instant::now();
452        let res = Self::get_payload_v1(self, payload_id).await;
453        self.inner.metrics.latency.get_payload_v1.record(start.elapsed());
454        res
455    }
456
457    /// Returns the most recent version of the payload that is available in the corresponding
458    /// payload build process at the time of receiving this call.
459    ///
460    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_getpayloadv2>
461    ///
462    /// Note:
463    /// > Provider software MAY stop the corresponding build process after serving this call.
464    pub async fn get_payload_v2(
465        &self,
466        payload_id: PayloadId,
467    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
468        self.get_payload_inner(payload_id, EngineApiMessageVersion::V2).await
469    }
470
471    /// Metrics version of `get_payload_v2`
472    pub async fn get_payload_v2_metered(
473        &self,
474        payload_id: PayloadId,
475    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV2> {
476        let start = Instant::now();
477        let res = Self::get_payload_v2(self, payload_id).await;
478        self.inner.metrics.latency.get_payload_v2.record(start.elapsed());
479        res
480    }
481
482    /// Returns the most recent version of the payload that is available in the corresponding
483    /// payload build process at the time of receiving this call.
484    ///
485    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_getpayloadv3>
486    ///
487    /// Note:
488    /// > Provider software MAY stop the corresponding build process after serving this call.
489    pub async fn get_payload_v3(
490        &self,
491        payload_id: PayloadId,
492    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
493        self.get_payload_inner(payload_id, EngineApiMessageVersion::V3).await
494    }
495
496    /// Metrics version of `get_payload_v3`
497    pub async fn get_payload_v3_metered(
498        &self,
499        payload_id: PayloadId,
500    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV3> {
501        let start = Instant::now();
502        let res = Self::get_payload_v3(self, payload_id).await;
503        self.inner.metrics.latency.get_payload_v3.record(start.elapsed());
504        res
505    }
506
507    /// Returns the most recent version of the payload that is available in the corresponding
508    /// payload build process at the time of receiving this call.
509    ///
510    /// See also <https://github.com/ethereum/execution-apis/blob/7907424db935b93c2fe6a3c0faab943adebe8557/src/engine/prague.md#engine_newpayloadv4>
511    ///
512    /// Note:
513    /// > Provider software MAY stop the corresponding build process after serving this call.
514    pub async fn get_payload_v4(
515        &self,
516        payload_id: PayloadId,
517    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
518        self.get_payload_inner(payload_id, EngineApiMessageVersion::V4).await
519    }
520
521    /// Metrics version of `get_payload_v4`
522    pub async fn get_payload_v4_metered(
523        &self,
524        payload_id: PayloadId,
525    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV4> {
526        let start = Instant::now();
527        let res = Self::get_payload_v4(self, payload_id).await;
528        self.inner.metrics.latency.get_payload_v4.record(start.elapsed());
529        res
530    }
531
532    /// Handler for `engine_getPayloadV5`
533    ///
534    /// Returns the most recent version of the payload that is available in the corresponding
535    /// payload build process at the time of receiving this call.
536    ///
537    /// See also <https://github.com/ethereum/execution-apis/blob/15399c2e2f16a5f800bf3f285640357e2c245ad9/src/engine/osaka.md#engine_getpayloadv5>
538    ///
539    /// Note:
540    /// > Provider software MAY stop the corresponding build process after serving this call.
541    pub async fn get_payload_v5(
542        &self,
543        payload_id: PayloadId,
544    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
545        self.get_payload_inner(payload_id, EngineApiMessageVersion::V5).await
546    }
547
548    /// Metrics version of `get_payload_v5`
549    pub async fn get_payload_v5_metered(
550        &self,
551        payload_id: PayloadId,
552    ) -> EngineApiResult<EngineT::ExecutionPayloadEnvelopeV5> {
553        let start = Instant::now();
554        let res = Self::get_payload_v5(self, payload_id).await;
555        self.inner.metrics.latency.get_payload_v5.record(start.elapsed());
556        res
557    }
558
559    /// Fetches all the blocks for the provided range starting at `start`, containing `count`
560    /// blocks and returns the mapped payload bodies.
561    pub async fn get_payload_bodies_by_range_with<F, R>(
562        &self,
563        start: BlockNumber,
564        count: u64,
565        f: F,
566    ) -> EngineApiResult<Vec<Option<R>>>
567    where
568        F: Fn(Provider::Block) -> R + Send + 'static,
569        R: Send + 'static,
570    {
571        let (tx, rx) = oneshot::channel();
572        let inner = self.inner.clone();
573
574        self.inner.task_spawner.spawn_blocking(Box::pin(async move {
575            if count > MAX_PAYLOAD_BODIES_LIMIT {
576                tx.send(Err(EngineApiError::PayloadRequestTooLarge { len: count })).ok();
577                return;
578            }
579
580            if start == 0 || count == 0 {
581                tx.send(Err(EngineApiError::InvalidBodiesRange { start, count })).ok();
582                return;
583            }
584
585            let mut result = Vec::with_capacity(count as usize);
586
587            // -1 so range is inclusive
588            let mut end = start.saturating_add(count - 1);
589
590            // > Client software MUST NOT return trailing null values if the request extends past the current latest known block.
591            // truncate the end if it's greater than the last block
592            if let Ok(best_block) = inner.provider.best_block_number() {
593                if end > best_block {
594                    end = best_block;
595                }
596            }
597
598            for num in start..=end {
599                let block_result = inner.provider.block(BlockHashOrNumber::Number(num));
600                match block_result {
601                    Ok(block) => {
602                        result.push(block.map(&f));
603                    }
604                    Err(err) => {
605                        tx.send(Err(EngineApiError::Internal(Box::new(err)))).ok();
606                        return;
607                    }
608                };
609            }
610            tx.send(Ok(result)).ok();
611        }));
612
613        rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
614    }
615
616    /// Returns the execution payload bodies by the range starting at `start`, containing `count`
617    /// blocks.
618    ///
619    /// WARNING: This method is associated with the `BeaconBlocksByRange` message in the consensus
620    /// layer p2p specification, meaning the input should be treated as untrusted or potentially
621    /// adversarial.
622    ///
623    /// Implementers should take care when acting on the input to this method, specifically
624    /// ensuring that the range is limited properly, and that the range boundaries are computed
625    /// correctly and without panics.
626    pub async fn get_payload_bodies_by_range_v1(
627        &self,
628        start: BlockNumber,
629        count: u64,
630    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
631        self.get_payload_bodies_by_range_with(start, count, |block| ExecutionPayloadBodyV1 {
632            transactions: block.body().encoded_2718_transactions(),
633            withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
634        })
635        .await
636    }
637
638    /// Metrics version of `get_payload_bodies_by_range_v1`
639    pub async fn get_payload_bodies_by_range_v1_metered(
640        &self,
641        start: BlockNumber,
642        count: u64,
643    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
644        let start_time = Instant::now();
645        let res = Self::get_payload_bodies_by_range_v1(self, start, count).await;
646        self.inner.metrics.latency.get_payload_bodies_by_range_v1.record(start_time.elapsed());
647        res
648    }
649
650    /// Called to retrieve execution payload bodies by hashes.
651    pub async fn get_payload_bodies_by_hash_with<F, R>(
652        &self,
653        hashes: Vec<BlockHash>,
654        f: F,
655    ) -> EngineApiResult<Vec<Option<R>>>
656    where
657        F: Fn(Provider::Block) -> R + Send + 'static,
658        R: Send + 'static,
659    {
660        let len = hashes.len() as u64;
661        if len > MAX_PAYLOAD_BODIES_LIMIT {
662            return Err(EngineApiError::PayloadRequestTooLarge { len });
663        }
664
665        let (tx, rx) = oneshot::channel();
666        let inner = self.inner.clone();
667
668        self.inner.task_spawner.spawn_blocking(Box::pin(async move {
669            let mut result = Vec::with_capacity(hashes.len());
670            for hash in hashes {
671                let block_result = inner.provider.block(BlockHashOrNumber::Hash(hash));
672                match block_result {
673                    Ok(block) => {
674                        result.push(block.map(&f));
675                    }
676                    Err(err) => {
677                        let _ = tx.send(Err(EngineApiError::Internal(Box::new(err))));
678                        return;
679                    }
680                }
681            }
682            tx.send(Ok(result)).ok();
683        }));
684
685        rx.await.map_err(|err| EngineApiError::Internal(Box::new(err)))?
686    }
687
688    /// Called to retrieve execution payload bodies by hashes.
689    pub async fn get_payload_bodies_by_hash_v1(
690        &self,
691        hashes: Vec<BlockHash>,
692    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
693        self.get_payload_bodies_by_hash_with(hashes, |block| ExecutionPayloadBodyV1 {
694            transactions: block.body().encoded_2718_transactions(),
695            withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner),
696        })
697        .await
698    }
699
700    /// Metrics version of `get_payload_bodies_by_hash_v1`
701    pub async fn get_payload_bodies_by_hash_v1_metered(
702        &self,
703        hashes: Vec<BlockHash>,
704    ) -> EngineApiResult<ExecutionPayloadBodiesV1> {
705        let start = Instant::now();
706        let res = Self::get_payload_bodies_by_hash_v1(self, hashes);
707        self.inner.metrics.latency.get_payload_bodies_by_hash_v1.record(start.elapsed());
708        res.await
709    }
710
711    /// Validates the `engine_forkchoiceUpdated` payload attributes and executes the forkchoice
712    /// update.
713    ///
714    /// The payload attributes will be validated according to the engine API rules for the given
715    /// message version:
716    /// * If the version is [`EngineApiMessageVersion::V1`], then the payload attributes will be
717    ///   validated according to the Paris rules.
718    /// * If the version is [`EngineApiMessageVersion::V2`], then the payload attributes will be
719    ///   validated according to the Shanghai rules, as well as the validity changes from cancun:
720    ///   <https://github.com/ethereum/execution-apis/blob/584905270d8ad665718058060267061ecfd79ca5/src/engine/cancun.md#update-the-methods-of-previous-forks>
721    ///
722    /// * If the version above [`EngineApiMessageVersion::V3`], then the payload attributes will be
723    ///   validated according to the Cancun rules.
724    async fn validate_and_execute_forkchoice(
725        &self,
726        version: EngineApiMessageVersion,
727        state: ForkchoiceState,
728        payload_attrs: Option<EngineT::PayloadAttributes>,
729    ) -> EngineApiResult<ForkchoiceUpdated> {
730        self.inner.record_elapsed_time_on_fcu();
731
732        if let Some(ref attrs) = payload_attrs {
733            let attr_validation_res =
734                self.inner.validator.ensure_well_formed_attributes(version, attrs);
735
736            // From the engine API spec:
737            //
738            // Client software MUST ensure that payloadAttributes.timestamp is greater than
739            // timestamp of a block referenced by forkchoiceState.headBlockHash. If this condition
740            // isn't held client software MUST respond with -38003: Invalid payload attributes and
741            // MUST NOT begin a payload build process. In such an event, the forkchoiceState
742            // update MUST NOT be rolled back.
743            //
744            // NOTE: This will also apply to the validation result for the cancun or
745            // shanghai-specific fields provided in the payload attributes.
746            //
747            // To do this, we set the payload attrs to `None` if attribute validation failed, but
748            // we still apply the forkchoice update.
749            if let Err(err) = attr_validation_res {
750                let fcu_res =
751                    self.inner.beacon_consensus.fork_choice_updated(state, None, version).await?;
752                // TODO: decide if we want this branch - the FCU INVALID response might be more
753                // useful than the payload attributes INVALID response
754                if fcu_res.is_invalid() {
755                    return Ok(fcu_res)
756                }
757                return Err(err.into())
758            }
759        }
760
761        Ok(self.inner.beacon_consensus.fork_choice_updated(state, payload_attrs, version).await?)
762    }
763
764    /// Returns reference to supported capabilities.
765    pub fn capabilities(&self) -> &EngineCapabilities {
766        &self.inner.capabilities
767    }
768
769    fn get_blobs_v1(
770        &self,
771        versioned_hashes: Vec<B256>,
772    ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
773        if versioned_hashes.len() > MAX_BLOB_LIMIT {
774            return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
775        }
776
777        self.inner
778            .tx_pool
779            .get_blobs_for_versioned_hashes_v1(&versioned_hashes)
780            .map_err(|err| EngineApiError::Internal(Box::new(err)))
781    }
782
783    fn get_blobs_v1_metered(
784        &self,
785        versioned_hashes: Vec<B256>,
786    ) -> EngineApiResult<Vec<Option<BlobAndProofV1>>> {
787        let hashes_len = versioned_hashes.len();
788        let start = Instant::now();
789        let res = Self::get_blobs_v1(self, versioned_hashes);
790        self.inner.metrics.latency.get_blobs_v1.record(start.elapsed());
791
792        if let Ok(blobs) = &res {
793            let blobs_found = blobs.iter().flatten().count();
794            let blobs_missed = hashes_len - blobs_found;
795
796            self.inner.metrics.blob_metrics.blob_count.increment(blobs_found as u64);
797            self.inner.metrics.blob_metrics.blob_misses.increment(blobs_missed as u64);
798        }
799
800        res
801    }
802
803    fn get_blobs_v2(
804        &self,
805        versioned_hashes: Vec<B256>,
806    ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
807        if versioned_hashes.len() > MAX_BLOB_LIMIT {
808            return Err(EngineApiError::BlobRequestTooLarge { len: versioned_hashes.len() })
809        }
810
811        self.inner
812            .tx_pool
813            .get_blobs_for_versioned_hashes_v2(&versioned_hashes)
814            .map_err(|err| EngineApiError::Internal(Box::new(err)))
815    }
816
817    fn get_blobs_v2_metered(
818        &self,
819        versioned_hashes: Vec<B256>,
820    ) -> EngineApiResult<Option<Vec<BlobAndProofV2>>> {
821        let hashes_len = versioned_hashes.len();
822        let start = Instant::now();
823        let res = Self::get_blobs_v2(self, versioned_hashes);
824        self.inner.metrics.latency.get_blobs_v2.record(start.elapsed());
825
826        if let Ok(blobs) = &res {
827            let blobs_found = blobs.iter().flatten().count();
828
829            self.inner
830                .metrics
831                .blob_metrics
832                .get_blobs_requests_blobs_total
833                .increment(hashes_len as u64);
834            self.inner
835                .metrics
836                .blob_metrics
837                .get_blobs_requests_blobs_in_blobpool_total
838                .increment(blobs_found as u64);
839
840            if blobs_found == hashes_len {
841                self.inner.metrics.blob_metrics.get_blobs_requests_success_total.increment(1);
842            } else {
843                self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
844            }
845        } else {
846            self.inner.metrics.blob_metrics.get_blobs_requests_failure_total.increment(1);
847        }
848
849        res
850    }
851}
852
853impl<Provider, PayloadT, Pool, Validator, ChainSpec>
854    EngineApiInner<Provider, PayloadT, Pool, Validator, ChainSpec>
855where
856    PayloadT: PayloadTypes,
857{
858    /// Tracks the elapsed time between the new payload response and the received forkchoice update
859    /// request.
860    fn record_elapsed_time_on_fcu(&self) {
861        if let Some(start_time) = self.latest_new_payload_response.lock().take() {
862            let elapsed_time = start_time.elapsed();
863            self.metrics.latency.new_payload_forkchoice_updated_time_diff.record(elapsed_time);
864        }
865    }
866
867    /// Updates the timestamp for the latest new payload response.
868    fn on_new_payload_response(&self) {
869        self.latest_new_payload_response.lock().replace(Instant::now());
870    }
871}
872
873// This is the concrete ethereum engine API implementation.
874#[async_trait]
875impl<Provider, EngineT, Pool, Validator, ChainSpec> EngineApiServer<EngineT>
876    for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
877where
878    Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static,
879    EngineT: EngineTypes<ExecutionData = ExecutionData>,
880    Pool: TransactionPool + 'static,
881    Validator: EngineValidator<EngineT>,
882    ChainSpec: EthereumHardforks + Send + Sync + 'static,
883{
884    /// Handler for `engine_newPayloadV1`
885    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_newpayloadv1>
886    /// Caution: This should not accept the `withdrawals` field
887    async fn new_payload_v1(&self, payload: ExecutionPayloadV1) -> RpcResult<PayloadStatus> {
888        trace!(target: "rpc::engine", "Serving engine_newPayloadV1");
889        let payload =
890            ExecutionData { payload: payload.into(), sidecar: ExecutionPayloadSidecar::none() };
891        Ok(self.new_payload_v1_metered(payload).await?)
892    }
893
894    /// Handler for `engine_newPayloadV2`
895    /// See also <https://github.com/ethereum/execution-apis/blob/584905270d8ad665718058060267061ecfd79ca5/src/engine/shanghai.md#engine_newpayloadv2>
896    async fn new_payload_v2(&self, payload: ExecutionPayloadInputV2) -> RpcResult<PayloadStatus> {
897        trace!(target: "rpc::engine", "Serving engine_newPayloadV2");
898        let payload = ExecutionData {
899            payload: payload.into_payload(),
900            sidecar: ExecutionPayloadSidecar::none(),
901        };
902
903        Ok(self.new_payload_v2_metered(payload).await?)
904    }
905
906    /// Handler for `engine_newPayloadV3`
907    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_newpayloadv3>
908    async fn new_payload_v3(
909        &self,
910        payload: ExecutionPayloadV3,
911        versioned_hashes: Vec<B256>,
912        parent_beacon_block_root: B256,
913    ) -> RpcResult<PayloadStatus> {
914        trace!(target: "rpc::engine", "Serving engine_newPayloadV3");
915        let payload = ExecutionData {
916            payload: payload.into(),
917            sidecar: ExecutionPayloadSidecar::v3(CancunPayloadFields {
918                versioned_hashes,
919                parent_beacon_block_root,
920            }),
921        };
922
923        Ok(self.new_payload_v3_metered(payload).await?)
924    }
925
926    /// Handler for `engine_newPayloadV4`
927    /// See also <https://github.com/ethereum/execution-apis/blob/03911ffc053b8b806123f1fc237184b0092a485a/src/engine/prague.md#engine_newpayloadv4>
928    async fn new_payload_v4(
929        &self,
930        payload: ExecutionPayloadV3,
931        versioned_hashes: Vec<B256>,
932        parent_beacon_block_root: B256,
933        requests: RequestsOrHash,
934    ) -> RpcResult<PayloadStatus> {
935        trace!(target: "rpc::engine", "Serving engine_newPayloadV4");
936
937        // Accept requests as a hash only if it is explicitly allowed
938        if requests.is_hash() && !self.inner.accept_execution_requests_hash {
939            return Err(EngineApiError::UnexpectedRequestsHash.into());
940        }
941
942        let payload = ExecutionData {
943            payload: payload.into(),
944            sidecar: ExecutionPayloadSidecar::v4(
945                CancunPayloadFields { versioned_hashes, parent_beacon_block_root },
946                PraguePayloadFields { requests },
947            ),
948        };
949
950        Ok(self.new_payload_v4_metered(payload).await?)
951    }
952
953    /// Handler for `engine_forkchoiceUpdatedV1`
954    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_forkchoiceupdatedv1>
955    ///
956    /// Caution: This should not accept the `withdrawals` field
957    async fn fork_choice_updated_v1(
958        &self,
959        fork_choice_state: ForkchoiceState,
960        payload_attributes: Option<EngineT::PayloadAttributes>,
961    ) -> RpcResult<ForkchoiceUpdated> {
962        trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV1");
963        Ok(self.fork_choice_updated_v1_metered(fork_choice_state, payload_attributes).await?)
964    }
965
966    /// Handler for `engine_forkchoiceUpdatedV2`
967    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_forkchoiceupdatedv2>
968    async fn fork_choice_updated_v2(
969        &self,
970        fork_choice_state: ForkchoiceState,
971        payload_attributes: Option<EngineT::PayloadAttributes>,
972    ) -> RpcResult<ForkchoiceUpdated> {
973        trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV2");
974        Ok(self.fork_choice_updated_v2_metered(fork_choice_state, payload_attributes).await?)
975    }
976
977    /// Handler for `engine_forkchoiceUpdatedV2`
978    ///
979    /// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/cancun.md#engine_forkchoiceupdatedv3>
980    async fn fork_choice_updated_v3(
981        &self,
982        fork_choice_state: ForkchoiceState,
983        payload_attributes: Option<EngineT::PayloadAttributes>,
984    ) -> RpcResult<ForkchoiceUpdated> {
985        trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV3");
986        Ok(self.fork_choice_updated_v3_metered(fork_choice_state, payload_attributes).await?)
987    }
988
989    /// Handler for `engine_getPayloadV1`
990    ///
991    /// Returns the most recent version of the payload that is available in the corresponding
992    /// payload build process at the time of receiving this call.
993    ///
994    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_getPayloadV1>
995    ///
996    /// Caution: This should not return the `withdrawals` field
997    ///
998    /// Note:
999    /// > Provider software MAY stop the corresponding build process after serving this call.
1000    async fn get_payload_v1(
1001        &self,
1002        payload_id: PayloadId,
1003    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV1> {
1004        trace!(target: "rpc::engine", "Serving engine_getPayloadV1");
1005        Ok(self.get_payload_v1_metered(payload_id).await?)
1006    }
1007
1008    /// Handler for `engine_getPayloadV2`
1009    ///
1010    /// Returns the most recent version of the payload that is available in the corresponding
1011    /// payload build process at the time of receiving this call.
1012    ///
1013    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_getpayloadv2>
1014    ///
1015    /// Note:
1016    /// > Provider software MAY stop the corresponding build process after serving this call.
1017    async fn get_payload_v2(
1018        &self,
1019        payload_id: PayloadId,
1020    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV2> {
1021        debug!(target: "rpc::engine", id = %payload_id, "Serving engine_getPayloadV2");
1022        Ok(self.get_payload_v2_metered(payload_id).await?)
1023    }
1024
1025    /// Handler for `engine_getPayloadV3`
1026    ///
1027    /// Returns the most recent version of the payload that is available in the corresponding
1028    /// payload build process at the time of receiving this call.
1029    ///
1030    /// See also <https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md#engine_getpayloadv3>
1031    ///
1032    /// Note:
1033    /// > Provider software MAY stop the corresponding build process after serving this call.
1034    async fn get_payload_v3(
1035        &self,
1036        payload_id: PayloadId,
1037    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV3> {
1038        trace!(target: "rpc::engine", "Serving engine_getPayloadV3");
1039        Ok(self.get_payload_v3_metered(payload_id).await?)
1040    }
1041
1042    /// Handler for `engine_getPayloadV4`
1043    ///
1044    /// Returns the most recent version of the payload that is available in the corresponding
1045    /// payload build process at the time of receiving this call.
1046    ///
1047    /// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/prague.md#engine_getpayloadv4>
1048    ///
1049    /// Note:
1050    /// > Provider software MAY stop the corresponding build process after serving this call.
1051    async fn get_payload_v4(
1052        &self,
1053        payload_id: PayloadId,
1054    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV4> {
1055        trace!(target: "rpc::engine", "Serving engine_getPayloadV4");
1056        Ok(self.get_payload_v4_metered(payload_id).await?)
1057    }
1058
1059    /// Handler for `engine_getPayloadV5`
1060    ///
1061    /// Returns the most recent version of the payload that is available in the corresponding
1062    /// payload build process at the time of receiving this call.
1063    ///
1064    /// See also <https://github.com/ethereum/execution-apis/blob/15399c2e2f16a5f800bf3f285640357e2c245ad9/src/engine/osaka.md#engine_getpayloadv5>
1065    ///
1066    /// Note:
1067    /// > Provider software MAY stop the corresponding build process after serving this call.
1068    async fn get_payload_v5(
1069        &self,
1070        payload_id: PayloadId,
1071    ) -> RpcResult<EngineT::ExecutionPayloadEnvelopeV5> {
1072        trace!(target: "rpc::engine", "Serving engine_getPayloadV5");
1073        Ok(self.get_payload_v5_metered(payload_id).await?)
1074    }
1075
1076    /// Handler for `engine_getPayloadBodiesByHashV1`
1077    /// See also <https://github.com/ethereum/execution-apis/blob/6452a6b194d7db269bf1dbd087a267251d3cc7f8/src/engine/shanghai.md#engine_getpayloadbodiesbyhashv1>
1078    async fn get_payload_bodies_by_hash_v1(
1079        &self,
1080        block_hashes: Vec<BlockHash>,
1081    ) -> RpcResult<ExecutionPayloadBodiesV1> {
1082        trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV1");
1083        Ok(self.get_payload_bodies_by_hash_v1_metered(block_hashes).await?)
1084    }
1085
1086    /// Handler for `engine_getPayloadBodiesByRangeV1`
1087    ///
1088    /// See also <https://github.com/ethereum/execution-apis/blob/6452a6b194d7db269bf1dbd087a267251d3cc7f8/src/engine/shanghai.md#engine_getpayloadbodiesbyrangev1>
1089    ///
1090    /// Returns the execution payload bodies by the range starting at `start`, containing `count`
1091    /// blocks.
1092    ///
1093    /// WARNING: This method is associated with the BeaconBlocksByRange message in the consensus
1094    /// layer p2p specification, meaning the input should be treated as untrusted or potentially
1095    /// adversarial.
1096    ///
1097    /// Implementers should take care when acting on the input to this method, specifically
1098    /// ensuring that the range is limited properly, and that the range boundaries are computed
1099    /// correctly and without panics.
1100    ///
1101    /// Note: If a block is pre shanghai, `withdrawals` field will be `null`.
1102    async fn get_payload_bodies_by_range_v1(
1103        &self,
1104        start: U64,
1105        count: U64,
1106    ) -> RpcResult<ExecutionPayloadBodiesV1> {
1107        trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV1");
1108        Ok(self.get_payload_bodies_by_range_v1_metered(start.to(), count.to()).await?)
1109    }
1110
1111    /// Handler for `engine_getClientVersionV1`
1112    ///
1113    /// See also <https://github.com/ethereum/execution-apis/blob/03911ffc053b8b806123f1fc237184b0092a485a/src/engine/identification.md>
1114    async fn get_client_version_v1(
1115        &self,
1116        client: ClientVersionV1,
1117    ) -> RpcResult<Vec<ClientVersionV1>> {
1118        trace!(target: "rpc::engine", "Serving engine_getClientVersionV1");
1119        Ok(Self::get_client_version_v1(self, client)?)
1120    }
1121
1122    /// Handler for `engine_exchangeCapabilitiesV1`
1123    /// See also <https://github.com/ethereum/execution-apis/blob/6452a6b194d7db269bf1dbd087a267251d3cc7f8/src/engine/common.md#capabilities>
1124    async fn exchange_capabilities(&self, _capabilities: Vec<String>) -> RpcResult<Vec<String>> {
1125        Ok(self.capabilities().list())
1126    }
1127
1128    async fn get_blobs_v1(
1129        &self,
1130        versioned_hashes: Vec<B256>,
1131    ) -> RpcResult<Vec<Option<BlobAndProofV1>>> {
1132        trace!(target: "rpc::engine", "Serving engine_getBlobsV1");
1133        Ok(self.get_blobs_v1_metered(versioned_hashes)?)
1134    }
1135
1136    async fn get_blobs_v2(
1137        &self,
1138        versioned_hashes: Vec<B256>,
1139    ) -> RpcResult<Option<Vec<BlobAndProofV2>>> {
1140        trace!(target: "rpc::engine", "Serving engine_getBlobsV2");
1141        Ok(self.get_blobs_v2_metered(versioned_hashes)?)
1142    }
1143}
1144
1145impl<Provider, EngineT, Pool, Validator, ChainSpec> IntoEngineApiRpcModule
1146    for EngineApi<Provider, EngineT, Pool, Validator, ChainSpec>
1147where
1148    EngineT: EngineTypes,
1149    Self: EngineApiServer<EngineT>,
1150{
1151    fn into_rpc_module(self) -> RpcModule<()> {
1152        self.into_rpc().remove_context()
1153    }
1154}
1155
1156impl<Provider, PayloadT, Pool, Validator, ChainSpec> std::fmt::Debug
1157    for EngineApi<Provider, PayloadT, Pool, Validator, ChainSpec>
1158where
1159    PayloadT: PayloadTypes,
1160{
1161    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1162        f.debug_struct("EngineApi").finish_non_exhaustive()
1163    }
1164}
1165
1166#[cfg(test)]
1167mod tests {
1168    use super::*;
1169    use alloy_rpc_types_engine::{ClientCode, ClientVersionV1};
1170    use assert_matches::assert_matches;
1171    use reth_chainspec::{ChainSpec, MAINNET};
1172    use reth_engine_primitives::BeaconEngineMessage;
1173    use reth_ethereum_engine_primitives::EthEngineTypes;
1174    use reth_ethereum_primitives::Block;
1175    use reth_node_ethereum::EthereumEngineValidator;
1176    use reth_payload_builder::test_utils::spawn_test_payload_service;
1177    use reth_provider::test_utils::MockEthProvider;
1178    use reth_tasks::TokioTaskExecutor;
1179    use reth_transaction_pool::noop::NoopTransactionPool;
1180    use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
1181
1182    fn setup_engine_api() -> (
1183        EngineApiTestHandle,
1184        EngineApi<
1185            Arc<MockEthProvider>,
1186            EthEngineTypes,
1187            NoopTransactionPool,
1188            EthereumEngineValidator,
1189            ChainSpec,
1190        >,
1191    ) {
1192        let client = ClientVersionV1 {
1193            code: ClientCode::RH,
1194            name: "Reth".to_string(),
1195            version: "v0.2.0-beta.5".to_string(),
1196            commit: "defa64b2".to_string(),
1197        };
1198
1199        let chain_spec: Arc<ChainSpec> = MAINNET.clone();
1200        let provider = Arc::new(MockEthProvider::default());
1201        let payload_store = spawn_test_payload_service();
1202        let (to_engine, engine_rx) = unbounded_channel();
1203        let task_executor = Box::<TokioTaskExecutor>::default();
1204        let api = EngineApi::new(
1205            provider.clone(),
1206            chain_spec.clone(),
1207            BeaconConsensusEngineHandle::new(to_engine),
1208            payload_store.into(),
1209            NoopTransactionPool::default(),
1210            task_executor,
1211            client,
1212            EngineCapabilities::default(),
1213            EthereumEngineValidator::new(chain_spec.clone()),
1214            false,
1215        );
1216        let handle = EngineApiTestHandle { chain_spec, provider, from_api: engine_rx };
1217        (handle, api)
1218    }
1219
1220    #[tokio::test]
1221    async fn engine_client_version_v1() {
1222        let client = ClientVersionV1 {
1223            code: ClientCode::RH,
1224            name: "Reth".to_string(),
1225            version: "v0.2.0-beta.5".to_string(),
1226            commit: "defa64b2".to_string(),
1227        };
1228        let (_, api) = setup_engine_api();
1229        let res = api.get_client_version_v1(client.clone());
1230        assert_eq!(res.unwrap(), vec![client]);
1231    }
1232
1233    struct EngineApiTestHandle {
1234        #[allow(dead_code)]
1235        chain_spec: Arc<ChainSpec>,
1236        provider: Arc<MockEthProvider>,
1237        from_api: UnboundedReceiver<BeaconEngineMessage<EthEngineTypes>>,
1238    }
1239
1240    #[tokio::test]
1241    async fn forwards_responses_to_consensus_engine() {
1242        let (mut handle, api) = setup_engine_api();
1243
1244        tokio::spawn(async move {
1245            let payload_v1 = ExecutionPayloadV1::from_block_slow(&Block::default());
1246            let execution_data = ExecutionData {
1247                payload: payload_v1.into(),
1248                sidecar: ExecutionPayloadSidecar::none(),
1249            };
1250
1251            api.new_payload_v1(execution_data).await.unwrap();
1252        });
1253        assert_matches!(handle.from_api.recv().await, Some(BeaconEngineMessage::NewPayload { .. }));
1254    }
1255
1256    // tests covering `engine_getPayloadBodiesByRange` and `engine_getPayloadBodiesByHash`
1257    mod get_payload_bodies {
1258        use super::*;
1259        use alloy_rpc_types_engine::ExecutionPayloadBodyV1;
1260        use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
1261
1262        #[tokio::test]
1263        async fn invalid_params() {
1264            let (_, api) = setup_engine_api();
1265
1266            let by_range_tests = [
1267                // (start, count)
1268                (0, 0),
1269                (0, 1),
1270                (1, 0),
1271            ];
1272
1273            // test [EngineApiMessage::GetPayloadBodiesByRange]
1274            for (start, count) in by_range_tests {
1275                let res = api.get_payload_bodies_by_range_v1(start, count).await;
1276                assert_matches!(res, Err(EngineApiError::InvalidBodiesRange { .. }));
1277            }
1278        }
1279
1280        #[tokio::test]
1281        async fn request_too_large() {
1282            let (_, api) = setup_engine_api();
1283
1284            let request_count = MAX_PAYLOAD_BODIES_LIMIT + 1;
1285            let res = api.get_payload_bodies_by_range_v1(0, request_count).await;
1286            assert_matches!(res, Err(EngineApiError::PayloadRequestTooLarge { .. }));
1287        }
1288
1289        #[tokio::test]
1290        async fn returns_payload_bodies() {
1291            let mut rng = generators::rng();
1292            let (handle, api) = setup_engine_api();
1293
1294            let (start, count) = (1, 10);
1295            let blocks = random_block_range(
1296                &mut rng,
1297                start..=start + count - 1,
1298                BlockRangeParams { tx_count: 0..2, ..Default::default() },
1299            );
1300            handle
1301                .provider
1302                .extend_blocks(blocks.iter().cloned().map(|b| (b.hash(), b.into_block())));
1303
1304            let expected = blocks
1305                .iter()
1306                .cloned()
1307                .map(|b| Some(ExecutionPayloadBodyV1::from_block(b.into_block())))
1308                .collect::<Vec<_>>();
1309
1310            let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1311            assert_eq!(res, expected);
1312        }
1313
1314        #[tokio::test]
1315        async fn returns_payload_bodies_with_gaps() {
1316            let mut rng = generators::rng();
1317            let (handle, api) = setup_engine_api();
1318
1319            let (start, count) = (1, 100);
1320            let blocks = random_block_range(
1321                &mut rng,
1322                start..=start + count - 1,
1323                BlockRangeParams { tx_count: 0..2, ..Default::default() },
1324            );
1325
1326            // Insert only blocks in ranges 1-25 and 50-75
1327            let first_missing_range = 26..=50;
1328            let second_missing_range = 76..=100;
1329            handle.provider.extend_blocks(
1330                blocks
1331                    .iter()
1332                    .filter(|b| {
1333                        !first_missing_range.contains(&b.number) &&
1334                            !second_missing_range.contains(&b.number)
1335                    })
1336                    .map(|b| (b.hash(), b.clone().into_block())),
1337            );
1338
1339            let expected = blocks
1340                .iter()
1341                // filter anything after the second missing range to ensure we don't expect trailing
1342                // `None`s
1343                .filter(|b| !second_missing_range.contains(&b.number))
1344                .cloned()
1345                .map(|b| {
1346                    if first_missing_range.contains(&b.number) {
1347                        None
1348                    } else {
1349                        Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1350                    }
1351                })
1352                .collect::<Vec<_>>();
1353
1354            let res = api.get_payload_bodies_by_range_v1(start, count).await.unwrap();
1355            assert_eq!(res, expected);
1356
1357            let expected = blocks
1358                .iter()
1359                .cloned()
1360                // ensure we still return trailing `None`s here because by-hash will not be aware
1361                // of the missing block's number, and cannot compare it to the current best block
1362                .map(|b| {
1363                    if first_missing_range.contains(&b.number) ||
1364                        second_missing_range.contains(&b.number)
1365                    {
1366                        None
1367                    } else {
1368                        Some(ExecutionPayloadBodyV1::from_block(b.into_block()))
1369                    }
1370                })
1371                .collect::<Vec<_>>();
1372
1373            let hashes = blocks.iter().map(|b| b.hash()).collect();
1374            let res = api.get_payload_bodies_by_hash_v1(hashes).await.unwrap();
1375            assert_eq!(res, expected);
1376        }
1377    }
1378}