1use crate::{
40 stages::{
41 AccountHashingStage, BodyStage, ExecutionStage, FinishStage, HeaderStage,
42 IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, PruneSenderRecoveryStage,
43 PruneStage, SenderRecoveryStage, StorageHashingStage, TransactionLookupStage,
44 },
45 StageSet, StageSetBuilder,
46};
47use alloy_primitives::B256;
48use reth_config::config::StageConfig;
49use reth_consensus::{ConsensusError, FullConsensus};
50use reth_evm::ConfigureEvm;
51use reth_network_p2p::{bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader};
52use reth_primitives_traits::{Block, NodePrimitives};
53use reth_provider::HeaderSyncGapProvider;
54use reth_prune_types::PruneModes;
55use reth_stages_api::Stage;
56use std::{ops::Not, sync::Arc};
57use tokio::sync::watch;
58
59#[derive(Debug)]
83pub struct DefaultStages<Provider, H, B, E>
84where
85 H: HeaderDownloader,
86 B: BodyDownloader,
87 E: ConfigureEvm,
88{
89 online: OnlineStages<Provider, H, B>,
91 evm_config: E,
93 consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
95 stages_config: StageConfig,
97 prune_modes: PruneModes,
99}
100
101impl<Provider, H, B, E> DefaultStages<Provider, H, B, E>
102where
103 H: HeaderDownloader,
104 B: BodyDownloader,
105 E: ConfigureEvm<Primitives: NodePrimitives<BlockHeader = H::Header, Block = B::Block>>,
106{
107 #[expect(clippy::too_many_arguments)]
109 pub fn new(
110 provider: Provider,
111 tip: watch::Receiver<B256>,
112 consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
113 header_downloader: H,
114 body_downloader: B,
115 evm_config: E,
116 stages_config: StageConfig,
117 prune_modes: PruneModes,
118 ) -> Self {
119 Self {
120 online: OnlineStages::new(
121 provider,
122 tip,
123 header_downloader,
124 body_downloader,
125 stages_config.clone(),
126 ),
127 evm_config,
128 consensus,
129 stages_config,
130 prune_modes,
131 }
132 }
133}
134
135impl<P, H, B, E> DefaultStages<P, H, B, E>
136where
137 E: ConfigureEvm,
138 H: HeaderDownloader,
139 B: BodyDownloader,
140{
141 pub fn add_offline_stages<Provider>(
143 default_offline: StageSetBuilder<Provider>,
144 evm_config: E,
145 consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
146 stages_config: StageConfig,
147 prune_modes: PruneModes,
148 ) -> StageSetBuilder<Provider>
149 where
150 OfflineStages<E>: StageSet<Provider>,
151 {
152 StageSetBuilder::default()
153 .add_set(default_offline)
154 .add_set(OfflineStages::new(evm_config, consensus, stages_config, prune_modes))
155 .add_stage(FinishStage)
156 }
157}
158
159impl<P, H, B, E, Provider> StageSet<Provider> for DefaultStages<P, H, B, E>
160where
161 P: HeaderSyncGapProvider + 'static,
162 H: HeaderDownloader + 'static,
163 B: BodyDownloader + 'static,
164 E: ConfigureEvm,
165 OnlineStages<P, H, B>: StageSet<Provider>,
166 OfflineStages<E>: StageSet<Provider>,
167{
168 fn builder(self) -> StageSetBuilder<Provider> {
169 Self::add_offline_stages(
170 self.online.builder(),
171 self.evm_config,
172 self.consensus,
173 self.stages_config.clone(),
174 self.prune_modes,
175 )
176 }
177}
178
179#[derive(Debug)]
184pub struct OnlineStages<Provider, H, B>
185where
186 H: HeaderDownloader,
187 B: BodyDownloader,
188{
189 provider: Provider,
191 tip: watch::Receiver<B256>,
193
194 header_downloader: H,
196 body_downloader: B,
198 stages_config: StageConfig,
200}
201
202impl<Provider, H, B> OnlineStages<Provider, H, B>
203where
204 H: HeaderDownloader,
205 B: BodyDownloader,
206{
207 pub const fn new(
209 provider: Provider,
210 tip: watch::Receiver<B256>,
211 header_downloader: H,
212 body_downloader: B,
213 stages_config: StageConfig,
214 ) -> Self {
215 Self { provider, tip, header_downloader, body_downloader, stages_config }
216 }
217}
218
219impl<P, H, B> OnlineStages<P, H, B>
220where
221 P: HeaderSyncGapProvider + 'static,
222 H: HeaderDownloader<Header = <B::Block as Block>::Header> + 'static,
223 B: BodyDownloader + 'static,
224{
225 pub fn builder_with_headers<Provider>(
227 headers: HeaderStage<P, H>,
228 body_downloader: B,
229 ) -> StageSetBuilder<Provider>
230 where
231 HeaderStage<P, H>: Stage<Provider>,
232 BodyStage<B>: Stage<Provider>,
233 {
234 StageSetBuilder::default().add_stage(headers).add_stage(BodyStage::new(body_downloader))
235 }
236
237 pub fn builder_with_bodies<Provider>(
239 bodies: BodyStage<B>,
240 provider: P,
241 tip: watch::Receiver<B256>,
242 header_downloader: H,
243 stages_config: StageConfig,
244 ) -> StageSetBuilder<Provider>
245 where
246 BodyStage<B>: Stage<Provider>,
247 HeaderStage<P, H>: Stage<Provider>,
248 {
249 StageSetBuilder::default()
250 .add_stage(HeaderStage::new(provider, header_downloader, tip, stages_config.etl))
251 .add_stage(bodies)
252 }
253}
254
255impl<Provider, P, H, B> StageSet<Provider> for OnlineStages<P, H, B>
256where
257 P: HeaderSyncGapProvider + 'static,
258 H: HeaderDownloader<Header = <B::Block as Block>::Header> + 'static,
259 B: BodyDownloader + 'static,
260 HeaderStage<P, H>: Stage<Provider>,
261 BodyStage<B>: Stage<Provider>,
262{
263 fn builder(self) -> StageSetBuilder<Provider> {
264 StageSetBuilder::default()
265 .add_stage(HeaderStage::new(
266 self.provider,
267 self.header_downloader,
268 self.tip,
269 self.stages_config.etl.clone(),
270 ))
271 .add_stage(BodyStage::new(self.body_downloader))
272 }
273}
274
275#[derive(Debug)]
285#[non_exhaustive]
286pub struct OfflineStages<E: ConfigureEvm> {
287 evm_config: E,
289 consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
291 stages_config: StageConfig,
293 prune_modes: PruneModes,
295}
296
297impl<E: ConfigureEvm> OfflineStages<E> {
298 pub const fn new(
300 evm_config: E,
301 consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
302 stages_config: StageConfig,
303 prune_modes: PruneModes,
304 ) -> Self {
305 Self { evm_config, consensus, stages_config, prune_modes }
306 }
307}
308
309impl<E, Provider> StageSet<Provider> for OfflineStages<E>
310where
311 E: ConfigureEvm,
312 ExecutionStages<E>: StageSet<Provider>,
313 PruneSenderRecoveryStage: Stage<Provider>,
314 HashingStages: StageSet<Provider>,
315 HistoryIndexingStages: StageSet<Provider>,
316 PruneStage: Stage<Provider>,
317{
318 fn builder(self) -> StageSetBuilder<Provider> {
319 ExecutionStages::new(self.evm_config, self.consensus, self.stages_config.clone())
320 .builder()
321 .add_stage_opt(self.prune_modes.sender_recovery.map(|prune_mode| {
323 PruneSenderRecoveryStage::new(prune_mode, self.stages_config.prune.commit_threshold)
324 }))
325 .add_set(HashingStages { stages_config: self.stages_config.clone() })
326 .add_set(HistoryIndexingStages {
327 stages_config: self.stages_config.clone(),
328 prune_modes: self.prune_modes.clone(),
329 })
330 .add_stage_opt(self.prune_modes.is_empty().not().then(|| {
332 PruneStage::new(self.prune_modes.clone(), self.stages_config.prune.commit_threshold)
335 }))
336 }
337}
338
339#[derive(Debug)]
341#[non_exhaustive]
342pub struct ExecutionStages<E: ConfigureEvm> {
343 evm_config: E,
345 consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
347 stages_config: StageConfig,
349}
350
351impl<E: ConfigureEvm> ExecutionStages<E> {
352 pub const fn new(
354 executor_provider: E,
355 consensus: Arc<dyn FullConsensus<E::Primitives, Error = ConsensusError>>,
356 stages_config: StageConfig,
357 ) -> Self {
358 Self { evm_config: executor_provider, consensus, stages_config }
359 }
360}
361
362impl<E, Provider> StageSet<Provider> for ExecutionStages<E>
363where
364 E: ConfigureEvm + 'static,
365 SenderRecoveryStage: Stage<Provider>,
366 ExecutionStage<E>: Stage<Provider>,
367{
368 fn builder(self) -> StageSetBuilder<Provider> {
369 StageSetBuilder::default()
370 .add_stage(SenderRecoveryStage::new(self.stages_config.sender_recovery))
371 .add_stage(ExecutionStage::from_config(
372 self.evm_config,
373 self.consensus,
374 self.stages_config.execution,
375 self.stages_config.execution_external_clean_threshold(),
376 ))
377 }
378}
379
380#[derive(Debug, Default)]
382#[non_exhaustive]
383pub struct HashingStages {
384 stages_config: StageConfig,
386}
387
388impl<Provider> StageSet<Provider> for HashingStages
389where
390 MerkleStage: Stage<Provider>,
391 AccountHashingStage: Stage<Provider>,
392 StorageHashingStage: Stage<Provider>,
393{
394 fn builder(self) -> StageSetBuilder<Provider> {
395 StageSetBuilder::default()
396 .add_stage(MerkleStage::default_unwind())
397 .add_stage(AccountHashingStage::new(
398 self.stages_config.account_hashing,
399 self.stages_config.etl.clone(),
400 ))
401 .add_stage(StorageHashingStage::new(
402 self.stages_config.storage_hashing,
403 self.stages_config.etl.clone(),
404 ))
405 .add_stage(MerkleStage::new_execution(self.stages_config.merkle.clean_threshold))
406 }
407}
408
409#[derive(Debug, Default)]
411#[non_exhaustive]
412pub struct HistoryIndexingStages {
413 stages_config: StageConfig,
415 prune_modes: PruneModes,
417}
418
419impl<Provider> StageSet<Provider> for HistoryIndexingStages
420where
421 TransactionLookupStage: Stage<Provider>,
422 IndexStorageHistoryStage: Stage<Provider>,
423 IndexAccountHistoryStage: Stage<Provider>,
424{
425 fn builder(self) -> StageSetBuilder<Provider> {
426 StageSetBuilder::default()
427 .add_stage(TransactionLookupStage::new(
428 self.stages_config.transaction_lookup,
429 self.stages_config.etl.clone(),
430 self.prune_modes.transaction_lookup,
431 ))
432 .add_stage(IndexStorageHistoryStage::new(
433 self.stages_config.index_storage_history,
434 self.stages_config.etl.clone(),
435 self.prune_modes.account_history,
436 ))
437 .add_stage(IndexAccountHistoryStage::new(
438 self.stages_config.index_account_history,
439 self.stages_config.etl.clone(),
440 self.prune_modes.storage_history,
441 ))
442 }
443}