reth_engine_local/
service.rs

1//! Provides a local dev service engine that can be used to run a dev chain.
2//!
3//! [`LocalEngineService`] polls the payload builder based on a mining mode
4//! which can be set to `Instant` or `Interval`. The `Instant` mode will
5//! constantly poll the payload builder and initiate block building
6//! with a single transaction. The `Interval` mode will initiate block
7//! building at a fixed interval.
8
9use core::fmt;
10use std::{
11    fmt::{Debug, Formatter},
12    pin::Pin,
13    sync::Arc,
14    task::{Context, Poll},
15};
16
17use crate::miner::{LocalMiner, MiningMode};
18use futures_util::{Stream, StreamExt};
19use reth_beacon_consensus::{BeaconConsensusEngineEvent, EngineNodeTypes};
20use reth_chainspec::EthChainSpec;
21use reth_consensus::FullConsensus;
22use reth_engine_primitives::{BeaconEngineMessage, EngineValidator};
23use reth_engine_service::service::EngineMessageStream;
24use reth_engine_tree::{
25    backup::BackupHandle,
26    chain::{ChainEvent, HandlerEvent},
27    engine::{
28        EngineApiKind, EngineApiRequest, EngineApiRequestHandler, EngineRequestHandler, FromEngine,
29        RequestHandlerEvent,
30    },
31    persistence::PersistenceHandle,
32    tree::{EngineApiTreeHandler, InvalidBlockHook, TreeConfig},
33};
34use reth_evm::execute::BlockExecutorProvider;
35use reth_node_core::dirs::{ChainPath, DataDirPath};
36use reth_node_types::BlockTy;
37use reth_payload_builder::PayloadBuilderHandle;
38use reth_payload_primitives::{PayloadAttributesBuilder, PayloadTypes};
39use reth_provider::{providers::BlockchainProvider2, ChainSpecProvider, ProviderFactory};
40use reth_prune::PrunerWithFactory;
41use reth_stages_api::MetricEventsSender;
42use tokio::sync::mpsc::UnboundedSender;
43use tracing::error;
44
45/// Provides a local dev service engine that can be used to drive the
46/// chain forward.
47///
48/// This service both produces and consumes [`BeaconEngineMessage`]s. This is done to allow
49/// modifications of the stream
50pub struct LocalEngineService<N>
51where
52    N: EngineNodeTypes,
53{
54    /// Processes requests.
55    ///
56    /// This type is responsible for processing incoming requests.
57    handler: EngineApiRequestHandler<EngineApiRequest<N::Engine, N::Primitives>, N::Primitives>,
58    /// Receiver for incoming requests (from the engine API endpoint) that need to be processed.
59    incoming_requests: EngineMessageStream<N::Engine>,
60}
61
62impl<N> LocalEngineService<N>
63where
64    N: EngineNodeTypes,
65{
66    /// Constructor for [`LocalEngineService`].
67    #[allow(clippy::too_many_arguments)]
68    pub fn new<B, V>(
69        consensus: Arc<dyn FullConsensus<N::Primitives>>,
70        executor_factory: impl BlockExecutorProvider<Primitives = N::Primitives>,
71        provider: ProviderFactory<N>,
72        blockchain_db: BlockchainProvider2<N>,
73        pruner: PrunerWithFactory<ProviderFactory<N>>,
74        payload_builder: PayloadBuilderHandle<N::Engine>,
75        payload_validator: V,
76        tree_config: TreeConfig,
77        invalid_block_hook: Box<dyn InvalidBlockHook<N::Primitives>>,
78        sync_metrics_tx: MetricEventsSender,
79        to_engine: UnboundedSender<BeaconEngineMessage<N::Engine>>,
80        from_engine: EngineMessageStream<N::Engine>,
81        mode: MiningMode,
82        payload_attributes_builder: B,
83        data_dir: ChainPath<DataDirPath>,
84    ) -> Self
85    where
86        B: PayloadAttributesBuilder<<N::Engine as PayloadTypes>::PayloadAttributes>,
87        V: EngineValidator<N::Engine, Block = BlockTy<N>>,
88    {
89        let chain_spec = provider.chain_spec();
90        let engine_kind =
91            if chain_spec.is_optimism() { EngineApiKind::OpStack } else { EngineApiKind::Ethereum };
92
93        let persistence_handle =
94            PersistenceHandle::<N::Primitives>::spawn_service(provider, pruner, sync_metrics_tx);
95        let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
96        let backup_handle = BackupHandle::spawn_service(data_dir);
97
98        let (to_tree_tx, from_tree) = EngineApiTreeHandler::<N::Primitives, _, _, _, _>::spawn_new(
99            blockchain_db.clone(),
100            executor_factory,
101            consensus,
102            payload_validator,
103            persistence_handle,
104            payload_builder.clone(),
105            canonical_in_memory_state,
106            tree_config,
107            invalid_block_hook,
108            engine_kind,
109            backup_handle,
110        );
111
112        let handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
113
114        LocalMiner::spawn_new(
115            blockchain_db,
116            payload_attributes_builder,
117            to_engine,
118            mode,
119            payload_builder,
120        );
121
122        Self { handler, incoming_requests: from_engine }
123    }
124}
125
126impl<N> Stream for LocalEngineService<N>
127where
128    N: EngineNodeTypes,
129{
130    type Item = ChainEvent<BeaconConsensusEngineEvent<N::Primitives>>;
131
132    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
133        let this = self.get_mut();
134
135        if let Poll::Ready(ev) = this.handler.poll(cx) {
136            return match ev {
137                RequestHandlerEvent::HandlerEvent(ev) => match ev {
138                    HandlerEvent::BackfillAction(_) => {
139                        error!(target: "engine::local", "received backfill request in local engine");
140                        Poll::Ready(Some(ChainEvent::FatalError))
141                    }
142                    HandlerEvent::Event(ev) => Poll::Ready(Some(ChainEvent::Handler(ev))),
143                    HandlerEvent::FatalError => Poll::Ready(Some(ChainEvent::FatalError)),
144                },
145                RequestHandlerEvent::Download(_) => {
146                    error!(target: "engine::local", "received download request in local engine");
147                    Poll::Ready(Some(ChainEvent::FatalError))
148                }
149            }
150        }
151
152        // forward incoming requests to the handler
153        while let Poll::Ready(Some(req)) = this.incoming_requests.poll_next_unpin(cx) {
154            this.handler.on_event(FromEngine::Request(req.into()));
155        }
156
157        Poll::Pending
158    }
159}
160
161impl<N: EngineNodeTypes> Debug for LocalEngineService<N> {
162    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
163        f.debug_struct("LocalEngineService").finish_non_exhaustive()
164    }
165}