1use std::{collections::HashMap, io, path::Path};
2
3use alloy_consensus::BlockHeader;
4use alloy_eips::BlockHashOrNumber;
5use alloy_primitives::{BlockHash, BlockNumber, Sealable, B256};
6use futures::Future;
7use itertools::Either;
8use reth_network_p2p::{
9 bodies::client::{BodiesClient, BodiesFut},
10 download::DownloadClient,
11 error::RequestError,
12 headers::client::{HeadersClient, HeadersDirection, HeadersFut, HeadersRequest},
13 priority::Priority,
14};
15use reth_network_peers::PeerId;
16use reth_primitives::SealedHeader;
17use reth_primitives_traits::{Block, BlockBody, FullBlock};
18use thiserror::Error;
19use tokio::{fs::File, io::AsyncReadExt};
20use tokio_stream::StreamExt;
21use tokio_util::codec::FramedRead;
22use tracing::{debug, trace, warn};
23
24use super::file_codec::BlockFileCodec;
25use crate::receipt_file_client::FromReceiptReader;
26
27pub const DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE: u64 = 1_000_000_000;
31
32#[derive(Debug)]
44pub struct FileClient<B: Block = reth_primitives::Block> {
45 headers: HashMap<BlockNumber, B::Header>,
47
48 hash_to_number: HashMap<BlockHash, BlockNumber>,
50
51 bodies: HashMap<BlockHash, B::Body>,
53}
54
55#[derive(Debug, Error)]
57pub enum FileClientError {
58 #[error(transparent)]
60 Io(#[from] std::io::Error),
61
62 #[error("{0}")]
64 Rlp(alloy_rlp::Error, Vec<u8>),
65
66 #[error("{0}")]
68 Custom(&'static str),
69}
70
71impl From<&'static str> for FileClientError {
72 fn from(value: &'static str) -> Self {
73 Self::Custom(value)
74 }
75}
76
77impl<B: FullBlock> FileClient<B> {
78 pub async fn new<P: AsRef<Path>>(path: P) -> Result<Self, FileClientError> {
80 let file = File::open(path).await?;
81 Self::from_file(file).await
82 }
83
84 pub(crate) async fn from_file(mut file: File) -> Result<Self, FileClientError> {
86 let metadata = file.metadata().await?;
88 let file_len = metadata.len();
89
90 let mut reader = vec![];
91 file.read_to_end(&mut reader).await?;
92
93 Ok(Self::from_reader(&reader[..], file_len).await?.file_client)
94 }
95
96 pub fn tip(&self) -> Option<B256> {
98 self.headers.get(&self.max_block()?).map(|h| h.hash_slow())
99 }
100
101 pub fn start(&self) -> Option<B256> {
103 self.headers.get(&self.min_block()?).map(|h| h.hash_slow())
104 }
105
106 pub fn max_block(&self) -> Option<u64> {
108 self.headers.keys().max().copied()
109 }
110
111 pub fn min_block(&self) -> Option<u64> {
113 self.headers.keys().min().copied()
114 }
115
116 pub fn tip_header(&self) -> Option<SealedHeader<B::Header>> {
119 self.headers.get(&self.max_block()?).map(|h| SealedHeader::seal(h.clone()))
120 }
121
122 pub fn has_canonical_blocks(&self) -> bool {
124 if self.headers.is_empty() {
125 return true
126 }
127 let mut nums = self.headers.keys().copied().collect::<Vec<_>>();
128 nums.sort_unstable();
129 let mut iter = nums.into_iter();
130 let mut lowest = iter.next().expect("not empty");
131 for next in iter {
132 if next != lowest + 1 {
133 return false
134 }
135 lowest = next;
136 }
137 true
138 }
139
140 pub fn with_bodies(mut self, bodies: HashMap<BlockHash, B::Body>) -> Self {
142 self.bodies = bodies;
143 self
144 }
145
146 pub fn with_headers(mut self, headers: HashMap<BlockNumber, B::Header>) -> Self {
148 self.headers = headers;
149 for (number, header) in &self.headers {
150 self.hash_to_number.insert(header.hash_slow(), *number);
151 }
152 self
153 }
154
155 pub fn headers_len(&self) -> usize {
157 self.headers.len()
158 }
159
160 pub fn bodies_len(&self) -> usize {
162 self.bodies.len()
163 }
164
165 pub fn headers_iter(&self) -> impl Iterator<Item = &B::Header> {
167 self.headers.values()
168 }
169
170 pub fn bodies_iter_mut(&mut self) -> impl Iterator<Item = (u64, &mut B::Body)> {
174 let bodies = &mut self.bodies;
175 let numbers = &self.hash_to_number;
176 bodies.iter_mut().map(|(hash, body)| (numbers[hash], body))
177 }
178
179 pub fn total_transactions(&self) -> usize {
181 self.bodies.iter().fold(0, |acc, (_, body)| acc + body.transactions().len())
182 }
183}
184
185impl<B: FullBlock> FromReader for FileClient<B> {
186 type Error = FileClientError;
187
188 fn from_reader<R>(
190 reader: R,
191 num_bytes: u64,
192 ) -> impl Future<Output = Result<DecodedFileChunk<Self>, Self::Error>>
193 where
194 R: AsyncReadExt + Unpin,
195 {
196 let mut headers = HashMap::default();
197 let mut hash_to_number = HashMap::default();
198 let mut bodies = HashMap::default();
199
200 let mut stream =
202 FramedRead::with_capacity(reader, BlockFileCodec::<B>::default(), num_bytes as usize);
203
204 trace!(target: "downloaders::file",
205 target_num_bytes=num_bytes,
206 capacity=stream.read_buffer().capacity(),
207 "init decode stream"
208 );
209
210 let mut remaining_bytes = vec![];
211
212 let mut log_interval = 0;
213 let mut log_interval_start_block = 0;
214
215 async move {
216 while let Some(block_res) = stream.next().await {
217 let block = match block_res {
218 Ok(block) => block,
219 Err(FileClientError::Rlp(err, bytes)) => {
220 trace!(target: "downloaders::file",
221 %err,
222 bytes_len=bytes.len(),
223 "partial block returned from decoding chunk"
224 );
225 remaining_bytes = bytes;
226 break
227 }
228 Err(err) => return Err(err),
229 };
230 let block_number = block.header().number();
231 let block_hash = block.header().hash_slow();
232
233 headers.insert(block.header().number(), block.header().clone());
235 hash_to_number.insert(block_hash, block.header().number());
236 bodies.insert(block_hash, block.body().clone());
237
238 if log_interval == 0 {
239 trace!(target: "downloaders::file",
240 block_number,
241 "read first block"
242 );
243 log_interval_start_block = block_number;
244 } else if log_interval % 100_000 == 0 {
245 trace!(target: "downloaders::file",
246 blocks=?log_interval_start_block..=block_number,
247 "read blocks from file"
248 );
249 log_interval_start_block = block_number + 1;
250 }
251 log_interval += 1;
252 }
253
254 trace!(target: "downloaders::file", blocks = headers.len(), "Initialized file client");
255
256 Ok(DecodedFileChunk {
257 file_client: Self { headers, hash_to_number, bodies },
258 remaining_bytes,
259 highest_block: None,
260 })
261 }
262 }
263}
264
265impl<B: FullBlock> HeadersClient for FileClient<B> {
266 type Header = B::Header;
267 type Output = HeadersFut<B::Header>;
268
269 fn get_headers_with_priority(
270 &self,
271 request: HeadersRequest,
272 _priority: Priority,
273 ) -> Self::Output {
274 let mut headers = Vec::new();
276 trace!(target: "downloaders::file", request=?request, "Getting headers");
277
278 let start_num = match request.start {
279 BlockHashOrNumber::Hash(hash) => match self.hash_to_number.get(&hash) {
280 Some(num) => *num,
281 None => {
282 warn!(%hash, "Could not find starting block number for requested header hash");
283 return Box::pin(async move { Err(RequestError::BadResponse) })
284 }
285 },
286 BlockHashOrNumber::Number(num) => num,
287 };
288
289 let range = if request.limit == 1 {
290 Either::Left(start_num..start_num + 1)
291 } else {
292 match request.direction {
293 HeadersDirection::Rising => Either::Left(start_num..start_num + request.limit),
294 HeadersDirection::Falling => {
295 Either::Right((start_num - request.limit + 1..=start_num).rev())
296 }
297 }
298 };
299
300 trace!(target: "downloaders::file", range=?range, "Getting headers with range");
301
302 for block_number in range {
303 match self.headers.get(&block_number).cloned() {
304 Some(header) => headers.push(header),
305 None => {
306 warn!(number=%block_number, "Could not find header");
307 return Box::pin(async move { Err(RequestError::BadResponse) })
308 }
309 }
310 }
311
312 Box::pin(async move { Ok((PeerId::default(), headers).into()) })
313 }
314}
315
316impl<B: FullBlock> BodiesClient for FileClient<B> {
317 type Body = B::Body;
318 type Output = BodiesFut<B::Body>;
319
320 fn get_block_bodies_with_priority(
321 &self,
322 hashes: Vec<B256>,
323 _priority: Priority,
324 ) -> Self::Output {
325 let mut bodies = Vec::new();
327
328 for hash in hashes {
331 match self.bodies.get(&hash).cloned() {
332 Some(body) => bodies.push(body),
333 None => return Box::pin(async move { Err(RequestError::BadResponse) }),
334 }
335 }
336
337 Box::pin(async move { Ok((PeerId::default(), bodies).into()) })
338 }
339}
340
341impl<B: FullBlock> DownloadClient for FileClient<B> {
342 fn report_bad_message(&self, _peer_id: PeerId) {
343 warn!("Reported a bad message on a file client, the file may be corrupted or invalid");
344 }
346
347 fn num_connected_peers(&self) -> usize {
348 1
350 }
351}
352
353#[derive(Debug)]
355pub struct ChunkedFileReader {
356 file: File,
358 file_byte_len: u64,
360 chunk: Vec<u8>,
362 chunk_byte_len: u64,
364 highest_block: Option<u64>,
367}
368
369impl ChunkedFileReader {
370 pub const fn file_len(&self) -> u64 {
372 self.file_byte_len
373 }
374
375 pub async fn new<P: AsRef<Path>>(
378 path: P,
379 chunk_byte_len: Option<u64>,
380 ) -> Result<Self, FileClientError> {
381 let file = File::open(path).await?;
382 let chunk_byte_len = chunk_byte_len.unwrap_or(DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE);
383
384 Self::from_file(file, chunk_byte_len).await
385 }
386
387 pub async fn from_file(file: File, chunk_byte_len: u64) -> Result<Self, FileClientError> {
389 let metadata = file.metadata().await?;
391 let file_byte_len = metadata.len();
392
393 Ok(Self { file, file_byte_len, chunk: vec![], chunk_byte_len, highest_block: None })
394 }
395
396 fn chunk_len(&self) -> u64 {
399 let Self { chunk_byte_len, file_byte_len, .. } = *self;
400 let file_byte_len = file_byte_len + self.chunk.len() as u64;
401
402 if chunk_byte_len > file_byte_len {
403 file_byte_len
405 } else {
406 chunk_byte_len
407 }
408 }
409
410 async fn read_next_chunk(&mut self) -> Result<Option<u64>, io::Error> {
413 if self.file_byte_len == 0 && self.chunk.is_empty() {
414 return Ok(None)
416 }
417
418 let chunk_target_len = self.chunk_len();
419 let old_bytes_len = self.chunk.len() as u64;
420
421 let new_read_bytes_target_len = chunk_target_len - old_bytes_len;
423
424 let prev_read_bytes_len = self.chunk.len();
426 self.chunk.extend(std::iter::repeat(0).take(new_read_bytes_target_len as usize));
427 let reader = &mut self.chunk[prev_read_bytes_len..];
428
429 let new_read_bytes_len = self.file.read_exact(reader).await? as u64;
431 let next_chunk_byte_len = self.chunk.len();
432
433 self.file_byte_len -= new_read_bytes_len;
435
436 debug!(target: "downloaders::file",
437 max_chunk_byte_len=self.chunk_byte_len,
438 prev_read_bytes_len,
439 new_read_bytes_target_len,
440 new_read_bytes_len,
441 next_chunk_byte_len,
442 remaining_file_byte_len=self.file_byte_len,
443 "new bytes were read from file"
444 );
445
446 Ok(Some(next_chunk_byte_len as u64))
447 }
448
449 pub async fn next_chunk<T>(&mut self) -> Result<Option<T>, T::Error>
451 where
452 T: FromReader,
453 {
454 let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) };
455
456 let DecodedFileChunk { file_client, remaining_bytes, .. } =
458 T::from_reader(&self.chunk[..], next_chunk_byte_len).await?;
459
460 self.chunk = remaining_bytes;
462
463 Ok(Some(file_client))
464 }
465
466 pub async fn next_receipts_chunk<T, D>(&mut self) -> Result<Option<T>, T::Error>
468 where
469 T: FromReceiptReader<D>,
470 {
471 let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) };
472
473 let DecodedFileChunk { file_client, remaining_bytes, highest_block } =
475 T::from_receipt_reader(&self.chunk[..], next_chunk_byte_len, self.highest_block)
476 .await?;
477
478 self.chunk = remaining_bytes;
480 self.highest_block = highest_block;
482
483 Ok(Some(file_client))
484 }
485}
486
487pub trait FromReader {
489 type Error: From<io::Error>;
491
492 fn from_reader<B>(
494 reader: B,
495 num_bytes: u64,
496 ) -> impl Future<Output = Result<DecodedFileChunk<Self>, Self::Error>>
497 where
498 Self: Sized,
499 B: AsyncReadExt + Unpin;
500}
501
502#[derive(Debug)]
504pub struct DecodedFileChunk<T> {
505 pub file_client: T,
507 pub remaining_bytes: Vec<u8>,
509 pub highest_block: Option<u64>,
512}
513
514#[cfg(test)]
515mod tests {
516 use super::*;
517 use crate::{
518 bodies::{
519 bodies::BodiesDownloaderBuilder,
520 test_utils::{insert_headers, zip_blocks},
521 },
522 headers::{reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::child_header},
523 test_utils::{generate_bodies, generate_bodies_file},
524 };
525 use assert_matches::assert_matches;
526 use futures_util::stream::StreamExt;
527 use rand::Rng;
528 use reth_consensus::test_utils::TestConsensus;
529 use reth_network_p2p::{
530 bodies::downloader::BodyDownloader,
531 headers::downloader::{HeaderDownloader, SyncTarget},
532 };
533 use reth_provider::test_utils::create_test_provider_factory;
534 use std::sync::Arc;
535
536 #[tokio::test]
537 async fn streams_bodies_from_buffer() {
538 let factory = create_test_provider_factory();
540 let (headers, mut bodies) = generate_bodies(0..=19);
541
542 insert_headers(factory.db_ref().db(), &headers);
543
544 let file = tempfile::tempfile().unwrap();
546
547 let client: Arc<FileClient> =
548 Arc::new(FileClient::from_file(file.into()).await.unwrap().with_bodies(bodies.clone()));
549 let mut downloader = BodiesDownloaderBuilder::default().build(
550 client.clone(),
551 Arc::new(TestConsensus::default()),
552 factory,
553 );
554 downloader.set_download_range(0..=19).expect("failed to set download range");
555
556 assert_matches!(
557 downloader.next().await,
558 Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
559 );
560 }
561
562 #[tokio::test]
563 async fn download_headers_at_fork_head() {
564 reth_tracing::init_test_tracing();
565
566 let p3 = SealedHeader::default();
567 let p2 = child_header(&p3);
568 let p1 = child_header(&p2);
569 let p0 = child_header(&p1);
570
571 let file = tempfile::tempfile().unwrap();
572 let client: Arc<FileClient> = Arc::new(
573 FileClient::from_file(file.into()).await.unwrap().with_headers(HashMap::from([
574 (0u64, p0.clone().unseal()),
575 (1, p1.clone().unseal()),
576 (2, p2.clone().unseal()),
577 (3, p3.clone().unseal()),
578 ])),
579 );
580
581 let mut downloader = ReverseHeadersDownloaderBuilder::default()
582 .stream_batch_size(3)
583 .request_limit(3)
584 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
585 downloader.update_local_head(p3.clone());
586 downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
587
588 let headers = downloader.next().await.unwrap();
589 assert_eq!(headers, Ok(vec![p0, p1, p2]));
590 assert!(downloader.next().await.is_none());
591 assert!(downloader.next().await.is_none());
592 }
593
594 #[tokio::test]
595 async fn test_download_headers_from_file() {
596 reth_tracing::init_test_tracing();
597
598 let (file, headers, _) = generate_bodies_file(0..=19).await;
600 let client: Arc<FileClient> = Arc::new(FileClient::from_file(file).await.unwrap());
602
603 let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
605 .build(Arc::clone(&client), Arc::new(TestConsensus::default()));
606 header_downloader.update_local_head(headers.first().unwrap().clone());
607 header_downloader.update_sync_target(SyncTarget::Tip(headers.last().unwrap().hash()));
608
609 let mut downloaded_headers = header_downloader.next().await.unwrap().unwrap();
611
612 downloaded_headers.reverse();
614
615 assert_eq!(downloaded_headers, headers[1..]);
617 }
618
619 #[tokio::test]
620 async fn test_download_bodies_from_file() {
621 let factory = create_test_provider_factory();
623 let (file, headers, mut bodies) = generate_bodies_file(0..=19).await;
624
625 let client: Arc<FileClient> = Arc::new(FileClient::from_file(file).await.unwrap());
627
628 insert_headers(factory.db_ref().db(), &headers);
630
631 let mut downloader = BodiesDownloaderBuilder::default().build(
632 client.clone(),
633 Arc::new(TestConsensus::default()),
634 factory,
635 );
636 downloader.set_download_range(0..=19).expect("failed to set download range");
637
638 assert_matches!(
639 downloader.next().await,
640 Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
641 );
642 }
643
644 #[tokio::test]
645 async fn test_chunk_download_headers_from_file() {
646 reth_tracing::init_test_tracing();
647
648 let (file, headers, _) = generate_bodies_file(0..=14).await;
650
651 let chunk_byte_len = rand::thread_rng().gen_range(2000..=10_000);
654 trace!(target: "downloaders::file::test", chunk_byte_len);
655
656 let mut reader = ChunkedFileReader::from_file(file, chunk_byte_len as u64).await.unwrap();
658
659 let mut downloaded_headers: Vec<SealedHeader> = vec![];
660
661 let mut local_header = headers.first().unwrap().clone();
662
663 while let Some(client) = reader.next_chunk::<FileClient>().await.unwrap() {
665 let sync_target = client.tip_header().unwrap();
666
667 let sync_target_hash = sync_target.hash();
668
669 let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
671 .build(Arc::clone(&Arc::new(client)), Arc::new(TestConsensus::default()));
672 header_downloader.update_local_head(local_header.clone());
673 header_downloader.update_sync_target(SyncTarget::Tip(sync_target_hash));
674
675 let mut downloaded_headers_chunk = header_downloader.next().await.unwrap().unwrap();
677
678 local_header = sync_target;
680
681 downloaded_headers_chunk.reverse();
683 downloaded_headers.extend_from_slice(&downloaded_headers_chunk);
684 }
685
686 assert_eq!(headers[1..], downloaded_headers);
688 }
689}