1use super::job::BackfillJobResult;
2use crate::{BackfillJob, SingleBlockBackfillJob};
3use alloy_primitives::BlockNumber;
4use futures::{
5 stream::{FuturesOrdered, Stream},
6 StreamExt,
7};
8use reth_ethereum_primitives::EthPrimitives;
9use reth_evm::{
10 execute::{BlockExecutionError, BlockExecutionOutput},
11 ConfigureEvm,
12};
13use reth_node_api::NodePrimitives;
14use reth_primitives_traits::RecoveredBlock;
15use reth_provider::{BlockReader, Chain, StateProviderFactory};
16use reth_prune_types::PruneModes;
17use reth_stages_api::ExecutionStageThresholds;
18use reth_tracing::tracing::debug;
19use std::{
20 ops::RangeInclusive,
21 pin::Pin,
22 task::{ready, Context, Poll},
23};
24use tokio::task::JoinHandle;
25
26pub(crate) const DEFAULT_PARALLELISM: usize = 4;
28const DEFAULT_BATCH_SIZE: usize = 100;
30
31type BackfillTaskIterator<T> =
33 Box<dyn Iterator<Item = BackfillJobResult<T>> + Send + Sync + 'static>;
34
35struct BackfillTaskOutput<T> {
37 job: BackfillTaskIterator<T>,
38 result: Option<BackfillJobResult<T>>,
39}
40
41type BackfillTasks<T> = FuturesOrdered<JoinHandle<BackfillTaskOutput<T>>>;
43
44type SingleBlockStreamItem<N = EthPrimitives> = (
45 RecoveredBlock<<N as NodePrimitives>::Block>,
46 BlockExecutionOutput<<N as NodePrimitives>::Receipt>,
47);
48type BatchBlockStreamItem<N = EthPrimitives> = Chain<N>;
49
50#[derive(Debug)]
55pub struct StreamBackfillJob<E, P, T> {
56 evm_config: E,
57 provider: P,
58 prune_modes: PruneModes,
59 range: RangeInclusive<BlockNumber>,
60 tasks: BackfillTasks<T>,
61 parallelism: usize,
62 batch_size: usize,
63 thresholds: ExecutionStageThresholds,
64}
65
66impl<E, P, T> StreamBackfillJob<E, P, T>
67where
68 T: Send + Sync + 'static,
69{
70 pub const fn with_parallelism(mut self, parallelism: usize) -> Self {
72 self.parallelism = parallelism;
73 self
74 }
75
76 pub const fn with_batch_size(mut self, batch_size: usize) -> Self {
78 self.batch_size = batch_size;
79 self
80 }
81
82 fn push_back(&mut self, mut job: BackfillTaskIterator<T>) {
85 self.tasks.push_back(tokio::task::spawn_blocking(move || BackfillTaskOutput {
86 result: job.next(),
87 job,
88 }));
89 }
90
91 fn push_front(&mut self, mut job: BackfillTaskIterator<T>) {
94 self.tasks.push_front(tokio::task::spawn_blocking(move || BackfillTaskOutput {
95 result: job.next(),
96 job,
97 }));
98 }
99
100 fn poll_next_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<BackfillJobResult<T>>> {
102 while let Some(res) = ready!(self.tasks.poll_next_unpin(cx)) {
103 let task_result = res.map_err(BlockExecutionError::other)?;
104
105 if let BackfillTaskOutput { result: Some(job_result), job } = task_result {
106 self.push_front(job);
110
111 return Poll::Ready(Some(job_result))
112 };
113 }
114
115 Poll::Ready(None)
116 }
117}
118
119impl<E, P> Stream for StreamBackfillJob<E, P, SingleBlockStreamItem<E::Primitives>>
120where
121 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
122 P: BlockReader + StateProviderFactory + Clone + Unpin + 'static,
123{
124 type Item = BackfillJobResult<SingleBlockStreamItem<E::Primitives>>;
125
126 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
127 let this = self.get_mut();
128
129 while this.tasks.len() < this.parallelism {
131 let Some(block_number) = this.range.next() else {
133 debug!(target: "exex::backfill", tasks = %this.tasks.len(), range = ?this.range, "No more single blocks to backfill");
134 break;
135 };
136
137 debug!(target: "exex::backfill", tasks = %this.tasks.len(), ?block_number, "Spawning new single block backfill task");
139 let job = Box::new(SingleBlockBackfillJob {
140 evm_config: this.evm_config.clone(),
141 provider: this.provider.clone(),
142 range: block_number..=block_number,
143 stream_parallelism: this.parallelism,
144 }) as BackfillTaskIterator<_>;
145 this.push_back(job);
146 }
147
148 this.poll_next_task(cx)
149 }
150}
151
152impl<E, P> Stream for StreamBackfillJob<E, P, BatchBlockStreamItem<E::Primitives>>
153where
154 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
155 P: BlockReader + StateProviderFactory + Clone + Unpin + 'static,
156{
157 type Item = BackfillJobResult<BatchBlockStreamItem<E::Primitives>>;
158
159 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
160 let this = self.get_mut();
161
162 loop {
163 while this.tasks.len() < this.parallelism {
165 let mut range = this.range.by_ref().take(this.batch_size);
167 let start = range.next();
168 let range_bounds = start.zip(range.last().or(start));
169
170 let Some(range) = range_bounds.map(|(first, last)| first..=last) else {
172 debug!(target: "exex::backfill", tasks = %this.tasks.len(), range = ?this.range, "No more block batches to backfill");
173 break;
174 };
175
176 debug!(target: "exex::backfill", tasks = %this.tasks.len(), ?range, "Spawning new block batch backfill task");
178 let job = Box::new(BackfillJob {
179 evm_config: this.evm_config.clone(),
180 provider: this.provider.clone(),
181 prune_modes: this.prune_modes.clone(),
182 thresholds: this.thresholds.clone(),
183 range,
184 stream_parallelism: this.parallelism,
185 }) as BackfillTaskIterator<_>;
186 this.push_back(job);
187 }
188
189 let res = ready!(this.poll_next_task(cx));
190
191 if res.is_some() {
192 return Poll::Ready(res);
193 }
194
195 if this.range.is_empty() {
196 return Poll::Ready(None);
198 }
199 }
200 }
201}
202
203impl<E, P> From<SingleBlockBackfillJob<E, P>> for StreamBackfillJob<E, P, SingleBlockStreamItem> {
204 fn from(job: SingleBlockBackfillJob<E, P>) -> Self {
205 Self {
206 evm_config: job.evm_config,
207 provider: job.provider,
208 prune_modes: PruneModes::default(),
209 range: job.range,
210 tasks: FuturesOrdered::new(),
211 parallelism: job.stream_parallelism,
212 batch_size: 1,
213 thresholds: ExecutionStageThresholds { max_blocks: Some(1), ..Default::default() },
214 }
215 }
216}
217
218impl<E, P> From<BackfillJob<E, P>> for StreamBackfillJob<E, P, BatchBlockStreamItem<E::Primitives>>
219where
220 E: ConfigureEvm,
221{
222 fn from(job: BackfillJob<E, P>) -> Self {
223 let batch_size = job.thresholds.max_blocks.map_or(DEFAULT_BATCH_SIZE, |max| max as usize);
224 Self {
225 evm_config: job.evm_config,
226 provider: job.provider,
227 prune_modes: job.prune_modes,
228 range: job.range,
229 tasks: FuturesOrdered::new(),
230 parallelism: job.stream_parallelism,
231 batch_size,
232 thresholds: ExecutionStageThresholds {
233 max_blocks: Some(batch_size as u64),
234 ..job.thresholds
235 },
236 }
237 }
238}
239
240#[cfg(test)]
241mod tests {
242 use crate::{
243 backfill::test_utils::{
244 blocks_and_execution_outcome, blocks_and_execution_outputs, chain_spec,
245 },
246 BackfillJobFactory,
247 };
248 use futures::StreamExt;
249 use reth_db_common::init::init_genesis;
250 use reth_evm_ethereum::execute::EthExecutorProvider;
251 use reth_primitives_traits::crypto::secp256k1::public_key_to_address;
252 use reth_provider::{
253 providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
254 };
255 use reth_stages_api::ExecutionStageThresholds;
256 use reth_testing_utils::generators;
257
258 #[tokio::test(flavor = "multi_thread")]
259 async fn test_single_blocks() -> eyre::Result<()> {
260 reth_tracing::init_test_tracing();
261
262 let key_pair = generators::generate_key(&mut generators::rng());
264 let address = public_key_to_address(key_pair.public_key());
265
266 let chain_spec = chain_spec(address);
267
268 let executor = EthExecutorProvider::ethereum(chain_spec.clone());
269 let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
270 init_genesis(&provider_factory)?;
271 let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
272
273 let blocks_and_execution_outcomes =
275 blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
276
277 let factory = BackfillJobFactory::new(executor.clone(), blockchain_db.clone());
279 let mut backfill_stream = factory.backfill(1..=1).into_single_blocks().into_stream();
280
281 let (block, mut execution_output) = backfill_stream.next().await.unwrap().unwrap();
283 execution_output.state.reverts.sort();
284 let expected_block = blocks_and_execution_outcomes[0].0.clone();
285 let expected_output = &blocks_and_execution_outcomes[0].1;
286 assert_eq!(block, expected_block);
287 assert_eq!(&execution_output, expected_output);
288
289 assert!(backfill_stream.next().await.is_none());
291
292 Ok(())
293 }
294
295 #[tokio::test(flavor = "multi_thread")]
296 async fn test_batch() -> eyre::Result<()> {
297 reth_tracing::init_test_tracing();
298
299 let key_pair = generators::generate_key(&mut generators::rng());
301 let address = public_key_to_address(key_pair.public_key());
302
303 let chain_spec = chain_spec(address);
304
305 let executor = EthExecutorProvider::ethereum(chain_spec.clone());
306 let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
307 init_genesis(&provider_factory)?;
308 let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
309
310 let (blocks, execution_outcome) =
312 blocks_and_execution_outcome(provider_factory, chain_spec, key_pair)?;
313
314 let factory = BackfillJobFactory::new(executor.clone(), blockchain_db.clone())
316 .with_thresholds(ExecutionStageThresholds { max_blocks: Some(2), ..Default::default() })
317 .with_stream_parallelism(1);
318 let mut backfill_stream = factory.backfill(1..=2).into_stream();
319 let mut chain = backfill_stream.next().await.unwrap().unwrap();
320 chain.execution_outcome_mut().state_mut().reverts.sort();
321
322 assert!(chain.blocks_iter().eq(&blocks));
323 assert_eq!(chain.execution_outcome(), &execution_outcome);
324
325 assert!(backfill_stream.next().await.is_none());
327
328 Ok(())
329 }
330}