reth_stages/
sets.rs

1//! Built-in [`StageSet`]s.
2//!
3//! The easiest set to use is [`DefaultStages`], which provides all stages required to run an
4//! instance of reth.
5//!
6//! It is also possible to run parts of reth standalone given the required data is present in
7//! the environment, such as [`ExecutionStages`] or [`HashingStages`].
8//!
9//!
10//! # Examples
11//!
12//! ```no_run
13//! # use reth_stages::Pipeline;
14//! # use reth_stages::sets::{OfflineStages};
15//! # use reth_chainspec::MAINNET;
16//! # use reth_prune_types::PruneModes;
17//! # use reth_evm_ethereum::EthEvmConfig;
18//! # use reth_provider::StaticFileProviderFactory;
19//! # use reth_provider::test_utils::{create_test_provider_factory, MockNodeTypesWithDB};
20//! # use reth_static_file::StaticFileProducer;
21//! # use reth_config::config::StageConfig;
22//! # use reth_evm::execute::BlockExecutorProvider;
23//! # use reth_primitives::EthPrimitives;
24//!
25//! # fn create(exec: impl BlockExecutorProvider<Primitives = EthPrimitives>) {
26//!
27//! let provider_factory = create_test_provider_factory();
28//! let static_file_producer =
29//!     StaticFileProducer::new(provider_factory.clone(), PruneModes::default());
30//! // Build a pipeline with all offline stages.
31//! let pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
32//!     .add_stages(OfflineStages::new(exec, StageConfig::default(), PruneModes::default()))
33//!     .build(provider_factory, static_file_producer);
34//!
35//! # }
36//! ```
37use crate::{
38    stages::{
39        AccountHashingStage, BodyStage, ExecutionStage, FinishStage, HeaderStage,
40        IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, PruneSenderRecoveryStage,
41        PruneStage, SenderRecoveryStage, StorageHashingStage, TransactionLookupStage,
42    },
43    StageSet, StageSetBuilder,
44};
45use alloy_primitives::B256;
46use reth_config::config::StageConfig;
47use reth_consensus::Consensus;
48use reth_evm::execute::BlockExecutorProvider;
49use reth_network_p2p::{bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader};
50use reth_provider::HeaderSyncGapProvider;
51use reth_prune_types::PruneModes;
52use reth_stages_api::Stage;
53use std::{ops::Not, sync::Arc};
54use tokio::sync::watch;
55
56/// A set containing all stages to run a fully syncing instance of reth.
57///
58/// A combination of (in order)
59///
60/// - [`OnlineStages`]
61/// - [`OfflineStages`]
62/// - [`FinishStage`]
63///
64/// This expands to the following series of stages:
65/// - [`HeaderStage`]
66/// - [`BodyStage`]
67/// - [`SenderRecoveryStage`]
68/// - [`ExecutionStage`]
69/// - [`PruneSenderRecoveryStage`] (execute)
70/// - [`MerkleStage`] (unwind)
71/// - [`AccountHashingStage`]
72/// - [`StorageHashingStage`]
73/// - [`MerkleStage`] (execute)
74/// - [`TransactionLookupStage`]
75/// - [`IndexStorageHistoryStage`]
76/// - [`IndexAccountHistoryStage`]
77/// - [`PruneStage`] (execute)
78/// - [`FinishStage`]
79#[derive(Debug)]
80pub struct DefaultStages<Provider, H, B, EF>
81where
82    H: HeaderDownloader,
83    B: BodyDownloader,
84{
85    /// Configuration for the online stages
86    online: OnlineStages<Provider, H, B>,
87    /// Executor factory needs for execution stage
88    executor_factory: EF,
89    /// Configuration for each stage in the pipeline
90    stages_config: StageConfig,
91    /// Prune configuration for every segment that can be pruned
92    prune_modes: PruneModes,
93}
94
95impl<Provider, H, B, E> DefaultStages<Provider, H, B, E>
96where
97    H: HeaderDownloader,
98    B: BodyDownloader,
99{
100    /// Create a new set of default stages with default values.
101    #[allow(clippy::too_many_arguments)]
102    pub fn new(
103        provider: Provider,
104        tip: watch::Receiver<B256>,
105        consensus: Arc<dyn Consensus<H::Header, B::Body>>,
106        header_downloader: H,
107        body_downloader: B,
108        executor_factory: E,
109        stages_config: StageConfig,
110        prune_modes: PruneModes,
111    ) -> Self
112    where
113        E: BlockExecutorProvider,
114    {
115        Self {
116            online: OnlineStages::new(
117                provider,
118                tip,
119                consensus,
120                header_downloader,
121                body_downloader,
122                stages_config.clone(),
123            ),
124            executor_factory,
125            stages_config,
126            prune_modes,
127        }
128    }
129}
130
131impl<P, H, B, E> DefaultStages<P, H, B, E>
132where
133    E: BlockExecutorProvider,
134    H: HeaderDownloader,
135    B: BodyDownloader,
136{
137    /// Appends the default offline stages and default finish stage to the given builder.
138    pub fn add_offline_stages<Provider>(
139        default_offline: StageSetBuilder<Provider>,
140        executor_factory: E,
141        stages_config: StageConfig,
142        prune_modes: PruneModes,
143    ) -> StageSetBuilder<Provider>
144    where
145        OfflineStages<E>: StageSet<Provider>,
146    {
147        StageSetBuilder::default()
148            .add_set(default_offline)
149            .add_set(OfflineStages::new(executor_factory, stages_config, prune_modes))
150            .add_stage(FinishStage)
151    }
152}
153
154impl<P, H, B, E, Provider> StageSet<Provider> for DefaultStages<P, H, B, E>
155where
156    P: HeaderSyncGapProvider + 'static,
157    H: HeaderDownloader + 'static,
158    B: BodyDownloader + 'static,
159    E: BlockExecutorProvider,
160    OnlineStages<P, H, B>: StageSet<Provider>,
161    OfflineStages<E>: StageSet<Provider>,
162{
163    fn builder(self) -> StageSetBuilder<Provider> {
164        Self::add_offline_stages(
165            self.online.builder(),
166            self.executor_factory,
167            self.stages_config.clone(),
168            self.prune_modes,
169        )
170    }
171}
172
173/// A set containing all stages that require network access by default.
174///
175/// These stages *can* be run without network access if the specified downloaders are
176/// themselves offline.
177#[derive(Debug)]
178pub struct OnlineStages<Provider, H, B>
179where
180    H: HeaderDownloader,
181    B: BodyDownloader,
182{
183    /// Sync gap provider for the headers stage.
184    provider: Provider,
185    /// The tip for the headers stage.
186    tip: watch::Receiver<B256>,
187    /// The consensus engine used to validate incoming data.
188    consensus: Arc<dyn Consensus<H::Header, B::Body>>,
189    /// The block header downloader
190    header_downloader: H,
191    /// The block body downloader
192    body_downloader: B,
193    /// Configuration for each stage in the pipeline
194    stages_config: StageConfig,
195}
196
197impl<Provider, H, B> OnlineStages<Provider, H, B>
198where
199    H: HeaderDownloader,
200    B: BodyDownloader,
201{
202    /// Create a new set of online stages with default values.
203    pub fn new(
204        provider: Provider,
205        tip: watch::Receiver<B256>,
206        consensus: Arc<dyn Consensus<H::Header, B::Body>>,
207        header_downloader: H,
208        body_downloader: B,
209        stages_config: StageConfig,
210    ) -> Self {
211        Self { provider, tip, consensus, header_downloader, body_downloader, stages_config }
212    }
213}
214
215impl<P, H, B> OnlineStages<P, H, B>
216where
217    P: HeaderSyncGapProvider + 'static,
218    H: HeaderDownloader + 'static,
219    B: BodyDownloader + 'static,
220{
221    /// Create a new builder using the given headers stage.
222    pub fn builder_with_headers<Provider>(
223        headers: HeaderStage<P, H>,
224        body_downloader: B,
225    ) -> StageSetBuilder<Provider>
226    where
227        HeaderStage<P, H>: Stage<Provider>,
228        BodyStage<B>: Stage<Provider>,
229    {
230        StageSetBuilder::default().add_stage(headers).add_stage(BodyStage::new(body_downloader))
231    }
232
233    /// Create a new builder using the given bodies stage.
234    pub fn builder_with_bodies<Provider>(
235        bodies: BodyStage<B>,
236        provider: P,
237        tip: watch::Receiver<B256>,
238        header_downloader: H,
239        consensus: Arc<dyn Consensus<H::Header, B::Body>>,
240        stages_config: StageConfig,
241    ) -> StageSetBuilder<Provider>
242    where
243        BodyStage<B>: Stage<Provider>,
244        HeaderStage<P, H>: Stage<Provider>,
245    {
246        StageSetBuilder::default()
247            .add_stage(HeaderStage::new(
248                provider,
249                header_downloader,
250                tip,
251                consensus.clone().as_header_validator(),
252                stages_config.etl,
253            ))
254            .add_stage(bodies)
255    }
256}
257
258impl<Provider, P, H, B> StageSet<Provider> for OnlineStages<P, H, B>
259where
260    P: HeaderSyncGapProvider + 'static,
261    H: HeaderDownloader + 'static,
262    B: BodyDownloader + 'static,
263    HeaderStage<P, H>: Stage<Provider>,
264    BodyStage<B>: Stage<Provider>,
265{
266    fn builder(self) -> StageSetBuilder<Provider> {
267        StageSetBuilder::default()
268            .add_stage(HeaderStage::new(
269                self.provider,
270                self.header_downloader,
271                self.tip,
272                self.consensus.clone().as_header_validator(),
273                self.stages_config.etl.clone(),
274            ))
275            .add_stage(BodyStage::new(self.body_downloader))
276    }
277}
278
279/// A set containing all stages that do not require network access.
280///
281/// A combination of (in order)
282///
283/// - [`ExecutionStages`]
284/// - [`PruneSenderRecoveryStage`]
285/// - [`HashingStages`]
286/// - [`HistoryIndexingStages`]
287/// - [`PruneStage`]
288#[derive(Debug, Default)]
289#[non_exhaustive]
290pub struct OfflineStages<EF> {
291    /// Executor factory needs for execution stage
292    executor_factory: EF,
293    /// Configuration for each stage in the pipeline
294    stages_config: StageConfig,
295    /// Prune configuration for every segment that can be pruned
296    prune_modes: PruneModes,
297}
298
299impl<EF> OfflineStages<EF> {
300    /// Create a new set of offline stages with default values.
301    pub const fn new(
302        executor_factory: EF,
303        stages_config: StageConfig,
304        prune_modes: PruneModes,
305    ) -> Self {
306        Self { executor_factory, stages_config, prune_modes }
307    }
308}
309
310impl<E, Provider> StageSet<Provider> for OfflineStages<E>
311where
312    E: BlockExecutorProvider,
313    ExecutionStages<E>: StageSet<Provider>,
314    PruneSenderRecoveryStage: Stage<Provider>,
315    HashingStages: StageSet<Provider>,
316    HistoryIndexingStages: StageSet<Provider>,
317    PruneStage: Stage<Provider>,
318{
319    fn builder(self) -> StageSetBuilder<Provider> {
320        ExecutionStages::new(
321            self.executor_factory,
322            self.stages_config.clone(),
323            self.prune_modes.clone(),
324        )
325        .builder()
326        // If sender recovery prune mode is set, add the prune sender recovery stage.
327        .add_stage_opt(self.prune_modes.sender_recovery.map(|prune_mode| {
328            PruneSenderRecoveryStage::new(prune_mode, self.stages_config.prune.commit_threshold)
329        }))
330        .add_set(HashingStages { stages_config: self.stages_config.clone() })
331        .add_set(HistoryIndexingStages {
332            stages_config: self.stages_config.clone(),
333            prune_modes: self.prune_modes.clone(),
334        })
335        // If any prune modes are set, add the prune stage.
336        .add_stage_opt(self.prune_modes.is_empty().not().then(|| {
337            // Prune stage should be added after all hashing stages, because otherwise it will
338            // delete
339            PruneStage::new(self.prune_modes.clone(), self.stages_config.prune.commit_threshold)
340        }))
341    }
342}
343
344/// A set containing all stages that are required to execute pre-existing block data.
345#[derive(Debug)]
346#[non_exhaustive]
347pub struct ExecutionStages<E> {
348    /// Executor factory that will create executors.
349    executor_factory: E,
350    /// Configuration for each stage in the pipeline
351    stages_config: StageConfig,
352    /// Prune configuration for every segment that can be pruned
353    prune_modes: PruneModes,
354}
355
356impl<E> ExecutionStages<E> {
357    /// Create a new set of execution stages with default values.
358    pub const fn new(
359        executor_factory: E,
360        stages_config: StageConfig,
361        prune_modes: PruneModes,
362    ) -> Self {
363        Self { executor_factory, stages_config, prune_modes }
364    }
365}
366
367impl<E, Provider> StageSet<Provider> for ExecutionStages<E>
368where
369    E: BlockExecutorProvider,
370    SenderRecoveryStage: Stage<Provider>,
371    ExecutionStage<E>: Stage<Provider>,
372{
373    fn builder(self) -> StageSetBuilder<Provider> {
374        StageSetBuilder::default()
375            .add_stage(SenderRecoveryStage::new(self.stages_config.sender_recovery))
376            .add_stage(ExecutionStage::from_config(
377                self.executor_factory,
378                self.stages_config.execution,
379                self.stages_config.execution_external_clean_threshold(),
380                self.prune_modes,
381            ))
382    }
383}
384
385/// A set containing all stages that hash account state.
386#[derive(Debug, Default)]
387#[non_exhaustive]
388pub struct HashingStages {
389    /// Configuration for each stage in the pipeline
390    stages_config: StageConfig,
391}
392
393impl<Provider> StageSet<Provider> for HashingStages
394where
395    MerkleStage: Stage<Provider>,
396    AccountHashingStage: Stage<Provider>,
397    StorageHashingStage: Stage<Provider>,
398{
399    fn builder(self) -> StageSetBuilder<Provider> {
400        StageSetBuilder::default()
401            .add_stage(MerkleStage::default_unwind())
402            .add_stage(AccountHashingStage::new(
403                self.stages_config.account_hashing,
404                self.stages_config.etl.clone(),
405            ))
406            .add_stage(StorageHashingStage::new(
407                self.stages_config.storage_hashing,
408                self.stages_config.etl.clone(),
409            ))
410            .add_stage(MerkleStage::new_execution(self.stages_config.merkle.clean_threshold))
411    }
412}
413
414/// A set containing all stages that do additional indexing for historical state.
415#[derive(Debug, Default)]
416#[non_exhaustive]
417pub struct HistoryIndexingStages {
418    /// Configuration for each stage in the pipeline
419    stages_config: StageConfig,
420    /// Prune configuration for every segment that can be pruned
421    prune_modes: PruneModes,
422}
423
424impl<Provider> StageSet<Provider> for HistoryIndexingStages
425where
426    TransactionLookupStage: Stage<Provider>,
427    IndexStorageHistoryStage: Stage<Provider>,
428    IndexAccountHistoryStage: Stage<Provider>,
429{
430    fn builder(self) -> StageSetBuilder<Provider> {
431        StageSetBuilder::default()
432            .add_stage(TransactionLookupStage::new(
433                self.stages_config.transaction_lookup,
434                self.stages_config.etl.clone(),
435                self.prune_modes.transaction_lookup,
436            ))
437            .add_stage(IndexStorageHistoryStage::new(
438                self.stages_config.index_storage_history,
439                self.stages_config.etl.clone(),
440                self.prune_modes.account_history,
441            ))
442            .add_stage(IndexAccountHistoryStage::new(
443                self.stages_config.index_account_history,
444                self.stages_config.etl.clone(),
445                self.prune_modes.storage_history,
446            ))
447    }
448}