reth_engine_service/
service.rs

1use futures::{Stream, StreamExt};
2use pin_project::pin_project;
3use reth_chainspec::EthChainSpec;
4use reth_consensus::{ConsensusError, FullConsensus};
5use reth_engine_primitives::{BeaconConsensusEngineEvent, BeaconEngineMessage, EngineValidator};
6use reth_engine_tree::{
7    backfill::PipelineSync,
8    backup::BackupHandle,
9    download::BasicBlockDownloader,
10    engine::{EngineApiKind, EngineApiRequest, EngineApiRequestHandler, EngineHandler},
11    persistence::PersistenceHandle,
12    tree::{EngineApiTreeHandler, InvalidBlockHook, TreeConfig},
13};
14pub use reth_engine_tree::{
15    chain::{ChainEvent, ChainOrchestrator},
16    engine::EngineApiEvent,
17};
18use reth_ethereum_primitives::EthPrimitives;
19use reth_evm::ConfigureEvm;
20use reth_network_p2p::BlockClient;
21use reth_node_types::{BlockTy, NodeTypes};
22use reth_payload_builder::PayloadBuilderHandle;
23use reth_provider::{
24    providers::{BlockchainProvider, ProviderNodeTypes},
25    ProviderFactory,
26};
27use reth_prune::PrunerWithFactory;
28use reth_stages_api::{MetricEventsSender, Pipeline};
29use reth_tasks::TaskSpawner;
30use std::{
31    pin::Pin,
32    sync::Arc,
33    task::{Context, Poll},
34};
35
36// seismic imports that upstream doesn't use
37use reth_node_core::dirs::{ChainPath, DataDirPath};
38
39/// Alias for consensus engine stream.
40pub type EngineMessageStream<T> = Pin<Box<dyn Stream<Item = BeaconEngineMessage<T>> + Send + Sync>>;
41
42/// Alias for chain orchestrator.
43type EngineServiceType<N, Client> = ChainOrchestrator<
44    EngineHandler<
45        EngineApiRequestHandler<
46            EngineApiRequest<<N as NodeTypes>::Payload, <N as NodeTypes>::Primitives>,
47            <N as NodeTypes>::Primitives,
48        >,
49        EngineMessageStream<<N as NodeTypes>::Payload>,
50        BasicBlockDownloader<Client, BlockTy<N>>,
51    >,
52    PipelineSync<N>,
53>;
54
55/// The type that drives the chain forward and communicates progress.
56#[pin_project]
57#[expect(missing_debug_implementations)]
58// TODO(mattsse): remove hidde once fixed : <https://github.com/rust-lang/rust/issues/135363>
59//  otherwise rustdoc fails to resolve the alias
60#[doc(hidden)]
61pub struct EngineService<N, Client>
62where
63    N: ProviderNodeTypes,
64    Client: BlockClient<Block = BlockTy<N>> + 'static,
65{
66    orchestrator: EngineServiceType<N, Client>,
67}
68
69impl<N, Client> EngineService<N, Client>
70where
71    N: ProviderNodeTypes,
72    Client: BlockClient<Block = BlockTy<N>> + 'static,
73{
74    /// Constructor for `EngineService`.
75    #[expect(clippy::too_many_arguments)]
76    pub fn new<V, C>(
77        consensus: Arc<dyn FullConsensus<N::Primitives, Error = ConsensusError>>,
78        chain_spec: Arc<N::ChainSpec>,
79        client: Client,
80        incoming_requests: EngineMessageStream<N::Payload>,
81        pipeline: Pipeline<N>,
82        pipeline_task_spawner: Box<dyn TaskSpawner>,
83        provider: ProviderFactory<N>,
84        blockchain_db: BlockchainProvider<N>,
85        pruner: PrunerWithFactory<ProviderFactory<N>>,
86        payload_builder: PayloadBuilderHandle<N::Payload>,
87        payload_validator: V,
88        tree_config: TreeConfig,
89        invalid_block_hook: Box<dyn InvalidBlockHook<N::Primitives>>,
90        sync_metrics_tx: MetricEventsSender,
91        evm_config: C,
92        data_dir: ChainPath<DataDirPath>,
93    ) -> Self
94    where
95        V: EngineValidator<N::Payload, Block = BlockTy<N>>,
96        C: ConfigureEvm<Primitives = N::Primitives> + 'static,
97    {
98        let engine_kind =
99            if chain_spec.is_optimism() { EngineApiKind::OpStack } else { EngineApiKind::Ethereum };
100
101        let downloader = BasicBlockDownloader::new(client, consensus.clone());
102
103        let persistence_handle =
104            PersistenceHandle::<EthPrimitives>::spawn_service(provider, pruner, sync_metrics_tx);
105
106        let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
107
108        let backup_handle = BackupHandle::spawn_service(data_dir);
109
110        let (to_tree_tx, from_tree) = EngineApiTreeHandler::<N::Primitives, _, _, _, _>::spawn_new(
111            blockchain_db,
112            consensus,
113            payload_validator,
114            persistence_handle,
115            payload_builder,
116            canonical_in_memory_state,
117            tree_config,
118            invalid_block_hook,
119            engine_kind,
120            evm_config,
121            backup_handle,
122        );
123
124        let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
125        let handler = EngineHandler::new(engine_handler, downloader, incoming_requests);
126
127        let backfill_sync = PipelineSync::new(pipeline, pipeline_task_spawner);
128
129        Self { orchestrator: ChainOrchestrator::new(handler, backfill_sync) }
130    }
131
132    /// Returns a mutable reference to the orchestrator.
133    pub fn orchestrator_mut(&mut self) -> &mut EngineServiceType<N, Client> {
134        &mut self.orchestrator
135    }
136}
137
138impl<N, Client> Stream for EngineService<N, Client>
139where
140    N: ProviderNodeTypes,
141    Client: BlockClient<Block = BlockTy<N>> + 'static,
142{
143    type Item = ChainEvent<BeaconConsensusEngineEvent<N::Primitives>>;
144
145    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
146        let mut orchestrator = self.project().orchestrator;
147        StreamExt::poll_next_unpin(&mut orchestrator, cx)
148    }
149}
150
151/// Potential error returned by `EngineService`.
152#[derive(Debug, thiserror::Error)]
153#[error("Engine service error.")]
154pub struct EngineServiceError {}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159    use reth_chainspec::{ChainSpecBuilder, MAINNET};
160    use reth_engine_primitives::BeaconEngineMessage;
161    use reth_engine_tree::{test_utils::TestPipelineBuilder, tree::NoopInvalidBlockHook};
162    use reth_ethereum_consensus::EthBeaconConsensus;
163    use reth_ethereum_engine_primitives::EthEngineTypes;
164    use reth_evm_ethereum::EthEvmConfig;
165    use reth_exex_types::FinishedExExHeight;
166    use reth_network_p2p::test_utils::TestFullBlockClient;
167    use reth_node_core::dirs::MaybePlatformPath;
168    use reth_node_ethereum::EthereumEngineValidator;
169    use reth_primitives_traits::SealedHeader;
170    use reth_provider::{
171        providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
172    };
173    use reth_prune::Pruner;
174    use reth_tasks::TokioTaskExecutor;
175    use std::sync::Arc;
176    use tokio::sync::{mpsc::unbounded_channel, watch};
177    use tokio_stream::wrappers::UnboundedReceiverStream;
178
179    #[test]
180    fn eth_chain_orchestrator_build() {
181        let chain_spec = Arc::new(
182            ChainSpecBuilder::default()
183                .chain(MAINNET.chain)
184                .genesis(MAINNET.genesis.clone())
185                .paris_activated()
186                .build(),
187        );
188        let consensus = Arc::new(EthBeaconConsensus::new(chain_spec.clone()));
189
190        let client = TestFullBlockClient::default();
191
192        let (_tx, rx) = unbounded_channel::<BeaconEngineMessage<EthEngineTypes>>();
193        let incoming_requests = UnboundedReceiverStream::new(rx);
194
195        let pipeline = TestPipelineBuilder::new().build(chain_spec.clone());
196        let pipeline_task_spawner = Box::<TokioTaskExecutor>::default();
197        let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
198
199        let blockchain_db =
200            BlockchainProvider::with_latest(provider_factory.clone(), SealedHeader::default())
201                .unwrap();
202        let engine_payload_validator = EthereumEngineValidator::new(chain_spec.clone());
203        let (_tx, rx) = watch::channel(FinishedExExHeight::NoExExs);
204        let pruner = Pruner::new_with_factory(provider_factory.clone(), vec![], 0, 0, None, rx);
205        let evm_config = EthEvmConfig::new(chain_spec.clone());
206
207        let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
208        let (tx, _rx) = unbounded_channel();
209        let _eth_service = EngineService::new(
210            consensus,
211            chain_spec.clone(),
212            client,
213            Box::pin(incoming_requests),
214            pipeline,
215            pipeline_task_spawner,
216            provider_factory,
217            blockchain_db,
218            pruner,
219            PayloadBuilderHandle::new(tx),
220            engine_payload_validator,
221            TreeConfig::default(),
222            Box::new(NoopInvalidBlockHook::default()),
223            sync_metrics_tx,
224            evm_config,
225            MaybePlatformPath::chain_default(chain_spec.chain.clone()),
226        );
227    }
228}