1use crate::StreamBackfillJob;
2use reth_evm::ConfigureEvm;
3use std::{
4 ops::RangeInclusive,
5 time::{Duration, Instant},
6};
7
8use alloy_consensus::BlockHeader;
9use alloy_primitives::BlockNumber;
10use reth_ethereum_primitives::Receipt;
11use reth_evm::execute::{BlockExecutionError, BlockExecutionOutput, Executor};
12use reth_node_api::{Block as _, BlockBody as _, NodePrimitives};
13use reth_primitives_traits::{format_gas_throughput, RecoveredBlock, SignedTransaction};
14use reth_provider::{
15 BlockReader, Chain, ExecutionOutcome, HeaderProvider, ProviderError, StateProviderFactory,
16 TransactionVariant,
17};
18use reth_prune_types::PruneModes;
19use reth_revm::database::StateProviderDatabase;
20use reth_stages_api::ExecutionStageThresholds;
21use reth_tracing::tracing::{debug, trace};
22
23pub(super) type BackfillJobResult<T> = Result<T, BlockExecutionError>;
24
25#[derive(Debug)]
31pub struct BackfillJob<E, P> {
32 pub(crate) evm_config: E,
33 pub(crate) provider: P,
34 pub(crate) prune_modes: PruneModes,
35 pub(crate) thresholds: ExecutionStageThresholds,
36 pub(crate) range: RangeInclusive<BlockNumber>,
37 pub(crate) stream_parallelism: usize,
38}
39
40impl<E, P> Iterator for BackfillJob<E, P>
41where
42 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
43 P: HeaderProvider + BlockReader<Transaction: SignedTransaction> + StateProviderFactory,
44{
45 type Item = BackfillJobResult<Chain<E::Primitives>>;
46
47 fn next(&mut self) -> Option<Self::Item> {
48 if self.range.is_empty() {
49 return None
50 }
51
52 Some(self.execute_range())
53 }
54}
55
56impl<E, P> BackfillJob<E, P>
57where
58 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
59 P: BlockReader<Transaction: SignedTransaction> + HeaderProvider + StateProviderFactory,
60{
61 pub fn into_single_blocks(self) -> SingleBlockBackfillJob<E, P> {
63 self.into()
64 }
65
66 pub fn into_stream(self) -> StreamBackfillJob<E, P, Chain<E::Primitives>> {
68 self.into()
69 }
70
71 fn execute_range(&mut self) -> BackfillJobResult<Chain<E::Primitives>> {
72 debug!(
73 target: "exex::backfill",
74 range = ?self.range,
75 "Executing block range"
76 );
77
78 let mut executor = self.evm_config.batch_executor(StateProviderDatabase::new(
79 self.provider
80 .history_by_block_number(self.range.start().saturating_sub(1))
81 .map_err(BlockExecutionError::other)?,
82 ));
83
84 let mut fetch_block_duration = Duration::default();
85 let mut execution_duration = Duration::default();
86 let mut cumulative_gas = 0;
87 let batch_start = Instant::now();
88
89 let mut blocks = Vec::new();
90 let mut results = Vec::new();
91 for block_number in self.range.clone() {
92 let fetch_block_start = Instant::now();
94
95 let block = self
97 .provider
98 .sealed_block_with_senders(block_number.into(), TransactionVariant::WithHash)
99 .map_err(BlockExecutionError::other)?
100 .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))
101 .map_err(BlockExecutionError::other)?;
102
103 fetch_block_duration += fetch_block_start.elapsed();
104
105 cumulative_gas += block.gas_used();
106
107 trace!(target: "exex::backfill", number = block_number, txs = block.body().transactions().len(), "Executing block");
109
110 let execute_start = Instant::now();
112
113 let (block, senders) = block.split_sealed();
115 let (header, body) = block.split_sealed_header_body();
116 let block = P::Block::new_sealed(header, body).with_senders(senders);
117
118 results.push(executor.execute_one(&block)?);
119 execution_duration += execute_start.elapsed();
120
121 blocks.push(block);
125 if self.thresholds.is_end_of_batch(
127 block_number - *self.range.start(),
128 executor.size_hint() as u64,
129 cumulative_gas,
130 batch_start.elapsed(),
131 ) {
132 break
133 }
134 }
135
136 let first_block_number = blocks.first().expect("blocks should not be empty").number();
137 let last_block_number = blocks.last().expect("blocks should not be empty").number();
138 debug!(
139 target: "exex::backfill",
140 range = ?*self.range.start()..=last_block_number,
141 block_fetch = ?fetch_block_duration,
142 execution = ?execution_duration,
143 throughput = format_gas_throughput(cumulative_gas, execution_duration),
144 "Finished executing block range"
145 );
146 self.range = last_block_number + 1..=*self.range.end();
147
148 let outcome = ExecutionOutcome::from_blocks(
149 first_block_number,
150 executor.into_state().take_bundle(),
151 results,
152 );
153 let chain = Chain::new(blocks, outcome, None);
154 Ok(chain)
155 }
156}
157
158#[derive(Debug, Clone)]
163pub struct SingleBlockBackfillJob<E, P> {
164 pub(crate) evm_config: E,
165 pub(crate) provider: P,
166 pub(crate) range: RangeInclusive<BlockNumber>,
167 pub(crate) stream_parallelism: usize,
168}
169
170impl<E, P> Iterator for SingleBlockBackfillJob<E, P>
171where
172 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
173 P: HeaderProvider + BlockReader + StateProviderFactory,
174{
175 type Item = BackfillJobResult<(
176 RecoveredBlock<P::Block>,
177 BlockExecutionOutput<<E::Primitives as NodePrimitives>::Receipt>,
178 )>;
179
180 fn next(&mut self) -> Option<Self::Item> {
181 self.range.next().map(|block_number| self.execute_block(block_number))
182 }
183}
184
185impl<E, P> SingleBlockBackfillJob<E, P>
186where
187 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
188 P: HeaderProvider + BlockReader + StateProviderFactory,
189{
190 pub fn into_stream(
192 self,
193 ) -> StreamBackfillJob<
194 E,
195 P,
196 (RecoveredBlock<reth_ethereum_primitives::Block>, BlockExecutionOutput<Receipt>),
197 > {
198 self.into()
199 }
200
201 #[expect(clippy::type_complexity)]
202 pub(crate) fn execute_block(
203 &self,
204 block_number: u64,
205 ) -> BackfillJobResult<(
206 RecoveredBlock<P::Block>,
207 BlockExecutionOutput<<E::Primitives as NodePrimitives>::Receipt>,
208 )> {
209 let block_with_senders = self
211 .provider
212 .recovered_block(block_number.into(), TransactionVariant::WithHash)
213 .map_err(BlockExecutionError::other)?
214 .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))
215 .map_err(BlockExecutionError::other)?;
216
217 let executor = self.evm_config.batch_executor(StateProviderDatabase::new(
219 self.provider
220 .history_by_block_number(block_number.saturating_sub(1))
221 .map_err(BlockExecutionError::other)?,
222 ));
223
224 trace!(target: "exex::backfill", number = block_number, txs = block_with_senders.body().transaction_count(), "Executing block");
225
226 let block_execution_output = executor.execute(&block_with_senders)?;
227
228 Ok((block_with_senders, block_execution_output))
229 }
230}
231
232impl<E, P> From<BackfillJob<E, P>> for SingleBlockBackfillJob<E, P> {
233 fn from(job: BackfillJob<E, P>) -> Self {
234 Self {
235 evm_config: job.evm_config,
236 provider: job.provider,
237 range: job.range,
238 stream_parallelism: job.stream_parallelism,
239 }
240 }
241}
242
243#[cfg(test)]
244mod tests {
245 use crate::{
246 backfill::test_utils::{blocks_and_execution_outputs, chain_spec, to_execution_outcome},
247 BackfillJobFactory,
248 };
249 use reth_db_common::init::init_genesis;
250 use reth_evm_ethereum::execute::EthExecutorProvider;
251 use reth_primitives_traits::crypto::secp256k1::public_key_to_address;
252 use reth_provider::{
253 providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
254 };
255 use reth_testing_utils::generators;
256
257 #[test]
258 fn test_backfill() -> eyre::Result<()> {
259 reth_tracing::init_test_tracing();
260
261 let key_pair = generators::generate_key(&mut generators::rng());
263 let address = public_key_to_address(key_pair.public_key());
264
265 let chain_spec = chain_spec(address);
266
267 let executor = EthExecutorProvider::ethereum(chain_spec.clone());
268 let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
269 init_genesis(&provider_factory)?;
270 let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
271
272 let blocks_and_execution_outputs =
273 blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
274 let (block, block_execution_output) = blocks_and_execution_outputs.first().unwrap();
275 let execution_outcome = to_execution_outcome(block.number, block_execution_output);
276
277 let factory = BackfillJobFactory::new(executor, blockchain_db);
279 let job = factory.backfill(1..=1);
280 let chains = job.collect::<Result<Vec<_>, _>>()?;
281
282 assert_eq!(chains.len(), 1);
285 let mut chain = chains.into_iter().next().unwrap();
286 chain.execution_outcome_mut().bundle.reverts.sort();
287 assert_eq!(chain.blocks(), &[(1, block.clone())].into());
288 assert_eq!(chain.execution_outcome(), &execution_outcome);
289
290 Ok(())
291 }
292
293 #[test]
294 fn test_single_block_backfill() -> eyre::Result<()> {
295 reth_tracing::init_test_tracing();
296
297 let key_pair = generators::generate_key(&mut generators::rng());
299 let address = public_key_to_address(key_pair.public_key());
300
301 let chain_spec = chain_spec(address);
302
303 let executor = EthExecutorProvider::ethereum(chain_spec.clone());
304 let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
305 init_genesis(&provider_factory)?;
306 let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
307
308 let blocks_and_execution_outcomes =
309 blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
310
311 let factory = BackfillJobFactory::new(executor, blockchain_db);
313 let job = factory.backfill(1..=1);
314 let single_job = job.into_single_blocks();
315 let block_execution_it = single_job.into_iter();
316
317 let blocks_and_outcomes = block_execution_it.collect::<Vec<_>>();
319 assert_eq!(blocks_and_outcomes.len(), 1);
320
321 for (i, res) in blocks_and_outcomes.into_iter().enumerate() {
324 let (block, mut execution_output) = res?;
325 execution_output.state.reverts.sort();
326
327 let expected_block = blocks_and_execution_outcomes[i].0.clone();
328 let expected_output = &blocks_and_execution_outcomes[i].1;
329
330 assert_eq!(block, expected_block);
331 assert_eq!(&execution_output, expected_output);
332 }
333
334 Ok(())
335 }
336}