1use futures::{future::Either, stream, stream_select, StreamExt};
4use reth_beacon_consensus::{
5 hooks::{EngineHooks, StaticFileHook},
6 BeaconConsensusEngineHandle, EngineNodeTypes,
7};
8use reth_chainspec::EthChainSpec;
9use reth_consensus_debug_client::{DebugConsensusClient, EtherscanBlockProvider};
10use reth_engine_local::{LocalEngineService, LocalPayloadAttributesBuilder};
11use reth_engine_service::service::{ChainEvent, EngineService};
12use reth_engine_tree::{
13 engine::{EngineApiRequest, EngineRequestHandler},
14 tree::TreeConfig,
15};
16use reth_engine_util::EngineMessageStreamExt;
17use reth_exex::ExExManagerHandle;
18use reth_network::{NetworkSyncUpdater, SyncState};
19use reth_network_api::BlockDownloaderProvider;
20use reth_node_api::{
21 BlockTy, BuiltPayload, EngineValidator, FullNodeTypes, NodeTypesWithEngine,
22 PayloadAttributesBuilder, PayloadBuilder, PayloadTypes,
23};
24use reth_node_core::{
25 dirs::{ChainPath, DataDirPath},
26 exit::NodeExitFuture,
27 primitives::Head,
28};
29use reth_node_events::{cl::ConsensusLayerHealthEvents, node};
30use reth_primitives::{EthPrimitives, EthereumHardforks};
31use reth_provider::providers::BlockchainProvider2;
32use reth_tasks::TaskExecutor;
33use reth_tokio_util::EventSender;
34use reth_tracing::tracing::{debug, error, info};
35use std::sync::Arc;
36use tokio::sync::{mpsc::unbounded_channel, oneshot};
37use tokio_stream::wrappers::UnboundedReceiverStream;
38
39use crate::{
40 common::{Attached, LaunchContextWith, WithConfigs},
41 hooks::NodeHooks,
42 rpc::{EngineValidatorAddOn, RethRpcAddOns, RpcHandle},
43 setup::build_networked_pipeline,
44 AddOns, AddOnsContext, ExExLauncher, FullNode, LaunchContext, LaunchNode, NodeAdapter,
45 NodeBuilderWithComponents, NodeComponents, NodeComponentsBuilder, NodeHandle, NodeTypesAdapter,
46};
47
48#[derive(Debug)]
50pub struct EngineNodeLauncher {
51 pub ctx: LaunchContext,
53
54 pub engine_tree_config: TreeConfig,
57}
58
59impl EngineNodeLauncher {
60 pub const fn new(
62 task_executor: TaskExecutor,
63 data_dir: ChainPath<DataDirPath>,
64 engine_tree_config: TreeConfig,
65 ) -> Self {
66 Self { ctx: LaunchContext::new(task_executor, data_dir), engine_tree_config }
67 }
68}
69
70impl<Types, T, CB, AO> LaunchNode<NodeBuilderWithComponents<T, CB, AO>> for EngineNodeLauncher
71where
72 Types: EngineNodeTypes<Primitives = EthPrimitives>,
73 T: FullNodeTypes<Types = Types, Provider = BlockchainProvider2<Types>>,
74 CB: NodeComponentsBuilder<T>,
75 AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
76 + EngineValidatorAddOn<
77 NodeAdapter<T, CB::Components>,
78 Validator: EngineValidator<Types::Engine, Block = BlockTy<Types>>,
79 >,
80
81 LocalPayloadAttributesBuilder<Types::ChainSpec>: PayloadAttributesBuilder<
82 <<Types as NodeTypesWithEngine>::Engine as PayloadTypes>::PayloadAttributes,
83 >,
84{
85 type Node = NodeHandle<NodeAdapter<T, CB::Components>, AO>;
86
87 async fn launch_node(
88 self,
89 target: NodeBuilderWithComponents<T, CB, AO>,
90 ) -> eyre::Result<Self::Node> {
91 let Self { ctx, engine_tree_config } = self;
92 let NodeBuilderWithComponents {
93 adapter: NodeTypesAdapter { database },
94 components_builder,
95 add_ons: AddOns { hooks, exexs: installed_exex, add_ons },
96 config,
97 } = target;
98 let NodeHooks { on_component_initialized, on_node_started, .. } = hooks;
99
100 let ctx = ctx
102 .with_configured_globals()
103 .with_loaded_toml_config(config)?
105 .with_resolved_peers().await?
107 .attach(database.clone())
109 .with_adjusted_configs()
111 .with_provider_factory().await?
113 .inspect(|_| {
114 info!(target: "reth::cli", "Database opened");
115 })
116 .with_prometheus_server().await?
117 .inspect(|this| {
118 debug!(target: "reth::cli", chain=%this.chain_id(), genesis=?this.genesis_hash(), "Initializing genesis");
119 })
120 .with_genesis()?
121 .inspect(|this: &LaunchContextWith<Attached<WithConfigs<Types::ChainSpec>, _>>| {
122 info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks());
123 })
124 .with_metrics_task()
125 .with_blockchain_db::<T, _>(move |provider_factory| {
128 Ok(BlockchainProvider2::new(provider_factory)?)
129 })?
130 .with_components(components_builder, on_component_initialized).await?;
131
132 let exex_manager_handle = ExExLauncher::new(
134 ctx.head(),
135 ctx.node_adapter().clone(),
136 installed_exex,
137 ctx.configs().clone(),
138 )
139 .launch()
140 .await?;
141
142 let network_client = ctx.components().network().fetch_client().await?;
144 let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
145
146 let node_config = ctx.node_config();
147 let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx)
148 .maybe_skip_fcu(node_config.debug.skip_fcu)
149 .maybe_skip_new_payload(node_config.debug.skip_new_payload)
150 .maybe_reorg(
151 ctx.blockchain_db().clone(),
152 ctx.components().evm_config().clone(),
153 reth_payload_validator::ExecutionPayloadValidator::new(ctx.chain_spec()),
154 node_config.debug.reorg_frequency,
155 node_config.debug.reorg_depth,
156 )
157 .maybe_store_messages(node_config.debug.engine_api_store.clone());
161
162 let max_block = ctx.max_block(network_client.clone()).await?;
163 let mut hooks = EngineHooks::new();
164
165 let static_file_producer = ctx.static_file_producer();
166 let static_file_producer_events = static_file_producer.lock().events();
167 hooks.add(StaticFileHook::new(
168 static_file_producer.clone(),
169 Box::new(ctx.task_executor().clone()),
170 ));
171 info!(target: "reth::cli", "StaticFileProducer initialized");
172
173 let consensus = Arc::new(ctx.components().consensus().clone());
174
175 let pipeline_exex_handle =
177 exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty);
178 let pipeline = build_networked_pipeline(
179 &ctx.toml_config().stages,
180 network_client.clone(),
181 consensus.clone(),
182 ctx.provider_factory().clone(),
183 ctx.task_executor(),
184 ctx.sync_metrics_tx(),
185 ctx.prune_config(),
186 max_block,
187 static_file_producer,
188 ctx.components().block_executor().clone(),
189 pipeline_exex_handle,
190 )?;
191
192 pipeline.move_to_static_files()?;
194
195 let pipeline_events = pipeline.events();
196
197 let mut pruner_builder = ctx.pruner_builder();
198 if let Some(exex_manager_handle) = &exex_manager_handle {
199 pruner_builder =
200 pruner_builder.finished_exex_height(exex_manager_handle.finished_height());
201 }
202 let pruner = pruner_builder.build_with_provider_factory(ctx.provider_factory().clone());
203 let pruner_events = pruner.events();
204 info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized");
205
206 let event_sender = EventSender::default();
207 let beacon_engine_handle =
208 BeaconConsensusEngineHandle::new(consensus_engine_tx.clone(), event_sender.clone());
209
210 let jwt_secret = ctx.auth_jwt_secret()?;
212
213 let add_ons_ctx = AddOnsContext {
214 node: ctx.node_adapter().clone(),
215 config: ctx.node_config(),
216 beacon_engine_handle: beacon_engine_handle.clone(),
217 jwt_secret,
218 };
219 let engine_payload_validator = add_ons.engine_validator(&add_ons_ctx).await?;
220
221 let mut engine_service = if ctx.is_dev() {
222 let eth_service = LocalEngineService::new(
223 consensus.clone(),
224 ctx.components().block_executor().clone(),
225 ctx.provider_factory().clone(),
226 ctx.blockchain_db().clone(),
227 pruner,
228 ctx.components().payload_builder().clone(),
229 engine_payload_validator,
230 engine_tree_config,
231 ctx.invalid_block_hook()?,
232 ctx.sync_metrics_tx(),
233 consensus_engine_tx.clone(),
234 Box::pin(consensus_engine_stream),
235 ctx.dev_mining_mode(ctx.components().pool()),
236 LocalPayloadAttributesBuilder::new(ctx.chain_spec()),
237 ctx.data_dir().clone(),
238 );
239
240 Either::Left(eth_service)
241 } else {
242 let eth_service = EngineService::new(
243 consensus.clone(),
244 ctx.components().block_executor().clone(),
245 ctx.chain_spec(),
246 network_client.clone(),
247 Box::pin(consensus_engine_stream),
248 pipeline,
249 Box::new(ctx.task_executor().clone()),
250 ctx.provider_factory().clone(),
251 ctx.blockchain_db().clone(),
252 pruner,
253 ctx.components().payload_builder().clone(),
254 engine_payload_validator,
255 engine_tree_config,
256 ctx.invalid_block_hook()?,
257 ctx.sync_metrics_tx(),
258 ctx.data_dir().clone(),
259 );
260
261 Either::Right(eth_service)
262 };
263
264 info!(target: "reth::cli", "Consensus engine initialized");
265
266 let events = stream_select!(
267 beacon_engine_handle.event_listener().map(Into::into),
268 pipeline_events.map(Into::into),
269 if ctx.node_config().debug.tip.is_none() && !ctx.is_dev() {
270 Either::Left(
271 ConsensusLayerHealthEvents::new(Box::new(ctx.blockchain_db().clone()))
272 .map(Into::into),
273 )
274 } else {
275 Either::Right(stream::empty())
276 },
277 pruner_events.map(Into::into),
278 static_file_producer_events.map(Into::into),
279 );
280 ctx.task_executor().spawn_critical(
281 "events task",
282 node::handle_events(
283 Some(Box::new(ctx.components().network().clone())),
284 Some(ctx.head().number),
285 events,
286 ),
287 );
288
289 let RpcHandle { rpc_server_handles, rpc_registry } =
290 add_ons.launch_add_ons(add_ons_ctx).await?;
291
292 if let Some(maybe_custom_etherscan_url) = ctx.node_config().debug.etherscan.clone() {
294 info!(target: "reth::cli", "Using etherscan as consensus client");
295
296 let chain = ctx.node_config().chain.chain();
297 let etherscan_url = maybe_custom_etherscan_url.map(Ok).unwrap_or_else(|| {
298 chain
300 .etherscan_urls()
301 .map(|urls| urls.0.to_string())
302 .ok_or_else(|| eyre::eyre!("failed to get etherscan url for chain: {chain}"))
303 })?;
304
305 let block_provider = EtherscanBlockProvider::new(
306 etherscan_url,
307 chain.etherscan_api_key().ok_or_else(|| {
308 eyre::eyre!(
309 "etherscan api key not found for rpc consensus client for chain: {chain}"
310 )
311 })?,
312 );
313 let rpc_consensus_client = DebugConsensusClient::new(
314 rpc_server_handles.auth.clone(),
315 Arc::new(block_provider),
316 );
317 ctx.task_executor().spawn_critical("etherscan consensus client", async move {
318 rpc_consensus_client.run::<<Types as NodeTypesWithEngine>::Engine>().await
319 });
320 }
321
322 let initial_target = ctx.initial_backfill_target()?;
324 let network_handle = ctx.components().network().clone();
325 let mut built_payloads = ctx
326 .components()
327 .payload_builder()
328 .subscribe()
329 .await
330 .map_err(|e| eyre::eyre!("Failed to subscribe to payload builder events: {:?}", e))?
331 .into_built_payload_stream()
332 .fuse();
333 let chainspec = ctx.chain_spec();
334 let (exit, rx) = oneshot::channel();
335 let terminate_after_backfill = ctx.terminate_after_initial_backfill();
336
337 info!(target: "reth::cli", "Starting consensus engine");
338 ctx.task_executor().spawn_critical("consensus engine", async move {
339 if let Some(initial_target) = initial_target {
340 debug!(target: "reth::cli", %initial_target, "start backfill sync");
341 if let Either::Right(eth_service) = &mut engine_service {
342 eth_service.orchestrator_mut().start_backfill_sync(initial_target);
343 }
344 }
345
346 let mut res = Ok(());
347
348 loop {
350 tokio::select! {
351 payload = built_payloads.select_next_some() => {
352 if let Some(executed_block) = payload.executed_block() {
353 debug!(target: "reth::cli", block=?executed_block.block().num_hash(), "inserting built payload");
354 if let Either::Right(eth_service) = &mut engine_service {
355 eth_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block).into());
356 }
357 }
358 }
359 event = engine_service.next() => {
360 let Some(event) = event else { break };
361 debug!(target: "reth::cli", "Event: {event}");
362 match event {
363 ChainEvent::BackfillSyncFinished => {
364 if terminate_after_backfill {
365 debug!(target: "reth::cli", "Terminating after initial backfill");
366 break
367 }
368
369 network_handle.update_sync_state(SyncState::Idle);
370 }
371 ChainEvent::BackfillSyncStarted => {
372 network_handle.update_sync_state(SyncState::Syncing);
373 }
374 ChainEvent::FatalError => {
375 error!(target: "reth::cli", "Fatal error in consensus engine");
376 res = Err(eyre::eyre!("Fatal error in consensus engine"));
377 break
378 }
379 ChainEvent::Handler(ev) => {
380 if let Some(head) = ev.canonical_header() {
381 let head_block = Head {
382 number: head.number,
383 hash: head.hash(),
384 difficulty: head.difficulty,
385 timestamp: head.timestamp,
386 total_difficulty: chainspec
387 .final_paris_total_difficulty(head.number)
388 .unwrap_or_default(),
389 };
390 network_handle.update_status(head_block);
391 }
392 event_sender.notify(ev);
393 }
394 }
395 }
396 }
397 }
398
399 let _ = exit.send(res);
400 });
401
402 let full_node = FullNode {
403 evm_config: ctx.components().evm_config().clone(),
404 block_executor: ctx.components().block_executor().clone(),
405 pool: ctx.components().pool().clone(),
406 network: ctx.components().network().clone(),
407 provider: ctx.node_adapter().provider.clone(),
408 payload_builder: ctx.components().payload_builder().clone(),
409 task_executor: ctx.task_executor().clone(),
410 config: ctx.node_config().clone(),
411 data_dir: ctx.data_dir().clone(),
412 add_ons_handle: RpcHandle { rpc_server_handles, rpc_registry },
413 };
414 on_node_started.on_event(FullNode::clone(&full_node))?;
416
417 let handle = NodeHandle {
418 node_exit_future: NodeExitFuture::new(
419 async { rx.await? },
420 full_node.config.debug.terminate,
421 ),
422 node: full_node,
423 };
424
425 Ok(handle)
426 }
427}