reth_engine_service/
service.rs

1use futures::{Stream, StreamExt};
2use pin_project::pin_project;
3use reth_beacon_consensus::{BeaconConsensusEngineEvent, EngineNodeTypes};
4use reth_chainspec::EthChainSpec;
5use reth_consensus::FullConsensus;
6use reth_engine_primitives::{BeaconEngineMessage, EngineValidator};
7use reth_engine_tree::{
8    backfill::PipelineSync,
9    backup::BackupHandle,
10    download::BasicBlockDownloader,
11    engine::{EngineApiKind, EngineApiRequest, EngineApiRequestHandler, EngineHandler},
12    persistence::PersistenceHandle,
13    tree::{EngineApiTreeHandler, InvalidBlockHook, TreeConfig},
14};
15pub use reth_engine_tree::{
16    chain::{ChainEvent, ChainOrchestrator},
17    engine::EngineApiEvent,
18};
19use reth_evm::execute::BlockExecutorProvider;
20use reth_network_p2p::BlockClient;
21use reth_node_core::dirs::{ChainPath, DataDirPath};
22use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, NodeTypesWithEngine};
23use reth_payload_builder::PayloadBuilderHandle;
24use reth_primitives::EthPrimitives;
25use reth_provider::{providers::BlockchainProvider2, ProviderFactory};
26use reth_prune::PrunerWithFactory;
27use reth_stages_api::{MetricEventsSender, Pipeline};
28use reth_tasks::TaskSpawner;
29use std::{
30    marker::PhantomData,
31    pin::Pin,
32    sync::Arc,
33    task::{Context, Poll},
34};
35
36/// Alias for consensus engine stream.
37pub type EngineMessageStream<T> = Pin<Box<dyn Stream<Item = BeaconEngineMessage<T>> + Send + Sync>>;
38
39/// Alias for chain orchestrator.
40type EngineServiceType<N, Client> = ChainOrchestrator<
41    EngineHandler<
42        EngineApiRequestHandler<
43            EngineApiRequest<<N as NodeTypesWithEngine>::Engine, <N as NodeTypes>::Primitives>,
44            <N as NodeTypes>::Primitives,
45        >,
46        EngineMessageStream<<N as NodeTypesWithEngine>::Engine>,
47        BasicBlockDownloader<Client, BlockTy<N>>,
48    >,
49    PipelineSync<N>,
50>;
51
52/// The type that drives the chain forward and communicates progress.
53#[pin_project]
54#[allow(missing_debug_implementations)]
55pub struct EngineService<N, Client, E>
56where
57    N: EngineNodeTypes,
58    Client: BlockClient<Header = HeaderTy<N>, Body = BodyTy<N>> + 'static,
59    E: BlockExecutorProvider + 'static,
60{
61    orchestrator: EngineServiceType<N, Client>,
62    _marker: PhantomData<E>,
63}
64
65impl<N, Client, E> EngineService<N, Client, E>
66where
67    N: EngineNodeTypes,
68    Client: BlockClient<Header = HeaderTy<N>, Body = BodyTy<N>> + 'static,
69    E: BlockExecutorProvider<Primitives = N::Primitives> + 'static,
70{
71    /// Constructor for `EngineService`.
72    #[allow(clippy::too_many_arguments)]
73    pub fn new<V>(
74        consensus: Arc<dyn FullConsensus<N::Primitives>>,
75        executor_factory: E,
76        chain_spec: Arc<N::ChainSpec>,
77        client: Client,
78        incoming_requests: EngineMessageStream<N::Engine>,
79        pipeline: Pipeline<N>,
80        pipeline_task_spawner: Box<dyn TaskSpawner>,
81        provider: ProviderFactory<N>,
82        blockchain_db: BlockchainProvider2<N>,
83        pruner: PrunerWithFactory<ProviderFactory<N>>,
84        payload_builder: PayloadBuilderHandle<N::Engine>,
85        payload_validator: V,
86        tree_config: TreeConfig,
87        invalid_block_hook: Box<dyn InvalidBlockHook<N::Primitives>>,
88        sync_metrics_tx: MetricEventsSender,
89        data_dir: ChainPath<DataDirPath>,
90    ) -> Self
91    where
92        V: EngineValidator<N::Engine, Block = BlockTy<N>>,
93    {
94        let engine_kind =
95            if chain_spec.is_optimism() { EngineApiKind::OpStack } else { EngineApiKind::Ethereum };
96
97        let downloader = BasicBlockDownloader::new(client, consensus.clone().as_consensus());
98
99        let persistence_handle =
100            PersistenceHandle::<EthPrimitives>::spawn_service(provider, pruner, sync_metrics_tx);
101
102        let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
103
104        let backup_handle = BackupHandle::spawn_service(data_dir);
105
106        let (to_tree_tx, from_tree) = EngineApiTreeHandler::<N::Primitives, _, _, _, _>::spawn_new(
107            blockchain_db,
108            executor_factory,
109            consensus,
110            payload_validator,
111            persistence_handle,
112            payload_builder,
113            canonical_in_memory_state,
114            tree_config,
115            invalid_block_hook,
116            engine_kind,
117            backup_handle,
118        );
119
120        let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
121        let handler = EngineHandler::new(engine_handler, downloader, incoming_requests);
122
123        let backfill_sync = PipelineSync::new(pipeline, pipeline_task_spawner);
124
125        Self {
126            orchestrator: ChainOrchestrator::new(handler, backfill_sync),
127            _marker: Default::default(),
128        }
129    }
130
131    /// Returns a mutable reference to the orchestrator.
132    pub fn orchestrator_mut(&mut self) -> &mut EngineServiceType<N, Client> {
133        &mut self.orchestrator
134    }
135}
136
137impl<N, Client, E> Stream for EngineService<N, Client, E>
138where
139    N: EngineNodeTypes,
140    Client: BlockClient<Header = HeaderTy<N>, Body = BodyTy<N>> + 'static,
141    E: BlockExecutorProvider + 'static,
142{
143    type Item = ChainEvent<BeaconConsensusEngineEvent<N::Primitives>>;
144
145    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
146        let mut orchestrator = self.project().orchestrator;
147        StreamExt::poll_next_unpin(&mut orchestrator, cx)
148    }
149}
150
151/// Potential error returned by `EngineService`.
152#[derive(Debug, thiserror::Error)]
153#[error("Engine service error.")]
154pub struct EngineServiceError {}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159    use reth_beacon_consensus::EthBeaconConsensus;
160    use reth_chainspec::{ChainSpecBuilder, MAINNET};
161    use reth_engine_primitives::BeaconEngineMessage;
162    use reth_engine_tree::{test_utils::TestPipelineBuilder, tree::NoopInvalidBlockHook};
163    use reth_ethereum_engine_primitives::{EthEngineTypes, EthereumEngineValidator};
164    use reth_evm_ethereum::execute::EthExecutorProvider;
165    use reth_exex_types::FinishedExExHeight;
166    use reth_network_p2p::test_utils::TestFullBlockClient;
167    use reth_node_core::dirs::MaybePlatformPath;
168    use reth_primitives::SealedHeader;
169    use reth_provider::{
170        providers::BlockchainProvider2, test_utils::create_test_provider_factory_with_chain_spec,
171    };
172    use reth_prune::Pruner;
173    use reth_tasks::TokioTaskExecutor;
174    use std::sync::Arc;
175    use tokio::sync::{mpsc::unbounded_channel, watch};
176    use tokio_stream::wrappers::UnboundedReceiverStream;
177
178    #[test]
179    fn eth_chain_orchestrator_build() {
180        let chain_spec = Arc::new(
181            ChainSpecBuilder::default()
182                .chain(MAINNET.chain)
183                .genesis(MAINNET.genesis.clone())
184                .paris_activated()
185                .build(),
186        );
187        let consensus = Arc::new(EthBeaconConsensus::new(chain_spec.clone()));
188
189        let client = TestFullBlockClient::default();
190
191        let (_tx, rx) = unbounded_channel::<BeaconEngineMessage<EthEngineTypes>>();
192        let incoming_requests = UnboundedReceiverStream::new(rx);
193
194        let pipeline = TestPipelineBuilder::new().build(chain_spec.clone());
195        let pipeline_task_spawner = Box::<TokioTaskExecutor>::default();
196        let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
197
198        let executor_factory = EthExecutorProvider::ethereum(chain_spec.clone());
199        let blockchain_db =
200            BlockchainProvider2::with_latest(provider_factory.clone(), SealedHeader::default())
201                .unwrap();
202        let engine_payload_validator = EthereumEngineValidator::new(chain_spec.clone());
203        let (_tx, rx) = watch::channel(FinishedExExHeight::NoExExs);
204        let pruner = Pruner::new_with_factory(provider_factory.clone(), vec![], 0, 0, None, rx);
205
206        let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
207        let (tx, _rx) = unbounded_channel();
208        let _eth_service = EngineService::new(
209            consensus,
210            executor_factory,
211            chain_spec.clone(),
212            client,
213            Box::pin(incoming_requests),
214            pipeline,
215            pipeline_task_spawner,
216            provider_factory,
217            blockchain_db,
218            pruner,
219            PayloadBuilderHandle::new(tx),
220            engine_payload_validator,
221            TreeConfig::default(),
222            Box::new(NoopInvalidBlockHook::default()),
223            sync_metrics_tx,
224            MaybePlatformPath::chain_default(chain_spec.chain.clone()),
225        );
226    }
227}