1use crate::args::NetworkArgs;
2use clap::Parser;
3use eyre::Context;
4use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig};
5use reth_beacon_consensus::{hooks::EngineHooks, BeaconConsensusEngine, EthBeaconConsensus};
6use reth_blockchain_tree::{
7 BlockchainTree, BlockchainTreeConfig, ShareableBlockchainTree, TreeExternals,
8};
9use reth_chainspec::ChainSpec;
10use reth_cli::chainspec::ChainSpecParser;
11use reth_cli_commands::common::{AccessRights, CliNodeTypes, Environment, EnvironmentArgs};
12use reth_cli_runner::CliContext;
13use reth_cli_util::get_secret_key;
14use reth_config::Config;
15use reth_consensus::FullConsensus;
16use reth_db::DatabaseEnv;
17use reth_engine_util::engine_store::{EngineMessageStore, StoredEngineApiMessage};
18use reth_ethereum_payload_builder::EthereumBuilderConfig;
19use reth_fs_util as fs;
20use reth_network::{BlockDownloaderProvider, NetworkHandle};
21use reth_network_api::NetworkInfo;
22use reth_node_api::{EngineApiMessageVersion, NodePrimitives, NodeTypesWithDBAdapter};
23use reth_node_ethereum::{EthEngineTypes, EthEvmConfig, EthExecutorProvider};
24use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService};
25use reth_provider::{
26 providers::{BlockchainProvider, ProviderNodeTypes},
27 CanonStateSubscriptions, ChainSpecProvider, ProviderFactory,
28};
29use reth_prune::PruneModes;
30use reth_stages::Pipeline;
31use reth_static_file::StaticFileProducer;
32use reth_tasks::TaskExecutor;
33use reth_transaction_pool::noop::NoopTransactionPool;
34use std::{path::PathBuf, sync::Arc, time::Duration};
35use tokio::sync::oneshot;
36use tracing::*;
37
38#[derive(Debug, Parser)]
42pub struct Command<C: ChainSpecParser> {
43 #[command(flatten)]
44 env: EnvironmentArgs<C>,
45
46 #[command(flatten)]
47 network: NetworkArgs,
48
49 #[arg(long = "engine-api-store", value_name = "PATH")]
51 engine_api_store: PathBuf,
52
53 #[arg(long = "interval", default_value_t = 1_000)]
55 interval: u64,
56}
57
58impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
59 async fn build_network<
60 N: ProviderNodeTypes<
61 ChainSpec = C::ChainSpec,
62 Primitives: NodePrimitives<
63 Block = reth_primitives::Block,
64 Receipt = reth_primitives::Receipt,
65 BlockHeader = reth_primitives::Header,
66 >,
67 >,
68 >(
69 &self,
70 config: &Config,
71 task_executor: TaskExecutor,
72 provider_factory: ProviderFactory<N>,
73 network_secret_path: PathBuf,
74 default_peers_path: PathBuf,
75 ) -> eyre::Result<NetworkHandle> {
76 let secret_key = get_secret_key(&network_secret_path)?;
77 let network = self
78 .network
79 .network_config(config, provider_factory.chain_spec(), secret_key, default_peers_path)
80 .with_task_executor(Box::new(task_executor))
81 .build(provider_factory)
82 .start_network()
83 .await?;
84 info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network");
85 debug!(target: "reth::cli", peer_id = ?network.peer_id(), "Full peer ID");
86 Ok(network)
87 }
88
89 pub async fn execute<N: CliNodeTypes<Engine = EthEngineTypes, ChainSpec = C::ChainSpec>>(
91 self,
92 ctx: CliContext,
93 ) -> eyre::Result<()> {
94 let Environment { provider_factory, config, data_dir } =
95 self.env.init::<N>(AccessRights::RW)?;
96
97 let consensus: Arc<dyn FullConsensus> =
98 Arc::new(EthBeaconConsensus::new(provider_factory.chain_spec()));
99
100 let executor = EthExecutorProvider::ethereum(provider_factory.chain_spec());
101
102 let tree_externals =
104 TreeExternals::new(provider_factory.clone(), Arc::clone(&consensus), executor);
105 let tree = BlockchainTree::new(tree_externals, BlockchainTreeConfig::default())?;
106 let blockchain_tree = Arc::new(ShareableBlockchainTree::new(tree));
107
108 let blockchain_db = BlockchainProvider::new(provider_factory.clone(), blockchain_tree)?;
110
111 let network_secret_path =
113 self.network.p2p_secret_key.clone().unwrap_or_else(|| data_dir.p2p_secret());
114 let network = self
115 .build_network(
116 &config,
117 ctx.task_executor.clone(),
118 provider_factory.clone(),
119 network_secret_path,
120 data_dir.known_peers(),
121 )
122 .await?;
123
124 let payload_builder = reth_ethereum_payload_builder::EthereumPayloadBuilder::new(
126 EthEvmConfig::new(provider_factory.chain_spec()),
127 EthereumBuilderConfig::new(Default::default()),
128 );
129
130 let payload_generator = BasicPayloadJobGenerator::with_builder(
131 blockchain_db.clone(),
132 NoopTransactionPool::default(),
133 ctx.task_executor.clone(),
134 BasicPayloadJobGeneratorConfig::default(),
135 payload_builder,
136 );
137
138 let (payload_service, payload_builder): (_, PayloadBuilderHandle<EthEngineTypes>) =
139 PayloadBuilderService::new(payload_generator, blockchain_db.canonical_state_stream());
140
141 ctx.task_executor.spawn_critical("payload builder service", payload_service);
142
143 let network_client = network.fetch_client().await?;
145 let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::new(
146 network_client,
147 Pipeline::<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>::builder().build(
148 provider_factory.clone(),
149 StaticFileProducer::new(provider_factory.clone(), PruneModes::none()),
150 ),
151 blockchain_db.clone(),
152 Box::new(ctx.task_executor.clone()),
153 Box::new(network),
154 None,
155 payload_builder,
156 None,
157 u64::MAX,
158 EngineHooks::new(),
159 )?;
160 info!(target: "reth::cli", "Consensus engine initialized");
161
162 let (tx, rx) = oneshot::channel();
164 info!(target: "reth::cli", "Starting consensus engine");
165 ctx.task_executor.spawn_critical_blocking("consensus engine", async move {
166 let res = beacon_consensus_engine.await;
167 let _ = tx.send(res);
168 });
169
170 let engine_api_store = EngineMessageStore::new(self.engine_api_store.clone());
171 for filepath in engine_api_store.engine_messages_iter()? {
172 let contents =
173 fs::read(&filepath).wrap_err(format!("failed to read: {}", filepath.display()))?;
174 let message = serde_json::from_slice(&contents)
175 .wrap_err(format!("failed to parse: {}", filepath.display()))?;
176 debug!(target: "reth::cli", filepath = %filepath.display(), ?message, "Forwarding Engine API message");
177 match message {
178 StoredEngineApiMessage::ForkchoiceUpdated { state, payload_attrs } => {
179 let response = beacon_engine_handle
180 .fork_choice_updated(
181 state,
182 payload_attrs,
183 EngineApiMessageVersion::default(),
184 )
185 .await?;
186 debug!(target: "reth::cli", ?response, "Received for forkchoice updated");
187 }
188 StoredEngineApiMessage::NewPayload { payload, sidecar } => {
189 let response = beacon_engine_handle.new_payload(payload, sidecar).await?;
190 debug!(target: "reth::cli", ?response, "Received for new payload");
191 }
192 };
193
194 tokio::time::sleep(Duration::from_millis(self.interval)).await;
196 }
197
198 info!(target: "reth::cli", "Finished replaying engine API messages");
199
200 match rx.await? {
201 Ok(()) => info!("Beacon consensus engine exited successfully"),
202 Err(error) => {
203 error!(target: "reth::cli", %error, "Beacon consensus engine exited with an error")
204 }
205 };
206
207 Ok(())
208 }
209}