1use futures::{Stream, StreamExt};
2use pin_project::pin_project;
3use reth_beacon_consensus::{BeaconConsensusEngineEvent, EngineNodeTypes};
4use reth_chainspec::EthChainSpec;
5use reth_consensus::FullConsensus;
6use reth_engine_primitives::{BeaconEngineMessage, EngineValidator};
7use reth_engine_tree::{
8 backfill::PipelineSync,
9 backup::BackupHandle,
10 download::BasicBlockDownloader,
11 engine::{EngineApiKind, EngineApiRequest, EngineApiRequestHandler, EngineHandler},
12 persistence::PersistenceHandle,
13 tree::{EngineApiTreeHandler, InvalidBlockHook, TreeConfig},
14};
15pub use reth_engine_tree::{
16 chain::{ChainEvent, ChainOrchestrator},
17 engine::EngineApiEvent,
18};
19use reth_evm::execute::BlockExecutorProvider;
20use reth_network_p2p::BlockClient;
21use reth_node_core::dirs::{ChainPath, DataDirPath};
22use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, NodeTypesWithEngine};
23use reth_payload_builder::PayloadBuilderHandle;
24use reth_primitives::EthPrimitives;
25use reth_provider::{providers::BlockchainProvider2, ProviderFactory};
26use reth_prune::PrunerWithFactory;
27use reth_stages_api::{MetricEventsSender, Pipeline};
28use reth_tasks::TaskSpawner;
29use std::{
30 marker::PhantomData,
31 pin::Pin,
32 sync::Arc,
33 task::{Context, Poll},
34};
35
36pub type EngineMessageStream<T> = Pin<Box<dyn Stream<Item = BeaconEngineMessage<T>> + Send + Sync>>;
38
39type EngineServiceType<N, Client> = ChainOrchestrator<
41 EngineHandler<
42 EngineApiRequestHandler<
43 EngineApiRequest<<N as NodeTypesWithEngine>::Engine, <N as NodeTypes>::Primitives>,
44 <N as NodeTypes>::Primitives,
45 >,
46 EngineMessageStream<<N as NodeTypesWithEngine>::Engine>,
47 BasicBlockDownloader<Client, BlockTy<N>>,
48 >,
49 PipelineSync<N>,
50>;
51
52#[pin_project]
54#[allow(missing_debug_implementations)]
55pub struct EngineService<N, Client, E>
56where
57 N: EngineNodeTypes,
58 Client: BlockClient<Header = HeaderTy<N>, Body = BodyTy<N>> + 'static,
59 E: BlockExecutorProvider + 'static,
60{
61 orchestrator: EngineServiceType<N, Client>,
62 _marker: PhantomData<E>,
63}
64
65impl<N, Client, E> EngineService<N, Client, E>
66where
67 N: EngineNodeTypes,
68 Client: BlockClient<Header = HeaderTy<N>, Body = BodyTy<N>> + 'static,
69 E: BlockExecutorProvider<Primitives = N::Primitives> + 'static,
70{
71 #[allow(clippy::too_many_arguments)]
73 pub fn new<V>(
74 consensus: Arc<dyn FullConsensus<N::Primitives>>,
75 executor_factory: E,
76 chain_spec: Arc<N::ChainSpec>,
77 client: Client,
78 incoming_requests: EngineMessageStream<N::Engine>,
79 pipeline: Pipeline<N>,
80 pipeline_task_spawner: Box<dyn TaskSpawner>,
81 provider: ProviderFactory<N>,
82 blockchain_db: BlockchainProvider2<N>,
83 pruner: PrunerWithFactory<ProviderFactory<N>>,
84 payload_builder: PayloadBuilderHandle<N::Engine>,
85 payload_validator: V,
86 tree_config: TreeConfig,
87 invalid_block_hook: Box<dyn InvalidBlockHook<N::Primitives>>,
88 sync_metrics_tx: MetricEventsSender,
89 data_dir: ChainPath<DataDirPath>,
90 ) -> Self
91 where
92 V: EngineValidator<N::Engine, Block = BlockTy<N>>,
93 {
94 let engine_kind =
95 if chain_spec.is_optimism() { EngineApiKind::OpStack } else { EngineApiKind::Ethereum };
96
97 let downloader = BasicBlockDownloader::new(client, consensus.clone().as_consensus());
98
99 let persistence_handle =
100 PersistenceHandle::<EthPrimitives>::spawn_service(provider, pruner, sync_metrics_tx);
101
102 let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
103
104 let backup_handle = BackupHandle::spawn_service(data_dir);
105
106 let (to_tree_tx, from_tree) = EngineApiTreeHandler::<N::Primitives, _, _, _, _>::spawn_new(
107 blockchain_db,
108 executor_factory,
109 consensus,
110 payload_validator,
111 persistence_handle,
112 payload_builder,
113 canonical_in_memory_state,
114 tree_config,
115 invalid_block_hook,
116 engine_kind,
117 backup_handle,
118 );
119
120 let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
121 let handler = EngineHandler::new(engine_handler, downloader, incoming_requests);
122
123 let backfill_sync = PipelineSync::new(pipeline, pipeline_task_spawner);
124
125 Self {
126 orchestrator: ChainOrchestrator::new(handler, backfill_sync),
127 _marker: Default::default(),
128 }
129 }
130
131 pub fn orchestrator_mut(&mut self) -> &mut EngineServiceType<N, Client> {
133 &mut self.orchestrator
134 }
135}
136
137impl<N, Client, E> Stream for EngineService<N, Client, E>
138where
139 N: EngineNodeTypes,
140 Client: BlockClient<Header = HeaderTy<N>, Body = BodyTy<N>> + 'static,
141 E: BlockExecutorProvider + 'static,
142{
143 type Item = ChainEvent<BeaconConsensusEngineEvent<N::Primitives>>;
144
145 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
146 let mut orchestrator = self.project().orchestrator;
147 StreamExt::poll_next_unpin(&mut orchestrator, cx)
148 }
149}
150
151#[derive(Debug, thiserror::Error)]
153#[error("Engine service error.")]
154pub struct EngineServiceError {}
155
156#[cfg(test)]
157mod tests {
158 use super::*;
159 use reth_beacon_consensus::EthBeaconConsensus;
160 use reth_chainspec::{ChainSpecBuilder, MAINNET};
161 use reth_engine_primitives::BeaconEngineMessage;
162 use reth_engine_tree::{test_utils::TestPipelineBuilder, tree::NoopInvalidBlockHook};
163 use reth_ethereum_engine_primitives::{EthEngineTypes, EthereumEngineValidator};
164 use reth_evm_ethereum::execute::EthExecutorProvider;
165 use reth_exex_types::FinishedExExHeight;
166 use reth_network_p2p::test_utils::TestFullBlockClient;
167 use reth_node_core::dirs::MaybePlatformPath;
168 use reth_primitives::SealedHeader;
169 use reth_provider::{
170 providers::BlockchainProvider2, test_utils::create_test_provider_factory_with_chain_spec,
171 };
172 use reth_prune::Pruner;
173 use reth_tasks::TokioTaskExecutor;
174 use std::sync::Arc;
175 use tokio::sync::{mpsc::unbounded_channel, watch};
176 use tokio_stream::wrappers::UnboundedReceiverStream;
177
178 #[test]
179 fn eth_chain_orchestrator_build() {
180 let chain_spec = Arc::new(
181 ChainSpecBuilder::default()
182 .chain(MAINNET.chain)
183 .genesis(MAINNET.genesis.clone())
184 .paris_activated()
185 .build(),
186 );
187 let consensus = Arc::new(EthBeaconConsensus::new(chain_spec.clone()));
188
189 let client = TestFullBlockClient::default();
190
191 let (_tx, rx) = unbounded_channel::<BeaconEngineMessage<EthEngineTypes>>();
192 let incoming_requests = UnboundedReceiverStream::new(rx);
193
194 let pipeline = TestPipelineBuilder::new().build(chain_spec.clone());
195 let pipeline_task_spawner = Box::<TokioTaskExecutor>::default();
196 let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
197
198 let executor_factory = EthExecutorProvider::ethereum(chain_spec.clone());
199 let blockchain_db =
200 BlockchainProvider2::with_latest(provider_factory.clone(), SealedHeader::default())
201 .unwrap();
202 let engine_payload_validator = EthereumEngineValidator::new(chain_spec.clone());
203 let (_tx, rx) = watch::channel(FinishedExExHeight::NoExExs);
204 let pruner = Pruner::new_with_factory(provider_factory.clone(), vec![], 0, 0, None, rx);
205
206 let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
207 let (tx, _rx) = unbounded_channel();
208 let _eth_service = EngineService::new(
209 consensus,
210 executor_factory,
211 chain_spec.clone(),
212 client,
213 Box::pin(incoming_requests),
214 pipeline,
215 pipeline_task_spawner,
216 provider_factory,
217 blockchain_db,
218 pruner,
219 PayloadBuilderHandle::new(tx),
220 engine_payload_validator,
221 TreeConfig::default(),
222 Box::new(NoopInvalidBlockHook::default()),
223 sync_metrics_tx,
224 MaybePlatformPath::chain_default(chain_spec.chain.clone()),
225 );
226 }
227}