reth_era/
era1_file.rs

1//! Represents a complete Era1 file
2//!
3//! The structure of an Era1 file follows the specification:
4//! `Version | block-tuple* | other-entries* | Accumulator | BlockIndex`
5//!
6//! See also <https://github.com/eth-clients/e2store-format-specs/blob/main/formats/era1.md>
7
8use crate::{
9    e2s_file::{E2StoreReader, E2StoreWriter},
10    e2s_types::{E2sError, Entry, Version},
11    era1_types::{BlockIndex, Era1Group, Era1Id, BLOCK_INDEX},
12    execution_types::{
13        self, Accumulator, BlockTuple, CompressedBody, CompressedHeader, CompressedReceipts,
14        TotalDifficulty, MAX_BLOCKS_PER_ERA1,
15    },
16};
17use alloy_primitives::BlockNumber;
18use std::{
19    collections::VecDeque,
20    fs::File,
21    io::{Read, Seek, Write},
22    path::Path,
23};
24
25/// Era1 file interface
26#[derive(Debug)]
27pub struct Era1File {
28    /// Version record, must be the first record in the file
29    pub version: Version,
30
31    /// Main content group of the Era1 file
32    pub group: Era1Group,
33
34    /// File identifier
35    pub id: Era1Id,
36}
37
38impl Era1File {
39    /// Create a new [`Era1File`]
40    pub const fn new(group: Era1Group, id: Era1Id) -> Self {
41        Self { version: Version, group, id }
42    }
43
44    /// Get a block by its number, if present in this file
45    pub fn get_block_by_number(&self, number: BlockNumber) -> Option<&BlockTuple> {
46        let index = (number - self.group.block_index.starting_number) as usize;
47        (index < self.group.blocks.len()).then(|| &self.group.blocks[index])
48    }
49
50    /// Get the range of block numbers contained in this file
51    pub fn block_range(&self) -> std::ops::RangeInclusive<BlockNumber> {
52        let start = self.group.block_index.starting_number;
53        let end = start + (self.group.blocks.len() as u64) - 1;
54        start..=end
55    }
56
57    /// Check if this file contains a specific block number
58    pub fn contains_block(&self, number: BlockNumber) -> bool {
59        self.block_range().contains(&number)
60    }
61}
62/// Reader for Era1 files that builds on top of [`E2StoreReader`]
63#[derive(Debug)]
64pub struct Era1Reader<R: Read> {
65    reader: E2StoreReader<R>,
66}
67
68/// An iterator of [`BlockTuple`] streaming from [`E2StoreReader`].
69#[derive(Debug)]
70pub struct BlockTupleIterator<'r, R: Read> {
71    reader: &'r mut E2StoreReader<R>,
72    headers: VecDeque<CompressedHeader>,
73    bodies: VecDeque<CompressedBody>,
74    receipts: VecDeque<CompressedReceipts>,
75    difficulties: VecDeque<TotalDifficulty>,
76    other_entries: Vec<Entry>,
77    accumulator: Option<Accumulator>,
78    block_index: Option<BlockIndex>,
79}
80
81impl<'r, R: Read> BlockTupleIterator<'r, R> {
82    fn new(reader: &'r mut E2StoreReader<R>) -> Self {
83        Self {
84            reader,
85            headers: Default::default(),
86            bodies: Default::default(),
87            receipts: Default::default(),
88            difficulties: Default::default(),
89            other_entries: Default::default(),
90            accumulator: None,
91            block_index: None,
92        }
93    }
94}
95
96impl<'r, R: Read + Seek> Iterator for BlockTupleIterator<'r, R> {
97    type Item = Result<BlockTuple, E2sError>;
98
99    fn next(&mut self) -> Option<Self::Item> {
100        self.next_result().transpose()
101    }
102}
103
104impl<'r, R: Read + Seek> BlockTupleIterator<'r, R> {
105    fn next_result(&mut self) -> Result<Option<BlockTuple>, E2sError> {
106        loop {
107            let Some(entry) = self.reader.read_next_entry()? else {
108                return Ok(None);
109            };
110
111            match entry.entry_type {
112                execution_types::COMPRESSED_HEADER => {
113                    self.headers.push_back(CompressedHeader::from_entry(&entry)?);
114                }
115                execution_types::COMPRESSED_BODY => {
116                    self.bodies.push_back(CompressedBody::from_entry(&entry)?);
117                }
118                execution_types::COMPRESSED_RECEIPTS => {
119                    self.receipts.push_back(CompressedReceipts::from_entry(&entry)?);
120                }
121                execution_types::TOTAL_DIFFICULTY => {
122                    self.difficulties.push_back(TotalDifficulty::from_entry(&entry)?);
123                }
124                execution_types::ACCUMULATOR => {
125                    if self.accumulator.is_some() {
126                        return Err(E2sError::Ssz("Multiple accumulator entries found".to_string()));
127                    }
128                    self.accumulator = Some(Accumulator::from_entry(&entry)?);
129                }
130                BLOCK_INDEX => {
131                    if self.block_index.is_some() {
132                        return Err(E2sError::Ssz("Multiple block index entries found".to_string()));
133                    }
134                    self.block_index = Some(BlockIndex::from_entry(&entry)?);
135                }
136                _ => {
137                    self.other_entries.push(entry);
138                }
139            }
140
141            if !self.headers.is_empty() &&
142                !self.bodies.is_empty() &&
143                !self.receipts.is_empty() &&
144                !self.difficulties.is_empty()
145            {
146                let header = self.headers.pop_front().unwrap();
147                let body = self.bodies.pop_front().unwrap();
148                let receipt = self.receipts.pop_front().unwrap();
149                let difficulty = self.difficulties.pop_front().unwrap();
150
151                return Ok(Some(BlockTuple::new(header, body, receipt, difficulty)));
152            }
153        }
154    }
155}
156
157impl<R: Read + Seek> Era1Reader<R> {
158    /// Create a new [`Era1Reader`]
159    pub fn new(reader: R) -> Self {
160        Self { reader: E2StoreReader::new(reader) }
161    }
162
163    /// Returns an iterator of [`BlockTuple`] streaming from `reader`.
164    pub fn iter(&mut self) -> BlockTupleIterator<'_, R> {
165        BlockTupleIterator::new(&mut self.reader)
166    }
167
168    /// Reads and parses an Era1 file from the underlying reader, assembling all components
169    /// into a complete [`Era1File`] with an [`Era1Id`] that includes the provided network name.
170    pub fn read(&mut self, network_name: String) -> Result<Era1File, E2sError> {
171        // Validate version entry
172        let _version_entry = match self.reader.read_version()? {
173            Some(entry) if entry.is_version() => entry,
174            Some(_) => return Err(E2sError::Ssz("First entry is not a Version entry".to_string())),
175            None => return Err(E2sError::Ssz("Empty Era1 file".to_string())),
176        };
177
178        let mut iter = self.iter();
179        let blocks = (&mut iter).collect::<Result<Vec<_>, _>>()?;
180
181        let BlockTupleIterator {
182            headers,
183            bodies,
184            receipts,
185            difficulties,
186            other_entries,
187            accumulator,
188            block_index,
189            ..
190        } = iter;
191
192        // Ensure we have matching counts for block components
193        if headers.len() != bodies.len() ||
194            headers.len() != receipts.len() ||
195            headers.len() != difficulties.len()
196        {
197            return Err(E2sError::Ssz(format!(
198                "Mismatched block component counts: headers={}, bodies={}, receipts={}, difficulties={}",
199                headers.len(), bodies.len(), receipts.len(), difficulties.len()
200            )));
201        }
202
203        let accumulator = accumulator
204            .ok_or_else(|| E2sError::Ssz("Era1 file missing accumulator entry".to_string()))?;
205
206        let block_index = block_index
207            .ok_or_else(|| E2sError::Ssz("Era1 file missing block index entry".to_string()))?;
208
209        let mut group = Era1Group::new(blocks, accumulator, block_index.clone());
210
211        // Add other entries
212        for entry in other_entries {
213            group.add_entry(entry);
214        }
215
216        let id = Era1Id::new(
217            network_name,
218            block_index.starting_number,
219            block_index.offsets.len() as u32,
220        );
221
222        Ok(Era1File::new(group, id))
223    }
224}
225
226impl Era1Reader<File> {
227    /// Opens and reads an Era1 file from the given path
228    pub fn open<P: AsRef<Path>>(
229        path: P,
230        network_name: impl Into<String>,
231    ) -> Result<Era1File, E2sError> {
232        let file = File::open(path).map_err(E2sError::Io)?;
233        let mut reader = Self::new(file);
234        reader.read(network_name.into())
235    }
236}
237
238/// Writer for Era1 files that builds on top of [`E2StoreWriter`]
239#[derive(Debug)]
240pub struct Era1Writer<W: Write> {
241    writer: E2StoreWriter<W>,
242    has_written_version: bool,
243    has_written_blocks: bool,
244    has_written_accumulator: bool,
245    has_written_block_index: bool,
246}
247
248impl<W: Write> Era1Writer<W> {
249    /// Create a new [`Era1Writer`]
250    pub fn new(writer: W) -> Self {
251        Self {
252            writer: E2StoreWriter::new(writer),
253            has_written_version: false,
254            has_written_blocks: false,
255            has_written_accumulator: false,
256            has_written_block_index: false,
257        }
258    }
259
260    /// Write the version entry
261    pub fn write_version(&mut self) -> Result<(), E2sError> {
262        if self.has_written_version {
263            return Ok(());
264        }
265
266        self.writer.write_version()?;
267        self.has_written_version = true;
268        Ok(())
269    }
270
271    /// Write a complete [`Era1File`] to the underlying writer
272    pub fn write_era1_file(&mut self, era1_file: &Era1File) -> Result<(), E2sError> {
273        // Write version
274        self.write_version()?;
275
276        // Ensure blocks are written before other entries
277        if era1_file.group.blocks.len() > MAX_BLOCKS_PER_ERA1 {
278            return Err(E2sError::Ssz("Era1 file cannot contain more than 8192 blocks".to_string()));
279        }
280
281        // Write all blocks
282        for block in &era1_file.group.blocks {
283            self.write_block(block)?;
284        }
285
286        // Write other entries
287        for entry in &era1_file.group.other_entries {
288            self.writer.write_entry(entry)?;
289        }
290
291        // Write accumulator
292        self.write_accumulator(&era1_file.group.accumulator)?;
293
294        // Write block index
295        self.write_block_index(&era1_file.group.block_index)?;
296
297        // Flush the writer
298        self.writer.flush()?;
299
300        Ok(())
301    }
302
303    /// Write a single block tuple
304    pub fn write_block(
305        &mut self,
306        block_tuple: &crate::execution_types::BlockTuple,
307    ) -> Result<(), E2sError> {
308        if !self.has_written_version {
309            self.write_version()?;
310        }
311
312        if self.has_written_accumulator || self.has_written_block_index {
313            return Err(E2sError::Ssz(
314                "Cannot write blocks after accumulator or block index".to_string(),
315            ));
316        }
317
318        // Write header
319        let header_entry = block_tuple.header.to_entry();
320        self.writer.write_entry(&header_entry)?;
321
322        // Write body
323        let body_entry = block_tuple.body.to_entry();
324        self.writer.write_entry(&body_entry)?;
325
326        // Write receipts
327        let receipts_entry = block_tuple.receipts.to_entry();
328        self.writer.write_entry(&receipts_entry)?;
329
330        // Write difficulty
331        let difficulty_entry = block_tuple.total_difficulty.to_entry();
332        self.writer.write_entry(&difficulty_entry)?;
333
334        self.has_written_blocks = true;
335
336        Ok(())
337    }
338
339    /// Write the accumulator
340    pub fn write_accumulator(&mut self, accumulator: &Accumulator) -> Result<(), E2sError> {
341        if !self.has_written_version {
342            self.write_version()?;
343        }
344
345        if self.has_written_accumulator {
346            return Err(E2sError::Ssz("Accumulator already written".to_string()));
347        }
348
349        if self.has_written_block_index {
350            return Err(E2sError::Ssz("Cannot write accumulator after block index".to_string()));
351        }
352
353        let accumulator_entry = accumulator.to_entry();
354        self.writer.write_entry(&accumulator_entry)?;
355        self.has_written_accumulator = true;
356
357        Ok(())
358    }
359
360    /// Write the block index
361    pub fn write_block_index(&mut self, block_index: &BlockIndex) -> Result<(), E2sError> {
362        if !self.has_written_version {
363            self.write_version()?;
364        }
365
366        if self.has_written_block_index {
367            return Err(E2sError::Ssz("Block index already written".to_string()));
368        }
369
370        let block_index_entry = block_index.to_entry();
371        self.writer.write_entry(&block_index_entry)?;
372        self.has_written_block_index = true;
373
374        Ok(())
375    }
376
377    /// Flush any buffered data to the underlying writer
378    pub fn flush(&mut self) -> Result<(), E2sError> {
379        self.writer.flush()
380    }
381}
382
383impl Era1Writer<File> {
384    /// Creates a new file at the specified path and writes the [`Era1File`] to it
385    pub fn create<P: AsRef<Path>>(path: P, era1_file: &Era1File) -> Result<(), E2sError> {
386        let file = File::create(path).map_err(E2sError::Io)?;
387        let mut writer = Self::new(file);
388        writer.write_era1_file(era1_file)?;
389        Ok(())
390    }
391
392    /// Creates a new file in the specified directory with a filename derived from the
393    /// [`Era1File`]'s ID using the standardized Era1 file naming convention
394    pub fn create_with_id<P: AsRef<Path>>(
395        directory: P,
396        era1_file: &Era1File,
397    ) -> Result<(), E2sError> {
398        let filename = era1_file.id.to_file_name();
399        let path = directory.as_ref().join(filename);
400        Self::create(path, era1_file)
401    }
402}
403
404#[cfg(test)]
405mod tests {
406    use super::*;
407    use crate::execution_types::{
408        Accumulator, BlockTuple, CompressedBody, CompressedHeader, CompressedReceipts,
409        TotalDifficulty,
410    };
411    use alloy_primitives::{B256, U256};
412    use std::io::Cursor;
413    use tempfile::tempdir;
414
415    // Helper to create a sample block tuple for testing
416    fn create_test_block(number: BlockNumber, data_size: usize) -> BlockTuple {
417        let header_data = vec![(number % 256) as u8; data_size];
418        let header = CompressedHeader::new(header_data);
419
420        let body_data = vec![((number + 1) % 256) as u8; data_size * 2];
421        let body = CompressedBody::new(body_data);
422
423        let receipts_data = vec![((number + 2) % 256) as u8; data_size];
424        let receipts = CompressedReceipts::new(receipts_data);
425
426        let difficulty = TotalDifficulty::new(U256::from(number * 1000));
427
428        BlockTuple::new(header, body, receipts, difficulty)
429    }
430
431    // Helper to create a sample Era1File for testing
432    fn create_test_era1_file(
433        start_block: BlockNumber,
434        block_count: usize,
435        network: &str,
436    ) -> Era1File {
437        // Create blocks
438        let mut blocks = Vec::with_capacity(block_count);
439        for i in 0..block_count {
440            let block_num = start_block + i as u64;
441            blocks.push(create_test_block(block_num, 32));
442        }
443
444        let accumulator = Accumulator::new(B256::from([0xAA; 32]));
445
446        let mut offsets = Vec::with_capacity(block_count);
447        for i in 0..block_count {
448            offsets.push(i as i64 * 100);
449        }
450        let block_index = BlockIndex::new(start_block, offsets);
451        let group = Era1Group::new(blocks, accumulator, block_index);
452        let id = Era1Id::new(network, start_block, block_count as u32);
453
454        Era1File::new(group, id)
455    }
456
457    #[test]
458    fn test_era1_roundtrip_memory() -> Result<(), E2sError> {
459        // Create a test Era1File
460        let start_block = 1000;
461        let era1_file = create_test_era1_file(1000, 5, "testnet");
462
463        // Write to memory buffer
464        let mut buffer = Vec::new();
465        {
466            let mut writer = Era1Writer::new(&mut buffer);
467            writer.write_era1_file(&era1_file)?;
468        }
469
470        // Read back from memory buffer
471        let mut reader = Era1Reader::new(Cursor::new(&buffer));
472        let read_era1 = reader.read("testnet".to_string())?;
473
474        // Verify core properties
475        assert_eq!(read_era1.id.network_name, "testnet");
476        assert_eq!(read_era1.id.start_block, 1000);
477        assert_eq!(read_era1.id.block_count, 5);
478        assert_eq!(read_era1.group.blocks.len(), 5);
479
480        // Verify block properties
481        assert_eq!(read_era1.group.blocks[0].total_difficulty.value, U256::from(1000 * 1000));
482        assert_eq!(read_era1.group.blocks[1].total_difficulty.value, U256::from(1001 * 1000));
483
484        // Verify block data
485        assert_eq!(read_era1.group.blocks[0].header.data, vec![(start_block % 256) as u8; 32]);
486        assert_eq!(read_era1.group.blocks[0].body.data, vec![((start_block + 1) % 256) as u8; 64]);
487        assert_eq!(
488            read_era1.group.blocks[0].receipts.data,
489            vec![((start_block + 2) % 256) as u8; 32]
490        );
491
492        // Verify block access methods
493        assert!(read_era1.contains_block(1000));
494        assert!(read_era1.contains_block(1004));
495        assert!(!read_era1.contains_block(999));
496        assert!(!read_era1.contains_block(1005));
497
498        let block_1002 = read_era1.get_block_by_number(1002);
499        assert!(block_1002.is_some());
500        assert_eq!(block_1002.unwrap().header.data, vec![((start_block + 2) % 256) as u8; 32]);
501
502        Ok(())
503    }
504
505    #[test]
506    fn test_era1_roundtrip_file() -> Result<(), E2sError> {
507        // Create a temporary directory
508        let temp_dir = tempdir().expect("Failed to create temp directory");
509        let file_path = temp_dir.path().join("test_roundtrip.era1");
510
511        // Create and write `Era1File` to disk
512        let era1_file = create_test_era1_file(2000, 3, "mainnet");
513        Era1Writer::create(&file_path, &era1_file)?;
514
515        // Read it back
516        let read_era1 = Era1Reader::open(&file_path, "mainnet")?;
517
518        // Verify core properties
519        assert_eq!(read_era1.id.network_name, "mainnet");
520        assert_eq!(read_era1.id.start_block, 2000);
521        assert_eq!(read_era1.id.block_count, 3);
522        assert_eq!(read_era1.group.blocks.len(), 3);
523
524        // Verify blocks
525        for i in 0..3 {
526            let block_num = 2000 + i as u64;
527            let block = read_era1.get_block_by_number(block_num);
528            assert!(block.is_some());
529            assert_eq!(block.unwrap().header.data, vec![block_num as u8; 32]);
530        }
531
532        Ok(())
533    }
534}