1use core::fmt;
10use std::{
11 fmt::{Debug, Formatter},
12 pin::Pin,
13 sync::Arc,
14 task::{Context, Poll},
15};
16
17use crate::miner::{LocalMiner, MiningMode};
18use futures_util::{Stream, StreamExt};
19use reth_beacon_consensus::{BeaconConsensusEngineEvent, EngineNodeTypes};
20use reth_chainspec::EthChainSpec;
21use reth_consensus::FullConsensus;
22use reth_engine_primitives::{BeaconEngineMessage, EngineValidator};
23use reth_engine_service::service::EngineMessageStream;
24use reth_engine_tree::{
25 backup::BackupHandle,
26 chain::{ChainEvent, HandlerEvent},
27 engine::{
28 EngineApiKind, EngineApiRequest, EngineApiRequestHandler, EngineRequestHandler, FromEngine,
29 RequestHandlerEvent,
30 },
31 persistence::PersistenceHandle,
32 tree::{EngineApiTreeHandler, InvalidBlockHook, TreeConfig},
33};
34use reth_evm::execute::BlockExecutorProvider;
35use reth_node_core::dirs::{ChainPath, DataDirPath};
36use reth_node_types::BlockTy;
37use reth_payload_builder::PayloadBuilderHandle;
38use reth_payload_primitives::{PayloadAttributesBuilder, PayloadTypes};
39use reth_provider::{providers::BlockchainProvider2, ChainSpecProvider, ProviderFactory};
40use reth_prune::PrunerWithFactory;
41use reth_stages_api::MetricEventsSender;
42use tokio::sync::mpsc::UnboundedSender;
43use tracing::error;
44
45pub struct LocalEngineService<N>
51where
52 N: EngineNodeTypes,
53{
54 handler: EngineApiRequestHandler<EngineApiRequest<N::Engine, N::Primitives>, N::Primitives>,
58 incoming_requests: EngineMessageStream<N::Engine>,
60}
61
62impl<N> LocalEngineService<N>
63where
64 N: EngineNodeTypes,
65{
66 #[allow(clippy::too_many_arguments)]
68 pub fn new<B, V>(
69 consensus: Arc<dyn FullConsensus<N::Primitives>>,
70 executor_factory: impl BlockExecutorProvider<Primitives = N::Primitives>,
71 provider: ProviderFactory<N>,
72 blockchain_db: BlockchainProvider2<N>,
73 pruner: PrunerWithFactory<ProviderFactory<N>>,
74 payload_builder: PayloadBuilderHandle<N::Engine>,
75 payload_validator: V,
76 tree_config: TreeConfig,
77 invalid_block_hook: Box<dyn InvalidBlockHook<N::Primitives>>,
78 sync_metrics_tx: MetricEventsSender,
79 to_engine: UnboundedSender<BeaconEngineMessage<N::Engine>>,
80 from_engine: EngineMessageStream<N::Engine>,
81 mode: MiningMode,
82 payload_attributes_builder: B,
83 data_dir: ChainPath<DataDirPath>,
84 ) -> Self
85 where
86 B: PayloadAttributesBuilder<<N::Engine as PayloadTypes>::PayloadAttributes>,
87 V: EngineValidator<N::Engine, Block = BlockTy<N>>,
88 {
89 let chain_spec = provider.chain_spec();
90 let engine_kind =
91 if chain_spec.is_optimism() { EngineApiKind::OpStack } else { EngineApiKind::Ethereum };
92
93 let persistence_handle =
94 PersistenceHandle::<N::Primitives>::spawn_service(provider, pruner, sync_metrics_tx);
95 let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
96 let backup_handle = BackupHandle::spawn_service(data_dir);
97
98 let (to_tree_tx, from_tree) = EngineApiTreeHandler::<N::Primitives, _, _, _, _>::spawn_new(
99 blockchain_db.clone(),
100 executor_factory,
101 consensus,
102 payload_validator,
103 persistence_handle,
104 payload_builder.clone(),
105 canonical_in_memory_state,
106 tree_config,
107 invalid_block_hook,
108 engine_kind,
109 backup_handle,
110 );
111
112 let handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
113
114 LocalMiner::spawn_new(
115 blockchain_db,
116 payload_attributes_builder,
117 to_engine,
118 mode,
119 payload_builder,
120 );
121
122 Self { handler, incoming_requests: from_engine }
123 }
124}
125
126impl<N> Stream for LocalEngineService<N>
127where
128 N: EngineNodeTypes,
129{
130 type Item = ChainEvent<BeaconConsensusEngineEvent<N::Primitives>>;
131
132 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
133 let this = self.get_mut();
134
135 if let Poll::Ready(ev) = this.handler.poll(cx) {
136 return match ev {
137 RequestHandlerEvent::HandlerEvent(ev) => match ev {
138 HandlerEvent::BackfillAction(_) => {
139 error!(target: "engine::local", "received backfill request in local engine");
140 Poll::Ready(Some(ChainEvent::FatalError))
141 }
142 HandlerEvent::Event(ev) => Poll::Ready(Some(ChainEvent::Handler(ev))),
143 HandlerEvent::FatalError => Poll::Ready(Some(ChainEvent::FatalError)),
144 },
145 RequestHandlerEvent::Download(_) => {
146 error!(target: "engine::local", "received download request in local engine");
147 Poll::Ready(Some(ChainEvent::FatalError))
148 }
149 }
150 }
151
152 while let Poll::Ready(Some(req)) = this.incoming_requests.poll_next_unpin(cx) {
154 this.handler.on_event(FromEngine::Request(req.into()));
155 }
156
157 Poll::Pending
158 }
159}
160
161impl<N: EngineNodeTypes> Debug for LocalEngineService<N> {
162 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
163 f.debug_struct("LocalEngineService").finish_non_exhaustive()
164 }
165}