reth_downloaders/
file_client.rs

1use std::{collections::HashMap, io, path::Path};
2
3use alloy_consensus::BlockHeader;
4use alloy_eips::BlockHashOrNumber;
5use alloy_primitives::{BlockHash, BlockNumber, Sealable, B256};
6use futures::Future;
7use itertools::Either;
8use reth_network_p2p::{
9    bodies::client::{BodiesClient, BodiesFut},
10    download::DownloadClient,
11    error::RequestError,
12    headers::client::{HeadersClient, HeadersDirection, HeadersFut, HeadersRequest},
13    priority::Priority,
14};
15use reth_network_peers::PeerId;
16use reth_primitives::SealedHeader;
17use reth_primitives_traits::{Block, BlockBody, FullBlock};
18use thiserror::Error;
19use tokio::{fs::File, io::AsyncReadExt};
20use tokio_stream::StreamExt;
21use tokio_util::codec::FramedRead;
22use tracing::{debug, trace, warn};
23
24use super::file_codec::BlockFileCodec;
25use crate::receipt_file_client::FromReceiptReader;
26
27/// Default byte length of chunk to read from chain file.
28///
29/// Default is 1 GB.
30pub const DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE: u64 = 1_000_000_000;
31
32/// Front-end API for fetching chain data from a file.
33///
34/// Blocks are assumed to be written one after another in a file, as rlp bytes.
35///
36/// For example, if the file contains 3 blocks, the file is assumed to be encoded as follows:
37/// rlp(block1) || rlp(block2) || rlp(block3)
38///
39/// Blocks are assumed to have populated transactions, so reading headers will also buffer
40/// transactions in memory for use in the bodies stage.
41///
42/// This reads the entire file into memory, so it is not suitable for large files.
43#[derive(Debug)]
44pub struct FileClient<B: Block = reth_primitives::Block> {
45    /// The buffered headers retrieved when fetching new bodies.
46    headers: HashMap<BlockNumber, B::Header>,
47
48    /// A mapping between block hash and number.
49    hash_to_number: HashMap<BlockHash, BlockNumber>,
50
51    /// The buffered bodies retrieved when fetching new headers.
52    bodies: HashMap<BlockHash, B::Body>,
53}
54
55/// An error that can occur when constructing and using a [`FileClient`].
56#[derive(Debug, Error)]
57pub enum FileClientError {
58    /// An error occurred when opening or reading the file.
59    #[error(transparent)]
60    Io(#[from] std::io::Error),
61
62    /// An error occurred when decoding blocks, headers, or rlp headers from the file.
63    #[error("{0}")]
64    Rlp(alloy_rlp::Error, Vec<u8>),
65
66    /// Custom error message.
67    #[error("{0}")]
68    Custom(&'static str),
69}
70
71impl From<&'static str> for FileClientError {
72    fn from(value: &'static str) -> Self {
73        Self::Custom(value)
74    }
75}
76
77impl<B: FullBlock> FileClient<B> {
78    /// Create a new file client from a file path.
79    pub async fn new<P: AsRef<Path>>(path: P) -> Result<Self, FileClientError> {
80        let file = File::open(path).await?;
81        Self::from_file(file).await
82    }
83
84    /// Initialize the [`FileClient`] with a file directly.
85    pub(crate) async fn from_file(mut file: File) -> Result<Self, FileClientError> {
86        // get file len from metadata before reading
87        let metadata = file.metadata().await?;
88        let file_len = metadata.len();
89
90        let mut reader = vec![];
91        file.read_to_end(&mut reader).await?;
92
93        Ok(Self::from_reader(&reader[..], file_len).await?.file_client)
94    }
95
96    /// Get the tip hash of the chain.
97    pub fn tip(&self) -> Option<B256> {
98        self.headers.get(&self.max_block()?).map(|h| h.hash_slow())
99    }
100
101    /// Get the start hash of the chain.
102    pub fn start(&self) -> Option<B256> {
103        self.headers.get(&self.min_block()?).map(|h| h.hash_slow())
104    }
105
106    /// Returns the highest block number of this client has or `None` if empty
107    pub fn max_block(&self) -> Option<u64> {
108        self.headers.keys().max().copied()
109    }
110
111    /// Returns the lowest block number of this client has or `None` if empty
112    pub fn min_block(&self) -> Option<u64> {
113        self.headers.keys().min().copied()
114    }
115
116    /// Clones and returns the highest header of this client has or `None` if empty. Seals header
117    /// before returning.
118    pub fn tip_header(&self) -> Option<SealedHeader<B::Header>> {
119        self.headers.get(&self.max_block()?).map(|h| SealedHeader::seal(h.clone()))
120    }
121
122    /// Returns true if all blocks are canonical (no gaps)
123    pub fn has_canonical_blocks(&self) -> bool {
124        if self.headers.is_empty() {
125            return true
126        }
127        let mut nums = self.headers.keys().copied().collect::<Vec<_>>();
128        nums.sort_unstable();
129        let mut iter = nums.into_iter();
130        let mut lowest = iter.next().expect("not empty");
131        for next in iter {
132            if next != lowest + 1 {
133                return false
134            }
135            lowest = next;
136        }
137        true
138    }
139
140    /// Use the provided bodies as the file client's block body buffer.
141    pub fn with_bodies(mut self, bodies: HashMap<BlockHash, B::Body>) -> Self {
142        self.bodies = bodies;
143        self
144    }
145
146    /// Use the provided headers as the file client's block body buffer.
147    pub fn with_headers(mut self, headers: HashMap<BlockNumber, B::Header>) -> Self {
148        self.headers = headers;
149        for (number, header) in &self.headers {
150            self.hash_to_number.insert(header.hash_slow(), *number);
151        }
152        self
153    }
154
155    /// Returns the current number of headers in the client.
156    pub fn headers_len(&self) -> usize {
157        self.headers.len()
158    }
159
160    /// Returns the current number of bodies in the client.
161    pub fn bodies_len(&self) -> usize {
162        self.bodies.len()
163    }
164
165    /// Returns an iterator over headers in the client.
166    pub fn headers_iter(&self) -> impl Iterator<Item = &B::Header> {
167        self.headers.values()
168    }
169
170    /// Returns a mutable iterator over bodies in the client.
171    ///
172    /// Panics, if file client headers and bodies are not mapping 1-1.
173    pub fn bodies_iter_mut(&mut self) -> impl Iterator<Item = (u64, &mut B::Body)> {
174        let bodies = &mut self.bodies;
175        let numbers = &self.hash_to_number;
176        bodies.iter_mut().map(|(hash, body)| (numbers[hash], body))
177    }
178
179    /// Returns the current number of transactions in the client.
180    pub fn total_transactions(&self) -> usize {
181        self.bodies.iter().fold(0, |acc, (_, body)| acc + body.transactions().len())
182    }
183}
184
185impl<B: FullBlock> FromReader for FileClient<B> {
186    type Error = FileClientError;
187
188    /// Initialize the [`FileClient`] from bytes that have been read from file.
189    fn from_reader<R>(
190        reader: R,
191        num_bytes: u64,
192    ) -> impl Future<Output = Result<DecodedFileChunk<Self>, Self::Error>>
193    where
194        R: AsyncReadExt + Unpin,
195    {
196        let mut headers = HashMap::default();
197        let mut hash_to_number = HashMap::default();
198        let mut bodies = HashMap::default();
199
200        // use with_capacity to make sure the internal buffer contains the entire chunk
201        let mut stream =
202            FramedRead::with_capacity(reader, BlockFileCodec::<B>::default(), num_bytes as usize);
203
204        trace!(target: "downloaders::file",
205            target_num_bytes=num_bytes,
206            capacity=stream.read_buffer().capacity(),
207            "init decode stream"
208        );
209
210        let mut remaining_bytes = vec![];
211
212        let mut log_interval = 0;
213        let mut log_interval_start_block = 0;
214
215        async move {
216            while let Some(block_res) = stream.next().await {
217                let block = match block_res {
218                    Ok(block) => block,
219                    Err(FileClientError::Rlp(err, bytes)) => {
220                        trace!(target: "downloaders::file",
221                            %err,
222                            bytes_len=bytes.len(),
223                            "partial block returned from decoding chunk"
224                        );
225                        remaining_bytes = bytes;
226                        break
227                    }
228                    Err(err) => return Err(err),
229                };
230                let block_number = block.header().number();
231                let block_hash = block.header().hash_slow();
232
233                // add to the internal maps
234                headers.insert(block.header().number(), block.header().clone());
235                hash_to_number.insert(block_hash, block.header().number());
236                bodies.insert(block_hash, block.body().clone());
237
238                if log_interval == 0 {
239                    trace!(target: "downloaders::file",
240                        block_number,
241                        "read first block"
242                    );
243                    log_interval_start_block = block_number;
244                } else if log_interval % 100_000 == 0 {
245                    trace!(target: "downloaders::file",
246                        blocks=?log_interval_start_block..=block_number,
247                        "read blocks from file"
248                    );
249                    log_interval_start_block = block_number + 1;
250                }
251                log_interval += 1;
252            }
253
254            trace!(target: "downloaders::file", blocks = headers.len(), "Initialized file client");
255
256            Ok(DecodedFileChunk {
257                file_client: Self { headers, hash_to_number, bodies },
258                remaining_bytes,
259                highest_block: None,
260            })
261        }
262    }
263}
264
265impl<B: FullBlock> HeadersClient for FileClient<B> {
266    type Header = B::Header;
267    type Output = HeadersFut<B::Header>;
268
269    fn get_headers_with_priority(
270        &self,
271        request: HeadersRequest,
272        _priority: Priority,
273    ) -> Self::Output {
274        // this just searches the buffer, and fails if it can't find the header
275        let mut headers = Vec::new();
276        trace!(target: "downloaders::file", request=?request, "Getting headers");
277
278        let start_num = match request.start {
279            BlockHashOrNumber::Hash(hash) => match self.hash_to_number.get(&hash) {
280                Some(num) => *num,
281                None => {
282                    warn!(%hash, "Could not find starting block number for requested header hash");
283                    return Box::pin(async move { Err(RequestError::BadResponse) })
284                }
285            },
286            BlockHashOrNumber::Number(num) => num,
287        };
288
289        let range = if request.limit == 1 {
290            Either::Left(start_num..start_num + 1)
291        } else {
292            match request.direction {
293                HeadersDirection::Rising => Either::Left(start_num..start_num + request.limit),
294                HeadersDirection::Falling => {
295                    Either::Right((start_num - request.limit + 1..=start_num).rev())
296                }
297            }
298        };
299
300        trace!(target: "downloaders::file", range=?range, "Getting headers with range");
301
302        for block_number in range {
303            match self.headers.get(&block_number).cloned() {
304                Some(header) => headers.push(header),
305                None => {
306                    warn!(number=%block_number, "Could not find header");
307                    return Box::pin(async move { Err(RequestError::BadResponse) })
308                }
309            }
310        }
311
312        Box::pin(async move { Ok((PeerId::default(), headers).into()) })
313    }
314}
315
316impl<B: FullBlock> BodiesClient for FileClient<B> {
317    type Body = B::Body;
318    type Output = BodiesFut<B::Body>;
319
320    fn get_block_bodies_with_priority(
321        &self,
322        hashes: Vec<B256>,
323        _priority: Priority,
324    ) -> Self::Output {
325        // this just searches the buffer, and fails if it can't find the block
326        let mut bodies = Vec::new();
327
328        // check if any are an error
329        // could unwrap here
330        for hash in hashes {
331            match self.bodies.get(&hash).cloned() {
332                Some(body) => bodies.push(body),
333                None => return Box::pin(async move { Err(RequestError::BadResponse) }),
334            }
335        }
336
337        Box::pin(async move { Ok((PeerId::default(), bodies).into()) })
338    }
339}
340
341impl<B: FullBlock> DownloadClient for FileClient<B> {
342    fn report_bad_message(&self, _peer_id: PeerId) {
343        warn!("Reported a bad message on a file client, the file may be corrupted or invalid");
344        // noop
345    }
346
347    fn num_connected_peers(&self) -> usize {
348        // no such thing as connected peers when we are just using a file
349        1
350    }
351}
352
353/// Chunks file into several [`FileClient`]s.
354#[derive(Debug)]
355pub struct ChunkedFileReader {
356    /// File to read from.
357    file: File,
358    /// Current file byte length.
359    file_byte_len: u64,
360    /// Bytes that have been read.
361    chunk: Vec<u8>,
362    /// Max bytes per chunk.
363    chunk_byte_len: u64,
364    /// Optionally, tracks highest decoded block number. Needed when decoding data that maps * to 1
365    /// with block number
366    highest_block: Option<u64>,
367}
368
369impl ChunkedFileReader {
370    /// Returns the remaining file length.
371    pub const fn file_len(&self) -> u64 {
372        self.file_byte_len
373    }
374
375    /// Opens the file to import from given path. Returns a new instance. If no chunk byte length
376    /// is passed, chunks have [`DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE`] (one static file).
377    pub async fn new<P: AsRef<Path>>(
378        path: P,
379        chunk_byte_len: Option<u64>,
380    ) -> Result<Self, FileClientError> {
381        let file = File::open(path).await?;
382        let chunk_byte_len = chunk_byte_len.unwrap_or(DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE);
383
384        Self::from_file(file, chunk_byte_len).await
385    }
386
387    /// Opens the file to import from given path. Returns a new instance.
388    pub async fn from_file(file: File, chunk_byte_len: u64) -> Result<Self, FileClientError> {
389        // get file len from metadata before reading
390        let metadata = file.metadata().await?;
391        let file_byte_len = metadata.len();
392
393        Ok(Self { file, file_byte_len, chunk: vec![], chunk_byte_len, highest_block: None })
394    }
395
396    /// Calculates the number of bytes to read from the chain file. Returns a tuple of the chunk
397    /// length and the remaining file length.
398    fn chunk_len(&self) -> u64 {
399        let Self { chunk_byte_len, file_byte_len, .. } = *self;
400        let file_byte_len = file_byte_len + self.chunk.len() as u64;
401
402        if chunk_byte_len > file_byte_len {
403            // last chunk
404            file_byte_len
405        } else {
406            chunk_byte_len
407        }
408    }
409
410    /// Reads bytes from file and buffers as next chunk to decode. Returns byte length of next
411    /// chunk to read.
412    async fn read_next_chunk(&mut self) -> Result<Option<u64>, io::Error> {
413        if self.file_byte_len == 0 && self.chunk.is_empty() {
414            // eof
415            return Ok(None)
416        }
417
418        let chunk_target_len = self.chunk_len();
419        let old_bytes_len = self.chunk.len() as u64;
420
421        // calculate reserved space in chunk
422        let new_read_bytes_target_len = chunk_target_len - old_bytes_len;
423
424        // read new bytes from file
425        let prev_read_bytes_len = self.chunk.len();
426        self.chunk.extend(std::iter::repeat(0).take(new_read_bytes_target_len as usize));
427        let reader = &mut self.chunk[prev_read_bytes_len..];
428
429        // actual bytes that have been read
430        let new_read_bytes_len = self.file.read_exact(reader).await? as u64;
431        let next_chunk_byte_len = self.chunk.len();
432
433        // update remaining file length
434        self.file_byte_len -= new_read_bytes_len;
435
436        debug!(target: "downloaders::file",
437            max_chunk_byte_len=self.chunk_byte_len,
438            prev_read_bytes_len,
439            new_read_bytes_target_len,
440            new_read_bytes_len,
441            next_chunk_byte_len,
442            remaining_file_byte_len=self.file_byte_len,
443            "new bytes were read from file"
444        );
445
446        Ok(Some(next_chunk_byte_len as u64))
447    }
448
449    /// Read next chunk from file. Returns [`FileClient`] containing decoded chunk.
450    pub async fn next_chunk<T>(&mut self) -> Result<Option<T>, T::Error>
451    where
452        T: FromReader,
453    {
454        let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) };
455
456        // make new file client from chunk
457        let DecodedFileChunk { file_client, remaining_bytes, .. } =
458            T::from_reader(&self.chunk[..], next_chunk_byte_len).await?;
459
460        // save left over bytes
461        self.chunk = remaining_bytes;
462
463        Ok(Some(file_client))
464    }
465
466    /// Read next chunk from file. Returns [`FileClient`] containing decoded chunk.
467    pub async fn next_receipts_chunk<T, D>(&mut self) -> Result<Option<T>, T::Error>
468    where
469        T: FromReceiptReader<D>,
470    {
471        let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) };
472
473        // make new file client from chunk
474        let DecodedFileChunk { file_client, remaining_bytes, highest_block } =
475            T::from_receipt_reader(&self.chunk[..], next_chunk_byte_len, self.highest_block)
476                .await?;
477
478        // save left over bytes
479        self.chunk = remaining_bytes;
480        // update highest block
481        self.highest_block = highest_block;
482
483        Ok(Some(file_client))
484    }
485}
486
487/// Constructs a file client from a reader.
488pub trait FromReader {
489    /// Error returned by file client type.
490    type Error: From<io::Error>;
491
492    /// Returns a file client
493    fn from_reader<B>(
494        reader: B,
495        num_bytes: u64,
496    ) -> impl Future<Output = Result<DecodedFileChunk<Self>, Self::Error>>
497    where
498        Self: Sized,
499        B: AsyncReadExt + Unpin;
500}
501
502/// Output from decoding a file chunk with [`FromReader::from_reader`].
503#[derive(Debug)]
504pub struct DecodedFileChunk<T> {
505    /// File client, i.e. the decoded part of chunk.
506    pub file_client: T,
507    /// Remaining bytes that have not been decoded, e.g. a partial block or a partial receipt.
508    pub remaining_bytes: Vec<u8>,
509    /// Highest block of decoded chunk. This is needed when decoding data that maps * to 1 with
510    /// block number, like receipts.
511    pub highest_block: Option<u64>,
512}
513
514#[cfg(test)]
515mod tests {
516    use super::*;
517    use crate::{
518        bodies::{
519            bodies::BodiesDownloaderBuilder,
520            test_utils::{insert_headers, zip_blocks},
521        },
522        headers::{reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::child_header},
523        test_utils::{generate_bodies, generate_bodies_file},
524    };
525    use assert_matches::assert_matches;
526    use futures_util::stream::StreamExt;
527    use rand::Rng;
528    use reth_consensus::test_utils::TestConsensus;
529    use reth_network_p2p::{
530        bodies::downloader::BodyDownloader,
531        headers::downloader::{HeaderDownloader, SyncTarget},
532    };
533    use reth_provider::test_utils::create_test_provider_factory;
534    use std::sync::Arc;
535
536    #[tokio::test]
537    async fn streams_bodies_from_buffer() {
538        // Generate some random blocks
539        let factory = create_test_provider_factory();
540        let (headers, mut bodies) = generate_bodies(0..=19);
541
542        insert_headers(factory.db_ref().db(), &headers);
543
544        // create an empty file
545        let file = tempfile::tempfile().unwrap();
546
547        let client: Arc<FileClient> =
548            Arc::new(FileClient::from_file(file.into()).await.unwrap().with_bodies(bodies.clone()));
549        let mut downloader = BodiesDownloaderBuilder::default().build(
550            client.clone(),
551            Arc::new(TestConsensus::default()),
552            factory,
553        );
554        downloader.set_download_range(0..=19).expect("failed to set download range");
555
556        assert_matches!(
557            downloader.next().await,
558            Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
559        );
560    }
561
562    #[tokio::test]
563    async fn download_headers_at_fork_head() {
564        reth_tracing::init_test_tracing();
565
566        let p3 = SealedHeader::default();
567        let p2 = child_header(&p3);
568        let p1 = child_header(&p2);
569        let p0 = child_header(&p1);
570
571        let file = tempfile::tempfile().unwrap();
572        let client: Arc<FileClient> = Arc::new(
573            FileClient::from_file(file.into()).await.unwrap().with_headers(HashMap::from([
574                (0u64, p0.clone().unseal()),
575                (1, p1.clone().unseal()),
576                (2, p2.clone().unseal()),
577                (3, p3.clone().unseal()),
578            ])),
579        );
580
581        let mut downloader = ReverseHeadersDownloaderBuilder::default()
582            .stream_batch_size(3)
583            .request_limit(3)
584            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
585        downloader.update_local_head(p3.clone());
586        downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
587
588        let headers = downloader.next().await.unwrap();
589        assert_eq!(headers, Ok(vec![p0, p1, p2]));
590        assert!(downloader.next().await.is_none());
591        assert!(downloader.next().await.is_none());
592    }
593
594    #[tokio::test]
595    async fn test_download_headers_from_file() {
596        reth_tracing::init_test_tracing();
597
598        // Generate some random blocks
599        let (file, headers, _) = generate_bodies_file(0..=19).await;
600        // now try to read them back
601        let client: Arc<FileClient> = Arc::new(FileClient::from_file(file).await.unwrap());
602
603        // construct headers downloader and use first header
604        let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
605            .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
606        header_downloader.update_local_head(headers.first().unwrap().clone());
607        header_downloader.update_sync_target(SyncTarget::Tip(headers.last().unwrap().hash()));
608
609        // get headers first
610        let mut downloaded_headers = header_downloader.next().await.unwrap().unwrap();
611
612        // reverse to make sure it's in the right order before comparing
613        downloaded_headers.reverse();
614
615        // the first header is not included in the response
616        assert_eq!(downloaded_headers, headers[1..]);
617    }
618
619    #[tokio::test]
620    async fn test_download_bodies_from_file() {
621        // Generate some random blocks
622        let factory = create_test_provider_factory();
623        let (file, headers, mut bodies) = generate_bodies_file(0..=19).await;
624
625        // now try to read them back
626        let client: Arc<FileClient> = Arc::new(FileClient::from_file(file).await.unwrap());
627
628        // insert headers in db for the bodies downloader
629        insert_headers(factory.db_ref().db(), &headers);
630
631        let mut downloader = BodiesDownloaderBuilder::default().build(
632            client.clone(),
633            Arc::new(TestConsensus::default()),
634            factory,
635        );
636        downloader.set_download_range(0..=19).expect("failed to set download range");
637
638        assert_matches!(
639            downloader.next().await,
640            Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
641        );
642    }
643
644    #[tokio::test]
645    async fn test_chunk_download_headers_from_file() {
646        reth_tracing::init_test_tracing();
647
648        // Generate some random blocks
649        let (file, headers, _) = generate_bodies_file(0..=14).await;
650
651        // calculate min for chunk byte length range, pick a lower bound that guarantees at least
652        // one block will be read
653        let chunk_byte_len = rand::thread_rng().gen_range(2000..=10_000);
654        trace!(target: "downloaders::file::test", chunk_byte_len);
655
656        // init reader
657        let mut reader = ChunkedFileReader::from_file(file, chunk_byte_len as u64).await.unwrap();
658
659        let mut downloaded_headers: Vec<SealedHeader> = vec![];
660
661        let mut local_header = headers.first().unwrap().clone();
662
663        // test
664        while let Some(client) = reader.next_chunk::<FileClient>().await.unwrap() {
665            let sync_target = client.tip_header().unwrap();
666
667            let sync_target_hash = sync_target.hash();
668
669            // construct headers downloader and use first header
670            let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
671                .build(Arc::clone(&Arc::new(client)), Arc::new(TestConsensus::default()));
672            header_downloader.update_local_head(local_header.clone());
673            header_downloader.update_sync_target(SyncTarget::Tip(sync_target_hash));
674
675            // get headers first
676            let mut downloaded_headers_chunk = header_downloader.next().await.unwrap().unwrap();
677
678            // export new local header to outer scope
679            local_header = sync_target;
680
681            // reverse to make sure it's in the right order before comparing
682            downloaded_headers_chunk.reverse();
683            downloaded_headers.extend_from_slice(&downloaded_headers_chunk);
684        }
685
686        // the first header is not included in the response
687        assert_eq!(headers[1..], downloaded_headers);
688    }
689}