1use crate::{
38 stages::{
39 AccountHashingStage, BodyStage, ExecutionStage, FinishStage, HeaderStage,
40 IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, PruneSenderRecoveryStage,
41 PruneStage, SenderRecoveryStage, StorageHashingStage, TransactionLookupStage,
42 },
43 StageSet, StageSetBuilder,
44};
45use alloy_primitives::B256;
46use reth_config::config::StageConfig;
47use reth_consensus::Consensus;
48use reth_evm::execute::BlockExecutorProvider;
49use reth_network_p2p::{bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader};
50use reth_provider::HeaderSyncGapProvider;
51use reth_prune_types::PruneModes;
52use reth_stages_api::Stage;
53use std::{ops::Not, sync::Arc};
54use tokio::sync::watch;
55
56#[derive(Debug)]
80pub struct DefaultStages<Provider, H, B, EF>
81where
82 H: HeaderDownloader,
83 B: BodyDownloader,
84{
85 online: OnlineStages<Provider, H, B>,
87 executor_factory: EF,
89 stages_config: StageConfig,
91 prune_modes: PruneModes,
93}
94
95impl<Provider, H, B, E> DefaultStages<Provider, H, B, E>
96where
97 H: HeaderDownloader,
98 B: BodyDownloader,
99{
100 #[allow(clippy::too_many_arguments)]
102 pub fn new(
103 provider: Provider,
104 tip: watch::Receiver<B256>,
105 consensus: Arc<dyn Consensus<H::Header, B::Body>>,
106 header_downloader: H,
107 body_downloader: B,
108 executor_factory: E,
109 stages_config: StageConfig,
110 prune_modes: PruneModes,
111 ) -> Self
112 where
113 E: BlockExecutorProvider,
114 {
115 Self {
116 online: OnlineStages::new(
117 provider,
118 tip,
119 consensus,
120 header_downloader,
121 body_downloader,
122 stages_config.clone(),
123 ),
124 executor_factory,
125 stages_config,
126 prune_modes,
127 }
128 }
129}
130
131impl<P, H, B, E> DefaultStages<P, H, B, E>
132where
133 E: BlockExecutorProvider,
134 H: HeaderDownloader,
135 B: BodyDownloader,
136{
137 pub fn add_offline_stages<Provider>(
139 default_offline: StageSetBuilder<Provider>,
140 executor_factory: E,
141 stages_config: StageConfig,
142 prune_modes: PruneModes,
143 ) -> StageSetBuilder<Provider>
144 where
145 OfflineStages<E>: StageSet<Provider>,
146 {
147 StageSetBuilder::default()
148 .add_set(default_offline)
149 .add_set(OfflineStages::new(executor_factory, stages_config, prune_modes))
150 .add_stage(FinishStage)
151 }
152}
153
154impl<P, H, B, E, Provider> StageSet<Provider> for DefaultStages<P, H, B, E>
155where
156 P: HeaderSyncGapProvider + 'static,
157 H: HeaderDownloader + 'static,
158 B: BodyDownloader + 'static,
159 E: BlockExecutorProvider,
160 OnlineStages<P, H, B>: StageSet<Provider>,
161 OfflineStages<E>: StageSet<Provider>,
162{
163 fn builder(self) -> StageSetBuilder<Provider> {
164 Self::add_offline_stages(
165 self.online.builder(),
166 self.executor_factory,
167 self.stages_config.clone(),
168 self.prune_modes,
169 )
170 }
171}
172
173#[derive(Debug)]
178pub struct OnlineStages<Provider, H, B>
179where
180 H: HeaderDownloader,
181 B: BodyDownloader,
182{
183 provider: Provider,
185 tip: watch::Receiver<B256>,
187 consensus: Arc<dyn Consensus<H::Header, B::Body>>,
189 header_downloader: H,
191 body_downloader: B,
193 stages_config: StageConfig,
195}
196
197impl<Provider, H, B> OnlineStages<Provider, H, B>
198where
199 H: HeaderDownloader,
200 B: BodyDownloader,
201{
202 pub fn new(
204 provider: Provider,
205 tip: watch::Receiver<B256>,
206 consensus: Arc<dyn Consensus<H::Header, B::Body>>,
207 header_downloader: H,
208 body_downloader: B,
209 stages_config: StageConfig,
210 ) -> Self {
211 Self { provider, tip, consensus, header_downloader, body_downloader, stages_config }
212 }
213}
214
215impl<P, H, B> OnlineStages<P, H, B>
216where
217 P: HeaderSyncGapProvider + 'static,
218 H: HeaderDownloader + 'static,
219 B: BodyDownloader + 'static,
220{
221 pub fn builder_with_headers<Provider>(
223 headers: HeaderStage<P, H>,
224 body_downloader: B,
225 ) -> StageSetBuilder<Provider>
226 where
227 HeaderStage<P, H>: Stage<Provider>,
228 BodyStage<B>: Stage<Provider>,
229 {
230 StageSetBuilder::default().add_stage(headers).add_stage(BodyStage::new(body_downloader))
231 }
232
233 pub fn builder_with_bodies<Provider>(
235 bodies: BodyStage<B>,
236 provider: P,
237 tip: watch::Receiver<B256>,
238 header_downloader: H,
239 consensus: Arc<dyn Consensus<H::Header, B::Body>>,
240 stages_config: StageConfig,
241 ) -> StageSetBuilder<Provider>
242 where
243 BodyStage<B>: Stage<Provider>,
244 HeaderStage<P, H>: Stage<Provider>,
245 {
246 StageSetBuilder::default()
247 .add_stage(HeaderStage::new(
248 provider,
249 header_downloader,
250 tip,
251 consensus.clone().as_header_validator(),
252 stages_config.etl,
253 ))
254 .add_stage(bodies)
255 }
256}
257
258impl<Provider, P, H, B> StageSet<Provider> for OnlineStages<P, H, B>
259where
260 P: HeaderSyncGapProvider + 'static,
261 H: HeaderDownloader + 'static,
262 B: BodyDownloader + 'static,
263 HeaderStage<P, H>: Stage<Provider>,
264 BodyStage<B>: Stage<Provider>,
265{
266 fn builder(self) -> StageSetBuilder<Provider> {
267 StageSetBuilder::default()
268 .add_stage(HeaderStage::new(
269 self.provider,
270 self.header_downloader,
271 self.tip,
272 self.consensus.clone().as_header_validator(),
273 self.stages_config.etl.clone(),
274 ))
275 .add_stage(BodyStage::new(self.body_downloader))
276 }
277}
278
279#[derive(Debug, Default)]
289#[non_exhaustive]
290pub struct OfflineStages<EF> {
291 executor_factory: EF,
293 stages_config: StageConfig,
295 prune_modes: PruneModes,
297}
298
299impl<EF> OfflineStages<EF> {
300 pub const fn new(
302 executor_factory: EF,
303 stages_config: StageConfig,
304 prune_modes: PruneModes,
305 ) -> Self {
306 Self { executor_factory, stages_config, prune_modes }
307 }
308}
309
310impl<E, Provider> StageSet<Provider> for OfflineStages<E>
311where
312 E: BlockExecutorProvider,
313 ExecutionStages<E>: StageSet<Provider>,
314 PruneSenderRecoveryStage: Stage<Provider>,
315 HashingStages: StageSet<Provider>,
316 HistoryIndexingStages: StageSet<Provider>,
317 PruneStage: Stage<Provider>,
318{
319 fn builder(self) -> StageSetBuilder<Provider> {
320 ExecutionStages::new(
321 self.executor_factory,
322 self.stages_config.clone(),
323 self.prune_modes.clone(),
324 )
325 .builder()
326 .add_stage_opt(self.prune_modes.sender_recovery.map(|prune_mode| {
328 PruneSenderRecoveryStage::new(prune_mode, self.stages_config.prune.commit_threshold)
329 }))
330 .add_set(HashingStages { stages_config: self.stages_config.clone() })
331 .add_set(HistoryIndexingStages {
332 stages_config: self.stages_config.clone(),
333 prune_modes: self.prune_modes.clone(),
334 })
335 .add_stage_opt(self.prune_modes.is_empty().not().then(|| {
337 PruneStage::new(self.prune_modes.clone(), self.stages_config.prune.commit_threshold)
340 }))
341 }
342}
343
344#[derive(Debug)]
346#[non_exhaustive]
347pub struct ExecutionStages<E> {
348 executor_factory: E,
350 stages_config: StageConfig,
352 prune_modes: PruneModes,
354}
355
356impl<E> ExecutionStages<E> {
357 pub const fn new(
359 executor_factory: E,
360 stages_config: StageConfig,
361 prune_modes: PruneModes,
362 ) -> Self {
363 Self { executor_factory, stages_config, prune_modes }
364 }
365}
366
367impl<E, Provider> StageSet<Provider> for ExecutionStages<E>
368where
369 E: BlockExecutorProvider,
370 SenderRecoveryStage: Stage<Provider>,
371 ExecutionStage<E>: Stage<Provider>,
372{
373 fn builder(self) -> StageSetBuilder<Provider> {
374 StageSetBuilder::default()
375 .add_stage(SenderRecoveryStage::new(self.stages_config.sender_recovery))
376 .add_stage(ExecutionStage::from_config(
377 self.executor_factory,
378 self.stages_config.execution,
379 self.stages_config.execution_external_clean_threshold(),
380 self.prune_modes,
381 ))
382 }
383}
384
385#[derive(Debug, Default)]
387#[non_exhaustive]
388pub struct HashingStages {
389 stages_config: StageConfig,
391}
392
393impl<Provider> StageSet<Provider> for HashingStages
394where
395 MerkleStage: Stage<Provider>,
396 AccountHashingStage: Stage<Provider>,
397 StorageHashingStage: Stage<Provider>,
398{
399 fn builder(self) -> StageSetBuilder<Provider> {
400 StageSetBuilder::default()
401 .add_stage(MerkleStage::default_unwind())
402 .add_stage(AccountHashingStage::new(
403 self.stages_config.account_hashing,
404 self.stages_config.etl.clone(),
405 ))
406 .add_stage(StorageHashingStage::new(
407 self.stages_config.storage_hashing,
408 self.stages_config.etl.clone(),
409 ))
410 .add_stage(MerkleStage::new_execution(self.stages_config.merkle.clean_threshold))
411 }
412}
413
414#[derive(Debug, Default)]
416#[non_exhaustive]
417pub struct HistoryIndexingStages {
418 stages_config: StageConfig,
420 prune_modes: PruneModes,
422}
423
424impl<Provider> StageSet<Provider> for HistoryIndexingStages
425where
426 TransactionLookupStage: Stage<Provider>,
427 IndexStorageHistoryStage: Stage<Provider>,
428 IndexAccountHistoryStage: Stage<Provider>,
429{
430 fn builder(self) -> StageSetBuilder<Provider> {
431 StageSetBuilder::default()
432 .add_stage(TransactionLookupStage::new(
433 self.stages_config.transaction_lookup,
434 self.stages_config.etl.clone(),
435 self.prune_modes.transaction_lookup,
436 ))
437 .add_stage(IndexStorageHistoryStage::new(
438 self.stages_config.index_storage_history,
439 self.stages_config.etl.clone(),
440 self.prune_modes.account_history,
441 ))
442 .add_stage(IndexAccountHistoryStage::new(
443 self.stages_config.index_account_history,
444 self.stages_config.etl.clone(),
445 self.prune_modes.storage_history,
446 ))
447 }
448}