1use futures::{Stream, StreamExt};
2use pin_project::pin_project;
3use reth_chainspec::EthChainSpec;
4use reth_consensus::{ConsensusError, FullConsensus};
5use reth_engine_primitives::{BeaconConsensusEngineEvent, BeaconEngineMessage, EngineValidator};
6use reth_engine_tree::{
7 backfill::PipelineSync,
8 backup::BackupHandle,
9 download::BasicBlockDownloader,
10 engine::{EngineApiKind, EngineApiRequest, EngineApiRequestHandler, EngineHandler},
11 persistence::PersistenceHandle,
12 tree::{EngineApiTreeHandler, InvalidBlockHook, TreeConfig},
13};
14pub use reth_engine_tree::{
15 chain::{ChainEvent, ChainOrchestrator},
16 engine::EngineApiEvent,
17};
18use reth_ethereum_primitives::EthPrimitives;
19use reth_evm::ConfigureEvm;
20use reth_network_p2p::BlockClient;
21use reth_node_types::{BlockTy, NodeTypes};
22use reth_payload_builder::PayloadBuilderHandle;
23use reth_provider::{
24 providers::{BlockchainProvider, ProviderNodeTypes},
25 ProviderFactory,
26};
27use reth_prune::PrunerWithFactory;
28use reth_stages_api::{MetricEventsSender, Pipeline};
29use reth_tasks::TaskSpawner;
30use std::{
31 pin::Pin,
32 sync::Arc,
33 task::{Context, Poll},
34};
35
36use reth_node_core::dirs::{ChainPath, DataDirPath};
38
39pub type EngineMessageStream<T> = Pin<Box<dyn Stream<Item = BeaconEngineMessage<T>> + Send + Sync>>;
41
42type EngineServiceType<N, Client> = ChainOrchestrator<
44 EngineHandler<
45 EngineApiRequestHandler<
46 EngineApiRequest<<N as NodeTypes>::Payload, <N as NodeTypes>::Primitives>,
47 <N as NodeTypes>::Primitives,
48 >,
49 EngineMessageStream<<N as NodeTypes>::Payload>,
50 BasicBlockDownloader<Client, BlockTy<N>>,
51 >,
52 PipelineSync<N>,
53>;
54
55#[pin_project]
57#[expect(missing_debug_implementations)]
58#[doc(hidden)]
61pub struct EngineService<N, Client>
62where
63 N: ProviderNodeTypes,
64 Client: BlockClient<Block = BlockTy<N>> + 'static,
65{
66 orchestrator: EngineServiceType<N, Client>,
67}
68
69impl<N, Client> EngineService<N, Client>
70where
71 N: ProviderNodeTypes,
72 Client: BlockClient<Block = BlockTy<N>> + 'static,
73{
74 #[expect(clippy::too_many_arguments)]
76 pub fn new<V, C>(
77 consensus: Arc<dyn FullConsensus<N::Primitives, Error = ConsensusError>>,
78 chain_spec: Arc<N::ChainSpec>,
79 client: Client,
80 incoming_requests: EngineMessageStream<N::Payload>,
81 pipeline: Pipeline<N>,
82 pipeline_task_spawner: Box<dyn TaskSpawner>,
83 provider: ProviderFactory<N>,
84 blockchain_db: BlockchainProvider<N>,
85 pruner: PrunerWithFactory<ProviderFactory<N>>,
86 payload_builder: PayloadBuilderHandle<N::Payload>,
87 payload_validator: V,
88 tree_config: TreeConfig,
89 invalid_block_hook: Box<dyn InvalidBlockHook<N::Primitives>>,
90 sync_metrics_tx: MetricEventsSender,
91 evm_config: C,
92 data_dir: ChainPath<DataDirPath>,
93 ) -> Self
94 where
95 V: EngineValidator<N::Payload, Block = BlockTy<N>>,
96 C: ConfigureEvm<Primitives = N::Primitives> + 'static,
97 {
98 let engine_kind =
99 if chain_spec.is_optimism() { EngineApiKind::OpStack } else { EngineApiKind::Ethereum };
100
101 let downloader = BasicBlockDownloader::new(client, consensus.clone());
102
103 let persistence_handle =
104 PersistenceHandle::<EthPrimitives>::spawn_service(provider, pruner, sync_metrics_tx);
105
106 let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
107
108 let backup_handle = BackupHandle::spawn_service(data_dir);
109
110 let (to_tree_tx, from_tree) = EngineApiTreeHandler::<N::Primitives, _, _, _, _>::spawn_new(
111 blockchain_db,
112 consensus,
113 payload_validator,
114 persistence_handle,
115 payload_builder,
116 canonical_in_memory_state,
117 tree_config,
118 invalid_block_hook,
119 engine_kind,
120 evm_config,
121 backup_handle,
122 );
123
124 let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
125 let handler = EngineHandler::new(engine_handler, downloader, incoming_requests);
126
127 let backfill_sync = PipelineSync::new(pipeline, pipeline_task_spawner);
128
129 Self { orchestrator: ChainOrchestrator::new(handler, backfill_sync) }
130 }
131
132 pub fn orchestrator_mut(&mut self) -> &mut EngineServiceType<N, Client> {
134 &mut self.orchestrator
135 }
136}
137
138impl<N, Client> Stream for EngineService<N, Client>
139where
140 N: ProviderNodeTypes,
141 Client: BlockClient<Block = BlockTy<N>> + 'static,
142{
143 type Item = ChainEvent<BeaconConsensusEngineEvent<N::Primitives>>;
144
145 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
146 let mut orchestrator = self.project().orchestrator;
147 StreamExt::poll_next_unpin(&mut orchestrator, cx)
148 }
149}
150
151#[derive(Debug, thiserror::Error)]
153#[error("Engine service error.")]
154pub struct EngineServiceError {}
155
156#[cfg(test)]
157mod tests {
158 use super::*;
159 use reth_chainspec::{ChainSpecBuilder, MAINNET};
160 use reth_engine_primitives::BeaconEngineMessage;
161 use reth_engine_tree::{test_utils::TestPipelineBuilder, tree::NoopInvalidBlockHook};
162 use reth_ethereum_consensus::EthBeaconConsensus;
163 use reth_ethereum_engine_primitives::EthEngineTypes;
164 use reth_evm_ethereum::EthEvmConfig;
165 use reth_exex_types::FinishedExExHeight;
166 use reth_network_p2p::test_utils::TestFullBlockClient;
167 use reth_node_core::dirs::MaybePlatformPath;
168 use reth_node_ethereum::EthereumEngineValidator;
169 use reth_primitives_traits::SealedHeader;
170 use reth_provider::{
171 providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
172 };
173 use reth_prune::Pruner;
174 use reth_tasks::TokioTaskExecutor;
175 use std::sync::Arc;
176 use tokio::sync::{mpsc::unbounded_channel, watch};
177 use tokio_stream::wrappers::UnboundedReceiverStream;
178
179 #[test]
180 fn eth_chain_orchestrator_build() {
181 let chain_spec = Arc::new(
182 ChainSpecBuilder::default()
183 .chain(MAINNET.chain)
184 .genesis(MAINNET.genesis.clone())
185 .paris_activated()
186 .build(),
187 );
188 let consensus = Arc::new(EthBeaconConsensus::new(chain_spec.clone()));
189
190 let client = TestFullBlockClient::default();
191
192 let (_tx, rx) = unbounded_channel::<BeaconEngineMessage<EthEngineTypes>>();
193 let incoming_requests = UnboundedReceiverStream::new(rx);
194
195 let pipeline = TestPipelineBuilder::new().build(chain_spec.clone());
196 let pipeline_task_spawner = Box::<TokioTaskExecutor>::default();
197 let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
198
199 let blockchain_db =
200 BlockchainProvider::with_latest(provider_factory.clone(), SealedHeader::default())
201 .unwrap();
202 let engine_payload_validator = EthereumEngineValidator::new(chain_spec.clone());
203 let (_tx, rx) = watch::channel(FinishedExExHeight::NoExExs);
204 let pruner = Pruner::new_with_factory(provider_factory.clone(), vec![], 0, 0, None, rx);
205 let evm_config = EthEvmConfig::new(chain_spec.clone());
206
207 let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
208 let (tx, _rx) = unbounded_channel();
209 let _eth_service = EngineService::new(
210 consensus,
211 chain_spec.clone(),
212 client,
213 Box::pin(incoming_requests),
214 pipeline,
215 pipeline_task_spawner,
216 provider_factory,
217 blockchain_db,
218 pruner,
219 PayloadBuilderHandle::new(tx),
220 engine_payload_validator,
221 TreeConfig::default(),
222 Box::new(NoopInvalidBlockHook::default()),
223 sync_metrics_tx,
224 evm_config,
225 MaybePlatformPath::chain_default(chain_spec.chain.clone()),
226 );
227 }
228}