reth_era_utils/
history.rs1use alloy_primitives::{BlockHash, BlockNumber};
2use futures_util::{Stream, StreamExt};
3use reth_db_api::{
4 cursor::{DbCursorRO, DbCursorRW},
5 table::Value,
6 tables,
7 transaction::{DbTx, DbTxMut},
8 RawKey, RawTable, RawValue,
9};
10use reth_era::{era1_file::Era1Reader, execution_types::DecodeCompressed};
11use reth_era_downloader::EraMeta;
12use reth_etl::Collector;
13use reth_fs_util as fs;
14use reth_primitives_traits::{Block, FullBlockBody, FullBlockHeader, NodePrimitives};
15use reth_provider::{
16 BlockWriter, ProviderError, StaticFileProviderFactory, StaticFileSegment, StaticFileWriter,
17};
18use reth_storage_api::{DBProvider, HeaderProvider, NodePrimitivesProvider, StorageLocation};
19use std::sync::mpsc;
20use tracing::info;
21
22pub fn import<Downloader, Era, P, B, BB, BH>(
26 mut downloader: Downloader,
27 provider: &P,
28 mut hash_collector: Collector<BlockHash, BlockNumber>,
29) -> eyre::Result<BlockNumber>
30where
31 B: Block<Header = BH, Body = BB>,
32 BH: FullBlockHeader + Value,
33 BB: FullBlockBody<
34 Transaction = <<P as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
35 OmmerHeader = BH,
36 >,
37 Downloader: Stream<Item = eyre::Result<Era>> + Send + 'static + Unpin,
38 Era: EraMeta + Send + 'static,
39 P: DBProvider<Tx: DbTxMut> + StaticFileProviderFactory + BlockWriter<Block = B>,
40 <P as NodePrimitivesProvider>::Primitives: NodePrimitives<BlockHeader = BH, BlockBody = BB>,
41{
42 let (tx, rx) = mpsc::channel();
43
44 tokio::spawn(async move {
46 while let Some(file) = downloader.next().await {
47 tx.send(Some(file))?;
48 }
49 tx.send(None)
50 });
51
52 let static_file_provider = provider.static_file_provider();
53
54 let mut last_header_number = static_file_provider
57 .get_highest_static_file_block(StaticFileSegment::Headers)
58 .unwrap_or_default();
59
60 let mut td = static_file_provider
62 .header_td_by_number(last_header_number)?
63 .ok_or(ProviderError::TotalDifficultyNotFound(last_header_number))?;
64
65 let mut writer = static_file_provider.latest_writer(StaticFileSegment::Headers)?;
68
69 while let Some(meta) = rx.recv()? {
70 let meta = meta?;
71 let file = fs::open(meta.as_ref())?;
72 let mut reader = Era1Reader::new(file);
73
74 for block in reader.iter() {
75 let block = block?;
76 let header: BH = block.header.decode()?;
77 let body: BB = block.body.decode()?;
78 let number = header.number();
79
80 if number == 0 {
81 continue;
82 }
83
84 let hash = header.hash_slow();
85 last_header_number = number;
86
87 td += header.difficulty();
89
90 writer.append_header(&header, td, &hash)?;
92
93 provider.append_block_bodies(
95 vec![(header.number(), Some(body))],
96 StorageLocation::StaticFiles,
98 )?;
99
100 hash_collector.insert(hash, number)?;
101 }
102
103 info!(target: "era::history::import", "Processed {}", meta.as_ref().to_string_lossy());
104
105 meta.mark_as_processed()?;
106 }
107
108 let total_headers = hash_collector.len();
109 info!(target: "era::history::import", total = total_headers, "Writing headers hash index");
110
111 let mut cursor_header_numbers =
113 provider.tx_ref().cursor_write::<RawTable<tables::HeaderNumbers>>()?;
114 let mut first_sync = false;
115
116 if provider.tx_ref().entries::<RawTable<tables::HeaderNumbers>>()? == 1 {
119 if let Some((hash, block_number)) = cursor_header_numbers.last()? {
120 if block_number.value()? == 0 {
121 hash_collector.insert(hash.key()?, 0)?;
122 cursor_header_numbers.delete_current()?;
123 first_sync = true;
124 }
125 }
126 }
127
128 let interval = (total_headers / 10).max(1);
129
130 for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
132 let (hash, number) = hash_to_number?;
133
134 if index > 0 && index % interval == 0 && total_headers > 100 {
135 info!(target: "era::history::import", progress = %format!("{:.2}%", (index as f64 / total_headers as f64) * 100.0), "Writing headers hash index");
136 }
137
138 let hash = RawKey::<BlockHash>::from_vec(hash);
139 let number = RawValue::<BlockNumber>::from_vec(number);
140
141 if first_sync {
142 cursor_header_numbers.append(hash, &number)?;
143 } else {
144 cursor_header_numbers.upsert(hash, &number)?;
145 }
146 }
147
148 Ok(last_header_number)
149}