reth_era_utils/
history.rs

1use alloy_primitives::{BlockHash, BlockNumber};
2use futures_util::{Stream, StreamExt};
3use reth_db_api::{
4    cursor::{DbCursorRO, DbCursorRW},
5    table::Value,
6    tables,
7    transaction::{DbTx, DbTxMut},
8    RawKey, RawTable, RawValue,
9};
10use reth_era::{era1_file::Era1Reader, execution_types::DecodeCompressed};
11use reth_era_downloader::EraMeta;
12use reth_etl::Collector;
13use reth_fs_util as fs;
14use reth_primitives_traits::{Block, FullBlockBody, FullBlockHeader, NodePrimitives};
15use reth_provider::{
16    BlockWriter, ProviderError, StaticFileProviderFactory, StaticFileSegment, StaticFileWriter,
17};
18use reth_storage_api::{DBProvider, HeaderProvider, NodePrimitivesProvider, StorageLocation};
19use std::sync::mpsc;
20use tracing::info;
21
22/// Imports blocks from `downloader` using `provider`.
23///
24/// Returns current block height.
25pub fn import<Downloader, Era, P, B, BB, BH>(
26    mut downloader: Downloader,
27    provider: &P,
28    mut hash_collector: Collector<BlockHash, BlockNumber>,
29) -> eyre::Result<BlockNumber>
30where
31    B: Block<Header = BH, Body = BB>,
32    BH: FullBlockHeader + Value,
33    BB: FullBlockBody<
34        Transaction = <<P as NodePrimitivesProvider>::Primitives as NodePrimitives>::SignedTx,
35        OmmerHeader = BH,
36    >,
37    Downloader: Stream<Item = eyre::Result<Era>> + Send + 'static + Unpin,
38    Era: EraMeta + Send + 'static,
39    P: DBProvider<Tx: DbTxMut> + StaticFileProviderFactory + BlockWriter<Block = B>,
40    <P as NodePrimitivesProvider>::Primitives: NodePrimitives<BlockHeader = BH, BlockBody = BB>,
41{
42    let (tx, rx) = mpsc::channel();
43
44    // Handle IO-bound async download in a background tokio task
45    tokio::spawn(async move {
46        while let Some(file) = downloader.next().await {
47            tx.send(Some(file))?;
48        }
49        tx.send(None)
50    });
51
52    let static_file_provider = provider.static_file_provider();
53
54    // Consistency check of expected headers in static files vs DB is done on provider::sync_gap
55    // when poll_execute_ready is polled.
56    let mut last_header_number = static_file_provider
57        .get_highest_static_file_block(StaticFileSegment::Headers)
58        .unwrap_or_default();
59
60    // Find the latest total difficulty
61    let mut td = static_file_provider
62        .header_td_by_number(last_header_number)?
63        .ok_or(ProviderError::TotalDifficultyNotFound(last_header_number))?;
64
65    // Although headers were downloaded in reverse order, the collector iterates it in ascending
66    // order
67    let mut writer = static_file_provider.latest_writer(StaticFileSegment::Headers)?;
68
69    while let Some(meta) = rx.recv()? {
70        let meta = meta?;
71        let file = fs::open(meta.as_ref())?;
72        let mut reader = Era1Reader::new(file);
73
74        for block in reader.iter() {
75            let block = block?;
76            let header: BH = block.header.decode()?;
77            let body: BB = block.body.decode()?;
78            let number = header.number();
79
80            if number == 0 {
81                continue;
82            }
83
84            let hash = header.hash_slow();
85            last_header_number = number;
86
87            // Increase total difficulty
88            td += header.difficulty();
89
90            // Append to Headers segment
91            writer.append_header(&header, td, &hash)?;
92
93            // Write bodies to database.
94            provider.append_block_bodies(
95                vec![(header.number(), Some(body))],
96                // We are writing transactions directly to static files.
97                StorageLocation::StaticFiles,
98            )?;
99
100            hash_collector.insert(hash, number)?;
101        }
102
103        info!(target: "era::history::import", "Processed {}", meta.as_ref().to_string_lossy());
104
105        meta.mark_as_processed()?;
106    }
107
108    let total_headers = hash_collector.len();
109    info!(target: "era::history::import", total = total_headers, "Writing headers hash index");
110
111    // Database cursor for hash to number index
112    let mut cursor_header_numbers =
113        provider.tx_ref().cursor_write::<RawTable<tables::HeaderNumbers>>()?;
114    let mut first_sync = false;
115
116    // If we only have the genesis block hash, then we are at first sync, and we can remove it,
117    // add it to the collector and use tx.append on all hashes.
118    if provider.tx_ref().entries::<RawTable<tables::HeaderNumbers>>()? == 1 {
119        if let Some((hash, block_number)) = cursor_header_numbers.last()? {
120            if block_number.value()? == 0 {
121                hash_collector.insert(hash.key()?, 0)?;
122                cursor_header_numbers.delete_current()?;
123                first_sync = true;
124            }
125        }
126    }
127
128    let interval = (total_headers / 10).max(1);
129
130    // Build block hash to block number index
131    for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
132        let (hash, number) = hash_to_number?;
133
134        if index > 0 && index % interval == 0 && total_headers > 100 {
135            info!(target: "era::history::import", progress = %format!("{:.2}%", (index as f64 / total_headers as f64) * 100.0), "Writing headers hash index");
136        }
137
138        let hash = RawKey::<BlockHash>::from_vec(hash);
139        let number = RawValue::<BlockNumber>::from_vec(number);
140
141        if first_sync {
142            cursor_header_numbers.append(hash, &number)?;
143        } else {
144            cursor_header_numbers.upsert(hash, &number)?;
145        }
146    }
147
148    Ok(last_header_number)
149}