reth_cli_commands/
download.rs1use crate::common::EnvironmentArgs;
2use clap::Parser;
3use eyre::Result;
4use lz4::Decoder;
5use reqwest::Client;
6use reth_chainspec::{EthChainSpec, EthereumHardforks};
7use reth_cli::chainspec::ChainSpecParser;
8use reth_fs_util as fs;
9use std::{
10 io::{self, Read, Write},
11 path::Path,
12 sync::Arc,
13 time::{Duration, Instant},
14};
15use tar::Archive;
16use tokio::task;
17use tracing::info;
18
19const BYTE_UNITS: [&str; 4] = ["B", "KB", "MB", "GB"];
20const MERKLE_BASE_URL: &str = "https://downloads.merkle.io";
21const EXTENSION_TAR_FILE: &str = ".tar.lz4";
22
23#[derive(Debug, Parser)]
24pub struct DownloadCommand<C: ChainSpecParser> {
25 #[command(flatten)]
26 env: EnvironmentArgs<C>,
27
28 #[arg(
29 long,
30 short,
31 help = "Custom URL to download the snapshot from",
32 long_help = "Specify a snapshot URL or let the command propose a default one.\n\
33 \n\
34 Available snapshot sources:\n\
35 - https://downloads.merkle.io (default, mainnet archive)\n\
36 - https://publicnode.com/snapshots (full nodes & testnets)\n\
37 \n\
38 If no URL is provided, the latest mainnet archive snapshot\n\
39 will be proposed for download from merkle.io"
40 )]
41 url: Option<String>,
42}
43
44impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCommand<C> {
45 pub async fn execute<N>(self) -> Result<()> {
46 let data_dir = self.env.datadir.resolve_datadir(self.env.chain.chain());
47 fs::create_dir_all(&data_dir)?;
48
49 let url = match self.url {
50 Some(url) => url,
51 None => {
52 let url = get_latest_snapshot_url().await?;
53 info!(target: "reth::cli", "Using default snapshot URL: {}", url);
54 url
55 }
56 };
57
58 info!(target: "reth::cli",
59 chain = %self.env.chain.chain(),
60 dir = ?data_dir.data_dir(),
61 url = %url,
62 "Starting snapshot download and extraction"
63 );
64
65 stream_and_extract(&url, data_dir.data_dir()).await?;
66 info!(target: "reth::cli", "Snapshot downloaded and extracted successfully");
67
68 Ok(())
69 }
70}
71
72impl<C: ChainSpecParser> DownloadCommand<C> {
73 pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
75 Some(&self.env.chain)
76 }
77}
78
79struct DownloadProgress {
82 downloaded: u64,
83 total_size: u64,
84 last_displayed: Instant,
85}
86
87impl DownloadProgress {
88 fn new(total_size: u64) -> Self {
90 Self { downloaded: 0, total_size, last_displayed: Instant::now() }
91 }
92
93 fn format_size(size: u64) -> String {
95 let mut size = size as f64;
96 let mut unit_index = 0;
97
98 while size >= 1024.0 && unit_index < BYTE_UNITS.len() - 1 {
99 size /= 1024.0;
100 unit_index += 1;
101 }
102
103 format!("{:.2} {}", size, BYTE_UNITS[unit_index])
104 }
105
106 fn update(&mut self, chunk_size: u64) -> Result<()> {
108 self.downloaded += chunk_size;
109
110 if self.last_displayed.elapsed() >= Duration::from_millis(100) {
112 let formatted_downloaded = Self::format_size(self.downloaded);
113 let formatted_total = Self::format_size(self.total_size);
114 let progress = (self.downloaded as f64 / self.total_size as f64) * 100.0;
115
116 print!(
117 "\rDownloading and extracting... {progress:.2}% ({formatted_downloaded} / {formatted_total})",
118 );
119 io::stdout().flush()?;
120 self.last_displayed = Instant::now();
121 }
122
123 Ok(())
124 }
125}
126
127struct ProgressReader<R> {
129 reader: R,
130 progress: DownloadProgress,
131}
132
133impl<R: Read> ProgressReader<R> {
134 fn new(reader: R, total_size: u64) -> Self {
135 Self { reader, progress: DownloadProgress::new(total_size) }
136 }
137}
138
139impl<R: Read> Read for ProgressReader<R> {
140 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
141 let bytes = self.reader.read(buf)?;
142 if bytes > 0 {
143 if let Err(e) = self.progress.update(bytes as u64) {
144 return Err(io::Error::other(e));
145 }
146 }
147 Ok(bytes)
148 }
149}
150
151fn blocking_download_and_extract(url: &str, target_dir: &Path) -> Result<()> {
153 let client = reqwest::blocking::Client::builder().build()?;
154 let response = client.get(url).send()?.error_for_status()?;
155
156 let total_size = response.content_length().ok_or_else(|| {
157 eyre::eyre!(
158 "Server did not provide Content-Length header. This is required for snapshot downloads"
159 )
160 })?;
161
162 let progress_reader = ProgressReader::new(response, total_size);
163
164 let decoder = Decoder::new(progress_reader)?;
165 let mut archive = Archive::new(decoder);
166
167 archive.unpack(target_dir)?;
168
169 info!(target: "reth::cli", "Extraction complete.");
170 Ok(())
171}
172
173async fn stream_and_extract(url: &str, target_dir: &Path) -> Result<()> {
174 let target_dir = target_dir.to_path_buf();
175 let url = url.to_string();
176 task::spawn_blocking(move || blocking_download_and_extract(&url, &target_dir)).await??;
177
178 Ok(())
179}
180
181async fn get_latest_snapshot_url() -> Result<String> {
183 let latest_url = format!("{MERKLE_BASE_URL}/latest.txt");
184 let filename = Client::new()
185 .get(latest_url)
186 .send()
187 .await?
188 .error_for_status()?
189 .text()
190 .await?
191 .trim()
192 .to_string();
193
194 if !filename.ends_with(EXTENSION_TAR_FILE) {
195 return Err(eyre::eyre!("Unexpected snapshot filename format: {}", filename));
196 }
197
198 Ok(format!("{MERKLE_BASE_URL}/{filename}"))
199}