reth_beacon_consensus/engine/
handle.rs1use crate::{BeaconConsensusEngineEvent, BeaconForkChoiceUpdateError};
4use alloy_rpc_types_engine::{
5 ExecutionPayload, ExecutionPayloadSidecar, ForkchoiceState, ForkchoiceUpdated, PayloadStatus,
6};
7use futures::TryFutureExt;
8use reth_engine_primitives::{
9 BeaconEngineMessage, BeaconOnNewPayloadError, EngineApiMessageVersion, EngineTypes,
10 OnForkChoiceUpdated,
11};
12use reth_errors::RethResult;
13use reth_tokio_util::{EventSender, EventStream};
14use tokio::sync::{mpsc::UnboundedSender, oneshot};
15
16#[derive(Debug, Clone)]
21pub struct BeaconConsensusEngineHandle<Engine>
22where
23 Engine: EngineTypes,
24{
25 pub(crate) to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
26 event_sender: EventSender<BeaconConsensusEngineEvent>,
27}
28
29impl<Engine> BeaconConsensusEngineHandle<Engine>
32where
33 Engine: EngineTypes,
34{
35 pub const fn new(
37 to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
38 event_sender: EventSender<BeaconConsensusEngineEvent>,
39 ) -> Self {
40 Self { to_engine, event_sender }
41 }
42
43 pub async fn new_payload(
47 &self,
48 payload: ExecutionPayload,
49 sidecar: ExecutionPayloadSidecar,
50 ) -> Result<PayloadStatus, BeaconOnNewPayloadError> {
51 let (tx, rx) = oneshot::channel();
52 let _ = self.to_engine.send(BeaconEngineMessage::NewPayload { payload, sidecar, tx });
53 rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)?
54 }
55
56 pub async fn fork_choice_updated(
60 &self,
61 state: ForkchoiceState,
62 payload_attrs: Option<Engine::PayloadAttributes>,
63 version: EngineApiMessageVersion,
64 ) -> Result<ForkchoiceUpdated, BeaconForkChoiceUpdateError> {
65 Ok(self
66 .send_fork_choice_updated(state, payload_attrs, version)
67 .map_err(|_| BeaconForkChoiceUpdateError::EngineUnavailable)
68 .await??
69 .await?)
70 }
71
72 fn send_fork_choice_updated(
75 &self,
76 state: ForkchoiceState,
77 payload_attrs: Option<Engine::PayloadAttributes>,
78 version: EngineApiMessageVersion,
79 ) -> oneshot::Receiver<RethResult<OnForkChoiceUpdated>> {
80 let (tx, rx) = oneshot::channel();
81 let _ = self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
82 state,
83 payload_attrs,
84 tx,
85 version,
86 });
87 rx
88 }
89
90 pub fn transition_configuration_exchanged(&self) {
97 let _ = self.to_engine.send(BeaconEngineMessage::TransitionConfigurationExchanged);
98 }
99
100 pub fn event_listener(&self) -> EventStream<BeaconConsensusEngineEvent> {
102 self.event_sender.new_listener()
103 }
104}