reth_beacon_consensus/engine/
handle.rs

1//! `BeaconConsensusEngine` external API
2
3use crate::{BeaconConsensusEngineEvent, BeaconForkChoiceUpdateError};
4use alloy_rpc_types_engine::{
5    ExecutionPayload, ExecutionPayloadSidecar, ForkchoiceState, ForkchoiceUpdated, PayloadStatus,
6};
7use futures::TryFutureExt;
8use reth_engine_primitives::{
9    BeaconEngineMessage, BeaconOnNewPayloadError, EngineApiMessageVersion, EngineTypes,
10    OnForkChoiceUpdated,
11};
12use reth_errors::RethResult;
13use reth_tokio_util::{EventSender, EventStream};
14use tokio::sync::{mpsc::UnboundedSender, oneshot};
15
16/// A _shareable_ beacon consensus frontend type. Used to interact with the spawned beacon consensus
17/// engine task.
18///
19/// See also `BeaconConsensusEngine`
20#[derive(Debug, Clone)]
21pub struct BeaconConsensusEngineHandle<Engine>
22where
23    Engine: EngineTypes,
24{
25    pub(crate) to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
26    event_sender: EventSender<BeaconConsensusEngineEvent>,
27}
28
29// === impl BeaconConsensusEngineHandle ===
30
31impl<Engine> BeaconConsensusEngineHandle<Engine>
32where
33    Engine: EngineTypes,
34{
35    /// Creates a new beacon consensus engine handle.
36    pub const fn new(
37        to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
38        event_sender: EventSender<BeaconConsensusEngineEvent>,
39    ) -> Self {
40        Self { to_engine, event_sender }
41    }
42
43    /// Sends a new payload message to the beacon consensus engine and waits for a response.
44    ///
45    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_newpayloadv2>
46    pub async fn new_payload(
47        &self,
48        payload: ExecutionPayload,
49        sidecar: ExecutionPayloadSidecar,
50    ) -> Result<PayloadStatus, BeaconOnNewPayloadError> {
51        let (tx, rx) = oneshot::channel();
52        let _ = self.to_engine.send(BeaconEngineMessage::NewPayload { payload, sidecar, tx });
53        rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)?
54    }
55
56    /// Sends a forkchoice update message to the beacon consensus engine and waits for a response.
57    ///
58    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_forkchoiceupdatedv2>
59    pub async fn fork_choice_updated(
60        &self,
61        state: ForkchoiceState,
62        payload_attrs: Option<Engine::PayloadAttributes>,
63        version: EngineApiMessageVersion,
64    ) -> Result<ForkchoiceUpdated, BeaconForkChoiceUpdateError> {
65        Ok(self
66            .send_fork_choice_updated(state, payload_attrs, version)
67            .map_err(|_| BeaconForkChoiceUpdateError::EngineUnavailable)
68            .await??
69            .await?)
70    }
71
72    /// Sends a forkchoice update message to the beacon consensus engine and returns the receiver to
73    /// wait for a response.
74    fn send_fork_choice_updated(
75        &self,
76        state: ForkchoiceState,
77        payload_attrs: Option<Engine::PayloadAttributes>,
78        version: EngineApiMessageVersion,
79    ) -> oneshot::Receiver<RethResult<OnForkChoiceUpdated>> {
80        let (tx, rx) = oneshot::channel();
81        let _ = self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
82            state,
83            payload_attrs,
84            tx,
85            version,
86        });
87        rx
88    }
89
90    /// Sends a transition configuration exchange message to the beacon consensus engine.
91    ///
92    /// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_exchangetransitionconfigurationv1>
93    ///
94    /// This only notifies about the exchange. The actual exchange is done by the engine API impl
95    /// itself.
96    pub fn transition_configuration_exchanged(&self) {
97        let _ = self.to_engine.send(BeaconEngineMessage::TransitionConfigurationExchanged);
98    }
99
100    /// Creates a new [`BeaconConsensusEngineEvent`] listener stream.
101    pub fn event_listener(&self) -> EventStream<BeaconConsensusEngineEvent> {
102        self.event_sender.new_listener()
103    }
104}