1use crate::StreamBackfillJob;
2use std::{
3 ops::RangeInclusive,
4 time::{Duration, Instant},
5};
6
7use alloy_consensus::BlockHeader;
8use alloy_primitives::BlockNumber;
9use reth_evm::execute::{
10 BatchExecutor, BlockExecutionError, BlockExecutionOutput, BlockExecutorProvider, Executor,
11};
12use reth_node_api::{Block as _, BlockBody as _, NodePrimitives};
13use reth_primitives::{BlockExt, BlockWithSenders, Receipt};
14use reth_primitives_traits::{format_gas_throughput, SignedTransaction};
15use reth_provider::{
16 BlockReader, Chain, HeaderProvider, ProviderError, StateProviderFactory, TransactionVariant,
17};
18use reth_prune_types::PruneModes;
19use reth_revm::database::StateProviderDatabase;
20use reth_stages_api::ExecutionStageThresholds;
21use reth_tracing::tracing::{debug, trace};
22
23pub(super) type BackfillJobResult<T> = Result<T, BlockExecutionError>;
24
25#[derive(Debug)]
31pub struct BackfillJob<E, P> {
32 pub(crate) executor: E,
33 pub(crate) provider: P,
34 pub(crate) prune_modes: PruneModes,
35 pub(crate) thresholds: ExecutionStageThresholds,
36 pub(crate) range: RangeInclusive<BlockNumber>,
37 pub(crate) stream_parallelism: usize,
38}
39
40impl<E, P> Iterator for BackfillJob<E, P>
41where
42 E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>,
43 P: HeaderProvider + BlockReader<Transaction: SignedTransaction> + StateProviderFactory,
44{
45 type Item = BackfillJobResult<Chain<E::Primitives>>;
46
47 fn next(&mut self) -> Option<Self::Item> {
48 if self.range.is_empty() {
49 return None
50 }
51
52 Some(self.execute_range())
53 }
54}
55
56impl<E, P> BackfillJob<E, P>
57where
58 E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>,
59 P: BlockReader<Transaction: SignedTransaction> + HeaderProvider + StateProviderFactory,
60{
61 pub fn into_single_blocks(self) -> SingleBlockBackfillJob<E, P> {
63 self.into()
64 }
65
66 pub fn into_stream(self) -> StreamBackfillJob<E, P, Chain<E::Primitives>> {
68 self.into()
69 }
70
71 fn execute_range(&mut self) -> BackfillJobResult<Chain<E::Primitives>> {
72 debug!(
73 target: "exex::backfill",
74 range = ?self.range,
75 "Executing block range"
76 );
77
78 let mut executor = self.executor.batch_executor(StateProviderDatabase::new(
79 self.provider.history_by_block_number(self.range.start().saturating_sub(1))?,
80 ));
81 executor.set_prune_modes(self.prune_modes.clone());
82
83 let mut fetch_block_duration = Duration::default();
84 let mut execution_duration = Duration::default();
85 let mut cumulative_gas = 0;
86 let batch_start = Instant::now();
87
88 let mut blocks = Vec::new();
89 for block_number in self.range.clone() {
90 let fetch_block_start = Instant::now();
92
93 let td = self
94 .provider
95 .header_td_by_number(block_number)?
96 .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?;
97
98 let block = self
100 .provider
101 .sealed_block_with_senders(block_number.into(), TransactionVariant::WithHash)?
102 .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?;
103
104 fetch_block_duration += fetch_block_start.elapsed();
105
106 cumulative_gas += block.gas_used();
107
108 trace!(target: "exex::backfill", number = block_number, txs = block.body.transactions().len(), "Executing block");
110
111 let execute_start = Instant::now();
113
114 let (block, senders) = block.into_components();
116 let (unsealed_header, hash) = block.header.split();
117 let block = P::Block::new(unsealed_header, block.body).with_senders_unchecked(senders);
118
119 executor.execute_and_verify_one((&block, td).into())?;
120 execution_duration += execute_start.elapsed();
121
122 blocks.push(block.seal(hash));
126
127 let bundle_size_hint = executor.size_hint().unwrap_or_default() as u64;
129 if self.thresholds.is_end_of_batch(
130 block_number - *self.range.start(),
131 bundle_size_hint,
132 cumulative_gas,
133 batch_start.elapsed(),
134 ) {
135 break
136 }
137 }
138
139 let last_block_number = blocks.last().expect("blocks should not be empty").number();
140 debug!(
141 target: "exex::backfill",
142 range = ?*self.range.start()..=last_block_number,
143 block_fetch = ?fetch_block_duration,
144 execution = ?execution_duration,
145 throughput = format_gas_throughput(cumulative_gas, execution_duration),
146 "Finished executing block range"
147 );
148 self.range = last_block_number + 1..=*self.range.end();
149
150 let chain = Chain::new(blocks, executor.finalize(), None);
151 Ok(chain)
152 }
153}
154
155#[derive(Debug, Clone)]
160pub struct SingleBlockBackfillJob<E, P> {
161 pub(crate) executor: E,
162 pub(crate) provider: P,
163 pub(crate) range: RangeInclusive<BlockNumber>,
164 pub(crate) stream_parallelism: usize,
165}
166
167impl<E, P> Iterator for SingleBlockBackfillJob<E, P>
168where
169 E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>,
170 P: HeaderProvider + BlockReader + StateProviderFactory,
171{
172 type Item = BackfillJobResult<(
173 BlockWithSenders<P::Block>,
174 BlockExecutionOutput<<E::Primitives as NodePrimitives>::Receipt>,
175 )>;
176
177 fn next(&mut self) -> Option<Self::Item> {
178 self.range.next().map(|block_number| self.execute_block(block_number))
179 }
180}
181
182impl<E, P> SingleBlockBackfillJob<E, P>
183where
184 E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>,
185 P: HeaderProvider + BlockReader + StateProviderFactory,
186{
187 pub fn into_stream(
189 self,
190 ) -> StreamBackfillJob<E, P, (BlockWithSenders, BlockExecutionOutput<Receipt>)> {
191 self.into()
192 }
193
194 #[expect(clippy::type_complexity)]
195 pub(crate) fn execute_block(
196 &self,
197 block_number: u64,
198 ) -> BackfillJobResult<(
199 BlockWithSenders<P::Block>,
200 BlockExecutionOutput<<E::Primitives as NodePrimitives>::Receipt>,
201 )> {
202 let td = self
203 .provider
204 .header_td_by_number(block_number)?
205 .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?;
206
207 let block_with_senders = self
209 .provider
210 .block_with_senders(block_number.into(), TransactionVariant::WithHash)?
211 .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?;
212
213 let executor = self.executor.executor(StateProviderDatabase::new(
215 self.provider.history_by_block_number(block_number.saturating_sub(1))?,
216 ));
217
218 trace!(target: "exex::backfill", number = block_number, txs = block_with_senders.block.body().transactions().len(), "Executing block");
219
220 let block_execution_output = executor.execute((&block_with_senders, td).into())?;
221
222 Ok((block_with_senders, block_execution_output))
223 }
224}
225
226impl<E, P> From<BackfillJob<E, P>> for SingleBlockBackfillJob<E, P> {
227 fn from(job: BackfillJob<E, P>) -> Self {
228 Self {
229 executor: job.executor,
230 provider: job.provider,
231 range: job.range,
232 stream_parallelism: job.stream_parallelism,
233 }
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use std::sync::Arc;
240
241 use crate::{
242 backfill::test_utils::{blocks_and_execution_outputs, chain_spec, to_execution_outcome},
243 BackfillJobFactory,
244 };
245 use reth_blockchain_tree::noop::NoopBlockchainTree;
246 use reth_db_common::init::init_genesis;
247 use reth_evm_ethereum::execute::EthExecutorProvider;
248 use reth_primitives::public_key_to_address;
249 use reth_provider::{
250 providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
251 };
252 use reth_testing_utils::generators;
253 use secp256k1::Keypair;
254
255 #[test]
256 fn test_backfill() -> eyre::Result<()> {
257 reth_tracing::init_test_tracing();
258
259 let key_pair = Keypair::new_global(&mut generators::rng());
261 let address = public_key_to_address(key_pair.public_key());
262
263 let chain_spec = chain_spec(address);
264
265 let executor = EthExecutorProvider::ethereum(chain_spec.clone());
266 let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
267 init_genesis(&provider_factory)?;
268 let blockchain_db = BlockchainProvider::new(
269 provider_factory.clone(),
270 Arc::new(NoopBlockchainTree::default()),
271 )?;
272
273 let blocks_and_execution_outputs =
274 blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
275 let (block, block_execution_output) = blocks_and_execution_outputs.first().unwrap();
276 let execution_outcome = to_execution_outcome(block.number, block_execution_output);
277
278 let factory = BackfillJobFactory::new(executor, blockchain_db);
280 let job = factory.backfill(1..=1);
281 let chains = job.collect::<Result<Vec<_>, _>>()?;
282
283 assert_eq!(chains.len(), 1);
286 let mut chain = chains.into_iter().next().unwrap();
287 chain.execution_outcome_mut().bundle.reverts.sort();
288 assert_eq!(chain.blocks(), &[(1, block.clone())].into());
289 assert_eq!(chain.execution_outcome(), &execution_outcome);
290
291 Ok(())
292 }
293
294 #[test]
295 fn test_single_block_backfill() -> eyre::Result<()> {
296 reth_tracing::init_test_tracing();
297
298 let key_pair = Keypair::new_global(&mut generators::rng());
300 let address = public_key_to_address(key_pair.public_key());
301
302 let chain_spec = chain_spec(address);
303
304 let executor = EthExecutorProvider::ethereum(chain_spec.clone());
305 let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
306 init_genesis(&provider_factory)?;
307 let blockchain_db = BlockchainProvider::new(
308 provider_factory.clone(),
309 Arc::new(NoopBlockchainTree::default()),
310 )?;
311
312 let blocks_and_execution_outcomes =
313 blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
314
315 let factory = BackfillJobFactory::new(executor, blockchain_db);
317 let job = factory.backfill(1..=1);
318 let single_job = job.into_single_blocks();
319 let block_execution_it = single_job.into_iter();
320
321 let blocks_and_outcomes = block_execution_it.collect::<Vec<_>>();
323 assert_eq!(blocks_and_outcomes.len(), 1);
324
325 for (i, res) in blocks_and_outcomes.into_iter().enumerate() {
328 let (block, mut execution_output) = res?;
329 execution_output.state.reverts.sort();
330
331 let sealed_block_with_senders = blocks_and_execution_outcomes[i].0.clone();
332 let expected_block = sealed_block_with_senders.unseal();
333 let expected_output = &blocks_and_execution_outcomes[i].1;
334
335 assert_eq!(block, expected_block);
336 assert_eq!(&execution_output, expected_output);
337 }
338
339 Ok(())
340 }
341}