reth_stages/
sets.rs

1//! Built-in [`StageSet`]s.
2//!
3//! The easiest set to use is [`DefaultStages`], which provides all stages required to run an
4//! instance of reth.
5//!
6//! It is also possible to run parts of reth standalone given the required data is present in
7//! the environment, such as [`ExecutionStages`] or [`HashingStages`].
8//!
9//!
10//! # Examples
11//!
12//! ```no_run
13//! # use reth_stages::Pipeline;
14//! # use reth_stages::sets::{OfflineStages};
15//! # use reth_chainspec::MAINNET;
16//! # use reth_prune_types::PruneModes;
17//! # use reth_evm_ethereum::EthEvmConfig;
18//! # use reth_evm::ConfigureEvm;
19//! # use reth_provider::StaticFileProviderFactory;
20//! # use reth_provider::test_utils::{create_test_provider_factory, MockNodeTypesWithDB};
21//! # use reth_static_file::StaticFileProducer;
22//! # use reth_config::config::StageConfig;
23//! # use reth_ethereum_primitives::EthPrimitives;
24//! # use std::sync::Arc;
25//! # use reth_consensus::{FullConsensus, ConsensusError};
26//!
27//! # fn create(exec: impl ConfigureEvm<Primitives = EthPrimitives> + 'static, consensus: impl FullConsensus<EthPrimitives, Error = ConsensusError> + 'static) {
28//!
29//! let provider_factory = create_test_provider_factory();
30//! let static_file_producer =
31//!     StaticFileProducer::new(provider_factory.clone(), PruneModes::default());
32//! // Build a pipeline with all offline stages.
33//! let pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
34//!     .add_stages(OfflineStages::new(exec, Arc::new(consensus), StageConfig::default(), PruneModes::default()))
35//!     .build(provider_factory, static_file_producer);
36//!
37//! # }
38//! ```
39use crate::{
40    stages::{
41        AccountHashingStage, BodyStage, ExecutionStage, FinishStage, HeaderStage,
42        IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, PruneSenderRecoveryStage,
43        PruneStage, SenderRecoveryStage, StorageHashingStage, TransactionLookupStage,
44    },
45    StageSet, StageSetBuilder,
46};
47use alloy_primitives::B256;
48use reth_config::config::StageConfig;
49use reth_consensus::{ConsensusError, FullConsensus};
50use reth_evm::ConfigureEvm;
51use reth_network_p2p::{bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader};
52use reth_primitives_traits::{Block, NodePrimitives};
53use reth_provider::HeaderSyncGapProvider;
54use reth_prune_types::PruneModes;
55use reth_stages_api::Stage;
56use std::{ops::Not, sync::Arc};
57use tokio::sync::watch;
58
59/// A set containing all stages to run a fully syncing instance of reth.
60///
61/// A combination of (in order)
62///
63/// - [`OnlineStages`]
64/// - [`OfflineStages`]
65/// - [`FinishStage`]
66///
67/// This expands to the following series of stages:
68/// - [`HeaderStage`]
69/// - [`BodyStage`]
70/// - [`SenderRecoveryStage`]
71/// - [`ExecutionStage`]
72/// - [`PruneSenderRecoveryStage`] (execute)
73/// - [`MerkleStage`] (unwind)
74/// - [`AccountHashingStage`]
75/// - [`StorageHashingStage`]
76/// - [`MerkleStage`] (execute)
77/// - [`TransactionLookupStage`]
78/// - [`IndexStorageHistoryStage`]
79/// - [`IndexAccountHistoryStage`]
80/// - [`PruneStage`] (execute)
81/// - [`FinishStage`]
82#[derive(Debug)]
83pub struct DefaultStages<Provider, H, B, E>
84where
85    H: HeaderDownloader,
86    B: BodyDownloader,
87    E: ConfigureEvm,
88{
89    /// Configuration for the online stages
90    online: OnlineStages<Provider, H, B>,
91    /// Executor factory needs for execution stage
92    evm_config: E,
93    /// Consensus instance
94    consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
95    /// Configuration for each stage in the pipeline
96    stages_config: StageConfig,
97    /// Prune configuration for every segment that can be pruned
98    prune_modes: PruneModes,
99}
100
101impl<Provider, H, B, E> DefaultStages<Provider, H, B, E>
102where
103    H: HeaderDownloader,
104    B: BodyDownloader,
105    E: ConfigureEvm<Primitives: NodePrimitives<BlockHeader = H::Header, Block = B::Block>>,
106{
107    /// Create a new set of default stages with default values.
108    #[expect(clippy::too_many_arguments)]
109    pub fn new(
110        provider: Provider,
111        tip: watch::Receiver<B256>,
112        consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
113        header_downloader: H,
114        body_downloader: B,
115        evm_config: E,
116        stages_config: StageConfig,
117        prune_modes: PruneModes,
118    ) -> Self {
119        Self {
120            online: OnlineStages::new(
121                provider,
122                tip,
123                header_downloader,
124                body_downloader,
125                stages_config.clone(),
126            ),
127            evm_config,
128            consensus,
129            stages_config,
130            prune_modes,
131        }
132    }
133}
134
135impl<P, H, B, E> DefaultStages<P, H, B, E>
136where
137    E: ConfigureEvm,
138    H: HeaderDownloader,
139    B: BodyDownloader,
140{
141    /// Appends the default offline stages and default finish stage to the given builder.
142    pub fn add_offline_stages<Provider>(
143        default_offline: StageSetBuilder<Provider>,
144        evm_config: E,
145        consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
146        stages_config: StageConfig,
147        prune_modes: PruneModes,
148    ) -> StageSetBuilder<Provider>
149    where
150        OfflineStages<E>: StageSet<Provider>,
151    {
152        StageSetBuilder::default()
153            .add_set(default_offline)
154            .add_set(OfflineStages::new(evm_config, consensus, stages_config, prune_modes))
155            .add_stage(FinishStage)
156    }
157}
158
159impl<P, H, B, E, Provider> StageSet<Provider> for DefaultStages<P, H, B, E>
160where
161    P: HeaderSyncGapProvider + 'static,
162    H: HeaderDownloader + 'static,
163    B: BodyDownloader + 'static,
164    E: ConfigureEvm,
165    OnlineStages<P, H, B>: StageSet<Provider>,
166    OfflineStages<E>: StageSet<Provider>,
167{
168    fn builder(self) -> StageSetBuilder<Provider> {
169        Self::add_offline_stages(
170            self.online.builder(),
171            self.evm_config,
172            self.consensus,
173            self.stages_config.clone(),
174            self.prune_modes,
175        )
176    }
177}
178
179/// A set containing all stages that require network access by default.
180///
181/// These stages *can* be run without network access if the specified downloaders are
182/// themselves offline.
183#[derive(Debug)]
184pub struct OnlineStages<Provider, H, B>
185where
186    H: HeaderDownloader,
187    B: BodyDownloader,
188{
189    /// Sync gap provider for the headers stage.
190    provider: Provider,
191    /// The tip for the headers stage.
192    tip: watch::Receiver<B256>,
193
194    /// The block header downloader
195    header_downloader: H,
196    /// The block body downloader
197    body_downloader: B,
198    /// Configuration for each stage in the pipeline
199    stages_config: StageConfig,
200}
201
202impl<Provider, H, B> OnlineStages<Provider, H, B>
203where
204    H: HeaderDownloader,
205    B: BodyDownloader,
206{
207    /// Create a new set of online stages with default values.
208    pub const fn new(
209        provider: Provider,
210        tip: watch::Receiver<B256>,
211        header_downloader: H,
212        body_downloader: B,
213        stages_config: StageConfig,
214    ) -> Self {
215        Self { provider, tip, header_downloader, body_downloader, stages_config }
216    }
217}
218
219impl<P, H, B> OnlineStages<P, H, B>
220where
221    P: HeaderSyncGapProvider + 'static,
222    H: HeaderDownloader<Header = <B::Block as Block>::Header> + 'static,
223    B: BodyDownloader + 'static,
224{
225    /// Create a new builder using the given headers stage.
226    pub fn builder_with_headers<Provider>(
227        headers: HeaderStage<P, H>,
228        body_downloader: B,
229    ) -> StageSetBuilder<Provider>
230    where
231        HeaderStage<P, H>: Stage<Provider>,
232        BodyStage<B>: Stage<Provider>,
233    {
234        StageSetBuilder::default().add_stage(headers).add_stage(BodyStage::new(body_downloader))
235    }
236
237    /// Create a new builder using the given bodies stage.
238    pub fn builder_with_bodies<Provider>(
239        bodies: BodyStage<B>,
240        provider: P,
241        tip: watch::Receiver<B256>,
242        header_downloader: H,
243        stages_config: StageConfig,
244    ) -> StageSetBuilder<Provider>
245    where
246        BodyStage<B>: Stage<Provider>,
247        HeaderStage<P, H>: Stage<Provider>,
248    {
249        StageSetBuilder::default()
250            .add_stage(HeaderStage::new(provider, header_downloader, tip, stages_config.etl))
251            .add_stage(bodies)
252    }
253}
254
255impl<Provider, P, H, B> StageSet<Provider> for OnlineStages<P, H, B>
256where
257    P: HeaderSyncGapProvider + 'static,
258    H: HeaderDownloader<Header = <B::Block as Block>::Header> + 'static,
259    B: BodyDownloader + 'static,
260    HeaderStage<P, H>: Stage<Provider>,
261    BodyStage<B>: Stage<Provider>,
262{
263    fn builder(self) -> StageSetBuilder<Provider> {
264        StageSetBuilder::default()
265            .add_stage(HeaderStage::new(
266                self.provider,
267                self.header_downloader,
268                self.tip,
269                self.stages_config.etl.clone(),
270            ))
271            .add_stage(BodyStage::new(self.body_downloader))
272    }
273}
274
275/// A set containing all stages that do not require network access.
276///
277/// A combination of (in order)
278///
279/// - [`ExecutionStages`]
280/// - [`PruneSenderRecoveryStage`]
281/// - [`HashingStages`]
282/// - [`HistoryIndexingStages`]
283/// - [`PruneStage`]
284#[derive(Debug)]
285#[non_exhaustive]
286pub struct OfflineStages<E: ConfigureEvm> {
287    /// Executor factory needs for execution stage
288    evm_config: E,
289    /// Consensus instance for validating blocks.
290    consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
291    /// Configuration for each stage in the pipeline
292    stages_config: StageConfig,
293    /// Prune configuration for every segment that can be pruned
294    prune_modes: PruneModes,
295}
296
297impl<E: ConfigureEvm> OfflineStages<E> {
298    /// Create a new set of offline stages with default values.
299    pub const fn new(
300        evm_config: E,
301        consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
302        stages_config: StageConfig,
303        prune_modes: PruneModes,
304    ) -> Self {
305        Self { evm_config, consensus, stages_config, prune_modes }
306    }
307}
308
309impl<E, Provider> StageSet<Provider> for OfflineStages<E>
310where
311    E: ConfigureEvm,
312    ExecutionStages<E>: StageSet<Provider>,
313    PruneSenderRecoveryStage: Stage<Provider>,
314    HashingStages: StageSet<Provider>,
315    HistoryIndexingStages: StageSet<Provider>,
316    PruneStage: Stage<Provider>,
317{
318    fn builder(self) -> StageSetBuilder<Provider> {
319        ExecutionStages::new(self.evm_config, self.consensus, self.stages_config.clone())
320            .builder()
321            // If sender recovery prune mode is set, add the prune sender recovery stage.
322            .add_stage_opt(self.prune_modes.sender_recovery.map(|prune_mode| {
323                PruneSenderRecoveryStage::new(prune_mode, self.stages_config.prune.commit_threshold)
324            }))
325            .add_set(HashingStages { stages_config: self.stages_config.clone() })
326            .add_set(HistoryIndexingStages {
327                stages_config: self.stages_config.clone(),
328                prune_modes: self.prune_modes.clone(),
329            })
330            // If any prune modes are set, add the prune stage.
331            .add_stage_opt(self.prune_modes.is_empty().not().then(|| {
332                // Prune stage should be added after all hashing stages, because otherwise it will
333                // delete
334                PruneStage::new(self.prune_modes.clone(), self.stages_config.prune.commit_threshold)
335            }))
336    }
337}
338
339/// A set containing all stages that are required to execute pre-existing block data.
340#[derive(Debug)]
341#[non_exhaustive]
342pub struct ExecutionStages<E: ConfigureEvm> {
343    /// Executor factory that will create executors.
344    evm_config: E,
345    /// Consensus instance for validating blocks.
346    consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
347    /// Configuration for each stage in the pipeline
348    stages_config: StageConfig,
349}
350
351impl<E: ConfigureEvm> ExecutionStages<E> {
352    /// Create a new set of execution stages with default values.
353    pub const fn new(
354        executor_provider: E,
355        consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
356        stages_config: StageConfig,
357    ) -> Self {
358        Self { evm_config: executor_provider, consensus, stages_config }
359    }
360}
361
362impl<E, Provider> StageSet<Provider> for ExecutionStages<E>
363where
364    E: ConfigureEvm + 'static,
365    SenderRecoveryStage: Stage<Provider>,
366    ExecutionStage<E>: Stage<Provider>,
367{
368    fn builder(self) -> StageSetBuilder<Provider> {
369        StageSetBuilder::default()
370            .add_stage(SenderRecoveryStage::new(self.stages_config.sender_recovery))
371            .add_stage(ExecutionStage::from_config(
372                self.evm_config,
373                self.consensus,
374                self.stages_config.execution,
375                self.stages_config.execution_external_clean_threshold(),
376            ))
377    }
378}
379
380/// A set containing all stages that hash account state.
381#[derive(Debug, Default)]
382#[non_exhaustive]
383pub struct HashingStages {
384    /// Configuration for each stage in the pipeline
385    stages_config: StageConfig,
386}
387
388impl<Provider> StageSet<Provider> for HashingStages
389where
390    MerkleStage: Stage<Provider>,
391    AccountHashingStage: Stage<Provider>,
392    StorageHashingStage: Stage<Provider>,
393{
394    fn builder(self) -> StageSetBuilder<Provider> {
395        StageSetBuilder::default()
396            .add_stage(MerkleStage::default_unwind())
397            .add_stage(AccountHashingStage::new(
398                self.stages_config.account_hashing,
399                self.stages_config.etl.clone(),
400            ))
401            .add_stage(StorageHashingStage::new(
402                self.stages_config.storage_hashing,
403                self.stages_config.etl.clone(),
404            ))
405            .add_stage(MerkleStage::new_execution(self.stages_config.merkle.clean_threshold))
406    }
407}
408
409/// A set containing all stages that do additional indexing for historical state.
410#[derive(Debug, Default)]
411#[non_exhaustive]
412pub struct HistoryIndexingStages {
413    /// Configuration for each stage in the pipeline
414    stages_config: StageConfig,
415    /// Prune configuration for every segment that can be pruned
416    prune_modes: PruneModes,
417}
418
419impl<Provider> StageSet<Provider> for HistoryIndexingStages
420where
421    TransactionLookupStage: Stage<Provider>,
422    IndexStorageHistoryStage: Stage<Provider>,
423    IndexAccountHistoryStage: Stage<Provider>,
424{
425    fn builder(self) -> StageSetBuilder<Provider> {
426        StageSetBuilder::default()
427            .add_stage(TransactionLookupStage::new(
428                self.stages_config.transaction_lookup,
429                self.stages_config.etl.clone(),
430                self.prune_modes.transaction_lookup,
431            ))
432            .add_stage(IndexStorageHistoryStage::new(
433                self.stages_config.index_storage_history,
434                self.stages_config.etl.clone(),
435                self.prune_modes.account_history,
436            ))
437            .add_stage(IndexAccountHistoryStage::new(
438                self.stages_config.index_account_history,
439                self.stages_config.etl.clone(),
440                self.prune_modes.storage_history,
441            ))
442    }
443}