reth_exex/backfill/
stream.rs

1use super::job::BackfillJobResult;
2use crate::{BackfillJob, SingleBlockBackfillJob};
3use alloy_primitives::BlockNumber;
4use futures::{
5    stream::{FuturesOrdered, Stream},
6    StreamExt,
7};
8use reth_ethereum_primitives::EthPrimitives;
9use reth_evm::{
10    execute::{BlockExecutionError, BlockExecutionOutput},
11    ConfigureEvm,
12};
13use reth_node_api::NodePrimitives;
14use reth_primitives_traits::RecoveredBlock;
15use reth_provider::{BlockReader, Chain, StateProviderFactory};
16use reth_prune_types::PruneModes;
17use reth_stages_api::ExecutionStageThresholds;
18use reth_tracing::tracing::debug;
19use std::{
20    ops::RangeInclusive,
21    pin::Pin,
22    task::{ready, Context, Poll},
23};
24use tokio::task::JoinHandle;
25
26/// The default parallelism for active tasks in [`StreamBackfillJob`].
27pub(crate) const DEFAULT_PARALLELISM: usize = 4;
28/// The default batch size for active tasks in [`StreamBackfillJob`].
29const DEFAULT_BATCH_SIZE: usize = 100;
30
31/// Boxed thread-safe iterator that yields [`BackfillJobResult`]s.
32type BackfillTaskIterator<T> =
33    Box<dyn Iterator<Item = BackfillJobResult<T>> + Send + Sync + 'static>;
34
35/// Backfill task output.
36struct BackfillTaskOutput<T> {
37    job: BackfillTaskIterator<T>,
38    result: Option<BackfillJobResult<T>>,
39}
40
41/// Ordered queue of [`JoinHandle`]s that yield [`BackfillTaskOutput`]s.
42type BackfillTasks<T> = FuturesOrdered<JoinHandle<BackfillTaskOutput<T>>>;
43
44type SingleBlockStreamItem<N = EthPrimitives> = (
45    RecoveredBlock<<N as NodePrimitives>::Block>,
46    BlockExecutionOutput<<N as NodePrimitives>::Receipt>,
47);
48type BatchBlockStreamItem<N = EthPrimitives> = Chain<N>;
49
50/// Stream for processing backfill jobs asynchronously.
51///
52/// This struct manages the execution of [`SingleBlockBackfillJob`] tasks, allowing blocks to be
53/// processed asynchronously but in order within a specified range.
54#[derive(Debug)]
55pub struct StreamBackfillJob<E, P, T> {
56    evm_config: E,
57    provider: P,
58    prune_modes: PruneModes,
59    range: RangeInclusive<BlockNumber>,
60    tasks: BackfillTasks<T>,
61    parallelism: usize,
62    batch_size: usize,
63    thresholds: ExecutionStageThresholds,
64}
65
66impl<E, P, T> StreamBackfillJob<E, P, T>
67where
68    T: Send + Sync + 'static,
69{
70    /// Configures the parallelism of the [`StreamBackfillJob`] to handle active tasks.
71    pub const fn with_parallelism(mut self, parallelism: usize) -> Self {
72        self.parallelism = parallelism;
73        self
74    }
75
76    /// Configures the batch size for the [`StreamBackfillJob`].
77    pub const fn with_batch_size(mut self, batch_size: usize) -> Self {
78        self.batch_size = batch_size;
79        self
80    }
81
82    /// Spawns a new task calling the [`BackfillTaskIterator::next`] method and pushes it to the end
83    /// of the [`BackfillTasks`] queue.
84    fn push_back(&mut self, mut job: BackfillTaskIterator<T>) {
85        self.tasks.push_back(tokio::task::spawn_blocking(move || BackfillTaskOutput {
86            result: job.next(),
87            job,
88        }));
89    }
90
91    /// Spawns a new task calling the [`BackfillTaskIterator::next`] method and pushes it to the
92    /// front of the  [`BackfillTasks`] queue.
93    fn push_front(&mut self, mut job: BackfillTaskIterator<T>) {
94        self.tasks.push_front(tokio::task::spawn_blocking(move || BackfillTaskOutput {
95            result: job.next(),
96            job,
97        }));
98    }
99
100    /// Polls the next task in the [`BackfillTasks`] queue until it returns a non-empty result.
101    fn poll_next_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<BackfillJobResult<T>>> {
102        while let Some(res) = ready!(self.tasks.poll_next_unpin(cx)) {
103            let task_result = res.map_err(BlockExecutionError::other)?;
104
105            if let BackfillTaskOutput { result: Some(job_result), job } = task_result {
106                // If the task returned a non-empty result, a new task advancing the job is created
107                // and pushed to the __front__ of the queue, so that the next item of this returned
108                // next.
109                self.push_front(job);
110
111                return Poll::Ready(Some(job_result))
112            };
113        }
114
115        Poll::Ready(None)
116    }
117}
118
119impl<E, P> Stream for StreamBackfillJob<E, P, SingleBlockStreamItem<E::Primitives>>
120where
121    E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
122    P: BlockReader + StateProviderFactory + Clone + Unpin + 'static,
123{
124    type Item = BackfillJobResult<SingleBlockStreamItem<E::Primitives>>;
125
126    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
127        let this = self.get_mut();
128
129        // Spawn new tasks only if we are below the parallelism configured.
130        while this.tasks.len() < this.parallelism {
131            // Get the next block number from the range. If it is empty, we are done.
132            let Some(block_number) = this.range.next() else {
133                debug!(target: "exex::backfill", tasks = %this.tasks.len(), range = ?this.range, "No more single blocks to backfill");
134                break;
135            };
136
137            // Spawn a new task for that block
138            debug!(target: "exex::backfill", tasks = %this.tasks.len(), ?block_number, "Spawning new single block backfill task");
139            let job = Box::new(SingleBlockBackfillJob {
140                evm_config: this.evm_config.clone(),
141                provider: this.provider.clone(),
142                range: block_number..=block_number,
143                stream_parallelism: this.parallelism,
144            }) as BackfillTaskIterator<_>;
145            this.push_back(job);
146        }
147
148        this.poll_next_task(cx)
149    }
150}
151
152impl<E, P> Stream for StreamBackfillJob<E, P, BatchBlockStreamItem<E::Primitives>>
153where
154    E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
155    P: BlockReader + StateProviderFactory + Clone + Unpin + 'static,
156{
157    type Item = BackfillJobResult<BatchBlockStreamItem<E::Primitives>>;
158
159    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
160        let this = self.get_mut();
161
162        loop {
163            // Spawn new tasks only if we are below the parallelism configured.
164            while this.tasks.len() < this.parallelism {
165                // Take the next `batch_size` blocks from the range and calculate the range bounds
166                let mut range = this.range.by_ref().take(this.batch_size);
167                let start = range.next();
168                let range_bounds = start.zip(range.last().or(start));
169
170                // Create the range from the range bounds. If it is empty, we are done.
171                let Some(range) = range_bounds.map(|(first, last)| first..=last) else {
172                    debug!(target: "exex::backfill", tasks = %this.tasks.len(), range = ?this.range, "No more block batches to backfill");
173                    break;
174                };
175
176                // Spawn a new task for that range
177                debug!(target: "exex::backfill", tasks = %this.tasks.len(), ?range, "Spawning new block batch backfill task");
178                let job = Box::new(BackfillJob {
179                    evm_config: this.evm_config.clone(),
180                    provider: this.provider.clone(),
181                    prune_modes: this.prune_modes.clone(),
182                    thresholds: this.thresholds.clone(),
183                    range,
184                    stream_parallelism: this.parallelism,
185                }) as BackfillTaskIterator<_>;
186                this.push_back(job);
187            }
188
189            let res = ready!(this.poll_next_task(cx));
190
191            if res.is_some() {
192                return Poll::Ready(res);
193            }
194
195            if this.range.is_empty() {
196                // only terminate the stream if there are no more blocks to process
197                return Poll::Ready(None);
198            }
199        }
200    }
201}
202
203impl<E, P> From<SingleBlockBackfillJob<E, P>> for StreamBackfillJob<E, P, SingleBlockStreamItem> {
204    fn from(job: SingleBlockBackfillJob<E, P>) -> Self {
205        Self {
206            evm_config: job.evm_config,
207            provider: job.provider,
208            prune_modes: PruneModes::default(),
209            range: job.range,
210            tasks: FuturesOrdered::new(),
211            parallelism: job.stream_parallelism,
212            batch_size: 1,
213            thresholds: ExecutionStageThresholds { max_blocks: Some(1), ..Default::default() },
214        }
215    }
216}
217
218impl<E, P> From<BackfillJob<E, P>> for StreamBackfillJob<E, P, BatchBlockStreamItem<E::Primitives>>
219where
220    E: ConfigureEvm,
221{
222    fn from(job: BackfillJob<E, P>) -> Self {
223        let batch_size = job.thresholds.max_blocks.map_or(DEFAULT_BATCH_SIZE, |max| max as usize);
224        Self {
225            evm_config: job.evm_config,
226            provider: job.provider,
227            prune_modes: job.prune_modes,
228            range: job.range,
229            tasks: FuturesOrdered::new(),
230            parallelism: job.stream_parallelism,
231            batch_size,
232            thresholds: ExecutionStageThresholds {
233                max_blocks: Some(batch_size as u64),
234                ..job.thresholds
235            },
236        }
237    }
238}
239
240#[cfg(test)]
241mod tests {
242    use crate::{
243        backfill::test_utils::{
244            blocks_and_execution_outcome, blocks_and_execution_outputs, chain_spec,
245        },
246        BackfillJobFactory,
247    };
248    use futures::StreamExt;
249    use reth_db_common::init::init_genesis;
250    use reth_evm_ethereum::execute::EthExecutorProvider;
251    use reth_primitives_traits::crypto::secp256k1::public_key_to_address;
252    use reth_provider::{
253        providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
254    };
255    use reth_stages_api::ExecutionStageThresholds;
256    use reth_testing_utils::generators;
257
258    #[tokio::test(flavor = "multi_thread")]
259    async fn test_single_blocks() -> eyre::Result<()> {
260        reth_tracing::init_test_tracing();
261
262        // Create a key pair for the sender
263        let key_pair = generators::generate_key(&mut generators::rng());
264        let address = public_key_to_address(key_pair.public_key());
265
266        let chain_spec = chain_spec(address);
267
268        let executor = EthExecutorProvider::ethereum(chain_spec.clone());
269        let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
270        init_genesis(&provider_factory)?;
271        let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
272
273        // Create first 2 blocks
274        let blocks_and_execution_outcomes =
275            blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
276
277        // Backfill the first block
278        let factory = BackfillJobFactory::new(executor.clone(), blockchain_db.clone());
279        let mut backfill_stream = factory.backfill(1..=1).into_single_blocks().into_stream();
280
281        // execute first block
282        let (block, mut execution_output) = backfill_stream.next().await.unwrap().unwrap();
283        execution_output.state.reverts.sort();
284        let expected_block = blocks_and_execution_outcomes[0].0.clone();
285        let expected_output = &blocks_and_execution_outcomes[0].1;
286        assert_eq!(block, expected_block);
287        assert_eq!(&execution_output, expected_output);
288
289        // expect no more blocks
290        assert!(backfill_stream.next().await.is_none());
291
292        Ok(())
293    }
294
295    #[tokio::test(flavor = "multi_thread")]
296    async fn test_batch() -> eyre::Result<()> {
297        reth_tracing::init_test_tracing();
298
299        // Create a key pair for the sender
300        let key_pair = generators::generate_key(&mut generators::rng());
301        let address = public_key_to_address(key_pair.public_key());
302
303        let chain_spec = chain_spec(address);
304
305        let executor = EthExecutorProvider::ethereum(chain_spec.clone());
306        let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
307        init_genesis(&provider_factory)?;
308        let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
309
310        // Create first 2 blocks
311        let (blocks, execution_outcome) =
312            blocks_and_execution_outcome(provider_factory, chain_spec, key_pair)?;
313
314        // Backfill the same range
315        let factory = BackfillJobFactory::new(executor.clone(), blockchain_db.clone())
316            .with_thresholds(ExecutionStageThresholds { max_blocks: Some(2), ..Default::default() })
317            .with_stream_parallelism(1);
318        let mut backfill_stream = factory.backfill(1..=2).into_stream();
319        let mut chain = backfill_stream.next().await.unwrap().unwrap();
320        chain.execution_outcome_mut().state_mut().reverts.sort();
321
322        assert!(chain.blocks_iter().eq(&blocks));
323        assert_eq!(chain.execution_outcome(), &execution_outcome);
324
325        // expect no more blocks
326        assert!(backfill_stream.next().await.is_none());
327
328        Ok(())
329    }
330}