1use crate::{
9 e2s_file::{E2StoreReader, E2StoreWriter},
10 e2s_types::{E2sError, Entry, Version},
11 era1_types::{BlockIndex, Era1Group, Era1Id, BLOCK_INDEX},
12 execution_types::{
13 self, Accumulator, BlockTuple, CompressedBody, CompressedHeader, CompressedReceipts,
14 TotalDifficulty, MAX_BLOCKS_PER_ERA1,
15 },
16};
17use alloy_primitives::BlockNumber;
18use std::{
19 collections::VecDeque,
20 fs::File,
21 io::{Read, Seek, Write},
22 path::Path,
23};
24
25#[derive(Debug)]
27pub struct Era1File {
28 pub version: Version,
30
31 pub group: Era1Group,
33
34 pub id: Era1Id,
36}
37
38impl Era1File {
39 pub const fn new(group: Era1Group, id: Era1Id) -> Self {
41 Self { version: Version, group, id }
42 }
43
44 pub fn get_block_by_number(&self, number: BlockNumber) -> Option<&BlockTuple> {
46 let index = (number - self.group.block_index.starting_number) as usize;
47 (index < self.group.blocks.len()).then(|| &self.group.blocks[index])
48 }
49
50 pub fn block_range(&self) -> std::ops::RangeInclusive<BlockNumber> {
52 let start = self.group.block_index.starting_number;
53 let end = start + (self.group.blocks.len() as u64) - 1;
54 start..=end
55 }
56
57 pub fn contains_block(&self, number: BlockNumber) -> bool {
59 self.block_range().contains(&number)
60 }
61}
62#[derive(Debug)]
64pub struct Era1Reader<R: Read> {
65 reader: E2StoreReader<R>,
66}
67
68#[derive(Debug)]
70pub struct BlockTupleIterator<'r, R: Read> {
71 reader: &'r mut E2StoreReader<R>,
72 headers: VecDeque<CompressedHeader>,
73 bodies: VecDeque<CompressedBody>,
74 receipts: VecDeque<CompressedReceipts>,
75 difficulties: VecDeque<TotalDifficulty>,
76 other_entries: Vec<Entry>,
77 accumulator: Option<Accumulator>,
78 block_index: Option<BlockIndex>,
79}
80
81impl<'r, R: Read> BlockTupleIterator<'r, R> {
82 fn new(reader: &'r mut E2StoreReader<R>) -> Self {
83 Self {
84 reader,
85 headers: Default::default(),
86 bodies: Default::default(),
87 receipts: Default::default(),
88 difficulties: Default::default(),
89 other_entries: Default::default(),
90 accumulator: None,
91 block_index: None,
92 }
93 }
94}
95
96impl<'r, R: Read + Seek> Iterator for BlockTupleIterator<'r, R> {
97 type Item = Result<BlockTuple, E2sError>;
98
99 fn next(&mut self) -> Option<Self::Item> {
100 self.next_result().transpose()
101 }
102}
103
104impl<'r, R: Read + Seek> BlockTupleIterator<'r, R> {
105 fn next_result(&mut self) -> Result<Option<BlockTuple>, E2sError> {
106 loop {
107 let Some(entry) = self.reader.read_next_entry()? else {
108 return Ok(None);
109 };
110
111 match entry.entry_type {
112 execution_types::COMPRESSED_HEADER => {
113 self.headers.push_back(CompressedHeader::from_entry(&entry)?);
114 }
115 execution_types::COMPRESSED_BODY => {
116 self.bodies.push_back(CompressedBody::from_entry(&entry)?);
117 }
118 execution_types::COMPRESSED_RECEIPTS => {
119 self.receipts.push_back(CompressedReceipts::from_entry(&entry)?);
120 }
121 execution_types::TOTAL_DIFFICULTY => {
122 self.difficulties.push_back(TotalDifficulty::from_entry(&entry)?);
123 }
124 execution_types::ACCUMULATOR => {
125 if self.accumulator.is_some() {
126 return Err(E2sError::Ssz("Multiple accumulator entries found".to_string()));
127 }
128 self.accumulator = Some(Accumulator::from_entry(&entry)?);
129 }
130 BLOCK_INDEX => {
131 if self.block_index.is_some() {
132 return Err(E2sError::Ssz("Multiple block index entries found".to_string()));
133 }
134 self.block_index = Some(BlockIndex::from_entry(&entry)?);
135 }
136 _ => {
137 self.other_entries.push(entry);
138 }
139 }
140
141 if !self.headers.is_empty() &&
142 !self.bodies.is_empty() &&
143 !self.receipts.is_empty() &&
144 !self.difficulties.is_empty()
145 {
146 let header = self.headers.pop_front().unwrap();
147 let body = self.bodies.pop_front().unwrap();
148 let receipt = self.receipts.pop_front().unwrap();
149 let difficulty = self.difficulties.pop_front().unwrap();
150
151 return Ok(Some(BlockTuple::new(header, body, receipt, difficulty)));
152 }
153 }
154 }
155}
156
157impl<R: Read + Seek> Era1Reader<R> {
158 pub fn new(reader: R) -> Self {
160 Self { reader: E2StoreReader::new(reader) }
161 }
162
163 pub fn iter(&mut self) -> BlockTupleIterator<'_, R> {
165 BlockTupleIterator::new(&mut self.reader)
166 }
167
168 pub fn read(&mut self, network_name: String) -> Result<Era1File, E2sError> {
171 let _version_entry = match self.reader.read_version()? {
173 Some(entry) if entry.is_version() => entry,
174 Some(_) => return Err(E2sError::Ssz("First entry is not a Version entry".to_string())),
175 None => return Err(E2sError::Ssz("Empty Era1 file".to_string())),
176 };
177
178 let mut iter = self.iter();
179 let blocks = (&mut iter).collect::<Result<Vec<_>, _>>()?;
180
181 let BlockTupleIterator {
182 headers,
183 bodies,
184 receipts,
185 difficulties,
186 other_entries,
187 accumulator,
188 block_index,
189 ..
190 } = iter;
191
192 if headers.len() != bodies.len() ||
194 headers.len() != receipts.len() ||
195 headers.len() != difficulties.len()
196 {
197 return Err(E2sError::Ssz(format!(
198 "Mismatched block component counts: headers={}, bodies={}, receipts={}, difficulties={}",
199 headers.len(), bodies.len(), receipts.len(), difficulties.len()
200 )));
201 }
202
203 let accumulator = accumulator
204 .ok_or_else(|| E2sError::Ssz("Era1 file missing accumulator entry".to_string()))?;
205
206 let block_index = block_index
207 .ok_or_else(|| E2sError::Ssz("Era1 file missing block index entry".to_string()))?;
208
209 let mut group = Era1Group::new(blocks, accumulator, block_index.clone());
210
211 for entry in other_entries {
213 group.add_entry(entry);
214 }
215
216 let id = Era1Id::new(
217 network_name,
218 block_index.starting_number,
219 block_index.offsets.len() as u32,
220 );
221
222 Ok(Era1File::new(group, id))
223 }
224}
225
226impl Era1Reader<File> {
227 pub fn open<P: AsRef<Path>>(
229 path: P,
230 network_name: impl Into<String>,
231 ) -> Result<Era1File, E2sError> {
232 let file = File::open(path).map_err(E2sError::Io)?;
233 let mut reader = Self::new(file);
234 reader.read(network_name.into())
235 }
236}
237
238#[derive(Debug)]
240pub struct Era1Writer<W: Write> {
241 writer: E2StoreWriter<W>,
242 has_written_version: bool,
243 has_written_blocks: bool,
244 has_written_accumulator: bool,
245 has_written_block_index: bool,
246}
247
248impl<W: Write> Era1Writer<W> {
249 pub fn new(writer: W) -> Self {
251 Self {
252 writer: E2StoreWriter::new(writer),
253 has_written_version: false,
254 has_written_blocks: false,
255 has_written_accumulator: false,
256 has_written_block_index: false,
257 }
258 }
259
260 pub fn write_version(&mut self) -> Result<(), E2sError> {
262 if self.has_written_version {
263 return Ok(());
264 }
265
266 self.writer.write_version()?;
267 self.has_written_version = true;
268 Ok(())
269 }
270
271 pub fn write_era1_file(&mut self, era1_file: &Era1File) -> Result<(), E2sError> {
273 self.write_version()?;
275
276 if era1_file.group.blocks.len() > MAX_BLOCKS_PER_ERA1 {
278 return Err(E2sError::Ssz("Era1 file cannot contain more than 8192 blocks".to_string()));
279 }
280
281 for block in &era1_file.group.blocks {
283 self.write_block(block)?;
284 }
285
286 for entry in &era1_file.group.other_entries {
288 self.writer.write_entry(entry)?;
289 }
290
291 self.write_accumulator(&era1_file.group.accumulator)?;
293
294 self.write_block_index(&era1_file.group.block_index)?;
296
297 self.writer.flush()?;
299
300 Ok(())
301 }
302
303 pub fn write_block(
305 &mut self,
306 block_tuple: &crate::execution_types::BlockTuple,
307 ) -> Result<(), E2sError> {
308 if !self.has_written_version {
309 self.write_version()?;
310 }
311
312 if self.has_written_accumulator || self.has_written_block_index {
313 return Err(E2sError::Ssz(
314 "Cannot write blocks after accumulator or block index".to_string(),
315 ));
316 }
317
318 let header_entry = block_tuple.header.to_entry();
320 self.writer.write_entry(&header_entry)?;
321
322 let body_entry = block_tuple.body.to_entry();
324 self.writer.write_entry(&body_entry)?;
325
326 let receipts_entry = block_tuple.receipts.to_entry();
328 self.writer.write_entry(&receipts_entry)?;
329
330 let difficulty_entry = block_tuple.total_difficulty.to_entry();
332 self.writer.write_entry(&difficulty_entry)?;
333
334 self.has_written_blocks = true;
335
336 Ok(())
337 }
338
339 pub fn write_accumulator(&mut self, accumulator: &Accumulator) -> Result<(), E2sError> {
341 if !self.has_written_version {
342 self.write_version()?;
343 }
344
345 if self.has_written_accumulator {
346 return Err(E2sError::Ssz("Accumulator already written".to_string()));
347 }
348
349 if self.has_written_block_index {
350 return Err(E2sError::Ssz("Cannot write accumulator after block index".to_string()));
351 }
352
353 let accumulator_entry = accumulator.to_entry();
354 self.writer.write_entry(&accumulator_entry)?;
355 self.has_written_accumulator = true;
356
357 Ok(())
358 }
359
360 pub fn write_block_index(&mut self, block_index: &BlockIndex) -> Result<(), E2sError> {
362 if !self.has_written_version {
363 self.write_version()?;
364 }
365
366 if self.has_written_block_index {
367 return Err(E2sError::Ssz("Block index already written".to_string()));
368 }
369
370 let block_index_entry = block_index.to_entry();
371 self.writer.write_entry(&block_index_entry)?;
372 self.has_written_block_index = true;
373
374 Ok(())
375 }
376
377 pub fn flush(&mut self) -> Result<(), E2sError> {
379 self.writer.flush()
380 }
381}
382
383impl Era1Writer<File> {
384 pub fn create<P: AsRef<Path>>(path: P, era1_file: &Era1File) -> Result<(), E2sError> {
386 let file = File::create(path).map_err(E2sError::Io)?;
387 let mut writer = Self::new(file);
388 writer.write_era1_file(era1_file)?;
389 Ok(())
390 }
391
392 pub fn create_with_id<P: AsRef<Path>>(
395 directory: P,
396 era1_file: &Era1File,
397 ) -> Result<(), E2sError> {
398 let filename = era1_file.id.to_file_name();
399 let path = directory.as_ref().join(filename);
400 Self::create(path, era1_file)
401 }
402}
403
404#[cfg(test)]
405mod tests {
406 use super::*;
407 use crate::execution_types::{
408 Accumulator, BlockTuple, CompressedBody, CompressedHeader, CompressedReceipts,
409 TotalDifficulty,
410 };
411 use alloy_primitives::{B256, U256};
412 use std::io::Cursor;
413 use tempfile::tempdir;
414
415 fn create_test_block(number: BlockNumber, data_size: usize) -> BlockTuple {
417 let header_data = vec![(number % 256) as u8; data_size];
418 let header = CompressedHeader::new(header_data);
419
420 let body_data = vec![((number + 1) % 256) as u8; data_size * 2];
421 let body = CompressedBody::new(body_data);
422
423 let receipts_data = vec![((number + 2) % 256) as u8; data_size];
424 let receipts = CompressedReceipts::new(receipts_data);
425
426 let difficulty = TotalDifficulty::new(U256::from(number * 1000));
427
428 BlockTuple::new(header, body, receipts, difficulty)
429 }
430
431 fn create_test_era1_file(
433 start_block: BlockNumber,
434 block_count: usize,
435 network: &str,
436 ) -> Era1File {
437 let mut blocks = Vec::with_capacity(block_count);
439 for i in 0..block_count {
440 let block_num = start_block + i as u64;
441 blocks.push(create_test_block(block_num, 32));
442 }
443
444 let accumulator = Accumulator::new(B256::from([0xAA; 32]));
445
446 let mut offsets = Vec::with_capacity(block_count);
447 for i in 0..block_count {
448 offsets.push(i as i64 * 100);
449 }
450 let block_index = BlockIndex::new(start_block, offsets);
451 let group = Era1Group::new(blocks, accumulator, block_index);
452 let id = Era1Id::new(network, start_block, block_count as u32);
453
454 Era1File::new(group, id)
455 }
456
457 #[test]
458 fn test_era1_roundtrip_memory() -> Result<(), E2sError> {
459 let start_block = 1000;
461 let era1_file = create_test_era1_file(1000, 5, "testnet");
462
463 let mut buffer = Vec::new();
465 {
466 let mut writer = Era1Writer::new(&mut buffer);
467 writer.write_era1_file(&era1_file)?;
468 }
469
470 let mut reader = Era1Reader::new(Cursor::new(&buffer));
472 let read_era1 = reader.read("testnet".to_string())?;
473
474 assert_eq!(read_era1.id.network_name, "testnet");
476 assert_eq!(read_era1.id.start_block, 1000);
477 assert_eq!(read_era1.id.block_count, 5);
478 assert_eq!(read_era1.group.blocks.len(), 5);
479
480 assert_eq!(read_era1.group.blocks[0].total_difficulty.value, U256::from(1000 * 1000));
482 assert_eq!(read_era1.group.blocks[1].total_difficulty.value, U256::from(1001 * 1000));
483
484 assert_eq!(read_era1.group.blocks[0].header.data, vec![(start_block % 256) as u8; 32]);
486 assert_eq!(read_era1.group.blocks[0].body.data, vec![((start_block + 1) % 256) as u8; 64]);
487 assert_eq!(
488 read_era1.group.blocks[0].receipts.data,
489 vec![((start_block + 2) % 256) as u8; 32]
490 );
491
492 assert!(read_era1.contains_block(1000));
494 assert!(read_era1.contains_block(1004));
495 assert!(!read_era1.contains_block(999));
496 assert!(!read_era1.contains_block(1005));
497
498 let block_1002 = read_era1.get_block_by_number(1002);
499 assert!(block_1002.is_some());
500 assert_eq!(block_1002.unwrap().header.data, vec![((start_block + 2) % 256) as u8; 32]);
501
502 Ok(())
503 }
504
505 #[test]
506 fn test_era1_roundtrip_file() -> Result<(), E2sError> {
507 let temp_dir = tempdir().expect("Failed to create temp directory");
509 let file_path = temp_dir.path().join("test_roundtrip.era1");
510
511 let era1_file = create_test_era1_file(2000, 3, "mainnet");
513 Era1Writer::create(&file_path, &era1_file)?;
514
515 let read_era1 = Era1Reader::open(&file_path, "mainnet")?;
517
518 assert_eq!(read_era1.id.network_name, "mainnet");
520 assert_eq!(read_era1.id.start_block, 2000);
521 assert_eq!(read_era1.id.block_count, 3);
522 assert_eq!(read_era1.group.blocks.len(), 3);
523
524 for i in 0..3 {
526 let block_num = 2000 + i as u64;
527 let block = read_era1.get_block_by_number(block_num);
528 assert!(block.is_some());
529 assert_eq!(block.unwrap().header.data, vec![block_num as u8; 32]);
530 }
531
532 Ok(())
533 }
534}