reth_cli_commands/
import.rs

1//! Command that initializes the node by importing a chain from a file.
2use crate::common::{AccessRights, CliNodeComponents, CliNodeTypes, Environment, EnvironmentArgs};
3use alloy_primitives::B256;
4use clap::Parser;
5use futures::{Stream, StreamExt};
6use reth_chainspec::{EthChainSpec, EthereumHardforks};
7use reth_cli::chainspec::ChainSpecParser;
8use reth_config::Config;
9use reth_consensus::{ConsensusError, FullConsensus};
10use reth_db_api::{tables, transaction::DbTx};
11use reth_downloaders::{
12    bodies::bodies::BodiesDownloaderBuilder,
13    file_client::{ChunkedFileReader, FileClient, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE},
14    headers::reverse_headers::ReverseHeadersDownloaderBuilder,
15};
16use reth_evm::ConfigureEvm;
17use reth_network_p2p::{
18    bodies::downloader::BodyDownloader,
19    headers::downloader::{HeaderDownloader, SyncTarget},
20};
21use reth_node_api::BlockTy;
22use reth_node_core::version::SHORT_VERSION;
23use reth_node_events::node::NodeEvent;
24use reth_provider::{
25    providers::ProviderNodeTypes, BlockNumReader, ChainSpecProvider, HeaderProvider, ProviderError,
26    ProviderFactory, StageCheckpointReader,
27};
28use reth_prune::PruneModes;
29use reth_stages::{prelude::*, Pipeline, StageId, StageSet};
30use reth_static_file::StaticFileProducer;
31use std::{path::PathBuf, sync::Arc};
32use tokio::sync::watch;
33use tracing::{debug, error, info};
34
35/// Syncs RLP encoded blocks from a file.
36#[derive(Debug, Parser)]
37pub struct ImportCommand<C: ChainSpecParser> {
38    #[command(flatten)]
39    env: EnvironmentArgs<C>,
40
41    /// Disables stages that require state.
42    #[arg(long, verbatim_doc_comment)]
43    no_state: bool,
44
45    /// Chunk byte length to read from file.
46    #[arg(long, value_name = "CHUNK_LEN", verbatim_doc_comment)]
47    chunk_len: Option<u64>,
48
49    /// The path to a block file for import.
50    ///
51    /// The online stages (headers and bodies) are replaced by a file import, after which the
52    /// remaining stages are executed.
53    #[arg(value_name = "IMPORT_PATH", verbatim_doc_comment)]
54    path: PathBuf,
55}
56
57impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> ImportCommand<C> {
58    /// Execute `import` command
59    pub async fn execute<N, Comp, F>(self, components: F) -> eyre::Result<()>
60    where
61        N: CliNodeTypes<ChainSpec = C::ChainSpec>,
62        Comp: CliNodeComponents<N>,
63        F: FnOnce(Arc<N::ChainSpec>) -> Comp,
64    {
65        info!(target: "reth::cli", "reth {} starting", SHORT_VERSION);
66
67        if self.no_state {
68            info!(target: "reth::cli", "Disabled stages requiring state");
69        }
70
71        debug!(target: "reth::cli",
72            chunk_byte_len=self.chunk_len.unwrap_or(DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE),
73            "Chunking chain import"
74        );
75
76        let Environment { provider_factory, config, .. } = self.env.init::<N>(AccessRights::RW)?;
77
78        let components = components(provider_factory.chain_spec());
79        let executor = components.evm_config().clone();
80        let consensus = Arc::new(components.consensus().clone());
81        info!(target: "reth::cli", "Consensus engine initialized");
82
83        // open file
84        let mut reader = ChunkedFileReader::new(&self.path, self.chunk_len).await?;
85
86        let mut total_decoded_blocks = 0;
87        let mut total_decoded_txns = 0;
88
89        let mut sealed_header = provider_factory
90            .sealed_header(provider_factory.last_block_number()?)?
91            .expect("should have genesis");
92
93        while let Some(file_client) =
94            reader.next_chunk::<BlockTy<N>>(consensus.clone(), Some(sealed_header)).await?
95        {
96            // create a new FileClient from chunk read from file
97            info!(target: "reth::cli",
98                "Importing chain file chunk"
99            );
100
101            let tip = file_client.tip().ok_or(eyre::eyre!("file client has no tip"))?;
102            info!(target: "reth::cli", "Chain file chunk read");
103
104            total_decoded_blocks += file_client.headers_len();
105            total_decoded_txns += file_client.total_transactions();
106
107            let (mut pipeline, events) = build_import_pipeline(
108                &config,
109                provider_factory.clone(),
110                &consensus,
111                Arc::new(file_client),
112                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
113                self.no_state,
114                executor.clone(),
115            )?;
116
117            // override the tip
118            pipeline.set_tip(tip);
119            debug!(target: "reth::cli", ?tip, "Tip manually set");
120
121            let provider = provider_factory.provider()?;
122
123            let latest_block_number =
124                provider.get_stage_checkpoint(StageId::Finish)?.map(|ch| ch.block_number);
125            tokio::spawn(reth_node_events::node::handle_events(None, latest_block_number, events));
126
127            // Run pipeline
128            info!(target: "reth::cli", "Starting sync pipeline");
129            tokio::select! {
130                res = pipeline.run() => res?,
131                _ = tokio::signal::ctrl_c() => {},
132            }
133
134            sealed_header = provider_factory
135                .sealed_header(provider_factory.last_block_number()?)?
136                .expect("should have genesis");
137        }
138
139        let provider = provider_factory.provider()?;
140
141        let total_imported_blocks = provider.tx_ref().entries::<tables::HeaderNumbers>()?;
142        let total_imported_txns = provider.tx_ref().entries::<tables::TransactionHashNumbers>()?;
143
144        if total_decoded_blocks != total_imported_blocks ||
145            total_decoded_txns != total_imported_txns
146        {
147            error!(target: "reth::cli",
148                total_decoded_blocks,
149                total_imported_blocks,
150                total_decoded_txns,
151                total_imported_txns,
152                "Chain was partially imported"
153            );
154        }
155
156        info!(target: "reth::cli",
157            total_imported_blocks,
158            total_imported_txns,
159            "Chain file imported"
160        );
161
162        Ok(())
163    }
164}
165
166impl<C: ChainSpecParser> ImportCommand<C> {
167    /// Returns the underlying chain being used to run this command
168    pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
169        Some(&self.env.chain)
170    }
171}
172
173/// Builds import pipeline.
174///
175/// If configured to execute, all stages will run. Otherwise, only stages that don't require state
176/// will run.
177pub fn build_import_pipeline<N, C, E>(
178    config: &Config,
179    provider_factory: ProviderFactory<N>,
180    consensus: &Arc<C>,
181    file_client: Arc<FileClient<BlockTy<N>>>,
182    static_file_producer: StaticFileProducer<ProviderFactory<N>>,
183    disable_exec: bool,
184    evm_config: E,
185) -> eyre::Result<(Pipeline<N>, impl Stream<Item = NodeEvent<N::Primitives>>)>
186where
187    N: ProviderNodeTypes,
188    C: FullConsensus<N::Primitives, Error = ConsensusError> + 'static,
189    E: ConfigureEvm<Primitives = N::Primitives> + 'static,
190{
191    if !file_client.has_canonical_blocks() {
192        eyre::bail!("unable to import non canonical blocks");
193    }
194
195    // Retrieve latest header found in the database.
196    let last_block_number = provider_factory.last_block_number()?;
197    let local_head = provider_factory
198        .sealed_header(last_block_number)?
199        .ok_or_else(|| ProviderError::HeaderNotFound(last_block_number.into()))?;
200
201    let mut header_downloader = ReverseHeadersDownloaderBuilder::new(config.stages.headers)
202        .build(file_client.clone(), consensus.clone())
203        .into_task();
204    // TODO: The pipeline should correctly configure the downloader on its own.
205    // Find the possibility to remove unnecessary pre-configuration.
206    header_downloader.update_local_head(local_head);
207    header_downloader.update_sync_target(SyncTarget::Tip(file_client.tip().unwrap()));
208
209    let mut body_downloader = BodiesDownloaderBuilder::new(config.stages.bodies)
210        .build(file_client.clone(), consensus.clone(), provider_factory.clone())
211        .into_task();
212    // TODO: The pipeline should correctly configure the downloader on its own.
213    // Find the possibility to remove unnecessary pre-configuration.
214    body_downloader
215        .set_download_range(file_client.min_block().unwrap()..=file_client.max_block().unwrap())
216        .expect("failed to set download range");
217
218    let (tip_tx, tip_rx) = watch::channel(B256::ZERO);
219
220    let max_block = file_client.max_block().unwrap_or(0);
221
222    let pipeline = Pipeline::builder()
223        .with_tip_sender(tip_tx)
224        // we want to sync all blocks the file client provides or 0 if empty
225        .with_max_block(max_block)
226        .with_fail_on_unwind(true)
227        .add_stages(
228            DefaultStages::new(
229                provider_factory.clone(),
230                tip_rx,
231                consensus.clone(),
232                header_downloader,
233                body_downloader,
234                evm_config,
235                config.stages.clone(),
236                PruneModes::default(),
237            )
238            .builder()
239            .disable_all_if(&StageId::STATE_REQUIRED, || disable_exec),
240        )
241        .build(provider_factory, static_file_producer);
242
243    let events = pipeline.events().map(Into::into);
244
245    Ok((pipeline, events))
246}
247
248#[cfg(test)]
249mod tests {
250    use super::*;
251    use reth_ethereum_cli::chainspec::{EthereumChainSpecParser, SUPPORTED_CHAINS};
252
253    #[test]
254    fn parse_common_import_command_chain_args() {
255        for chain in SUPPORTED_CHAINS {
256            let args: ImportCommand<EthereumChainSpecParser> =
257                ImportCommand::parse_from(["reth", "--chain", chain, "."]);
258            assert_eq!(
259                Ok(args.env.chain.chain),
260                chain.parse::<reth_chainspec::Chain>(),
261                "failed to parse chain {chain}"
262            );
263        }
264    }
265}