1use crate::{
4 args::NetworkArgs,
5 utils::{get_single_body, get_single_header},
6};
7use alloy_eips::BlockHashOrNumber;
8use backon::{ConstantBuilder, Retryable};
9use clap::Parser;
10use reth_beacon_consensus::EthBeaconConsensus;
11use reth_chainspec::ChainSpec;
12use reth_cli::chainspec::ChainSpecParser;
13use reth_cli_commands::common::{AccessRights, CliNodeTypes, Environment, EnvironmentArgs};
14use reth_cli_runner::CliContext;
15use reth_cli_util::get_secret_key;
16use reth_config::Config;
17use reth_errors::BlockValidationError;
18use reth_evm::execute::{BlockExecutorProvider, Executor};
19use reth_execution_types::ExecutionOutcome;
20use reth_network::{BlockDownloaderProvider, NetworkHandle};
21use reth_network_api::NetworkInfo;
22use reth_node_api::{BlockTy, NodePrimitives};
23use reth_node_ethereum::EthExecutorProvider;
24use reth_primitives::BlockExt;
25use reth_provider::{
26 providers::ProviderNodeTypes, AccountExtReader, ChainSpecProvider, DatabaseProviderFactory,
27 HashedPostStateProvider, HashingWriter, HeaderProvider, LatestStateProviderRef,
28 OriginalValuesKnown, ProviderFactory, StageCheckpointReader, StateWriter, StorageLocation,
29 StorageReader,
30};
31use reth_revm::database::StateProviderDatabase;
32use reth_stages::StageId;
33use reth_tasks::TaskExecutor;
34use reth_trie::StateRoot;
35use reth_trie_db::DatabaseStateRoot;
36use std::{path::PathBuf, sync::Arc};
37use tracing::*;
38
39#[derive(Debug, Parser)]
44pub struct Command<C: ChainSpecParser> {
45 #[command(flatten)]
46 env: EnvironmentArgs<C>,
47
48 #[command(flatten)]
49 network: NetworkArgs,
50
51 #[arg(long, default_value = "5")]
53 retries: usize,
54
55 #[arg(long)]
57 skip_node_depth: Option<usize>,
58}
59
60impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
61 async fn build_network<
62 N: ProviderNodeTypes<
63 ChainSpec = C::ChainSpec,
64 Primitives: NodePrimitives<
65 Block = reth_primitives::Block,
66 Receipt = reth_primitives::Receipt,
67 BlockHeader = reth_primitives::Header,
68 >,
69 >,
70 >(
71 &self,
72 config: &Config,
73 task_executor: TaskExecutor,
74 provider_factory: ProviderFactory<N>,
75 network_secret_path: PathBuf,
76 default_peers_path: PathBuf,
77 ) -> eyre::Result<NetworkHandle> {
78 let secret_key = get_secret_key(&network_secret_path)?;
79 let network = self
80 .network
81 .network_config(config, provider_factory.chain_spec(), secret_key, default_peers_path)
82 .with_task_executor(Box::new(task_executor))
83 .build(provider_factory)
84 .start_network()
85 .await?;
86 info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network");
87 debug!(target: "reth::cli", peer_id = ?network.peer_id(), "Full peer ID");
88 Ok(network)
89 }
90
91 pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec>>(
93 self,
94 ctx: CliContext,
95 ) -> eyre::Result<()> {
96 let Environment { provider_factory, config, data_dir } =
97 self.env.init::<N>(AccessRights::RW)?;
98
99 let provider = provider_factory.provider()?;
100
101 let merkle_checkpoint = provider
103 .get_stage_checkpoint(StageId::MerkleExecute)?
104 .expect("merkle checkpoint exists");
105
106 let merkle_block_number = merkle_checkpoint.block_number;
107
108 let network_secret_path =
110 self.network.p2p_secret_key.clone().unwrap_or_else(|| data_dir.p2p_secret());
111 let network = self
112 .build_network(
113 &config,
114 ctx.task_executor.clone(),
115 provider_factory.clone(),
116 network_secret_path,
117 data_dir.known_peers(),
118 )
119 .await?;
120
121 let target_block_number = merkle_block_number + 1;
122
123 info!(target: "reth::cli", target_block_number, "Downloading full block");
124 let fetch_client = network.fetch_client().await?;
125
126 let retries = self.retries.max(1);
127 let backoff = ConstantBuilder::default().with_max_times(retries);
128
129 let client = fetch_client.clone();
130 let header = (move || {
131 get_single_header(client.clone(), BlockHashOrNumber::Number(target_block_number))
132 })
133 .retry(backoff)
134 .notify(|err, _| warn!(target: "reth::cli", "Error requesting header: {err}. Retrying..."))
135 .await?;
136
137 let client = fetch_client.clone();
138 let chain = provider_factory.chain_spec();
139 let consensus = Arc::new(EthBeaconConsensus::new(chain.clone()));
140 let block = (move || get_single_body(client.clone(), header.clone(), consensus.clone()))
141 .retry(backoff)
142 .notify(
143 |err, _| warn!(target: "reth::cli", "Error requesting body: {err}. Retrying..."),
144 )
145 .await?;
146
147 let state_provider = LatestStateProviderRef::new(&provider);
148 let db = StateProviderDatabase::new(&state_provider);
149
150 let executor = EthExecutorProvider::ethereum(provider_factory.chain_spec()).executor(db);
151
152 let merkle_block_td =
153 provider.header_td_by_number(merkle_block_number)?.unwrap_or_default();
154 let block_execution_output = executor.execute(
155 (
156 &block
157 .clone()
158 .unseal::<BlockTy<N>>()
159 .with_recovered_senders()
160 .ok_or(BlockValidationError::SenderRecoveryError)?,
161 merkle_block_td + block.difficulty,
162 )
163 .into(),
164 )?;
165 let execution_outcome = ExecutionOutcome::from((block_execution_output, block.number));
166
167 let (in_memory_state_root, in_memory_updates) = StateRoot::overlay_root_with_updates(
169 provider.tx_ref(),
170 state_provider.hashed_post_state(execution_outcome.state()),
171 )?;
172
173 if in_memory_state_root == block.state_root {
174 info!(target: "reth::cli", state_root = ?in_memory_state_root, "Computed in-memory state root matches");
175 return Ok(())
176 }
177
178 let provider_rw = provider_factory.database_provider_rw()?;
179
180 provider_rw.insert_historical_block(
182 block
183 .clone()
184 .try_seal_with_senders()
185 .map_err(|_| BlockValidationError::SenderRecoveryError)?,
186 )?;
187 provider_rw.write_state(
188 execution_outcome,
189 OriginalValuesKnown::No,
190 StorageLocation::Database,
191 )?;
192 let storage_lists = provider_rw.changed_storages_with_range(block.number..=block.number)?;
193 let storages = provider_rw.plain_state_storages(storage_lists)?;
194 provider_rw.insert_storage_for_hashing(storages)?;
195 let account_lists = provider_rw.changed_accounts_with_range(block.number..=block.number)?;
196 let accounts = provider_rw.basic_accounts(account_lists)?;
197 provider_rw.insert_account_for_hashing(accounts)?;
198
199 let (state_root, incremental_trie_updates) = StateRoot::incremental_root_with_updates(
200 provider_rw.tx_ref(),
201 block.number..=block.number,
202 )?;
203 if state_root != block.state_root {
204 eyre::bail!(
205 "Computed incremental state root mismatch. Expected: {:?}. Got: {:?}",
206 block.state_root,
207 state_root
208 );
209 }
210
211 let mut in_mem_mismatched = Vec::new();
213 let mut incremental_mismatched = Vec::new();
214 let mut in_mem_updates_iter = in_memory_updates.account_nodes_ref().iter().peekable();
215 let mut incremental_updates_iter =
216 incremental_trie_updates.account_nodes_ref().iter().peekable();
217
218 while in_mem_updates_iter.peek().is_some() || incremental_updates_iter.peek().is_some() {
219 match (in_mem_updates_iter.next(), incremental_updates_iter.next()) {
220 (Some(in_mem), Some(incr)) => {
221 similar_asserts::assert_eq!(in_mem.0, incr.0, "Nibbles don't match");
222 if in_mem.1 != incr.1 &&
223 in_mem.0.len() > self.skip_node_depth.unwrap_or_default()
224 {
225 in_mem_mismatched.push(in_mem);
226 incremental_mismatched.push(incr);
227 }
228 }
229 (Some(in_mem), None) => {
230 warn!(target: "reth::cli", next = ?in_mem, "In-memory trie updates have more entries");
231 }
232 (None, Some(incr)) => {
233 tracing::warn!(target: "reth::cli", next = ?incr, "Incremental trie updates have more entries");
234 }
235 (None, None) => {
236 tracing::info!(target: "reth::cli", "Exhausted all trie updates entries");
237 }
238 }
239 }
240
241 similar_asserts::assert_eq!(
242 incremental_mismatched,
243 in_mem_mismatched,
244 "Mismatched trie updates"
245 );
246
247 drop(provider_rw);
249
250 Ok(())
251 }
252}