1use alloy_primitives::{keccak256, B256};
2use itertools::Itertools;
3use reth_config::config::{EtlConfig, HashingConfig};
4use reth_db::{tables, RawKey, RawTable, RawValue};
5use reth_db_api::{
6 cursor::{DbCursorRO, DbCursorRW},
7 transaction::{DbTx, DbTxMut},
8};
9use reth_etl::Collector;
10use reth_primitives::Account;
11use reth_provider::{AccountExtReader, DBProvider, HashingWriter, StatsReader};
12use reth_stages_api::{
13 AccountHashingCheckpoint, EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint,
14 StageError, StageId, UnwindInput, UnwindOutput,
15};
16use reth_storage_errors::provider::ProviderResult;
17use std::{
18 fmt::Debug,
19 ops::{Range, RangeInclusive},
20 sync::mpsc::{self, Receiver},
21};
22use tracing::*;
23
24const MAXIMUM_CHANNELS: usize = 10_000;
26
27const WORKER_CHUNK_SIZE: usize = 100;
29
30#[derive(Clone, Debug)]
33pub struct AccountHashingStage {
34 pub clean_threshold: u64,
37 pub commit_threshold: u64,
39 pub etl_config: EtlConfig,
41}
42
43impl AccountHashingStage {
44 pub const fn new(config: HashingConfig, etl_config: EtlConfig) -> Self {
46 Self {
47 clean_threshold: config.clean_threshold,
48 commit_threshold: config.commit_threshold,
49 etl_config,
50 }
51 }
52}
53
54#[cfg(any(test, feature = "test-utils"))]
55impl AccountHashingStage {
56 pub fn seed<Tx: DbTx + DbTxMut + 'static, N: reth_provider::providers::ProviderNodeTypes>(
62 provider: &reth_provider::DatabaseProvider<Tx, N>,
63 opts: SeedOpts,
64 ) -> Result<Vec<(alloy_primitives::Address, reth_primitives::Account)>, StageError>
65 where
66 N::Primitives: reth_primitives_traits::FullNodePrimitives<
67 BlockBody = reth_primitives::BlockBody,
68 BlockHeader = reth_primitives::Header,
69 >,
70 {
71 use alloy_primitives::U256;
72 use reth_db_api::models::AccountBeforeTx;
73 use reth_provider::{StaticFileProviderFactory, StaticFileWriter};
74 use reth_testing_utils::{
75 generators,
76 generators::{random_block_range, random_eoa_accounts, BlockRangeParams},
77 };
78
79 let mut rng = generators::rng();
80
81 let blocks = random_block_range(
82 &mut rng,
83 opts.blocks.clone(),
84 BlockRangeParams { parent: Some(B256::ZERO), tx_count: opts.txs, ..Default::default() },
85 );
86
87 for block in blocks {
88 provider.insert_historical_block(block.try_seal_with_senders().unwrap()).unwrap();
89 }
90 provider
91 .static_file_provider()
92 .latest_writer(reth_primitives::StaticFileSegment::Headers)
93 .unwrap()
94 .commit()
95 .unwrap();
96 let mut accounts = random_eoa_accounts(&mut rng, opts.accounts);
97 {
98 let mut account_cursor =
100 provider.tx_ref().cursor_write::<tables::PlainAccountState>()?;
101 accounts.sort_by(|a, b| a.0.cmp(&b.0));
102 for (addr, acc) in &accounts {
103 account_cursor.append(*addr, *acc)?;
104 }
105
106 let mut acc_changeset_cursor =
107 provider.tx_ref().cursor_write::<tables::AccountChangeSets>()?;
108 for (t, (addr, acc)) in opts.blocks.zip(&accounts) {
109 let Account { nonce, balance, .. } = acc;
110 let prev_acc = Account {
111 nonce: nonce - 1,
112 balance: balance - U256::from(1),
113 bytecode_hash: None,
114 };
115 let acc_before_tx = AccountBeforeTx { address: *addr, info: Some(prev_acc) };
116 acc_changeset_cursor.append(t, acc_before_tx)?;
117 }
118 }
119
120 Ok(accounts)
121 }
122}
123
124impl Default for AccountHashingStage {
125 fn default() -> Self {
126 Self {
127 clean_threshold: 500_000,
128 commit_threshold: 100_000,
129 etl_config: EtlConfig::default(),
130 }
131 }
132}
133
134impl<Provider> Stage<Provider> for AccountHashingStage
135where
136 Provider: DBProvider<Tx: DbTxMut> + HashingWriter + AccountExtReader + StatsReader,
137{
138 fn id(&self) -> StageId {
140 StageId::AccountHashing
141 }
142
143 fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
145 if input.target_reached() {
146 return Ok(ExecOutput::done(input.checkpoint()))
147 }
148
149 let (from_block, to_block) = input.next_block_range().into_inner();
150
151 if to_block - from_block > self.clean_threshold || from_block == 1 {
156 let tx = provider.tx_ref();
157
158 tx.clear::<tables::HashedAccounts>()?;
160
161 let mut accounts_cursor = tx.cursor_read::<RawTable<tables::PlainAccountState>>()?;
162 let mut collector =
163 Collector::new(self.etl_config.file_size, self.etl_config.dir.clone());
164 let mut channels = Vec::with_capacity(MAXIMUM_CHANNELS);
165
166 for chunk in &accounts_cursor.walk(None)?.chunks(WORKER_CHUNK_SIZE) {
168 let (tx, rx) = mpsc::channel();
170 channels.push(rx);
171
172 let chunk = chunk.collect::<Result<Vec<_>, _>>()?;
173 rayon::spawn(move || {
175 for (address, account) in chunk {
176 let address = address.key().unwrap();
177 let _ = tx.send((RawKey::new(keccak256(address)), account));
178 }
179 });
180
181 if !channels.is_empty() && channels.len() % MAXIMUM_CHANNELS == 0 {
183 collect(&mut channels, &mut collector)?;
184 }
185 }
186
187 collect(&mut channels, &mut collector)?;
188
189 let mut hashed_account_cursor =
190 tx.cursor_write::<RawTable<tables::HashedAccounts>>()?;
191
192 let total_hashes = collector.len();
193 let interval = (total_hashes / 10).max(1);
194 for (index, item) in collector.iter()?.enumerate() {
195 if index > 0 && index % interval == 0 {
196 info!(
197 target: "sync::stages::hashing_account",
198 progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
199 "Inserting hashes"
200 );
201 }
202
203 let (key, value) = item?;
204 hashed_account_cursor
205 .append(RawKey::<B256>::from_vec(key), RawValue::<Account>::from_vec(value))?;
206 }
207 } else {
208 let lists = provider.changed_accounts_with_range(from_block..=to_block)?;
211 let accounts = provider.basic_accounts(lists)?;
215 provider.insert_account_for_hashing(accounts)?;
217 }
218
219 let checkpoint = StageCheckpoint::new(input.target())
222 .with_account_hashing_stage_checkpoint(AccountHashingCheckpoint {
223 progress: stage_checkpoint_progress(provider)?,
224 ..Default::default()
225 });
226
227 Ok(ExecOutput { checkpoint, done: true })
228 }
229
230 fn unwind(
232 &mut self,
233 provider: &Provider,
234 input: UnwindInput,
235 ) -> Result<UnwindOutput, StageError> {
236 let (range, unwind_progress, _) =
237 input.unwind_block_range_with_threshold(self.commit_threshold);
238
239 provider.unwind_account_hashing_range(range)?;
241
242 let mut stage_checkpoint =
243 input.checkpoint.account_hashing_stage_checkpoint().unwrap_or_default();
244
245 stage_checkpoint.progress = stage_checkpoint_progress(provider)?;
246
247 Ok(UnwindOutput {
248 checkpoint: StageCheckpoint::new(unwind_progress)
249 .with_account_hashing_stage_checkpoint(stage_checkpoint),
250 })
251 }
252}
253
254fn collect(
256 channels: &mut Vec<Receiver<(RawKey<B256>, RawValue<Account>)>>,
257 collector: &mut Collector<RawKey<B256>, RawValue<Account>>,
258) -> Result<(), StageError> {
259 for channel in channels.iter_mut() {
260 while let Ok((key, v)) = channel.recv() {
261 collector.insert(key, v)?;
262 }
263 }
264 info!(target: "sync::stages::hashing_account", "Hashed {} entries", collector.len());
265 channels.clear();
266 Ok(())
267}
268
269#[derive(Clone, Debug)]
281pub struct SeedOpts {
282 pub blocks: RangeInclusive<u64>,
284 pub accounts: usize,
286 pub txs: Range<u8>,
288}
289
290fn stage_checkpoint_progress(provider: &impl StatsReader) -> ProviderResult<EntitiesCheckpoint> {
291 Ok(EntitiesCheckpoint {
292 processed: provider.count_entries::<tables::HashedAccounts>()? as u64,
293 total: provider.count_entries::<tables::PlainAccountState>()? as u64,
294 })
295}
296
297#[cfg(test)]
298mod tests {
299 use super::*;
300 use crate::test_utils::{
301 stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
302 UnwindStageTestRunner,
303 };
304 use alloy_primitives::U256;
305 use assert_matches::assert_matches;
306 use reth_primitives::Account;
307 use reth_provider::providers::StaticFileWriter;
308 use reth_stages_api::StageUnitCheckpoint;
309 use test_utils::*;
310
311 stage_test_suite_ext!(AccountHashingTestRunner, account_hashing);
312
313 #[tokio::test]
314 async fn execute_clean_account_hashing() {
315 let (previous_stage, stage_progress) = (20, 10);
316 let mut runner = AccountHashingTestRunner::default();
318 runner.set_clean_threshold(1);
319
320 let input = ExecInput {
321 target: Some(previous_stage),
322 checkpoint: Some(StageCheckpoint::new(stage_progress)),
323 };
324
325 runner.seed_execution(input).expect("failed to seed execution");
326
327 let rx = runner.execute(input);
328 let result = rx.await.unwrap();
329
330 assert_matches!(
331 result,
332 Ok(ExecOutput {
333 checkpoint: StageCheckpoint {
334 block_number,
335 stage_checkpoint: Some(StageUnitCheckpoint::Account(AccountHashingCheckpoint {
336 progress: EntitiesCheckpoint {
337 processed,
338 total,
339 },
340 ..
341 })),
342 },
343 done: true,
344 }) if block_number == previous_stage &&
345 processed == total &&
346 total == runner.db.table::<tables::PlainAccountState>().unwrap().len() as u64
347 );
348
349 assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
351 }
352
353 mod test_utils {
354 use super::*;
355 use crate::test_utils::TestStageDB;
356 use alloy_primitives::Address;
357 use reth_provider::DatabaseProviderFactory;
358
359 pub(crate) struct AccountHashingTestRunner {
360 pub(crate) db: TestStageDB,
361 commit_threshold: u64,
362 clean_threshold: u64,
363 etl_config: EtlConfig,
364 }
365
366 impl AccountHashingTestRunner {
367 pub(crate) fn set_clean_threshold(&mut self, threshold: u64) {
368 self.clean_threshold = threshold;
369 }
370
371 #[allow(dead_code)]
372 pub(crate) fn set_commit_threshold(&mut self, threshold: u64) {
373 self.commit_threshold = threshold;
374 }
375
376 pub(crate) fn check_hashed_accounts(&self) -> Result<(), TestRunnerError> {
379 self.db.query(|tx| {
380 let mut acc_cursor = tx.cursor_read::<tables::PlainAccountState>()?;
381 let mut hashed_acc_cursor = tx.cursor_read::<tables::HashedAccounts>()?;
382
383 while let Some((address, account)) = acc_cursor.next()? {
384 let hashed_addr = keccak256(address);
385 if let Some((_, acc)) = hashed_acc_cursor.seek_exact(hashed_addr)? {
386 assert_eq!(acc, account)
387 }
388 }
389 Ok(())
390 })?;
391
392 Ok(())
393 }
394
395 pub(crate) fn check_old_hashed_accounts(&self) -> Result<(), TestRunnerError> {
398 self.db.query(|tx| {
399 let mut acc_cursor = tx.cursor_read::<tables::PlainAccountState>()?;
400 let mut hashed_acc_cursor = tx.cursor_read::<tables::HashedAccounts>()?;
401
402 while let Some((address, account)) = acc_cursor.next()? {
403 let Account { nonce, balance, .. } = account;
404 let old_acc = Account {
405 nonce: nonce - 1,
406 balance: balance - U256::from(1),
407 bytecode_hash: None,
408 };
409 let hashed_addr = keccak256(address);
410 if let Some((_, acc)) = hashed_acc_cursor.seek_exact(hashed_addr)? {
411 assert_eq!(acc, old_acc)
412 }
413 }
414 Ok(())
415 })?;
416
417 Ok(())
418 }
419 }
420
421 impl Default for AccountHashingTestRunner {
422 fn default() -> Self {
423 Self {
424 db: TestStageDB::default(),
425 commit_threshold: 1000,
426 clean_threshold: 1000,
427 etl_config: EtlConfig::default(),
428 }
429 }
430 }
431
432 impl StageTestRunner for AccountHashingTestRunner {
433 type S = AccountHashingStage;
434
435 fn db(&self) -> &TestStageDB {
436 &self.db
437 }
438
439 fn stage(&self) -> Self::S {
440 Self::S {
441 commit_threshold: self.commit_threshold,
442 clean_threshold: self.clean_threshold,
443 etl_config: self.etl_config.clone(),
444 }
445 }
446 }
447
448 impl ExecuteStageTestRunner for AccountHashingTestRunner {
449 type Seed = Vec<(Address, Account)>;
450
451 fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
452 let provider = self.db.factory.database_provider_rw()?;
453 let res = Ok(AccountHashingStage::seed(
454 &provider,
455 SeedOpts { blocks: 1..=input.target(), accounts: 10, txs: 0..3 },
456 )
457 .unwrap());
458 provider.commit().expect("failed to commit");
459 res
460 }
461
462 fn validate_execution(
463 &self,
464 input: ExecInput,
465 output: Option<ExecOutput>,
466 ) -> Result<(), TestRunnerError> {
467 if let Some(output) = output {
468 let start_block = input.next_block();
469 let end_block = output.checkpoint.block_number;
470 if start_block > end_block {
471 return Ok(())
472 }
473 }
474 self.check_hashed_accounts()
475 }
476 }
477
478 impl UnwindStageTestRunner for AccountHashingTestRunner {
479 fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> {
480 self.check_old_hashed_accounts()
481 }
482 }
483 }
484}