reth_node_builder/launch/
mod.rs

1//! Abstraction for launching a node.
2
3pub mod common;
4mod exex;
5
6pub(crate) mod engine;
7
8pub use common::LaunchContext;
9use common::{Attached, LaunchContextWith, WithConfigs};
10pub use exex::ExExLauncher;
11
12use std::{future::Future, sync::Arc};
13
14use futures::{future::Either, stream, stream_select, StreamExt};
15use reth_beacon_consensus::{
16    hooks::{EngineHooks, PruneHook, StaticFileHook},
17    BeaconConsensusEngine,
18};
19use reth_blockchain_tree::{
20    externals::TreeNodeTypes, noop::NoopBlockchainTree, BlockchainTree, BlockchainTreeConfig,
21    ShareableBlockchainTree, TreeExternals,
22};
23use reth_chainspec::EthChainSpec;
24use reth_consensus_debug_client::{DebugConsensusClient, EtherscanBlockProvider, RpcBlockProvider};
25use reth_engine_util::EngineMessageStreamExt;
26use reth_exex::ExExManagerHandle;
27use reth_network::BlockDownloaderProvider;
28use reth_node_api::{AddOnsContext, FullNodeTypes, NodeTypesWithEngine};
29use reth_node_core::{
30    dirs::{ChainPath, DataDirPath},
31    exit::NodeExitFuture,
32};
33use reth_node_events::{cl::ConsensusLayerHealthEvents, node};
34use reth_provider::providers::{BlockchainProvider, ProviderNodeTypes};
35use reth_rpc::eth::RpcNodeCore;
36use reth_tasks::TaskExecutor;
37use reth_tracing::tracing::{debug, info};
38use tokio::sync::{mpsc::unbounded_channel, oneshot};
39use tokio_stream::wrappers::UnboundedReceiverStream;
40
41use crate::{
42    builder::{NodeAdapter, NodeTypesAdapter},
43    components::{NodeComponents, NodeComponentsBuilder},
44    hooks::NodeHooks,
45    node::FullNode,
46    rpc::{RethRpcAddOns, RpcHandle},
47    AddOns, NodeBuilderWithComponents, NodeHandle,
48};
49
50/// Alias for [`reth_rpc_eth_types::EthApiBuilderCtx`], adapter for [`RpcNodeCore`].
51pub type EthApiBuilderCtx<N> = reth_rpc_eth_types::EthApiBuilderCtx<
52    <N as RpcNodeCore>::Provider,
53    <N as RpcNodeCore>::Pool,
54    <N as RpcNodeCore>::Evm,
55    <N as RpcNodeCore>::Network,
56    TaskExecutor,
57    <N as RpcNodeCore>::Provider,
58>;
59
60/// A general purpose trait that launches a new node of any kind.
61///
62/// Acts as a node factory that targets a certain node configuration and returns a handle to the
63/// node.
64///
65/// This is essentially the launch logic for a node.
66///
67/// See also [`DefaultNodeLauncher`] and [`NodeBuilderWithComponents::launch_with`]
68pub trait LaunchNode<Target> {
69    /// The node type that is created.
70    type Node;
71
72    /// Create and return a new node asynchronously.
73    fn launch_node(self, target: Target) -> impl Future<Output = eyre::Result<Self::Node>>;
74}
75
76impl<F, Target, Fut, Node> LaunchNode<Target> for F
77where
78    F: FnOnce(Target) -> Fut + Send,
79    Fut: Future<Output = eyre::Result<Node>> + Send,
80{
81    type Node = Node;
82
83    fn launch_node(self, target: Target) -> impl Future<Output = eyre::Result<Self::Node>> {
84        self(target)
85    }
86}
87
88/// The default launcher for a node.
89#[derive(Debug)]
90pub struct DefaultNodeLauncher {
91    /// The task executor for the node.
92    pub ctx: LaunchContext,
93}
94
95impl DefaultNodeLauncher {
96    /// Create a new instance of the default node launcher.
97    pub const fn new(task_executor: TaskExecutor, data_dir: ChainPath<DataDirPath>) -> Self {
98        Self { ctx: LaunchContext::new(task_executor, data_dir) }
99    }
100}
101
102impl<Types, T, CB, AO> LaunchNode<NodeBuilderWithComponents<T, CB, AO>> for DefaultNodeLauncher
103where
104    Types: ProviderNodeTypes + NodeTypesWithEngine + TreeNodeTypes,
105    T: FullNodeTypes<Provider = BlockchainProvider<Types>, Types = Types>,
106    CB: NodeComponentsBuilder<T>,
107    AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>,
108{
109    type Node = NodeHandle<NodeAdapter<T, CB::Components>, AO>;
110
111    async fn launch_node(
112        self,
113        target: NodeBuilderWithComponents<T, CB, AO>,
114    ) -> eyre::Result<Self::Node> {
115        let Self { ctx } = self;
116        let NodeBuilderWithComponents {
117            adapter: NodeTypesAdapter { database },
118            components_builder,
119            add_ons: AddOns { hooks, exexs: installed_exex, add_ons },
120            config,
121        } = target;
122        let NodeHooks { on_component_initialized, on_node_started, .. } = hooks;
123
124        // TODO: remove tree and move tree_config and canon_state_notification_sender
125        // initialization to with_blockchain_db once the engine revamp is done
126        // https://github.com/paradigmxyz/reth/issues/8742
127        let tree_config = BlockchainTreeConfig::default();
128
129        // NOTE: This is a temporary workaround to provide the canon state notification sender to the components builder because there's a cyclic dependency between the blockchain provider and the tree component. This will be removed once the Blockchain provider no longer depends on an instance of the tree: <https://github.com/paradigmxyz/reth/issues/7154>
130        let (canon_state_notification_sender, _receiver) =
131            tokio::sync::broadcast::channel(tree_config.max_reorg_depth() as usize * 2);
132
133        let tree = Arc::new(NoopBlockchainTree::with_canon_state_notifications(
134            canon_state_notification_sender.clone(),
135        ));
136
137        // setup the launch context
138        let mut ctx = ctx
139            .with_configured_globals()
140            // load the toml config
141            .with_loaded_toml_config(config)?
142            // add resolved peers
143            .with_resolved_peers().await?
144            // attach the database
145            .attach(database.clone())
146            // ensure certain settings take effect
147            .with_adjusted_configs()
148            // Create the provider factory
149            .with_provider_factory().await?
150            .inspect(|_| {
151                info!(target: "reth::cli", "Database opened");
152            })
153            .with_prometheus_server().await?
154            .inspect(|this| {
155                debug!(target: "reth::cli", chain=%this.chain_id(), genesis=?this.genesis_hash(), "Initializing genesis");
156            })
157            .with_genesis()?
158            .inspect(|this: &LaunchContextWith<Attached<WithConfigs<Types::ChainSpec>, _>>| {
159                info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks());
160            })
161            .with_metrics_task()
162            // passing FullNodeTypes as type parameter here so that we can build
163            // later the components.
164            .with_blockchain_db::<T, _>(move |provider_factory| {
165                Ok(BlockchainProvider::new(provider_factory, tree)?)
166            })?
167            .with_components(components_builder, on_component_initialized).await?;
168
169        let consensus = Arc::new(ctx.components().consensus().clone());
170
171        let tree_externals = TreeExternals::new(
172            ctx.provider_factory().clone(),
173            consensus.clone(),
174            ctx.components().block_executor().clone(),
175        );
176        let tree = BlockchainTree::new(tree_externals, tree_config)?
177            .with_sync_metrics_tx(ctx.sync_metrics_tx())
178            // Note: This is required because we need to ensure that both the components and the
179            // tree are using the same channel for canon state notifications. This will be removed
180            // once the Blockchain provider no longer depends on an instance of the tree
181            .with_canon_state_notification_sender(canon_state_notification_sender);
182
183        let blockchain_tree = Arc::new(ShareableBlockchainTree::new(tree));
184
185        ctx.node_adapter_mut().provider = ctx.blockchain_db().clone().with_tree(blockchain_tree);
186
187        debug!(target: "reth::cli", "configured blockchain tree");
188
189        // spawn exexs
190        let exex_manager_handle = ExExLauncher::new(
191            ctx.head(),
192            ctx.node_adapter().clone(),
193            installed_exex,
194            ctx.configs().clone(),
195        )
196        .launch()
197        .await?;
198
199        // create pipeline
200        let network_client = ctx.components().network().fetch_client().await?;
201        let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
202
203        let node_config = ctx.node_config();
204        let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx)
205            .maybe_skip_fcu(node_config.debug.skip_fcu)
206            .maybe_skip_new_payload(node_config.debug.skip_new_payload)
207            .maybe_reorg(
208                ctx.blockchain_db().clone(),
209                ctx.components().evm_config().clone(),
210                reth_payload_validator::ExecutionPayloadValidator::new(ctx.chain_spec()),
211                node_config.debug.reorg_frequency,
212                node_config.debug.reorg_depth,
213            )
214            // Store messages _after_ skipping so that `replay-engine` command
215            // would replay only the messages that were observed by the engine
216            // during this run.
217            .maybe_store_messages(node_config.debug.engine_api_store.clone());
218
219        let max_block = ctx.max_block(network_client.clone()).await?;
220        let mut hooks = EngineHooks::new();
221
222        let static_file_producer = ctx.static_file_producer();
223        let static_file_producer_events = static_file_producer.lock().events();
224        hooks.add(StaticFileHook::new(
225            static_file_producer.clone(),
226            Box::new(ctx.task_executor().clone()),
227        ));
228        info!(target: "reth::cli", "StaticFileProducer initialized");
229
230        // Configure the pipeline
231        let pipeline_exex_handle =
232            exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty);
233        let (pipeline, client) = if ctx.is_dev() {
234            eyre::bail!("Dev mode is not supported for legacy engine")
235        } else {
236            let pipeline = crate::setup::build_networked_pipeline(
237                &ctx.toml_config().stages,
238                network_client.clone(),
239                consensus.clone(),
240                ctx.provider_factory().clone(),
241                ctx.task_executor(),
242                ctx.sync_metrics_tx(),
243                ctx.prune_config(),
244                max_block,
245                static_file_producer,
246                ctx.components().block_executor().clone(),
247                pipeline_exex_handle,
248            )?;
249
250            (pipeline, network_client.clone())
251        };
252
253        let pipeline_events = pipeline.events();
254
255        let initial_target = ctx.node_config().debug.tip;
256
257        let mut pruner_builder = ctx.pruner_builder();
258        if let Some(exex_manager_handle) = &exex_manager_handle {
259            pruner_builder =
260                pruner_builder.finished_exex_height(exex_manager_handle.finished_height());
261        }
262        let pruner = pruner_builder.build_with_provider_factory(ctx.provider_factory().clone());
263
264        let pruner_events = pruner.events();
265        info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized");
266        hooks.add(PruneHook::new(pruner, Box::new(ctx.task_executor().clone())));
267
268        // Configure the consensus engine
269        let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel(
270            client,
271            pipeline,
272            ctx.blockchain_db().clone(),
273            Box::new(ctx.task_executor().clone()),
274            Box::new(ctx.components().network().clone()),
275            max_block,
276            ctx.components().payload_builder().clone(),
277            initial_target,
278            reth_beacon_consensus::MIN_BLOCKS_FOR_PIPELINE_RUN,
279            consensus_engine_tx,
280            Box::pin(consensus_engine_stream),
281            hooks,
282        )?;
283        info!(target: "reth::cli", "Consensus engine initialized");
284
285        let events = stream_select!(
286            pipeline_events.map(Into::into),
287            if ctx.node_config().debug.tip.is_none() && !ctx.is_dev() {
288                Either::Left(
289                    ConsensusLayerHealthEvents::new(Box::new(ctx.blockchain_db().clone()))
290                        .map(Into::into),
291                )
292            } else {
293                Either::Right(stream::empty())
294            },
295            pruner_events.map(Into::into),
296            static_file_producer_events.map(Into::into),
297        );
298        ctx.task_executor().spawn_critical(
299            "events task",
300            node::handle_events(
301                Some(Box::new(ctx.components().network().clone())),
302                Some(ctx.head().number),
303                events,
304            ),
305        );
306
307        // extract the jwt secret from the args if possible
308        let jwt_secret = ctx.auth_jwt_secret()?;
309
310        let add_ons_ctx = AddOnsContext {
311            node: ctx.node_adapter().clone(),
312            config: ctx.node_config(),
313            beacon_engine_handle,
314            jwt_secret,
315        };
316
317        let RpcHandle { rpc_server_handles, rpc_registry } =
318            add_ons.launch_add_ons(add_ons_ctx).await?;
319
320        // Run consensus engine to completion
321        let (tx, rx) = oneshot::channel();
322        info!(target: "reth::cli", "Starting consensus engine");
323        ctx.task_executor().spawn_critical_blocking("consensus engine", async move {
324            let res = beacon_consensus_engine.await;
325            let _ = tx.send(res);
326        });
327
328        if let Some(maybe_custom_etherscan_url) = ctx.node_config().debug.etherscan.clone() {
329            info!(target: "reth::cli", "Using etherscan as consensus client");
330
331            let chain = ctx.node_config().chain.chain();
332            let etherscan_url = maybe_custom_etherscan_url.map(Ok).unwrap_or_else(|| {
333                // If URL isn't provided, use default Etherscan URL for the chain if it is known
334                chain
335                    .etherscan_urls()
336                    .map(|urls| urls.0.to_string())
337                    .ok_or_else(|| eyre::eyre!("failed to get etherscan url for chain: {chain}"))
338            })?;
339
340            let block_provider = EtherscanBlockProvider::new(
341                etherscan_url,
342                chain.etherscan_api_key().ok_or_else(|| {
343                    eyre::eyre!(
344                        "etherscan api key not found for rpc consensus client for chain: {chain}"
345                    )
346                })?,
347            );
348            let rpc_consensus_client = DebugConsensusClient::new(
349                rpc_server_handles.auth.clone(),
350                Arc::new(block_provider),
351            );
352            ctx.task_executor().spawn_critical("etherscan consensus client", async move {
353                rpc_consensus_client.run::<Types::Engine>().await
354            });
355        }
356
357        if let Some(rpc_ws_url) = ctx.node_config().debug.rpc_consensus_ws.clone() {
358            info!(target: "reth::cli", "Using rpc provider as consensus client");
359
360            let block_provider = RpcBlockProvider::new(rpc_ws_url);
361            let rpc_consensus_client = DebugConsensusClient::new(
362                rpc_server_handles.auth.clone(),
363                Arc::new(block_provider),
364            );
365            ctx.task_executor().spawn_critical("rpc consensus client", async move {
366                rpc_consensus_client.run::<Types::Engine>().await
367            });
368        }
369
370        let full_node = FullNode {
371            evm_config: ctx.components().evm_config().clone(),
372            block_executor: ctx.components().block_executor().clone(),
373            pool: ctx.components().pool().clone(),
374            network: ctx.components().network().clone(),
375            provider: ctx.node_adapter().provider.clone(),
376            payload_builder: ctx.components().payload_builder().clone(),
377            task_executor: ctx.task_executor().clone(),
378            config: ctx.node_config().clone(),
379            data_dir: ctx.data_dir().clone(),
380            add_ons_handle: RpcHandle { rpc_server_handles, rpc_registry },
381        };
382        // Notify on node started
383        on_node_started.on_event(FullNode::clone(&full_node))?;
384
385        let handle = NodeHandle {
386            node_exit_future: NodeExitFuture::new(
387                async { Ok(rx.await??) },
388                full_node.config.debug.terminate,
389            ),
390            node: full_node,
391        };
392
393        Ok(handle)
394    }
395}