reth_stages/stages/
hashing_account.rs

1use alloy_primitives::{keccak256, B256};
2use itertools::Itertools;
3use reth_config::config::{EtlConfig, HashingConfig};
4use reth_db::{tables, RawKey, RawTable, RawValue};
5use reth_db_api::{
6    cursor::{DbCursorRO, DbCursorRW},
7    transaction::{DbTx, DbTxMut},
8};
9use reth_etl::Collector;
10use reth_primitives::Account;
11use reth_provider::{AccountExtReader, DBProvider, HashingWriter, StatsReader};
12use reth_stages_api::{
13    AccountHashingCheckpoint, EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint,
14    StageError, StageId, UnwindInput, UnwindOutput,
15};
16use reth_storage_errors::provider::ProviderResult;
17use std::{
18    fmt::Debug,
19    ops::{Range, RangeInclusive},
20    sync::mpsc::{self, Receiver},
21};
22use tracing::*;
23
24/// Maximum number of channels that can exist in memory.
25const MAXIMUM_CHANNELS: usize = 10_000;
26
27/// Maximum number of accounts to hash per rayon worker job.
28const WORKER_CHUNK_SIZE: usize = 100;
29
30/// Account hashing stage hashes plain account.
31/// This is preparation before generating intermediate hashes and calculating Merkle tree root.
32#[derive(Clone, Debug)]
33pub struct AccountHashingStage {
34    /// The threshold (in number of blocks) for switching between incremental
35    /// hashing and full storage hashing.
36    pub clean_threshold: u64,
37    /// The maximum number of accounts to process before committing during unwind.
38    pub commit_threshold: u64,
39    /// ETL configuration
40    pub etl_config: EtlConfig,
41}
42
43impl AccountHashingStage {
44    /// Create new instance of [`AccountHashingStage`].
45    pub const fn new(config: HashingConfig, etl_config: EtlConfig) -> Self {
46        Self {
47            clean_threshold: config.clean_threshold,
48            commit_threshold: config.commit_threshold,
49            etl_config,
50        }
51    }
52}
53
54#[cfg(any(test, feature = "test-utils"))]
55impl AccountHashingStage {
56    /// Initializes the `PlainAccountState` table with `num_accounts` having some random state
57    /// at the target block, with `txs_range` transactions in each block.
58    ///
59    /// Proceeds to go to the `BlockTransitionIndex` end, go back `transitions` and change the
60    /// account state in the `AccountChangeSets` table.
61    pub fn seed<Tx: DbTx + DbTxMut + 'static, N: reth_provider::providers::ProviderNodeTypes>(
62        provider: &reth_provider::DatabaseProvider<Tx, N>,
63        opts: SeedOpts,
64    ) -> Result<Vec<(alloy_primitives::Address, reth_primitives::Account)>, StageError>
65    where
66        N::Primitives: reth_primitives_traits::FullNodePrimitives<
67            BlockBody = reth_primitives::BlockBody,
68            BlockHeader = reth_primitives::Header,
69        >,
70    {
71        use alloy_primitives::U256;
72        use reth_db_api::models::AccountBeforeTx;
73        use reth_provider::{StaticFileProviderFactory, StaticFileWriter};
74        use reth_testing_utils::{
75            generators,
76            generators::{random_block_range, random_eoa_accounts, BlockRangeParams},
77        };
78
79        let mut rng = generators::rng();
80
81        let blocks = random_block_range(
82            &mut rng,
83            opts.blocks.clone(),
84            BlockRangeParams { parent: Some(B256::ZERO), tx_count: opts.txs, ..Default::default() },
85        );
86
87        for block in blocks {
88            provider.insert_historical_block(block.try_seal_with_senders().unwrap()).unwrap();
89        }
90        provider
91            .static_file_provider()
92            .latest_writer(reth_primitives::StaticFileSegment::Headers)
93            .unwrap()
94            .commit()
95            .unwrap();
96        let mut accounts = random_eoa_accounts(&mut rng, opts.accounts);
97        {
98            // Account State generator
99            let mut account_cursor =
100                provider.tx_ref().cursor_write::<tables::PlainAccountState>()?;
101            accounts.sort_by(|a, b| a.0.cmp(&b.0));
102            for (addr, acc) in &accounts {
103                account_cursor.append(*addr, *acc)?;
104            }
105
106            let mut acc_changeset_cursor =
107                provider.tx_ref().cursor_write::<tables::AccountChangeSets>()?;
108            for (t, (addr, acc)) in opts.blocks.zip(&accounts) {
109                let Account { nonce, balance, .. } = acc;
110                let prev_acc = Account {
111                    nonce: nonce - 1,
112                    balance: balance - U256::from(1),
113                    bytecode_hash: None,
114                };
115                let acc_before_tx = AccountBeforeTx { address: *addr, info: Some(prev_acc) };
116                acc_changeset_cursor.append(t, acc_before_tx)?;
117            }
118        }
119
120        Ok(accounts)
121    }
122}
123
124impl Default for AccountHashingStage {
125    fn default() -> Self {
126        Self {
127            clean_threshold: 500_000,
128            commit_threshold: 100_000,
129            etl_config: EtlConfig::default(),
130        }
131    }
132}
133
134impl<Provider> Stage<Provider> for AccountHashingStage
135where
136    Provider: DBProvider<Tx: DbTxMut> + HashingWriter + AccountExtReader + StatsReader,
137{
138    /// Return the id of the stage
139    fn id(&self) -> StageId {
140        StageId::AccountHashing
141    }
142
143    /// Execute the stage.
144    fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
145        if input.target_reached() {
146            return Ok(ExecOutput::done(input.checkpoint()))
147        }
148
149        let (from_block, to_block) = input.next_block_range().into_inner();
150
151        // if there are more blocks then threshold it is faster to go over Plain state and hash all
152        // account otherwise take changesets aggregate the sets and apply hashing to
153        // AccountHashing table. Also, if we start from genesis, we need to hash from scratch, as
154        // genesis accounts are not in changeset.
155        if to_block - from_block > self.clean_threshold || from_block == 1 {
156            let tx = provider.tx_ref();
157
158            // clear table, load all accounts and hash it
159            tx.clear::<tables::HashedAccounts>()?;
160
161            let mut accounts_cursor = tx.cursor_read::<RawTable<tables::PlainAccountState>>()?;
162            let mut collector =
163                Collector::new(self.etl_config.file_size, self.etl_config.dir.clone());
164            let mut channels = Vec::with_capacity(MAXIMUM_CHANNELS);
165
166            // channels used to return result of account hashing
167            for chunk in &accounts_cursor.walk(None)?.chunks(WORKER_CHUNK_SIZE) {
168                // An _unordered_ channel to receive results from a rayon job
169                let (tx, rx) = mpsc::channel();
170                channels.push(rx);
171
172                let chunk = chunk.collect::<Result<Vec<_>, _>>()?;
173                // Spawn the hashing task onto the global rayon pool
174                rayon::spawn(move || {
175                    for (address, account) in chunk {
176                        let address = address.key().unwrap();
177                        let _ = tx.send((RawKey::new(keccak256(address)), account));
178                    }
179                });
180
181                // Flush to ETL when channels length reaches MAXIMUM_CHANNELS
182                if !channels.is_empty() && channels.len() % MAXIMUM_CHANNELS == 0 {
183                    collect(&mut channels, &mut collector)?;
184                }
185            }
186
187            collect(&mut channels, &mut collector)?;
188
189            let mut hashed_account_cursor =
190                tx.cursor_write::<RawTable<tables::HashedAccounts>>()?;
191
192            let total_hashes = collector.len();
193            let interval = (total_hashes / 10).max(1);
194            for (index, item) in collector.iter()?.enumerate() {
195                if index > 0 && index % interval == 0 {
196                    info!(
197                        target: "sync::stages::hashing_account",
198                        progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
199                        "Inserting hashes"
200                    );
201                }
202
203                let (key, value) = item?;
204                hashed_account_cursor
205                    .append(RawKey::<B256>::from_vec(key), RawValue::<Account>::from_vec(value))?;
206            }
207        } else {
208            // Aggregate all transition changesets and make a list of accounts that have been
209            // changed.
210            let lists = provider.changed_accounts_with_range(from_block..=to_block)?;
211            // Iterate over plain state and get newest value.
212            // Assumption we are okay to make is that plainstate represent
213            // `previous_stage_progress` state.
214            let accounts = provider.basic_accounts(lists)?;
215            // Insert and hash accounts to hashing table
216            provider.insert_account_for_hashing(accounts)?;
217        }
218
219        // We finished the hashing stage, no future iterations is expected for the same block range,
220        // so no checkpoint is needed.
221        let checkpoint = StageCheckpoint::new(input.target())
222            .with_account_hashing_stage_checkpoint(AccountHashingCheckpoint {
223                progress: stage_checkpoint_progress(provider)?,
224                ..Default::default()
225            });
226
227        Ok(ExecOutput { checkpoint, done: true })
228    }
229
230    /// Unwind the stage.
231    fn unwind(
232        &mut self,
233        provider: &Provider,
234        input: UnwindInput,
235    ) -> Result<UnwindOutput, StageError> {
236        let (range, unwind_progress, _) =
237            input.unwind_block_range_with_threshold(self.commit_threshold);
238
239        // Aggregate all transition changesets and make a list of accounts that have been changed.
240        provider.unwind_account_hashing_range(range)?;
241
242        let mut stage_checkpoint =
243            input.checkpoint.account_hashing_stage_checkpoint().unwrap_or_default();
244
245        stage_checkpoint.progress = stage_checkpoint_progress(provider)?;
246
247        Ok(UnwindOutput {
248            checkpoint: StageCheckpoint::new(unwind_progress)
249                .with_account_hashing_stage_checkpoint(stage_checkpoint),
250        })
251    }
252}
253
254/// Flushes channels hashes to ETL collector.
255fn collect(
256    channels: &mut Vec<Receiver<(RawKey<B256>, RawValue<Account>)>>,
257    collector: &mut Collector<RawKey<B256>, RawValue<Account>>,
258) -> Result<(), StageError> {
259    for channel in channels.iter_mut() {
260        while let Ok((key, v)) = channel.recv() {
261            collector.insert(key, v)?;
262        }
263    }
264    info!(target: "sync::stages::hashing_account", "Hashed {} entries", collector.len());
265    channels.clear();
266    Ok(())
267}
268
269// TODO: Rewrite this
270/// `SeedOpts` provides configuration parameters for calling `AccountHashingStage::seed`
271/// in unit tests or benchmarks to generate an initial database state for running the
272/// stage.
273///
274/// In order to check the "full hashing" mode of the stage you want to generate more
275/// transitions than `AccountHashingStage.clean_threshold`. This requires:
276/// 1. Creating enough blocks so there's enough transactions to generate the required transition
277///    keys in the `BlockTransitionIndex` (which depends on the `TxTransitionIndex` internally)
278/// 2. Setting `blocks.len() > clean_threshold` so that there's enough diffs to actually take the
279///    2nd codepath
280#[derive(Clone, Debug)]
281pub struct SeedOpts {
282    /// The range of blocks to be generated
283    pub blocks: RangeInclusive<u64>,
284    /// The number of accounts to be generated
285    pub accounts: usize,
286    /// The range of transactions to be generated per block.
287    pub txs: Range<u8>,
288}
289
290fn stage_checkpoint_progress(provider: &impl StatsReader) -> ProviderResult<EntitiesCheckpoint> {
291    Ok(EntitiesCheckpoint {
292        processed: provider.count_entries::<tables::HashedAccounts>()? as u64,
293        total: provider.count_entries::<tables::PlainAccountState>()? as u64,
294    })
295}
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300    use crate::test_utils::{
301        stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
302        UnwindStageTestRunner,
303    };
304    use alloy_primitives::U256;
305    use assert_matches::assert_matches;
306    use reth_primitives::Account;
307    use reth_provider::providers::StaticFileWriter;
308    use reth_stages_api::StageUnitCheckpoint;
309    use test_utils::*;
310
311    stage_test_suite_ext!(AccountHashingTestRunner, account_hashing);
312
313    #[tokio::test]
314    async fn execute_clean_account_hashing() {
315        let (previous_stage, stage_progress) = (20, 10);
316        // Set up the runner
317        let mut runner = AccountHashingTestRunner::default();
318        runner.set_clean_threshold(1);
319
320        let input = ExecInput {
321            target: Some(previous_stage),
322            checkpoint: Some(StageCheckpoint::new(stage_progress)),
323        };
324
325        runner.seed_execution(input).expect("failed to seed execution");
326
327        let rx = runner.execute(input);
328        let result = rx.await.unwrap();
329
330        assert_matches!(
331            result,
332            Ok(ExecOutput {
333                checkpoint: StageCheckpoint {
334                    block_number,
335                    stage_checkpoint: Some(StageUnitCheckpoint::Account(AccountHashingCheckpoint {
336                        progress: EntitiesCheckpoint {
337                            processed,
338                            total,
339                        },
340                        ..
341                    })),
342                },
343                done: true,
344            }) if block_number == previous_stage &&
345                processed == total &&
346                total == runner.db.table::<tables::PlainAccountState>().unwrap().len() as u64
347        );
348
349        // Validate the stage execution
350        assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
351    }
352
353    mod test_utils {
354        use super::*;
355        use crate::test_utils::TestStageDB;
356        use alloy_primitives::Address;
357        use reth_provider::DatabaseProviderFactory;
358
359        pub(crate) struct AccountHashingTestRunner {
360            pub(crate) db: TestStageDB,
361            commit_threshold: u64,
362            clean_threshold: u64,
363            etl_config: EtlConfig,
364        }
365
366        impl AccountHashingTestRunner {
367            pub(crate) fn set_clean_threshold(&mut self, threshold: u64) {
368                self.clean_threshold = threshold;
369            }
370
371            #[allow(dead_code)]
372            pub(crate) fn set_commit_threshold(&mut self, threshold: u64) {
373                self.commit_threshold = threshold;
374            }
375
376            /// Iterates over `PlainAccount` table and checks that the accounts match the ones
377            /// in the `HashedAccounts` table
378            pub(crate) fn check_hashed_accounts(&self) -> Result<(), TestRunnerError> {
379                self.db.query(|tx| {
380                    let mut acc_cursor = tx.cursor_read::<tables::PlainAccountState>()?;
381                    let mut hashed_acc_cursor = tx.cursor_read::<tables::HashedAccounts>()?;
382
383                    while let Some((address, account)) = acc_cursor.next()? {
384                        let hashed_addr = keccak256(address);
385                        if let Some((_, acc)) = hashed_acc_cursor.seek_exact(hashed_addr)? {
386                            assert_eq!(acc, account)
387                        }
388                    }
389                    Ok(())
390                })?;
391
392                Ok(())
393            }
394
395            /// Same as `check_hashed_accounts`, only that checks with the old account state,
396            /// namely, the same account with nonce - 1 and balance - 1.
397            pub(crate) fn check_old_hashed_accounts(&self) -> Result<(), TestRunnerError> {
398                self.db.query(|tx| {
399                    let mut acc_cursor = tx.cursor_read::<tables::PlainAccountState>()?;
400                    let mut hashed_acc_cursor = tx.cursor_read::<tables::HashedAccounts>()?;
401
402                    while let Some((address, account)) = acc_cursor.next()? {
403                        let Account { nonce, balance, .. } = account;
404                        let old_acc = Account {
405                            nonce: nonce - 1,
406                            balance: balance - U256::from(1),
407                            bytecode_hash: None,
408                        };
409                        let hashed_addr = keccak256(address);
410                        if let Some((_, acc)) = hashed_acc_cursor.seek_exact(hashed_addr)? {
411                            assert_eq!(acc, old_acc)
412                        }
413                    }
414                    Ok(())
415                })?;
416
417                Ok(())
418            }
419        }
420
421        impl Default for AccountHashingTestRunner {
422            fn default() -> Self {
423                Self {
424                    db: TestStageDB::default(),
425                    commit_threshold: 1000,
426                    clean_threshold: 1000,
427                    etl_config: EtlConfig::default(),
428                }
429            }
430        }
431
432        impl StageTestRunner for AccountHashingTestRunner {
433            type S = AccountHashingStage;
434
435            fn db(&self) -> &TestStageDB {
436                &self.db
437            }
438
439            fn stage(&self) -> Self::S {
440                Self::S {
441                    commit_threshold: self.commit_threshold,
442                    clean_threshold: self.clean_threshold,
443                    etl_config: self.etl_config.clone(),
444                }
445            }
446        }
447
448        impl ExecuteStageTestRunner for AccountHashingTestRunner {
449            type Seed = Vec<(Address, Account)>;
450
451            fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
452                let provider = self.db.factory.database_provider_rw()?;
453                let res = Ok(AccountHashingStage::seed(
454                    &provider,
455                    SeedOpts { blocks: 1..=input.target(), accounts: 10, txs: 0..3 },
456                )
457                .unwrap());
458                provider.commit().expect("failed to commit");
459                res
460            }
461
462            fn validate_execution(
463                &self,
464                input: ExecInput,
465                output: Option<ExecOutput>,
466            ) -> Result<(), TestRunnerError> {
467                if let Some(output) = output {
468                    let start_block = input.next_block();
469                    let end_block = output.checkpoint.block_number;
470                    if start_block > end_block {
471                        return Ok(())
472                    }
473                }
474                self.check_hashed_accounts()
475            }
476        }
477
478        impl UnwindStageTestRunner for AccountHashingTestRunner {
479            fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> {
480                self.check_old_hashed_accounts()
481            }
482        }
483    }
484}