reth_cli_commands/
download.rs

1use crate::common::EnvironmentArgs;
2use clap::Parser;
3use eyre::Result;
4use lz4::Decoder;
5use reqwest::Client;
6use reth_chainspec::{EthChainSpec, EthereumHardforks};
7use reth_cli::chainspec::ChainSpecParser;
8use reth_fs_util as fs;
9use std::{
10    io::{self, Read, Write},
11    path::Path,
12    sync::Arc,
13    time::{Duration, Instant},
14};
15use tar::Archive;
16use tokio::task;
17use tracing::info;
18
19const BYTE_UNITS: [&str; 4] = ["B", "KB", "MB", "GB"];
20const MERKLE_BASE_URL: &str = "https://downloads.merkle.io";
21const EXTENSION_TAR_FILE: &str = ".tar.lz4";
22
23#[derive(Debug, Parser)]
24pub struct DownloadCommand<C: ChainSpecParser> {
25    #[command(flatten)]
26    env: EnvironmentArgs<C>,
27
28    #[arg(
29        long,
30        short,
31        help = "Custom URL to download the snapshot from",
32        long_help = "Specify a snapshot URL or let the command propose a default one.\n\
33        \n\
34        Available snapshot sources:\n\
35        - https://downloads.merkle.io (default, mainnet archive)\n\
36        - https://publicnode.com/snapshots (full nodes & testnets)\n\
37        \n\
38        If no URL is provided, the latest mainnet archive snapshot\n\
39        will be proposed for download from merkle.io"
40    )]
41    url: Option<String>,
42}
43
44impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCommand<C> {
45    pub async fn execute<N>(self) -> Result<()> {
46        let data_dir = self.env.datadir.resolve_datadir(self.env.chain.chain());
47        fs::create_dir_all(&data_dir)?;
48
49        let url = match self.url {
50            Some(url) => url,
51            None => {
52                let url = get_latest_snapshot_url().await?;
53                info!(target: "reth::cli", "Using default snapshot URL: {}", url);
54                url
55            }
56        };
57
58        info!(target: "reth::cli",
59            chain = %self.env.chain.chain(),
60            dir = ?data_dir.data_dir(),
61            url = %url,
62            "Starting snapshot download and extraction"
63        );
64
65        stream_and_extract(&url, data_dir.data_dir()).await?;
66        info!(target: "reth::cli", "Snapshot downloaded and extracted successfully");
67
68        Ok(())
69    }
70}
71
72impl<C: ChainSpecParser> DownloadCommand<C> {
73    /// Returns the underlying chain being used to run this command
74    pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
75        Some(&self.env.chain)
76    }
77}
78
79// Monitor process status and display progress every 100ms
80// to avoid overwhelming stdout
81struct DownloadProgress {
82    downloaded: u64,
83    total_size: u64,
84    last_displayed: Instant,
85}
86
87impl DownloadProgress {
88    /// Creates new progress tracker with given total size
89    fn new(total_size: u64) -> Self {
90        Self { downloaded: 0, total_size, last_displayed: Instant::now() }
91    }
92
93    /// Converts bytes to human readable format (B, KB, MB, GB)
94    fn format_size(size: u64) -> String {
95        let mut size = size as f64;
96        let mut unit_index = 0;
97
98        while size >= 1024.0 && unit_index < BYTE_UNITS.len() - 1 {
99            size /= 1024.0;
100            unit_index += 1;
101        }
102
103        format!("{:.2} {}", size, BYTE_UNITS[unit_index])
104    }
105
106    /// Updates progress bar
107    fn update(&mut self, chunk_size: u64) -> Result<()> {
108        self.downloaded += chunk_size;
109
110        // Only update display at most 10 times per second for efficiency
111        if self.last_displayed.elapsed() >= Duration::from_millis(100) {
112            let formatted_downloaded = Self::format_size(self.downloaded);
113            let formatted_total = Self::format_size(self.total_size);
114            let progress = (self.downloaded as f64 / self.total_size as f64) * 100.0;
115
116            print!(
117                "\rDownloading and extracting... {progress:.2}% ({formatted_downloaded} / {formatted_total})",
118            );
119            io::stdout().flush()?;
120            self.last_displayed = Instant::now();
121        }
122
123        Ok(())
124    }
125}
126
127/// Adapter to track progress while reading
128struct ProgressReader<R> {
129    reader: R,
130    progress: DownloadProgress,
131}
132
133impl<R: Read> ProgressReader<R> {
134    fn new(reader: R, total_size: u64) -> Self {
135        Self { reader, progress: DownloadProgress::new(total_size) }
136    }
137}
138
139impl<R: Read> Read for ProgressReader<R> {
140    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
141        let bytes = self.reader.read(buf)?;
142        if bytes > 0 {
143            if let Err(e) = self.progress.update(bytes as u64) {
144                return Err(io::Error::other(e));
145            }
146        }
147        Ok(bytes)
148    }
149}
150
151/// Downloads and extracts a snapshot with blocking approach
152fn blocking_download_and_extract(url: &str, target_dir: &Path) -> Result<()> {
153    let client = reqwest::blocking::Client::builder().build()?;
154    let response = client.get(url).send()?.error_for_status()?;
155
156    let total_size = response.content_length().ok_or_else(|| {
157        eyre::eyre!(
158            "Server did not provide Content-Length header. This is required for snapshot downloads"
159        )
160    })?;
161
162    let progress_reader = ProgressReader::new(response, total_size);
163
164    let decoder = Decoder::new(progress_reader)?;
165    let mut archive = Archive::new(decoder);
166
167    archive.unpack(target_dir)?;
168
169    info!(target: "reth::cli", "Extraction complete.");
170    Ok(())
171}
172
173async fn stream_and_extract(url: &str, target_dir: &Path) -> Result<()> {
174    let target_dir = target_dir.to_path_buf();
175    let url = url.to_string();
176    task::spawn_blocking(move || blocking_download_and_extract(&url, &target_dir)).await??;
177
178    Ok(())
179}
180
181// Builds default URL for latest mainnet archive  snapshot
182async fn get_latest_snapshot_url() -> Result<String> {
183    let latest_url = format!("{MERKLE_BASE_URL}/latest.txt");
184    let filename = Client::new()
185        .get(latest_url)
186        .send()
187        .await?
188        .error_for_status()?
189        .text()
190        .await?
191        .trim()
192        .to_string();
193
194    if !filename.ends_with(EXTENSION_TAR_FILE) {
195        return Err(eyre::eyre!("Unexpected snapshot filename format: {}", filename));
196    }
197
198    Ok(format!("{MERKLE_BASE_URL}/{filename}"))
199}