reth_exex/backfill/
job.rs

1use crate::StreamBackfillJob;
2use std::{
3    ops::RangeInclusive,
4    time::{Duration, Instant},
5};
6
7use alloy_consensus::BlockHeader;
8use alloy_primitives::BlockNumber;
9use reth_evm::execute::{
10    BatchExecutor, BlockExecutionError, BlockExecutionOutput, BlockExecutorProvider, Executor,
11};
12use reth_node_api::{Block as _, BlockBody as _, NodePrimitives};
13use reth_primitives::{BlockExt, BlockWithSenders, Receipt};
14use reth_primitives_traits::{format_gas_throughput, SignedTransaction};
15use reth_provider::{
16    BlockReader, Chain, HeaderProvider, ProviderError, StateProviderFactory, TransactionVariant,
17};
18use reth_prune_types::PruneModes;
19use reth_revm::database::StateProviderDatabase;
20use reth_stages_api::ExecutionStageThresholds;
21use reth_tracing::tracing::{debug, trace};
22
23pub(super) type BackfillJobResult<T> = Result<T, BlockExecutionError>;
24
25/// Backfill job started for a specific range.
26///
27/// It implements [`Iterator`] that executes blocks in batches according to the provided thresholds
28/// and yields [`Chain`]. In other words, this iterator can yield multiple items for the given range
29/// depending on the configured thresholds.
30#[derive(Debug)]
31pub struct BackfillJob<E, P> {
32    pub(crate) executor: E,
33    pub(crate) provider: P,
34    pub(crate) prune_modes: PruneModes,
35    pub(crate) thresholds: ExecutionStageThresholds,
36    pub(crate) range: RangeInclusive<BlockNumber>,
37    pub(crate) stream_parallelism: usize,
38}
39
40impl<E, P> Iterator for BackfillJob<E, P>
41where
42    E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>,
43    P: HeaderProvider + BlockReader<Transaction: SignedTransaction> + StateProviderFactory,
44{
45    type Item = BackfillJobResult<Chain<E::Primitives>>;
46
47    fn next(&mut self) -> Option<Self::Item> {
48        if self.range.is_empty() {
49            return None
50        }
51
52        Some(self.execute_range())
53    }
54}
55
56impl<E, P> BackfillJob<E, P>
57where
58    E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>,
59    P: BlockReader<Transaction: SignedTransaction> + HeaderProvider + StateProviderFactory,
60{
61    /// Converts the backfill job into a single block backfill job.
62    pub fn into_single_blocks(self) -> SingleBlockBackfillJob<E, P> {
63        self.into()
64    }
65
66    /// Converts the backfill job into a stream.
67    pub fn into_stream(self) -> StreamBackfillJob<E, P, Chain<E::Primitives>> {
68        self.into()
69    }
70
71    fn execute_range(&mut self) -> BackfillJobResult<Chain<E::Primitives>> {
72        debug!(
73            target: "exex::backfill",
74            range = ?self.range,
75            "Executing block range"
76        );
77
78        let mut executor = self.executor.batch_executor(StateProviderDatabase::new(
79            self.provider.history_by_block_number(self.range.start().saturating_sub(1))?,
80        ));
81        executor.set_prune_modes(self.prune_modes.clone());
82
83        let mut fetch_block_duration = Duration::default();
84        let mut execution_duration = Duration::default();
85        let mut cumulative_gas = 0;
86        let batch_start = Instant::now();
87
88        let mut blocks = Vec::new();
89        for block_number in self.range.clone() {
90            // Fetch the block
91            let fetch_block_start = Instant::now();
92
93            let td = self
94                .provider
95                .header_td_by_number(block_number)?
96                .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?;
97
98            // we need the block's transactions along with their hashes
99            let block = self
100                .provider
101                .sealed_block_with_senders(block_number.into(), TransactionVariant::WithHash)?
102                .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?;
103
104            fetch_block_duration += fetch_block_start.elapsed();
105
106            cumulative_gas += block.gas_used();
107
108            // Configure the executor to use the current state.
109            trace!(target: "exex::backfill", number = block_number, txs = block.body.transactions().len(), "Executing block");
110
111            // Execute the block
112            let execute_start = Instant::now();
113
114            // Unseal the block for execution
115            let (block, senders) = block.into_components();
116            let (unsealed_header, hash) = block.header.split();
117            let block = P::Block::new(unsealed_header, block.body).with_senders_unchecked(senders);
118
119            executor.execute_and_verify_one((&block, td).into())?;
120            execution_duration += execute_start.elapsed();
121
122            // TODO(alexey): report gas metrics using `block.header.gas_used`
123
124            // Seal the block back and save it
125            blocks.push(block.seal(hash));
126
127            // Check if we should commit now
128            let bundle_size_hint = executor.size_hint().unwrap_or_default() as u64;
129            if self.thresholds.is_end_of_batch(
130                block_number - *self.range.start(),
131                bundle_size_hint,
132                cumulative_gas,
133                batch_start.elapsed(),
134            ) {
135                break
136            }
137        }
138
139        let last_block_number = blocks.last().expect("blocks should not be empty").number();
140        debug!(
141            target: "exex::backfill",
142            range = ?*self.range.start()..=last_block_number,
143            block_fetch = ?fetch_block_duration,
144            execution = ?execution_duration,
145            throughput = format_gas_throughput(cumulative_gas, execution_duration),
146            "Finished executing block range"
147        );
148        self.range = last_block_number + 1..=*self.range.end();
149
150        let chain = Chain::new(blocks, executor.finalize(), None);
151        Ok(chain)
152    }
153}
154
155/// Single block Backfill job started for a specific range.
156///
157/// It implements [`Iterator`] which executes a block each time the
158/// iterator is advanced and yields ([`BlockWithSenders`], [`BlockExecutionOutput`])
159#[derive(Debug, Clone)]
160pub struct SingleBlockBackfillJob<E, P> {
161    pub(crate) executor: E,
162    pub(crate) provider: P,
163    pub(crate) range: RangeInclusive<BlockNumber>,
164    pub(crate) stream_parallelism: usize,
165}
166
167impl<E, P> Iterator for SingleBlockBackfillJob<E, P>
168where
169    E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>,
170    P: HeaderProvider + BlockReader + StateProviderFactory,
171{
172    type Item = BackfillJobResult<(
173        BlockWithSenders<P::Block>,
174        BlockExecutionOutput<<E::Primitives as NodePrimitives>::Receipt>,
175    )>;
176
177    fn next(&mut self) -> Option<Self::Item> {
178        self.range.next().map(|block_number| self.execute_block(block_number))
179    }
180}
181
182impl<E, P> SingleBlockBackfillJob<E, P>
183where
184    E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>,
185    P: HeaderProvider + BlockReader + StateProviderFactory,
186{
187    /// Converts the single block backfill job into a stream.
188    pub fn into_stream(
189        self,
190    ) -> StreamBackfillJob<E, P, (BlockWithSenders, BlockExecutionOutput<Receipt>)> {
191        self.into()
192    }
193
194    #[expect(clippy::type_complexity)]
195    pub(crate) fn execute_block(
196        &self,
197        block_number: u64,
198    ) -> BackfillJobResult<(
199        BlockWithSenders<P::Block>,
200        BlockExecutionOutput<<E::Primitives as NodePrimitives>::Receipt>,
201    )> {
202        let td = self
203            .provider
204            .header_td_by_number(block_number)?
205            .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?;
206
207        // Fetch the block with senders for execution.
208        let block_with_senders = self
209            .provider
210            .block_with_senders(block_number.into(), TransactionVariant::WithHash)?
211            .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))?;
212
213        // Configure the executor to use the previous block's state.
214        let executor = self.executor.executor(StateProviderDatabase::new(
215            self.provider.history_by_block_number(block_number.saturating_sub(1))?,
216        ));
217
218        trace!(target: "exex::backfill", number = block_number, txs = block_with_senders.block.body().transactions().len(), "Executing block");
219
220        let block_execution_output = executor.execute((&block_with_senders, td).into())?;
221
222        Ok((block_with_senders, block_execution_output))
223    }
224}
225
226impl<E, P> From<BackfillJob<E, P>> for SingleBlockBackfillJob<E, P> {
227    fn from(job: BackfillJob<E, P>) -> Self {
228        Self {
229            executor: job.executor,
230            provider: job.provider,
231            range: job.range,
232            stream_parallelism: job.stream_parallelism,
233        }
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use std::sync::Arc;
240
241    use crate::{
242        backfill::test_utils::{blocks_and_execution_outputs, chain_spec, to_execution_outcome},
243        BackfillJobFactory,
244    };
245    use reth_blockchain_tree::noop::NoopBlockchainTree;
246    use reth_db_common::init::init_genesis;
247    use reth_evm_ethereum::execute::EthExecutorProvider;
248    use reth_primitives::public_key_to_address;
249    use reth_provider::{
250        providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
251    };
252    use reth_testing_utils::generators;
253    use secp256k1::Keypair;
254
255    #[test]
256    fn test_backfill() -> eyre::Result<()> {
257        reth_tracing::init_test_tracing();
258
259        // Create a key pair for the sender
260        let key_pair = Keypair::new_global(&mut generators::rng());
261        let address = public_key_to_address(key_pair.public_key());
262
263        let chain_spec = chain_spec(address);
264
265        let executor = EthExecutorProvider::ethereum(chain_spec.clone());
266        let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
267        init_genesis(&provider_factory)?;
268        let blockchain_db = BlockchainProvider::new(
269            provider_factory.clone(),
270            Arc::new(NoopBlockchainTree::default()),
271        )?;
272
273        let blocks_and_execution_outputs =
274            blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
275        let (block, block_execution_output) = blocks_and_execution_outputs.first().unwrap();
276        let execution_outcome = to_execution_outcome(block.number, block_execution_output);
277
278        // Backfill the first block
279        let factory = BackfillJobFactory::new(executor, blockchain_db);
280        let job = factory.backfill(1..=1);
281        let chains = job.collect::<Result<Vec<_>, _>>()?;
282
283        // Assert that the backfill job produced the same chain as we got before when we were
284        // executing only the first block
285        assert_eq!(chains.len(), 1);
286        let mut chain = chains.into_iter().next().unwrap();
287        chain.execution_outcome_mut().bundle.reverts.sort();
288        assert_eq!(chain.blocks(), &[(1, block.clone())].into());
289        assert_eq!(chain.execution_outcome(), &execution_outcome);
290
291        Ok(())
292    }
293
294    #[test]
295    fn test_single_block_backfill() -> eyre::Result<()> {
296        reth_tracing::init_test_tracing();
297
298        // Create a key pair for the sender
299        let key_pair = Keypair::new_global(&mut generators::rng());
300        let address = public_key_to_address(key_pair.public_key());
301
302        let chain_spec = chain_spec(address);
303
304        let executor = EthExecutorProvider::ethereum(chain_spec.clone());
305        let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
306        init_genesis(&provider_factory)?;
307        let blockchain_db = BlockchainProvider::new(
308            provider_factory.clone(),
309            Arc::new(NoopBlockchainTree::default()),
310        )?;
311
312        let blocks_and_execution_outcomes =
313            blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
314
315        // Backfill the first block
316        let factory = BackfillJobFactory::new(executor, blockchain_db);
317        let job = factory.backfill(1..=1);
318        let single_job = job.into_single_blocks();
319        let block_execution_it = single_job.into_iter();
320
321        // Assert that the backfill job only produces a single block
322        let blocks_and_outcomes = block_execution_it.collect::<Vec<_>>();
323        assert_eq!(blocks_and_outcomes.len(), 1);
324
325        // Assert that the backfill job single block iterator produces the expected output for each
326        // block
327        for (i, res) in blocks_and_outcomes.into_iter().enumerate() {
328            let (block, mut execution_output) = res?;
329            execution_output.state.reverts.sort();
330
331            let sealed_block_with_senders = blocks_and_execution_outcomes[i].0.clone();
332            let expected_block = sealed_block_with_senders.unseal();
333            let expected_output = &blocks_and_execution_outcomes[i].1;
334
335            assert_eq!(block, expected_block);
336            assert_eq!(&execution_output, expected_output);
337        }
338
339        Ok(())
340    }
341}