reth_node_builder/launch/
engine.rs

1//! Engine node related functionality.
2
3use futures::{future::Either, stream, stream_select, StreamExt};
4use reth_beacon_consensus::{
5    hooks::{EngineHooks, StaticFileHook},
6    BeaconConsensusEngineHandle, EngineNodeTypes,
7};
8use reth_chainspec::EthChainSpec;
9use reth_consensus_debug_client::{DebugConsensusClient, EtherscanBlockProvider};
10use reth_engine_local::{LocalEngineService, LocalPayloadAttributesBuilder};
11use reth_engine_service::service::{ChainEvent, EngineService};
12use reth_engine_tree::{
13    engine::{EngineApiRequest, EngineRequestHandler},
14    tree::TreeConfig,
15};
16use reth_engine_util::EngineMessageStreamExt;
17use reth_exex::ExExManagerHandle;
18use reth_network::{NetworkSyncUpdater, SyncState};
19use reth_network_api::BlockDownloaderProvider;
20use reth_node_api::{
21    BlockTy, BuiltPayload, EngineValidator, FullNodeTypes, NodeTypesWithEngine,
22    PayloadAttributesBuilder, PayloadBuilder, PayloadTypes,
23};
24use reth_node_core::{
25    dirs::{ChainPath, DataDirPath},
26    exit::NodeExitFuture,
27    primitives::Head,
28};
29use reth_node_events::{cl::ConsensusLayerHealthEvents, node};
30use reth_primitives::{EthPrimitives, EthereumHardforks};
31use reth_provider::providers::BlockchainProvider2;
32use reth_tasks::TaskExecutor;
33use reth_tokio_util::EventSender;
34use reth_tracing::tracing::{debug, error, info};
35use std::sync::Arc;
36use tokio::sync::{mpsc::unbounded_channel, oneshot};
37use tokio_stream::wrappers::UnboundedReceiverStream;
38
39use crate::{
40    common::{Attached, LaunchContextWith, WithConfigs},
41    hooks::NodeHooks,
42    rpc::{EngineValidatorAddOn, RethRpcAddOns, RpcHandle},
43    setup::build_networked_pipeline,
44    AddOns, AddOnsContext, ExExLauncher, FullNode, LaunchContext, LaunchNode, NodeAdapter,
45    NodeBuilderWithComponents, NodeComponents, NodeComponentsBuilder, NodeHandle, NodeTypesAdapter,
46};
47
48/// The engine node launcher.
49#[derive(Debug)]
50pub struct EngineNodeLauncher {
51    /// The task executor for the node.
52    pub ctx: LaunchContext,
53
54    /// Temporary configuration for engine tree.
55    /// After engine is stabilized, this should be configured through node builder.
56    pub engine_tree_config: TreeConfig,
57}
58
59impl EngineNodeLauncher {
60    /// Create a new instance of the ethereum node launcher.
61    pub const fn new(
62        task_executor: TaskExecutor,
63        data_dir: ChainPath<DataDirPath>,
64        engine_tree_config: TreeConfig,
65    ) -> Self {
66        Self { ctx: LaunchContext::new(task_executor, data_dir), engine_tree_config }
67    }
68}
69
70impl<Types, T, CB, AO> LaunchNode<NodeBuilderWithComponents<T, CB, AO>> for EngineNodeLauncher
71where
72    Types: EngineNodeTypes<Primitives = EthPrimitives>,
73    T: FullNodeTypes<Types = Types, Provider = BlockchainProvider2<Types>>,
74    CB: NodeComponentsBuilder<T>,
75    AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
76        + EngineValidatorAddOn<
77            NodeAdapter<T, CB::Components>,
78            Validator: EngineValidator<Types::Engine, Block = BlockTy<Types>>,
79        >,
80
81    LocalPayloadAttributesBuilder<Types::ChainSpec>: PayloadAttributesBuilder<
82        <<Types as NodeTypesWithEngine>::Engine as PayloadTypes>::PayloadAttributes,
83    >,
84{
85    type Node = NodeHandle<NodeAdapter<T, CB::Components>, AO>;
86
87    async fn launch_node(
88        self,
89        target: NodeBuilderWithComponents<T, CB, AO>,
90    ) -> eyre::Result<Self::Node> {
91        let Self { ctx, engine_tree_config } = self;
92        let NodeBuilderWithComponents {
93            adapter: NodeTypesAdapter { database },
94            components_builder,
95            add_ons: AddOns { hooks, exexs: installed_exex, add_ons },
96            config,
97        } = target;
98        let NodeHooks { on_component_initialized, on_node_started, .. } = hooks;
99
100        // setup the launch context
101        let ctx = ctx
102            .with_configured_globals()
103            // load the toml config
104            .with_loaded_toml_config(config)?
105            // add resolved peers
106            .with_resolved_peers().await?
107            // attach the database
108            .attach(database.clone())
109            // ensure certain settings take effect
110            .with_adjusted_configs()
111            // Create the provider factory
112            .with_provider_factory().await?
113            .inspect(|_| {
114                info!(target: "reth::cli", "Database opened");
115            })
116            .with_prometheus_server().await?
117            .inspect(|this| {
118                debug!(target: "reth::cli", chain=%this.chain_id(), genesis=?this.genesis_hash(), "Initializing genesis");
119            })
120            .with_genesis()?
121            .inspect(|this: &LaunchContextWith<Attached<WithConfigs<Types::ChainSpec>, _>>| {
122                info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks());
123            })
124            .with_metrics_task()
125            // passing FullNodeTypes as type parameter here so that we can build
126            // later the components.
127            .with_blockchain_db::<T, _>(move |provider_factory| {
128                Ok(BlockchainProvider2::new(provider_factory)?)
129            })?
130            .with_components(components_builder, on_component_initialized).await?;
131
132        // spawn exexs
133        let exex_manager_handle = ExExLauncher::new(
134            ctx.head(),
135            ctx.node_adapter().clone(),
136            installed_exex,
137            ctx.configs().clone(),
138        )
139        .launch()
140        .await?;
141
142        // create pipeline
143        let network_client = ctx.components().network().fetch_client().await?;
144        let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
145
146        let node_config = ctx.node_config();
147        let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx)
148            .maybe_skip_fcu(node_config.debug.skip_fcu)
149            .maybe_skip_new_payload(node_config.debug.skip_new_payload)
150            .maybe_reorg(
151                ctx.blockchain_db().clone(),
152                ctx.components().evm_config().clone(),
153                reth_payload_validator::ExecutionPayloadValidator::new(ctx.chain_spec()),
154                node_config.debug.reorg_frequency,
155                node_config.debug.reorg_depth,
156            )
157            // Store messages _after_ skipping so that `replay-engine` command
158            // would replay only the messages that were observed by the engine
159            // during this run.
160            .maybe_store_messages(node_config.debug.engine_api_store.clone());
161
162        let max_block = ctx.max_block(network_client.clone()).await?;
163        let mut hooks = EngineHooks::new();
164
165        let static_file_producer = ctx.static_file_producer();
166        let static_file_producer_events = static_file_producer.lock().events();
167        hooks.add(StaticFileHook::new(
168            static_file_producer.clone(),
169            Box::new(ctx.task_executor().clone()),
170        ));
171        info!(target: "reth::cli", "StaticFileProducer initialized");
172
173        let consensus = Arc::new(ctx.components().consensus().clone());
174
175        // Configure the pipeline
176        let pipeline_exex_handle =
177            exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty);
178        let pipeline = build_networked_pipeline(
179            &ctx.toml_config().stages,
180            network_client.clone(),
181            consensus.clone(),
182            ctx.provider_factory().clone(),
183            ctx.task_executor(),
184            ctx.sync_metrics_tx(),
185            ctx.prune_config(),
186            max_block,
187            static_file_producer,
188            ctx.components().block_executor().clone(),
189            pipeline_exex_handle,
190        )?;
191
192        // The new engine writes directly to static files. This ensures that they're up to the tip.
193        pipeline.move_to_static_files()?;
194
195        let pipeline_events = pipeline.events();
196
197        let mut pruner_builder = ctx.pruner_builder();
198        if let Some(exex_manager_handle) = &exex_manager_handle {
199            pruner_builder =
200                pruner_builder.finished_exex_height(exex_manager_handle.finished_height());
201        }
202        let pruner = pruner_builder.build_with_provider_factory(ctx.provider_factory().clone());
203        let pruner_events = pruner.events();
204        info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized");
205
206        let event_sender = EventSender::default();
207        let beacon_engine_handle =
208            BeaconConsensusEngineHandle::new(consensus_engine_tx.clone(), event_sender.clone());
209
210        // extract the jwt secret from the args if possible
211        let jwt_secret = ctx.auth_jwt_secret()?;
212
213        let add_ons_ctx = AddOnsContext {
214            node: ctx.node_adapter().clone(),
215            config: ctx.node_config(),
216            beacon_engine_handle: beacon_engine_handle.clone(),
217            jwt_secret,
218        };
219        let engine_payload_validator = add_ons.engine_validator(&add_ons_ctx).await?;
220
221        let mut engine_service = if ctx.is_dev() {
222            let eth_service = LocalEngineService::new(
223                consensus.clone(),
224                ctx.components().block_executor().clone(),
225                ctx.provider_factory().clone(),
226                ctx.blockchain_db().clone(),
227                pruner,
228                ctx.components().payload_builder().clone(),
229                engine_payload_validator,
230                engine_tree_config,
231                ctx.invalid_block_hook()?,
232                ctx.sync_metrics_tx(),
233                consensus_engine_tx.clone(),
234                Box::pin(consensus_engine_stream),
235                ctx.dev_mining_mode(ctx.components().pool()),
236                LocalPayloadAttributesBuilder::new(ctx.chain_spec()),
237                ctx.data_dir().clone(),
238            );
239
240            Either::Left(eth_service)
241        } else {
242            let eth_service = EngineService::new(
243                consensus.clone(),
244                ctx.components().block_executor().clone(),
245                ctx.chain_spec(),
246                network_client.clone(),
247                Box::pin(consensus_engine_stream),
248                pipeline,
249                Box::new(ctx.task_executor().clone()),
250                ctx.provider_factory().clone(),
251                ctx.blockchain_db().clone(),
252                pruner,
253                ctx.components().payload_builder().clone(),
254                engine_payload_validator,
255                engine_tree_config,
256                ctx.invalid_block_hook()?,
257                ctx.sync_metrics_tx(),
258                ctx.data_dir().clone(),
259            );
260
261            Either::Right(eth_service)
262        };
263
264        info!(target: "reth::cli", "Consensus engine initialized");
265
266        let events = stream_select!(
267            beacon_engine_handle.event_listener().map(Into::into),
268            pipeline_events.map(Into::into),
269            if ctx.node_config().debug.tip.is_none() && !ctx.is_dev() {
270                Either::Left(
271                    ConsensusLayerHealthEvents::new(Box::new(ctx.blockchain_db().clone()))
272                        .map(Into::into),
273                )
274            } else {
275                Either::Right(stream::empty())
276            },
277            pruner_events.map(Into::into),
278            static_file_producer_events.map(Into::into),
279        );
280        ctx.task_executor().spawn_critical(
281            "events task",
282            node::handle_events(
283                Some(Box::new(ctx.components().network().clone())),
284                Some(ctx.head().number),
285                events,
286            ),
287        );
288
289        let RpcHandle { rpc_server_handles, rpc_registry } =
290            add_ons.launch_add_ons(add_ons_ctx).await?;
291
292        // TODO: migrate to devmode with https://github.com/paradigmxyz/reth/issues/10104
293        if let Some(maybe_custom_etherscan_url) = ctx.node_config().debug.etherscan.clone() {
294            info!(target: "reth::cli", "Using etherscan as consensus client");
295
296            let chain = ctx.node_config().chain.chain();
297            let etherscan_url = maybe_custom_etherscan_url.map(Ok).unwrap_or_else(|| {
298                // If URL isn't provided, use default Etherscan URL for the chain if it is known
299                chain
300                    .etherscan_urls()
301                    .map(|urls| urls.0.to_string())
302                    .ok_or_else(|| eyre::eyre!("failed to get etherscan url for chain: {chain}"))
303            })?;
304
305            let block_provider = EtherscanBlockProvider::new(
306                etherscan_url,
307                chain.etherscan_api_key().ok_or_else(|| {
308                    eyre::eyre!(
309                        "etherscan api key not found for rpc consensus client for chain: {chain}"
310                    )
311                })?,
312            );
313            let rpc_consensus_client = DebugConsensusClient::new(
314                rpc_server_handles.auth.clone(),
315                Arc::new(block_provider),
316            );
317            ctx.task_executor().spawn_critical("etherscan consensus client", async move {
318                rpc_consensus_client.run::<<Types as NodeTypesWithEngine>::Engine>().await
319            });
320        }
321
322        // Run consensus engine to completion
323        let initial_target = ctx.initial_backfill_target()?;
324        let network_handle = ctx.components().network().clone();
325        let mut built_payloads = ctx
326            .components()
327            .payload_builder()
328            .subscribe()
329            .await
330            .map_err(|e| eyre::eyre!("Failed to subscribe to payload builder events: {:?}", e))?
331            .into_built_payload_stream()
332            .fuse();
333        let chainspec = ctx.chain_spec();
334        let (exit, rx) = oneshot::channel();
335        let terminate_after_backfill = ctx.terminate_after_initial_backfill();
336
337        info!(target: "reth::cli", "Starting consensus engine");
338        ctx.task_executor().spawn_critical("consensus engine", async move {
339            if let Some(initial_target) = initial_target {
340                debug!(target: "reth::cli", %initial_target,  "start backfill sync");
341                if let Either::Right(eth_service) = &mut engine_service {
342                    eth_service.orchestrator_mut().start_backfill_sync(initial_target);
343                }
344            }
345
346            let mut res = Ok(());
347
348            // advance the chain and await payloads built locally to add into the engine api tree handler to prevent re-execution if that block is received as payload from the CL
349            loop {
350                tokio::select! {
351                    payload = built_payloads.select_next_some() => {
352                        if let Some(executed_block) = payload.executed_block() {
353                            debug!(target: "reth::cli", block=?executed_block.block().num_hash(),  "inserting built payload");
354                            if let Either::Right(eth_service) = &mut engine_service {
355                                eth_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block).into());
356                            }
357                        }
358                    }
359                    event = engine_service.next() => {
360                        let Some(event) = event else { break };
361                        debug!(target: "reth::cli", "Event: {event}");
362                        match event {
363                            ChainEvent::BackfillSyncFinished => {
364                                if terminate_after_backfill {
365                                    debug!(target: "reth::cli", "Terminating after initial backfill");
366                                    break
367                                }
368
369                                network_handle.update_sync_state(SyncState::Idle);
370                            }
371                            ChainEvent::BackfillSyncStarted => {
372                                network_handle.update_sync_state(SyncState::Syncing);
373                            }
374                            ChainEvent::FatalError => {
375                                error!(target: "reth::cli", "Fatal error in consensus engine");
376                                res = Err(eyre::eyre!("Fatal error in consensus engine"));
377                                break
378                            }
379                            ChainEvent::Handler(ev) => {
380                                if let Some(head) = ev.canonical_header() {
381                                    let head_block = Head {
382                                        number: head.number,
383                                        hash: head.hash(),
384                                        difficulty: head.difficulty,
385                                        timestamp: head.timestamp,
386                                        total_difficulty: chainspec
387                                            .final_paris_total_difficulty(head.number)
388                                            .unwrap_or_default(),
389                                    };
390                                    network_handle.update_status(head_block);
391                                }
392                                event_sender.notify(ev);
393                            }
394                        }
395                    }
396                }
397            }
398
399            let _ = exit.send(res);
400        });
401
402        let full_node = FullNode {
403            evm_config: ctx.components().evm_config().clone(),
404            block_executor: ctx.components().block_executor().clone(),
405            pool: ctx.components().pool().clone(),
406            network: ctx.components().network().clone(),
407            provider: ctx.node_adapter().provider.clone(),
408            payload_builder: ctx.components().payload_builder().clone(),
409            task_executor: ctx.task_executor().clone(),
410            config: ctx.node_config().clone(),
411            data_dir: ctx.data_dir().clone(),
412            add_ons_handle: RpcHandle { rpc_server_handles, rpc_registry },
413        };
414        // Notify on node started
415        on_node_started.on_event(FullNode::clone(&full_node))?;
416
417        let handle = NodeHandle {
418            node_exit_future: NodeExitFuture::new(
419                async { rx.await? },
420                full_node.config.debug.terminate,
421            ),
422            node: full_node,
423        };
424
425        Ok(handle)
426    }
427}