reth_stages/stages/
bodies.rs

1use super::missing_static_data_error;
2use futures_util::TryStreamExt;
3use reth_codecs::Compact;
4use reth_db::{tables, transaction::DbTx};
5use reth_db_api::{cursor::DbCursorRO, transaction::DbTxMut};
6use reth_network_p2p::bodies::{downloader::BodyDownloader, response::BlockResponse};
7use reth_primitives::StaticFileSegment;
8use reth_primitives_traits::{Block, BlockBody, BlockHeader};
9use reth_provider::{
10    providers::StaticFileWriter, BlockReader, BlockWriter, DBProvider, ProviderError,
11    StaticFileProviderFactory, StatsReader, StorageLocation,
12};
13use reth_stages_api::{
14    EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId,
15    UnwindInput, UnwindOutput,
16};
17use reth_storage_errors::provider::ProviderResult;
18use std::{
19    cmp::Ordering,
20    task::{ready, Context, Poll},
21};
22use tracing::*;
23
24/// The body stage downloads block bodies.
25///
26/// The body stage downloads block bodies for all block headers stored locally in storage.
27///
28/// # Empty blocks
29///
30/// Blocks with an ommers hash corresponding to no ommers *and* a transaction root corresponding to
31/// no transactions will not have a block body downloaded for them, since it would be meaningless to
32/// do so.
33///
34/// This also means that if there is no body for the block in storage (assuming the
35/// block number <= the synced block of this stage), then the block can be considered empty.
36///
37/// # Tables
38///
39/// The bodies are processed and data is inserted into these tables:
40///
41/// - [`BlockOmmers`][reth_db::tables::BlockOmmers]
42/// - [`BlockBodies`][reth_db::tables::BlockBodyIndices]
43/// - [`Transactions`][reth_db::tables::Transactions]
44/// - [`TransactionBlocks`][reth_db::tables::TransactionBlocks]
45///
46/// # Genesis
47///
48/// This stage expects that the genesis has been inserted into the appropriate tables:
49///
50/// - The header tables (see [`HeaderStage`][crate::stages::HeaderStage])
51/// - The [`BlockOmmers`][reth_db::tables::BlockOmmers] table
52/// - The [`BlockBodies`][reth_db::tables::BlockBodyIndices] table
53/// - The [`Transactions`][reth_db::tables::Transactions] table
54#[derive(Debug)]
55pub struct BodyStage<D: BodyDownloader> {
56    /// The body downloader.
57    downloader: D,
58    /// Block response buffer.
59    buffer: Option<Vec<BlockResponse<D::Header, D::Body>>>,
60}
61
62impl<D: BodyDownloader> BodyStage<D> {
63    /// Create new bodies stage from downloader.
64    pub const fn new(downloader: D) -> Self {
65        Self { downloader, buffer: None }
66    }
67
68    /// Ensures that static files and database are in sync.
69    fn ensure_consistency<Provider>(
70        &self,
71        provider: &Provider,
72        unwind_block: Option<u64>,
73    ) -> Result<(), StageError>
74    where
75        Provider: DBProvider<Tx: DbTxMut> + BlockReader + StaticFileProviderFactory,
76    {
77        // Get id for the next tx_num of zero if there are no transactions.
78        let next_tx_num = provider
79            .tx_ref()
80            .cursor_read::<tables::TransactionBlocks>()?
81            .last()?
82            .map(|(id, _)| id + 1)
83            .unwrap_or_default();
84
85        let static_file_provider = provider.static_file_provider();
86
87        // Make sure Transactions static file is at the same height. If it's further, this
88        // input execution was interrupted previously and we need to unwind the static file.
89        let next_static_file_tx_num = static_file_provider
90            .get_highest_static_file_tx(StaticFileSegment::Transactions)
91            .map(|id| id + 1)
92            .unwrap_or_default();
93
94        match next_static_file_tx_num.cmp(&next_tx_num) {
95            // If static files are ahead, we are currently unwinding the stage or we didn't reach
96            // the database commit in a previous stage run. So, our only solution is to unwind the
97            // static files and proceed from the database expected height.
98            Ordering::Greater => {
99                let highest_db_block =
100                    provider.tx_ref().entries::<tables::BlockBodyIndices>()? as u64;
101                let mut static_file_producer =
102                    static_file_provider.latest_writer(StaticFileSegment::Transactions)?;
103                static_file_producer
104                    .prune_transactions(next_static_file_tx_num - next_tx_num, highest_db_block)?;
105                // Since this is a database <-> static file inconsistency, we commit the change
106                // straight away.
107                static_file_producer.commit()?;
108            }
109            // If static files are behind, then there was some corruption or loss of files. This
110            // error will trigger an unwind, that will bring the database to the same height as the
111            // static files.
112            Ordering::Less => {
113                // If we are already in the process of unwind, this might be fine because we will
114                // fix the inconsistency right away.
115                if let Some(unwind_to) = unwind_block {
116                    let next_tx_num_after_unwind = provider
117                        .tx_ref()
118                        .get::<tables::BlockBodyIndices>(unwind_to)?
119                        .map(|b| b.next_tx_num())
120                        .ok_or(ProviderError::BlockBodyIndicesNotFound(unwind_to))?;
121
122                    // This means we need a deeper unwind.
123                    if next_tx_num_after_unwind > next_static_file_tx_num {
124                        return Err(missing_static_data_error(
125                            next_static_file_tx_num.saturating_sub(1),
126                            &static_file_provider,
127                            provider,
128                            StaticFileSegment::Transactions,
129                        )?)
130                    }
131                } else {
132                    return Err(missing_static_data_error(
133                        next_static_file_tx_num.saturating_sub(1),
134                        &static_file_provider,
135                        provider,
136                        StaticFileSegment::Transactions,
137                    )?)
138                }
139            }
140            Ordering::Equal => {}
141        }
142
143        Ok(())
144    }
145}
146
147impl<Provider, D> Stage<Provider> for BodyStage<D>
148where
149    Provider: DBProvider<Tx: DbTxMut>
150        + StaticFileProviderFactory
151        + StatsReader
152        + BlockReader
153        + BlockWriter<Block: Block<Header = D::Header, Body = D::Body>>,
154    D: BodyDownloader<Header: BlockHeader, Body: BlockBody<Transaction: Compact>>,
155{
156    /// Return the id of the stage
157    fn id(&self) -> StageId {
158        StageId::Bodies
159    }
160
161    fn poll_execute_ready(
162        &mut self,
163        cx: &mut Context<'_>,
164        input: ExecInput,
165    ) -> Poll<Result<(), StageError>> {
166        if input.target_reached() || self.buffer.is_some() {
167            return Poll::Ready(Ok(()))
168        }
169
170        // Update the header range on the downloader
171        self.downloader.set_download_range(input.next_block_range())?;
172
173        // Poll next downloader item.
174        let maybe_next_result = ready!(self.downloader.try_poll_next_unpin(cx));
175
176        // Task downloader can return `None` only if the response relaying channel was closed. This
177        // is a fatal error to prevent the pipeline from running forever.
178        let response = match maybe_next_result {
179            Some(Ok(downloaded)) => {
180                self.buffer = Some(downloaded);
181                Ok(())
182            }
183            Some(Err(err)) => Err(err.into()),
184            None => Err(StageError::ChannelClosed),
185        };
186        Poll::Ready(response)
187    }
188
189    /// Download block bodies from the last checkpoint for this stage up until the latest synced
190    /// header, limited by the stage's batch size.
191    fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
192        if input.target_reached() {
193            return Ok(ExecOutput::done(input.checkpoint()))
194        }
195        let (from_block, to_block) = input.next_block_range().into_inner();
196
197        self.ensure_consistency(provider, None)?;
198
199        debug!(target: "sync::stages::bodies", stage_progress = from_block, target = to_block, "Commencing sync");
200
201        let buffer = self.buffer.take().ok_or(StageError::MissingDownloadBuffer)?;
202        trace!(target: "sync::stages::bodies", bodies_len = buffer.len(), "Writing blocks");
203        let highest_block = buffer.last().map(|r| r.block_number()).unwrap_or(from_block);
204
205        // Write bodies to database.
206        provider.append_block_bodies(
207            buffer
208                .into_iter()
209                .map(|response| (response.block_number(), response.into_body()))
210                .collect(),
211            // We are writing transactions directly to static files.
212            StorageLocation::StaticFiles,
213        )?;
214
215        // The stage is "done" if:
216        // - We got fewer blocks than our target
217        // - We reached our target and the target was not limited by the batch size of the stage
218        let done = highest_block == to_block;
219        Ok(ExecOutput {
220            checkpoint: StageCheckpoint::new(highest_block)
221                .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
222            done,
223        })
224    }
225
226    /// Unwind the stage.
227    fn unwind(
228        &mut self,
229        provider: &Provider,
230        input: UnwindInput,
231    ) -> Result<UnwindOutput, StageError> {
232        self.buffer.take();
233
234        self.ensure_consistency(provider, Some(input.unwind_to))?;
235        provider.remove_bodies_above(input.unwind_to, StorageLocation::Both)?;
236
237        Ok(UnwindOutput {
238            checkpoint: StageCheckpoint::new(input.unwind_to)
239                .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
240        })
241    }
242}
243
244// TODO(alexey): ideally, we want to measure Bodies stage progress in bytes, but it's hard to know
245//  beforehand how many bytes we need to download. So the good solution would be to measure the
246//  progress in gas as a proxy to size. Execution stage uses a similar approach.
247fn stage_checkpoint<Provider>(provider: &Provider) -> ProviderResult<EntitiesCheckpoint>
248where
249    Provider: StatsReader + StaticFileProviderFactory,
250{
251    Ok(EntitiesCheckpoint {
252        processed: provider.count_entries::<tables::BlockBodyIndices>()? as u64,
253        // Count only static files entries. If we count the database entries too, we may have
254        // duplicates. We're sure that the static files have all entries that database has,
255        // because we run the `StaticFileProducer` before starting the pipeline.
256        total: provider.static_file_provider().count_entries::<tables::Headers>()? as u64,
257    })
258}
259
260#[cfg(test)]
261mod tests {
262    use super::*;
263    use crate::test_utils::{
264        stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, UnwindStageTestRunner,
265    };
266    use assert_matches::assert_matches;
267    use reth_provider::StaticFileProviderFactory;
268    use reth_stages_api::StageUnitCheckpoint;
269    use test_utils::*;
270
271    stage_test_suite_ext!(BodyTestRunner, body);
272
273    /// Checks that the stage downloads at most `batch_size` blocks.
274    #[tokio::test]
275    async fn partial_body_download() {
276        let (stage_progress, previous_stage) = (1, 200);
277
278        // Set up test runner
279        let mut runner = BodyTestRunner::default();
280        let input = ExecInput {
281            target: Some(previous_stage),
282            checkpoint: Some(StageCheckpoint::new(stage_progress)),
283        };
284        runner.seed_execution(input).expect("failed to seed execution");
285
286        // Set the batch size (max we sync per stage execution) to less than the number of blocks
287        // the previous stage synced (10 vs 20)
288        let batch_size = 10;
289        runner.set_batch_size(batch_size);
290
291        // Run the stage
292        let rx = runner.execute(input);
293
294        // Check that we only synced around `batch_size` blocks even though the number of blocks
295        // synced by the previous stage is higher
296        let output = rx.await.unwrap();
297        runner.db().factory.static_file_provider().commit().unwrap();
298        assert_matches!(
299            output,
300            Ok(ExecOutput { checkpoint: StageCheckpoint {
301                block_number,
302                stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
303                    processed, // 1 seeded block body + batch size
304                    total // seeded headers
305                }))
306            }, done: false }) if block_number < 200 &&
307                processed == batch_size + 1 && total == previous_stage + 1
308        );
309        assert!(runner.validate_execution(input, output.ok()).is_ok(), "execution validation");
310    }
311
312    /// Same as [partial_body_download] except the `batch_size` is not hit.
313    #[tokio::test]
314    async fn full_body_download() {
315        let (stage_progress, previous_stage) = (1, 20);
316
317        // Set up test runner
318        let mut runner = BodyTestRunner::default();
319        let input = ExecInput {
320            target: Some(previous_stage),
321            checkpoint: Some(StageCheckpoint::new(stage_progress)),
322        };
323        runner.seed_execution(input).expect("failed to seed execution");
324
325        // Set the batch size to more than what the previous stage synced (40 vs 20)
326        runner.set_batch_size(40);
327
328        // Run the stage
329        let rx = runner.execute(input);
330
331        // Check that we synced all blocks successfully, even though our `batch_size` allows us to
332        // sync more (if there were more headers)
333        let output = rx.await.unwrap();
334        runner.db().factory.static_file_provider().commit().unwrap();
335        assert_matches!(
336            output,
337            Ok(ExecOutput {
338                checkpoint: StageCheckpoint {
339                    block_number: 20,
340                    stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
341                        processed,
342                        total
343                    }))
344                },
345                done: true
346            }) if processed + 1 == total && total == previous_stage + 1
347        );
348        assert!(runner.validate_execution(input, output.ok()).is_ok(), "execution validation");
349    }
350
351    /// Same as [full_body_download] except we have made progress before
352    #[tokio::test]
353    async fn sync_from_previous_progress() {
354        let (stage_progress, previous_stage) = (1, 21);
355
356        // Set up test runner
357        let mut runner = BodyTestRunner::default();
358        let input = ExecInput {
359            target: Some(previous_stage),
360            checkpoint: Some(StageCheckpoint::new(stage_progress)),
361        };
362        runner.seed_execution(input).expect("failed to seed execution");
363
364        let batch_size = 10;
365        runner.set_batch_size(batch_size);
366
367        // Run the stage
368        let rx = runner.execute(input);
369
370        // Check that we synced at least 10 blocks
371        let first_run = rx.await.unwrap();
372        runner.db().factory.static_file_provider().commit().unwrap();
373        assert_matches!(
374            first_run,
375            Ok(ExecOutput { checkpoint: StageCheckpoint {
376                block_number,
377                stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
378                    processed,
379                    total
380                }))
381            }, done: false }) if block_number >= 10 &&
382                processed - 1 == batch_size && total == previous_stage + 1
383        );
384        let first_run_checkpoint = first_run.unwrap().checkpoint;
385
386        // Execute again on top of the previous run
387        let input =
388            ExecInput { target: Some(previous_stage), checkpoint: Some(first_run_checkpoint) };
389        let rx = runner.execute(input);
390
391        // Check that we synced more blocks
392        let output = rx.await.unwrap();
393        runner.db().factory.static_file_provider().commit().unwrap();
394        assert_matches!(
395            output,
396            Ok(ExecOutput { checkpoint: StageCheckpoint {
397                block_number,
398                stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
399                    processed,
400                    total
401                }))
402            }, done: true }) if block_number > first_run_checkpoint.block_number &&
403                processed + 1 == total && total == previous_stage + 1
404        );
405        assert_matches!(
406            runner.validate_execution(input, output.ok()),
407            Ok(_),
408            "execution validation"
409        );
410    }
411
412    /// Checks that the stage unwinds correctly, even if a transaction in a block is missing.
413    #[tokio::test]
414    async fn unwind_missing_tx() {
415        let (stage_progress, previous_stage) = (1, 20);
416
417        // Set up test runner
418        let mut runner = BodyTestRunner::default();
419        let input = ExecInput {
420            target: Some(previous_stage),
421            checkpoint: Some(StageCheckpoint::new(stage_progress)),
422        };
423        runner.seed_execution(input).expect("failed to seed execution");
424
425        // Set the batch size to more than what the previous stage synced (40 vs 20)
426        runner.set_batch_size(40);
427
428        // Run the stage
429        let rx = runner.execute(input);
430
431        // Check that we synced all blocks successfully, even though our `batch_size` allows us to
432        // sync more (if there were more headers)
433        let output = rx.await.unwrap();
434        runner.db().factory.static_file_provider().commit().unwrap();
435        assert_matches!(
436            output,
437            Ok(ExecOutput { checkpoint: StageCheckpoint {
438                block_number,
439                stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
440                    processed,
441                    total
442                }))
443            }, done: true }) if block_number == previous_stage &&
444                processed + 1 == total && total == previous_stage + 1
445        );
446        let checkpoint = output.unwrap().checkpoint;
447        runner
448            .validate_db_blocks(input.checkpoint().block_number, checkpoint.block_number)
449            .expect("Written block data invalid");
450
451        // Delete a transaction
452        let static_file_provider = runner.db().factory.static_file_provider();
453        {
454            let mut static_file_producer =
455                static_file_provider.latest_writer(StaticFileSegment::Transactions).unwrap();
456            static_file_producer.prune_transactions(1, checkpoint.block_number).unwrap();
457            static_file_producer.commit().unwrap();
458        }
459        // Unwind all of it
460        let unwind_to = 1;
461        let input = UnwindInput { bad_block: None, checkpoint, unwind_to };
462        let res = runner.unwind(input).await;
463        assert_matches!(
464            res,
465            Ok(UnwindOutput { checkpoint: StageCheckpoint {
466                block_number: 1,
467                stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
468                    processed: 1,
469                    total
470                }))
471            }}) if total == previous_stage + 1
472        );
473
474        assert_matches!(runner.validate_unwind(input), Ok(_), "unwind validation");
475    }
476
477    mod test_utils {
478        use crate::{
479            stages::bodies::BodyStage,
480            test_utils::{
481                ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestStageDB,
482                UnwindStageTestRunner,
483            },
484        };
485        use alloy_consensus::{BlockHeader, Header};
486        use alloy_primitives::{BlockNumber, TxNumber, B256};
487        use futures_util::Stream;
488        use reth_db::{static_file::HeaderWithHashMask, tables};
489        use reth_db_api::{
490            cursor::DbCursorRO,
491            models::{StoredBlockBodyIndices, StoredBlockOmmers},
492            transaction::{DbTx, DbTxMut},
493        };
494        use reth_network_p2p::{
495            bodies::{
496                downloader::{BodyDownloader, BodyDownloaderResult},
497                response::BlockResponse,
498            },
499            error::DownloadResult,
500        };
501        use reth_primitives::{BlockBody, SealedBlock, SealedHeader, StaticFileSegment};
502        use reth_provider::{
503            providers::StaticFileWriter, test_utils::MockNodeTypesWithDB, HeaderProvider,
504            ProviderFactory, StaticFileProviderFactory, TransactionsProvider,
505        };
506        use reth_stages_api::{ExecInput, ExecOutput, UnwindInput};
507        use reth_testing_utils::generators::{
508            self, random_block_range, random_signed_tx, BlockRangeParams,
509        };
510        use std::{
511            collections::{HashMap, VecDeque},
512            ops::RangeInclusive,
513            pin::Pin,
514            task::{Context, Poll},
515        };
516
517        /// The block hash of the genesis block.
518        pub(crate) const GENESIS_HASH: B256 = B256::ZERO;
519
520        /// A helper to create a collection of block bodies keyed by their hash.
521        pub(crate) fn body_by_hash(block: &SealedBlock) -> (B256, BlockBody) {
522            (block.hash(), block.body.clone())
523        }
524
525        /// A helper struct for running the [`BodyStage`].
526        pub(crate) struct BodyTestRunner {
527            responses: HashMap<B256, BlockBody>,
528            db: TestStageDB,
529            batch_size: u64,
530        }
531
532        impl Default for BodyTestRunner {
533            fn default() -> Self {
534                Self { responses: HashMap::default(), db: TestStageDB::default(), batch_size: 1000 }
535            }
536        }
537
538        impl BodyTestRunner {
539            pub(crate) fn set_batch_size(&mut self, batch_size: u64) {
540                self.batch_size = batch_size;
541            }
542
543            pub(crate) fn set_responses(&mut self, responses: HashMap<B256, BlockBody>) {
544                self.responses = responses;
545            }
546        }
547
548        impl StageTestRunner for BodyTestRunner {
549            type S = BodyStage<TestBodyDownloader>;
550
551            fn db(&self) -> &TestStageDB {
552                &self.db
553            }
554
555            fn stage(&self) -> Self::S {
556                BodyStage::new(TestBodyDownloader::new(
557                    self.db.factory.clone(),
558                    self.responses.clone(),
559                    self.batch_size,
560                ))
561            }
562        }
563
564        impl ExecuteStageTestRunner for BodyTestRunner {
565            type Seed = Vec<SealedBlock>;
566
567            fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
568                let start = input.checkpoint().block_number;
569                let end = input.target();
570
571                let static_file_provider = self.db.factory.static_file_provider();
572
573                let mut rng = generators::rng();
574
575                // Static files do not support gaps in headers, so we need to generate 0 to end
576                let blocks = random_block_range(
577                    &mut rng,
578                    0..=end,
579                    BlockRangeParams {
580                        parent: Some(GENESIS_HASH),
581                        tx_count: 0..2,
582                        ..Default::default()
583                    },
584                );
585                self.db.insert_headers_with_td(blocks.iter().map(|block| &block.header))?;
586                if let Some(progress) = blocks.get(start as usize) {
587                    // Insert last progress data
588                    {
589                        let tx = self.db.factory.provider_rw()?.into_tx();
590                        let mut static_file_producer = static_file_provider
591                            .get_writer(start, StaticFileSegment::Transactions)?;
592
593                        let body = StoredBlockBodyIndices {
594                            first_tx_num: 0,
595                            tx_count: progress.body.transactions.len() as u64,
596                        };
597
598                        static_file_producer.set_block_range(0..=progress.number);
599
600                        body.tx_num_range().try_for_each(|tx_num| {
601                            let transaction = random_signed_tx(&mut rng);
602                            static_file_producer.append_transaction(tx_num, &transaction).map(drop)
603                        })?;
604
605                        if body.tx_count != 0 {
606                            tx.put::<tables::TransactionBlocks>(
607                                body.last_tx_num(),
608                                progress.number,
609                            )?;
610                        }
611
612                        tx.put::<tables::BlockBodyIndices>(progress.number, body)?;
613
614                        if !progress.ommers_hash_is_empty() {
615                            tx.put::<tables::BlockOmmers>(
616                                progress.number,
617                                StoredBlockOmmers { ommers: progress.body.ommers.clone() },
618                            )?;
619                        }
620
621                        static_file_producer.commit()?;
622                        tx.commit()?;
623                    }
624                }
625                self.set_responses(blocks.iter().map(body_by_hash).collect());
626                Ok(blocks)
627            }
628
629            fn validate_execution(
630                &self,
631                input: ExecInput,
632                output: Option<ExecOutput>,
633            ) -> Result<(), TestRunnerError> {
634                let highest_block = match output.as_ref() {
635                    Some(output) => output.checkpoint,
636                    None => input.checkpoint(),
637                }
638                .block_number;
639                self.validate_db_blocks(highest_block, highest_block)
640            }
641        }
642
643        impl UnwindStageTestRunner for BodyTestRunner {
644            fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
645                self.db.ensure_no_entry_above::<tables::BlockBodyIndices, _>(
646                    input.unwind_to,
647                    |key| key,
648                )?;
649                self.db
650                    .ensure_no_entry_above::<tables::BlockOmmers, _>(input.unwind_to, |key| key)?;
651                if let Some(last_tx_id) = self.get_last_tx_id()? {
652                    self.db
653                        .ensure_no_entry_above::<tables::Transactions, _>(last_tx_id, |key| key)?;
654                    self.db.ensure_no_entry_above::<tables::TransactionBlocks, _>(
655                        last_tx_id,
656                        |key| key,
657                    )?;
658                }
659                Ok(())
660            }
661        }
662
663        impl BodyTestRunner {
664            /// Get the last available tx id if any
665            pub(crate) fn get_last_tx_id(&self) -> Result<Option<TxNumber>, TestRunnerError> {
666                let last_body = self.db.query(|tx| {
667                    let v = tx.cursor_read::<tables::BlockBodyIndices>()?.last()?;
668                    Ok(v)
669                })?;
670                Ok(match last_body {
671                    Some((_, body)) if body.tx_count != 0 => {
672                        Some(body.first_tx_num + body.tx_count - 1)
673                    }
674                    _ => None,
675                })
676            }
677
678            /// Validate that the inserted block data is valid
679            pub(crate) fn validate_db_blocks(
680                &self,
681                prev_progress: BlockNumber,
682                highest_block: BlockNumber,
683            ) -> Result<(), TestRunnerError> {
684                let static_file_provider = self.db.factory.static_file_provider();
685
686                self.db.query(|tx| {
687                    // Acquire cursors on body related tables
688                    let mut bodies_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
689                    let mut ommers_cursor = tx.cursor_read::<tables::BlockOmmers>()?;
690                    let mut tx_block_cursor = tx.cursor_read::<tables::TransactionBlocks>()?;
691
692                    let first_body_key = match bodies_cursor.first()? {
693                        Some((key, _)) => key,
694                        None => return Ok(()),
695                    };
696
697                    let mut prev_number: Option<BlockNumber> = None;
698
699
700                    for entry in bodies_cursor.walk(Some(first_body_key))? {
701                        let (number, body) = entry?;
702
703                        // Validate sequentiality only after prev progress,
704                        // since the data before is mocked and can contain gaps
705                        if number > prev_progress {
706                            if let Some(prev_key) = prev_number {
707                                assert_eq!(prev_key + 1, number, "Body entries must be sequential");
708                            }
709                        }
710
711                        // Validate that the current entry is below or equals to the highest allowed block
712                        assert!(
713                            number <= highest_block,
714                            "We wrote a block body outside of our synced range. Found block with number {number}, highest block according to stage is {highest_block}",
715                        );
716
717                        let header = static_file_provider.header_by_number(number)?.expect("to be present");
718                        // Validate that ommers exist if any
719                        let stored_ommers =  ommers_cursor.seek_exact(number)?;
720                        if header.ommers_hash_is_empty() {
721                            assert!(stored_ommers.is_none(), "Unexpected ommers entry");
722                        } else {
723                            assert!(stored_ommers.is_some(), "Missing ommers entry");
724                        }
725
726                        let tx_block_id = tx_block_cursor.seek_exact(body.last_tx_num())?.map(|(_,b)| b);
727                        if body.tx_count == 0 {
728                            assert_ne!(tx_block_id,Some(number));
729                        } else {
730                            assert_eq!(tx_block_id, Some(number));
731                        }
732
733                        for tx_id in body.tx_num_range() {
734                            assert!(static_file_provider.transaction_by_id(tx_id)?.is_some(), "Transaction is missing.");
735                        }
736
737                        prev_number = Some(number);
738                    }
739                    Ok(())
740                })?;
741                Ok(())
742            }
743        }
744
745        /// A [`BodyDownloader`] that is backed by an internal [`HashMap`] for testing.
746        #[derive(Debug)]
747        pub(crate) struct TestBodyDownloader {
748            provider_factory: ProviderFactory<MockNodeTypesWithDB>,
749            responses: HashMap<B256, BlockBody>,
750            headers: VecDeque<SealedHeader>,
751            batch_size: u64,
752        }
753
754        impl TestBodyDownloader {
755            pub(crate) fn new(
756                provider_factory: ProviderFactory<MockNodeTypesWithDB>,
757                responses: HashMap<B256, BlockBody>,
758                batch_size: u64,
759            ) -> Self {
760                Self { provider_factory, responses, headers: VecDeque::default(), batch_size }
761            }
762        }
763
764        impl BodyDownloader for TestBodyDownloader {
765            type Header = Header;
766            type Body = BlockBody;
767
768            fn set_download_range(
769                &mut self,
770                range: RangeInclusive<BlockNumber>,
771            ) -> DownloadResult<()> {
772                let static_file_provider = self.provider_factory.static_file_provider();
773
774                for header in static_file_provider.fetch_range_iter(
775                    StaticFileSegment::Headers,
776                    *range.start()..*range.end() + 1,
777                    |cursor, number| cursor.get_two::<HeaderWithHashMask<Header>>(number.into()),
778                )? {
779                    let (header, hash) = header?;
780                    self.headers.push_back(SealedHeader::new(header, hash));
781                }
782
783                Ok(())
784            }
785        }
786
787        impl Stream for TestBodyDownloader {
788            type Item = BodyDownloaderResult<Header, BlockBody>;
789            fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
790                let this = self.get_mut();
791
792                if this.headers.is_empty() {
793                    return Poll::Ready(None)
794                }
795
796                let mut response =
797                    Vec::with_capacity(std::cmp::min(this.headers.len(), this.batch_size as usize));
798                while let Some(header) = this.headers.pop_front() {
799                    if header.is_empty() {
800                        response.push(BlockResponse::Empty(header))
801                    } else {
802                        let body =
803                            this.responses.remove(&header.hash()).expect("requested unknown body");
804                        response.push(BlockResponse::Full(SealedBlock { header, body }));
805                    }
806
807                    if response.len() as u64 >= this.batch_size {
808                        break
809                    }
810                }
811
812                if !response.is_empty() {
813                    return Poll::Ready(Some(Ok(response)))
814                }
815
816                panic!("requested bodies without setting headers")
817            }
818        }
819    }
820}