reth_era_downloader/
client.rs1use alloy_primitives::{hex, hex::ToHexExt};
2use bytes::Bytes;
3use eyre::{eyre, OptionExt};
4use futures_util::{stream::StreamExt, Stream, TryStreamExt};
5use reqwest::{Client, IntoUrl, Url};
6use sha2::{Digest, Sha256};
7use std::{future::Future, path::Path, str::FromStr};
8use tokio::{
9 fs::{self, File},
10 io::{self, AsyncBufReadExt, AsyncWriteExt},
11 join, try_join,
12};
13
14pub trait HttpClient {
16 fn get<U: IntoUrl + Send + Sync>(
18 &self,
19 url: U,
20 ) -> impl Future<
21 Output = eyre::Result<impl Stream<Item = eyre::Result<Bytes>> + Send + Sync + Unpin>,
22 > + Send
23 + Sync;
24}
25
26impl HttpClient for Client {
27 async fn get<U: IntoUrl + Send + Sync>(
28 &self,
29 url: U,
30 ) -> eyre::Result<impl Stream<Item = eyre::Result<Bytes>> + Unpin> {
31 let response = Self::get(self, url).send().await?;
32
33 Ok(response.bytes_stream().map_err(|e| eyre::Error::new(e)))
34 }
35}
36
37#[derive(Debug, Clone)]
40pub struct EraClient<Http> {
41 client: Http,
42 url: Url,
43 folder: Box<Path>,
44}
45
46impl<Http: HttpClient + Clone> EraClient<Http> {
47 const CHECKSUMS: &'static str = "checksums.txt";
48
49 pub const fn new(client: Http, url: Url, folder: Box<Path>) -> Self {
51 Self { client, url, folder }
52 }
53
54 pub async fn download_to_file(&mut self, url: impl IntoUrl) -> eyre::Result<Box<Path>> {
57 let path = self.folder.to_path_buf();
58
59 let url = url.into_url()?;
60 let client = self.client.clone();
61 let file_name = url
62 .path_segments()
63 .ok_or_eyre("cannot-be-a-base")?
64 .next_back()
65 .ok_or_eyre("empty path segments")?;
66 let path = path.join(file_name);
67
68 let number =
69 self.file_name_to_number(file_name).ok_or_eyre("Cannot parse number from file name")?;
70 let mut stream = client.get(url).await?;
71 let mut file = File::create(&path).await?;
72 let mut hasher = Sha256::new();
73
74 while let Some(item) = stream.next().await.transpose()? {
75 io::copy(&mut item.as_ref(), &mut file).await?;
76 hasher.update(item);
77 }
78
79 let actual_checksum = hasher.finalize().to_vec();
80
81 let file = File::open(self.folder.join(Self::CHECKSUMS)).await?;
82 let reader = io::BufReader::new(file);
83 let mut lines = reader.lines();
84
85 for _ in 0..number {
86 lines.next_line().await?;
87 }
88 let expected_checksum =
89 lines.next_line().await?.ok_or_else(|| eyre!("Missing hash for number {number}"))?;
90 let expected_checksum = hex::decode(expected_checksum)?;
91
92 if actual_checksum != expected_checksum {
93 return Err(eyre!(
94 "Checksum mismatch, got: {}, expected: {}",
95 actual_checksum.encode_hex(),
96 expected_checksum.encode_hex()
97 ));
98 }
99
100 Ok(path.into_boxed_path())
101 }
102
103 pub async fn recover_index(&self) -> u64 {
105 let mut max = None;
106
107 if let Ok(mut dir) = fs::read_dir(&self.folder).await {
108 while let Ok(Some(entry)) = dir.next_entry().await {
109 if let Some(name) = entry.file_name().to_str() {
110 if let Some(number) = self.file_name_to_number(name) {
111 if max.is_none() || matches!(max, Some(max) if number > max) {
112 max.replace(number);
113 }
114 }
115 }
116 }
117 }
118
119 max.map(|v| v + 1).unwrap_or(0)
120 }
121
122 pub async fn url(&self, number: u64) -> eyre::Result<Option<Url>> {
124 Ok(self.number_to_file_name(number).await?.map(|name| self.url.join(&name)).transpose()?)
125 }
126
127 pub async fn files_count(&self) -> usize {
129 let mut count = 0usize;
130
131 if let Ok(mut dir) = fs::read_dir(&self.folder).await {
132 while let Ok(Some(entry)) = dir.next_entry().await {
133 if entry.path().extension() == Some("era1".as_ref()) {
134 count += 1;
135 }
136 }
137 }
138
139 count
140 }
141
142 pub async fn fetch_file_list(&self) -> eyre::Result<()> {
144 let (mut index, mut checksums) = try_join!(
145 self.client.get(self.url.clone().join("index.html")?),
146 self.client.get(self.url.clone().join(Self::CHECKSUMS)?),
147 )?;
148
149 let index_path = self.folder.to_path_buf().join("index.html");
150 let checksums_path = self.folder.to_path_buf().join(Self::CHECKSUMS);
151
152 let (mut index_file, mut checksums_file) =
153 try_join!(File::create(&index_path), File::create(&checksums_path))?;
154
155 loop {
156 let (index, checksums) = join!(index.next(), checksums.next());
157 let (index, checksums) = (index.transpose()?, checksums.transpose()?);
158
159 if index.is_none() && checksums.is_none() {
160 break;
161 }
162 let index_file = &mut index_file;
163 let checksums_file = &mut checksums_file;
164
165 try_join!(
166 async move {
167 if let Some(index) = index {
168 io::copy(&mut index.as_ref(), index_file).await?;
169 }
170 Ok::<(), eyre::Error>(())
171 },
172 async move {
173 if let Some(checksums) = checksums {
174 io::copy(&mut checksums.as_ref(), checksums_file).await?;
175 }
176 Ok::<(), eyre::Error>(())
177 },
178 )?;
179 }
180
181 let file = File::open(&index_path).await?;
182 let reader = io::BufReader::new(file);
183 let mut lines = reader.lines();
184
185 let path = self.folder.to_path_buf().join("index");
186 let file = File::create(&path).await?;
187 let mut writer = io::BufWriter::new(file);
188
189 while let Some(line) = lines.next_line().await? {
190 if let Some(j) = line.find(".era1") {
191 if let Some(i) = line[..j].rfind(|c: char| !c.is_alphanumeric() && c != '-') {
192 let era = &line[i + 1..j + 5];
193 writer.write_all(era.as_bytes()).await?;
194 writer.write_all(b"\n").await?;
195 }
196 }
197 }
198 writer.flush().await?;
199
200 Ok(())
201 }
202
203 pub async fn number_to_file_name(&self, number: u64) -> eyre::Result<Option<String>> {
205 let path = self.folder.to_path_buf().join("index");
206 let file = File::open(&path).await?;
207 let reader = io::BufReader::new(file);
208 let mut lines = reader.lines();
209 for _ in 0..number {
210 lines.next_line().await?;
211 }
212
213 Ok(lines.next_line().await?)
214 }
215
216 fn file_name_to_number(&self, file_name: &str) -> Option<u64> {
217 file_name.split('-').nth(1).and_then(|v| u64::from_str(v).ok())
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use super::*;
224 use std::path::PathBuf;
225 use test_case::test_case;
226
227 impl EraClient<Client> {
228 fn empty() -> Self {
229 Self::new(
230 Client::new(),
231 Url::from_str("file:///").unwrap(),
232 PathBuf::new().into_boxed_path(),
233 )
234 }
235 }
236
237 #[test_case("mainnet-00600-a81ae85f.era1", Some(600))]
238 #[test_case("mainnet-00000-a81ae85f.era1", Some(0))]
239 #[test_case("00000-a81ae85f.era1", None)]
240 #[test_case("", None)]
241 fn test_file_name_to_number(file_name: &str, expected_number: Option<u64>) {
242 let client = EraClient::empty();
243
244 let actual_number = client.file_name_to_number(file_name);
245
246 assert_eq!(actual_number, expected_number);
247 }
248}