reth/commands/debug_cmd/
replay_engine.rs

1use crate::args::NetworkArgs;
2use clap::Parser;
3use eyre::Context;
4use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig};
5use reth_beacon_consensus::{hooks::EngineHooks, BeaconConsensusEngine, EthBeaconConsensus};
6use reth_blockchain_tree::{
7    BlockchainTree, BlockchainTreeConfig, ShareableBlockchainTree, TreeExternals,
8};
9use reth_chainspec::ChainSpec;
10use reth_cli::chainspec::ChainSpecParser;
11use reth_cli_commands::common::{AccessRights, CliNodeTypes, Environment, EnvironmentArgs};
12use reth_cli_runner::CliContext;
13use reth_cli_util::get_secret_key;
14use reth_config::Config;
15use reth_consensus::FullConsensus;
16use reth_db::DatabaseEnv;
17use reth_engine_util::engine_store::{EngineMessageStore, StoredEngineApiMessage};
18use reth_ethereum_payload_builder::EthereumBuilderConfig;
19use reth_fs_util as fs;
20use reth_network::{BlockDownloaderProvider, NetworkHandle};
21use reth_network_api::NetworkInfo;
22use reth_node_api::{EngineApiMessageVersion, NodePrimitives, NodeTypesWithDBAdapter};
23use reth_node_ethereum::{EthEngineTypes, EthEvmConfig, EthExecutorProvider};
24use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService};
25use reth_provider::{
26    providers::{BlockchainProvider, ProviderNodeTypes},
27    CanonStateSubscriptions, ChainSpecProvider, ProviderFactory,
28};
29use reth_prune::PruneModes;
30use reth_stages::Pipeline;
31use reth_static_file::StaticFileProducer;
32use reth_tasks::TaskExecutor;
33use reth_transaction_pool::noop::NoopTransactionPool;
34use std::{path::PathBuf, sync::Arc, time::Duration};
35use tokio::sync::oneshot;
36use tracing::*;
37
38/// `reth debug replay-engine` command
39/// This script will read stored engine API messages and replay them by the timestamp.
40/// It does not require
41#[derive(Debug, Parser)]
42pub struct Command<C: ChainSpecParser> {
43    #[command(flatten)]
44    env: EnvironmentArgs<C>,
45
46    #[command(flatten)]
47    network: NetworkArgs,
48
49    /// The path to read engine API messages from.
50    #[arg(long = "engine-api-store", value_name = "PATH")]
51    engine_api_store: PathBuf,
52
53    /// The number of milliseconds between Engine API messages.
54    #[arg(long = "interval", default_value_t = 1_000)]
55    interval: u64,
56}
57
58impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
59    async fn build_network<
60        N: ProviderNodeTypes<
61            ChainSpec = C::ChainSpec,
62            Primitives: NodePrimitives<
63                Block = reth_primitives::Block,
64                Receipt = reth_primitives::Receipt,
65                BlockHeader = reth_primitives::Header,
66            >,
67        >,
68    >(
69        &self,
70        config: &Config,
71        task_executor: TaskExecutor,
72        provider_factory: ProviderFactory<N>,
73        network_secret_path: PathBuf,
74        default_peers_path: PathBuf,
75    ) -> eyre::Result<NetworkHandle> {
76        let secret_key = get_secret_key(&network_secret_path)?;
77        let network = self
78            .network
79            .network_config(config, provider_factory.chain_spec(), secret_key, default_peers_path)
80            .with_task_executor(Box::new(task_executor))
81            .build(provider_factory)
82            .start_network()
83            .await?;
84        info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network");
85        debug!(target: "reth::cli", peer_id = ?network.peer_id(), "Full peer ID");
86        Ok(network)
87    }
88
89    /// Execute `debug replay-engine` command
90    pub async fn execute<N: CliNodeTypes<Engine = EthEngineTypes, ChainSpec = C::ChainSpec>>(
91        self,
92        ctx: CliContext,
93    ) -> eyre::Result<()> {
94        let Environment { provider_factory, config, data_dir } =
95            self.env.init::<N>(AccessRights::RW)?;
96
97        let consensus: Arc<dyn FullConsensus> =
98            Arc::new(EthBeaconConsensus::new(provider_factory.chain_spec()));
99
100        let executor = EthExecutorProvider::ethereum(provider_factory.chain_spec());
101
102        // Configure blockchain tree
103        let tree_externals =
104            TreeExternals::new(provider_factory.clone(), Arc::clone(&consensus), executor);
105        let tree = BlockchainTree::new(tree_externals, BlockchainTreeConfig::default())?;
106        let blockchain_tree = Arc::new(ShareableBlockchainTree::new(tree));
107
108        // Set up the blockchain provider
109        let blockchain_db = BlockchainProvider::new(provider_factory.clone(), blockchain_tree)?;
110
111        // Set up network
112        let network_secret_path =
113            self.network.p2p_secret_key.clone().unwrap_or_else(|| data_dir.p2p_secret());
114        let network = self
115            .build_network(
116                &config,
117                ctx.task_executor.clone(),
118                provider_factory.clone(),
119                network_secret_path,
120                data_dir.known_peers(),
121            )
122            .await?;
123
124        // Set up payload builder
125        let payload_builder = reth_ethereum_payload_builder::EthereumPayloadBuilder::new(
126            EthEvmConfig::new(provider_factory.chain_spec()),
127            EthereumBuilderConfig::new(Default::default()),
128        );
129
130        let payload_generator = BasicPayloadJobGenerator::with_builder(
131            blockchain_db.clone(),
132            NoopTransactionPool::default(),
133            ctx.task_executor.clone(),
134            BasicPayloadJobGeneratorConfig::default(),
135            payload_builder,
136        );
137
138        let (payload_service, payload_builder): (_, PayloadBuilderHandle<EthEngineTypes>) =
139            PayloadBuilderService::new(payload_generator, blockchain_db.canonical_state_stream());
140
141        ctx.task_executor.spawn_critical("payload builder service", payload_service);
142
143        // Configure the consensus engine
144        let network_client = network.fetch_client().await?;
145        let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::new(
146            network_client,
147            Pipeline::<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>::builder().build(
148                provider_factory.clone(),
149                StaticFileProducer::new(provider_factory.clone(), PruneModes::none()),
150            ),
151            blockchain_db.clone(),
152            Box::new(ctx.task_executor.clone()),
153            Box::new(network),
154            None,
155            payload_builder,
156            None,
157            u64::MAX,
158            EngineHooks::new(),
159        )?;
160        info!(target: "reth::cli", "Consensus engine initialized");
161
162        // Run consensus engine to completion
163        let (tx, rx) = oneshot::channel();
164        info!(target: "reth::cli", "Starting consensus engine");
165        ctx.task_executor.spawn_critical_blocking("consensus engine", async move {
166            let res = beacon_consensus_engine.await;
167            let _ = tx.send(res);
168        });
169
170        let engine_api_store = EngineMessageStore::new(self.engine_api_store.clone());
171        for filepath in engine_api_store.engine_messages_iter()? {
172            let contents =
173                fs::read(&filepath).wrap_err(format!("failed to read: {}", filepath.display()))?;
174            let message = serde_json::from_slice(&contents)
175                .wrap_err(format!("failed to parse: {}", filepath.display()))?;
176            debug!(target: "reth::cli", filepath = %filepath.display(), ?message, "Forwarding Engine API message");
177            match message {
178                StoredEngineApiMessage::ForkchoiceUpdated { state, payload_attrs } => {
179                    let response = beacon_engine_handle
180                        .fork_choice_updated(
181                            state,
182                            payload_attrs,
183                            EngineApiMessageVersion::default(),
184                        )
185                        .await?;
186                    debug!(target: "reth::cli", ?response, "Received for forkchoice updated");
187                }
188                StoredEngineApiMessage::NewPayload { payload, sidecar } => {
189                    let response = beacon_engine_handle.new_payload(payload, sidecar).await?;
190                    debug!(target: "reth::cli", ?response, "Received for new payload");
191                }
192            };
193
194            // Pause before next message
195            tokio::time::sleep(Duration::from_millis(self.interval)).await;
196        }
197
198        info!(target: "reth::cli", "Finished replaying engine API messages");
199
200        match rx.await? {
201            Ok(()) => info!("Beacon consensus engine exited successfully"),
202            Err(error) => {
203                error!(target: "reth::cli", %error, "Beacon consensus engine exited with an error")
204            }
205        };
206
207        Ok(())
208    }
209}