reth_era_downloader/
client.rs

1use alloy_primitives::{hex, hex::ToHexExt};
2use bytes::Bytes;
3use eyre::{eyre, OptionExt};
4use futures_util::{stream::StreamExt, Stream, TryStreamExt};
5use reqwest::{Client, IntoUrl, Url};
6use sha2::{Digest, Sha256};
7use std::{future::Future, path::Path, str::FromStr};
8use tokio::{
9    fs::{self, File},
10    io::{self, AsyncBufReadExt, AsyncWriteExt},
11    join, try_join,
12};
13
14/// Accesses the network over HTTP.
15pub trait HttpClient {
16    /// Makes an HTTP GET request to `url`. Returns a stream of response body bytes.
17    fn get<U: IntoUrl + Send + Sync>(
18        &self,
19        url: U,
20    ) -> impl Future<
21        Output = eyre::Result<impl Stream<Item = eyre::Result<Bytes>> + Send + Sync + Unpin>,
22    > + Send
23           + Sync;
24}
25
26impl HttpClient for Client {
27    async fn get<U: IntoUrl + Send + Sync>(
28        &self,
29        url: U,
30    ) -> eyre::Result<impl Stream<Item = eyre::Result<Bytes>> + Unpin> {
31        let response = Self::get(self, url).send().await?;
32
33        Ok(response.bytes_stream().map_err(|e| eyre::Error::new(e)))
34    }
35}
36
37/// An HTTP client with features for downloading ERA files from an external HTTP accessible
38/// endpoint.
39#[derive(Debug, Clone)]
40pub struct EraClient<Http> {
41    client: Http,
42    url: Url,
43    folder: Box<Path>,
44}
45
46impl<Http: HttpClient + Clone> EraClient<Http> {
47    const CHECKSUMS: &'static str = "checksums.txt";
48
49    /// Constructs [`EraClient`] using `client` to download from `url` into `folder`.
50    pub const fn new(client: Http, url: Url, folder: Box<Path>) -> Self {
51        Self { client, url, folder }
52    }
53
54    /// Performs a GET request on `url` and stores the response body into a file located within
55    /// the `folder`.
56    pub async fn download_to_file(&mut self, url: impl IntoUrl) -> eyre::Result<Box<Path>> {
57        let path = self.folder.to_path_buf();
58
59        let url = url.into_url()?;
60        let client = self.client.clone();
61        let file_name = url
62            .path_segments()
63            .ok_or_eyre("cannot-be-a-base")?
64            .next_back()
65            .ok_or_eyre("empty path segments")?;
66        let path = path.join(file_name);
67
68        let number =
69            self.file_name_to_number(file_name).ok_or_eyre("Cannot parse number from file name")?;
70        let mut stream = client.get(url).await?;
71        let mut file = File::create(&path).await?;
72        let mut hasher = Sha256::new();
73
74        while let Some(item) = stream.next().await.transpose()? {
75            io::copy(&mut item.as_ref(), &mut file).await?;
76            hasher.update(item);
77        }
78
79        let actual_checksum = hasher.finalize().to_vec();
80
81        let file = File::open(self.folder.join(Self::CHECKSUMS)).await?;
82        let reader = io::BufReader::new(file);
83        let mut lines = reader.lines();
84
85        for _ in 0..number {
86            lines.next_line().await?;
87        }
88        let expected_checksum =
89            lines.next_line().await?.ok_or_else(|| eyre!("Missing hash for number {number}"))?;
90        let expected_checksum = hex::decode(expected_checksum)?;
91
92        if actual_checksum != expected_checksum {
93            return Err(eyre!(
94                "Checksum mismatch, got: {}, expected: {}",
95                actual_checksum.encode_hex(),
96                expected_checksum.encode_hex()
97            ));
98        }
99
100        Ok(path.into_boxed_path())
101    }
102
103    /// Recovers index of file following the latest downloaded file from a different run.
104    pub async fn recover_index(&self) -> u64 {
105        let mut max = None;
106
107        if let Ok(mut dir) = fs::read_dir(&self.folder).await {
108            while let Ok(Some(entry)) = dir.next_entry().await {
109                if let Some(name) = entry.file_name().to_str() {
110                    if let Some(number) = self.file_name_to_number(name) {
111                        if max.is_none() || matches!(max, Some(max) if number > max) {
112                            max.replace(number);
113                        }
114                    }
115                }
116            }
117        }
118
119        max.map(|v| v + 1).unwrap_or(0)
120    }
121
122    /// Returns a download URL for the file corresponding to `number`.
123    pub async fn url(&self, number: u64) -> eyre::Result<Option<Url>> {
124        Ok(self.number_to_file_name(number).await?.map(|name| self.url.join(&name)).transpose()?)
125    }
126
127    /// Returns the number of files in the `folder`.
128    pub async fn files_count(&self) -> usize {
129        let mut count = 0usize;
130
131        if let Ok(mut dir) = fs::read_dir(&self.folder).await {
132            while let Ok(Some(entry)) = dir.next_entry().await {
133                if entry.path().extension() == Some("era1".as_ref()) {
134                    count += 1;
135                }
136            }
137        }
138
139        count
140    }
141
142    /// Fetches the list of ERA1 files from `url` and stores it in a file located within `folder`.
143    pub async fn fetch_file_list(&self) -> eyre::Result<()> {
144        let (mut index, mut checksums) = try_join!(
145            self.client.get(self.url.clone().join("index.html")?),
146            self.client.get(self.url.clone().join(Self::CHECKSUMS)?),
147        )?;
148
149        let index_path = self.folder.to_path_buf().join("index.html");
150        let checksums_path = self.folder.to_path_buf().join(Self::CHECKSUMS);
151
152        let (mut index_file, mut checksums_file) =
153            try_join!(File::create(&index_path), File::create(&checksums_path))?;
154
155        loop {
156            let (index, checksums) = join!(index.next(), checksums.next());
157            let (index, checksums) = (index.transpose()?, checksums.transpose()?);
158
159            if index.is_none() && checksums.is_none() {
160                break;
161            }
162            let index_file = &mut index_file;
163            let checksums_file = &mut checksums_file;
164
165            try_join!(
166                async move {
167                    if let Some(index) = index {
168                        io::copy(&mut index.as_ref(), index_file).await?;
169                    }
170                    Ok::<(), eyre::Error>(())
171                },
172                async move {
173                    if let Some(checksums) = checksums {
174                        io::copy(&mut checksums.as_ref(), checksums_file).await?;
175                    }
176                    Ok::<(), eyre::Error>(())
177                },
178            )?;
179        }
180
181        let file = File::open(&index_path).await?;
182        let reader = io::BufReader::new(file);
183        let mut lines = reader.lines();
184
185        let path = self.folder.to_path_buf().join("index");
186        let file = File::create(&path).await?;
187        let mut writer = io::BufWriter::new(file);
188
189        while let Some(line) = lines.next_line().await? {
190            if let Some(j) = line.find(".era1") {
191                if let Some(i) = line[..j].rfind(|c: char| !c.is_alphanumeric() && c != '-') {
192                    let era = &line[i + 1..j + 5];
193                    writer.write_all(era.as_bytes()).await?;
194                    writer.write_all(b"\n").await?;
195                }
196            }
197        }
198        writer.flush().await?;
199
200        Ok(())
201    }
202
203    /// Returns ERA1 file name that is ordered at `number`.
204    pub async fn number_to_file_name(&self, number: u64) -> eyre::Result<Option<String>> {
205        let path = self.folder.to_path_buf().join("index");
206        let file = File::open(&path).await?;
207        let reader = io::BufReader::new(file);
208        let mut lines = reader.lines();
209        for _ in 0..number {
210            lines.next_line().await?;
211        }
212
213        Ok(lines.next_line().await?)
214    }
215
216    fn file_name_to_number(&self, file_name: &str) -> Option<u64> {
217        file_name.split('-').nth(1).and_then(|v| u64::from_str(v).ok())
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224    use std::path::PathBuf;
225    use test_case::test_case;
226
227    impl EraClient<Client> {
228        fn empty() -> Self {
229            Self::new(
230                Client::new(),
231                Url::from_str("file:///").unwrap(),
232                PathBuf::new().into_boxed_path(),
233            )
234        }
235    }
236
237    #[test_case("mainnet-00600-a81ae85f.era1", Some(600))]
238    #[test_case("mainnet-00000-a81ae85f.era1", Some(0))]
239    #[test_case("00000-a81ae85f.era1", None)]
240    #[test_case("", None)]
241    fn test_file_name_to_number(file_name: &str, expected_number: Option<u64>) {
242        let client = EraClient::empty();
243
244        let actual_number = client.file_name_to_number(file_name);
245
246        assert_eq!(actual_number, expected_number);
247    }
248}