reth_exex/backfill/
job.rs

1use crate::StreamBackfillJob;
2use reth_evm::ConfigureEvm;
3use std::{
4    ops::RangeInclusive,
5    time::{Duration, Instant},
6};
7
8use alloy_consensus::BlockHeader;
9use alloy_primitives::BlockNumber;
10use reth_ethereum_primitives::Receipt;
11use reth_evm::execute::{BlockExecutionError, BlockExecutionOutput, Executor};
12use reth_node_api::{Block as _, BlockBody as _, NodePrimitives};
13use reth_primitives_traits::{format_gas_throughput, RecoveredBlock, SignedTransaction};
14use reth_provider::{
15    BlockReader, Chain, ExecutionOutcome, HeaderProvider, ProviderError, StateProviderFactory,
16    TransactionVariant,
17};
18use reth_prune_types::PruneModes;
19use reth_revm::database::StateProviderDatabase;
20use reth_stages_api::ExecutionStageThresholds;
21use reth_tracing::tracing::{debug, trace};
22
23pub(super) type BackfillJobResult<T> = Result<T, BlockExecutionError>;
24
25/// Backfill job started for a specific range.
26///
27/// It implements [`Iterator`] that executes blocks in batches according to the provided thresholds
28/// and yields [`Chain`]. In other words, this iterator can yield multiple items for the given range
29/// depending on the configured thresholds.
30#[derive(Debug)]
31pub struct BackfillJob<E, P> {
32    pub(crate) evm_config: E,
33    pub(crate) provider: P,
34    pub(crate) prune_modes: PruneModes,
35    pub(crate) thresholds: ExecutionStageThresholds,
36    pub(crate) range: RangeInclusive<BlockNumber>,
37    pub(crate) stream_parallelism: usize,
38}
39
40impl<E, P> Iterator for BackfillJob<E, P>
41where
42    E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
43    P: HeaderProvider + BlockReader<Transaction: SignedTransaction> + StateProviderFactory,
44{
45    type Item = BackfillJobResult<Chain<E::Primitives>>;
46
47    fn next(&mut self) -> Option<Self::Item> {
48        if self.range.is_empty() {
49            return None
50        }
51
52        Some(self.execute_range())
53    }
54}
55
56impl<E, P> BackfillJob<E, P>
57where
58    E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
59    P: BlockReader<Transaction: SignedTransaction> + HeaderProvider + StateProviderFactory,
60{
61    /// Converts the backfill job into a single block backfill job.
62    pub fn into_single_blocks(self) -> SingleBlockBackfillJob<E, P> {
63        self.into()
64    }
65
66    /// Converts the backfill job into a stream.
67    pub fn into_stream(self) -> StreamBackfillJob<E, P, Chain<E::Primitives>> {
68        self.into()
69    }
70
71    fn execute_range(&mut self) -> BackfillJobResult<Chain<E::Primitives>> {
72        debug!(
73            target: "exex::backfill",
74            range = ?self.range,
75            "Executing block range"
76        );
77
78        let mut executor = self.evm_config.batch_executor(StateProviderDatabase::new(
79            self.provider
80                .history_by_block_number(self.range.start().saturating_sub(1))
81                .map_err(BlockExecutionError::other)?,
82        ));
83
84        let mut fetch_block_duration = Duration::default();
85        let mut execution_duration = Duration::default();
86        let mut cumulative_gas = 0;
87        let batch_start = Instant::now();
88
89        let mut blocks = Vec::new();
90        let mut results = Vec::new();
91        for block_number in self.range.clone() {
92            // Fetch the block
93            let fetch_block_start = Instant::now();
94
95            // we need the block's transactions along with their hashes
96            let block = self
97                .provider
98                .sealed_block_with_senders(block_number.into(), TransactionVariant::WithHash)
99                .map_err(BlockExecutionError::other)?
100                .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))
101                .map_err(BlockExecutionError::other)?;
102
103            fetch_block_duration += fetch_block_start.elapsed();
104
105            cumulative_gas += block.gas_used();
106
107            // Configure the executor to use the current state.
108            trace!(target: "exex::backfill", number = block_number, txs = block.body().transactions().len(), "Executing block");
109
110            // Execute the block
111            let execute_start = Instant::now();
112
113            // Unseal the block for execution
114            let (block, senders) = block.split_sealed();
115            let (header, body) = block.split_sealed_header_body();
116            let block = P::Block::new_sealed(header, body).with_senders(senders);
117
118            results.push(executor.execute_one(&block)?);
119            execution_duration += execute_start.elapsed();
120
121            // TODO(alexey): report gas metrics using `block.header.gas_used`
122
123            // Seal the block back and save it
124            blocks.push(block);
125            // Check if we should commit now
126            if self.thresholds.is_end_of_batch(
127                block_number - *self.range.start(),
128                executor.size_hint() as u64,
129                cumulative_gas,
130                batch_start.elapsed(),
131            ) {
132                break
133            }
134        }
135
136        let first_block_number = blocks.first().expect("blocks should not be empty").number();
137        let last_block_number = blocks.last().expect("blocks should not be empty").number();
138        debug!(
139            target: "exex::backfill",
140            range = ?*self.range.start()..=last_block_number,
141            block_fetch = ?fetch_block_duration,
142            execution = ?execution_duration,
143            throughput = format_gas_throughput(cumulative_gas, execution_duration),
144            "Finished executing block range"
145        );
146        self.range = last_block_number + 1..=*self.range.end();
147
148        let outcome = ExecutionOutcome::from_blocks(
149            first_block_number,
150            executor.into_state().take_bundle(),
151            results,
152        );
153        let chain = Chain::new(blocks, outcome, None);
154        Ok(chain)
155    }
156}
157
158/// Single block Backfill job started for a specific range.
159///
160/// It implements [`Iterator`] which executes a block each time the
161/// iterator is advanced and yields ([`RecoveredBlock`], [`BlockExecutionOutput`])
162#[derive(Debug, Clone)]
163pub struct SingleBlockBackfillJob<E, P> {
164    pub(crate) evm_config: E,
165    pub(crate) provider: P,
166    pub(crate) range: RangeInclusive<BlockNumber>,
167    pub(crate) stream_parallelism: usize,
168}
169
170impl<E, P> Iterator for SingleBlockBackfillJob<E, P>
171where
172    E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
173    P: HeaderProvider + BlockReader + StateProviderFactory,
174{
175    type Item = BackfillJobResult<(
176        RecoveredBlock<P::Block>,
177        BlockExecutionOutput<<E::Primitives as NodePrimitives>::Receipt>,
178    )>;
179
180    fn next(&mut self) -> Option<Self::Item> {
181        self.range.next().map(|block_number| self.execute_block(block_number))
182    }
183}
184
185impl<E, P> SingleBlockBackfillJob<E, P>
186where
187    E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
188    P: HeaderProvider + BlockReader + StateProviderFactory,
189{
190    /// Converts the single block backfill job into a stream.
191    pub fn into_stream(
192        self,
193    ) -> StreamBackfillJob<
194        E,
195        P,
196        (RecoveredBlock<reth_ethereum_primitives::Block>, BlockExecutionOutput<Receipt>),
197    > {
198        self.into()
199    }
200
201    #[expect(clippy::type_complexity)]
202    pub(crate) fn execute_block(
203        &self,
204        block_number: u64,
205    ) -> BackfillJobResult<(
206        RecoveredBlock<P::Block>,
207        BlockExecutionOutput<<E::Primitives as NodePrimitives>::Receipt>,
208    )> {
209        // Fetch the block with senders for execution.
210        let block_with_senders = self
211            .provider
212            .recovered_block(block_number.into(), TransactionVariant::WithHash)
213            .map_err(BlockExecutionError::other)?
214            .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))
215            .map_err(BlockExecutionError::other)?;
216
217        // Configure the executor to use the previous block's state.
218        let executor = self.evm_config.batch_executor(StateProviderDatabase::new(
219            self.provider
220                .history_by_block_number(block_number.saturating_sub(1))
221                .map_err(BlockExecutionError::other)?,
222        ));
223
224        trace!(target: "exex::backfill", number = block_number, txs = block_with_senders.body().transaction_count(), "Executing block");
225
226        let block_execution_output = executor.execute(&block_with_senders)?;
227
228        Ok((block_with_senders, block_execution_output))
229    }
230}
231
232impl<E, P> From<BackfillJob<E, P>> for SingleBlockBackfillJob<E, P> {
233    fn from(job: BackfillJob<E, P>) -> Self {
234        Self {
235            evm_config: job.evm_config,
236            provider: job.provider,
237            range: job.range,
238            stream_parallelism: job.stream_parallelism,
239        }
240    }
241}
242
243#[cfg(test)]
244mod tests {
245    use crate::{
246        backfill::test_utils::{blocks_and_execution_outputs, chain_spec, to_execution_outcome},
247        BackfillJobFactory,
248    };
249    use reth_db_common::init::init_genesis;
250    use reth_evm_ethereum::execute::EthExecutorProvider;
251    use reth_primitives_traits::crypto::secp256k1::public_key_to_address;
252    use reth_provider::{
253        providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
254    };
255    use reth_testing_utils::generators;
256
257    #[test]
258    fn test_backfill() -> eyre::Result<()> {
259        reth_tracing::init_test_tracing();
260
261        // Create a key pair for the sender
262        let key_pair = generators::generate_key(&mut generators::rng());
263        let address = public_key_to_address(key_pair.public_key());
264
265        let chain_spec = chain_spec(address);
266
267        let executor = EthExecutorProvider::ethereum(chain_spec.clone());
268        let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
269        init_genesis(&provider_factory)?;
270        let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
271
272        let blocks_and_execution_outputs =
273            blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
274        let (block, block_execution_output) = blocks_and_execution_outputs.first().unwrap();
275        let execution_outcome = to_execution_outcome(block.number, block_execution_output);
276
277        // Backfill the first block
278        let factory = BackfillJobFactory::new(executor, blockchain_db);
279        let job = factory.backfill(1..=1);
280        let chains = job.collect::<Result<Vec<_>, _>>()?;
281
282        // Assert that the backfill job produced the same chain as we got before when we were
283        // executing only the first block
284        assert_eq!(chains.len(), 1);
285        let mut chain = chains.into_iter().next().unwrap();
286        chain.execution_outcome_mut().bundle.reverts.sort();
287        assert_eq!(chain.blocks(), &[(1, block.clone())].into());
288        assert_eq!(chain.execution_outcome(), &execution_outcome);
289
290        Ok(())
291    }
292
293    #[test]
294    fn test_single_block_backfill() -> eyre::Result<()> {
295        reth_tracing::init_test_tracing();
296
297        // Create a key pair for the sender
298        let key_pair = generators::generate_key(&mut generators::rng());
299        let address = public_key_to_address(key_pair.public_key());
300
301        let chain_spec = chain_spec(address);
302
303        let executor = EthExecutorProvider::ethereum(chain_spec.clone());
304        let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
305        init_genesis(&provider_factory)?;
306        let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
307
308        let blocks_and_execution_outcomes =
309            blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
310
311        // Backfill the first block
312        let factory = BackfillJobFactory::new(executor, blockchain_db);
313        let job = factory.backfill(1..=1);
314        let single_job = job.into_single_blocks();
315        let block_execution_it = single_job.into_iter();
316
317        // Assert that the backfill job only produces a single block
318        let blocks_and_outcomes = block_execution_it.collect::<Vec<_>>();
319        assert_eq!(blocks_and_outcomes.len(), 1);
320
321        // Assert that the backfill job single block iterator produces the expected output for each
322        // block
323        for (i, res) in blocks_and_outcomes.into_iter().enumerate() {
324            let (block, mut execution_output) = res?;
325            execution_output.state.reverts.sort();
326
327            let expected_block = blocks_and_execution_outcomes[i].0.clone();
328            let expected_output = &blocks_and_execution_outcomes[i].1;
329
330            assert_eq!(block, expected_block);
331            assert_eq!(&execution_output, expected_output);
332        }
333
334        Ok(())
335    }
336}