reth_engine_local/
service.rs

1//! Provides a local dev service engine that can be used to run a dev chain.
2//!
3//! [`LocalEngineService`] polls the payload builder based on a mining mode
4//! which can be set to `Instant` or `Interval`. The `Instant` mode will
5//! constantly poll the payload builder and initiate block building
6//! with a single transaction. The `Interval` mode will initiate block
7//! building at a fixed interval.
8
9use core::fmt;
10use std::{
11    fmt::{Debug, Formatter},
12    pin::Pin,
13    sync::Arc,
14    task::{Context, Poll},
15};
16
17use crate::miner::{LocalMiner, MiningMode};
18use futures_util::{Stream, StreamExt};
19use reth_chainspec::EthChainSpec;
20use reth_consensus::{ConsensusError, FullConsensus};
21use reth_engine_primitives::{BeaconConsensusEngineEvent, BeaconEngineMessage, EngineValidator};
22use reth_engine_service::service::EngineMessageStream;
23use reth_engine_tree::{
24    backup::BackupHandle,
25    chain::{ChainEvent, HandlerEvent},
26    engine::{
27        EngineApiKind, EngineApiRequest, EngineApiRequestHandler, EngineRequestHandler, FromEngine,
28        RequestHandlerEvent,
29    },
30    persistence::PersistenceHandle,
31    tree::{EngineApiTreeHandler, InvalidBlockHook, TreeConfig},
32};
33use reth_evm::ConfigureEvm;
34use reth_node_types::BlockTy;
35use reth_payload_builder::PayloadBuilderHandle;
36use reth_payload_primitives::{PayloadAttributesBuilder, PayloadTypes};
37use reth_provider::{
38    providers::{BlockchainProvider, ProviderNodeTypes},
39    ChainSpecProvider, ProviderFactory,
40};
41use reth_prune::PrunerWithFactory;
42use reth_stages_api::MetricEventsSender;
43use tokio::sync::mpsc::UnboundedSender;
44use tracing::error;
45
46// seismic imports not used by upstream
47use reth_node_core::dirs::{ChainPath, DataDirPath};
48
49/// Provides a local dev service engine that can be used to drive the
50/// chain forward.
51///
52/// This service both produces and consumes [`BeaconEngineMessage`]s. This is done to allow
53/// modifications of the stream
54pub struct LocalEngineService<N>
55where
56    N: ProviderNodeTypes,
57{
58    /// Processes requests.
59    ///
60    /// This type is responsible for processing incoming requests.
61    handler: EngineApiRequestHandler<EngineApiRequest<N::Payload, N::Primitives>, N::Primitives>,
62    /// Receiver for incoming requests (from the engine API endpoint) that need to be processed.
63    incoming_requests: EngineMessageStream<N::Payload>,
64}
65
66impl<N> LocalEngineService<N>
67where
68    N: ProviderNodeTypes,
69{
70    /// Constructor for [`LocalEngineService`].
71    #[expect(clippy::too_many_arguments)]
72    pub fn new<B, V, C>(
73        consensus: Arc<dyn FullConsensus<N::Primitives, Error = ConsensusError>>,
74        provider: ProviderFactory<N>,
75        blockchain_db: BlockchainProvider<N>,
76        pruner: PrunerWithFactory<ProviderFactory<N>>,
77        payload_builder: PayloadBuilderHandle<N::Payload>,
78        payload_validator: V,
79        tree_config: TreeConfig,
80        invalid_block_hook: Box<dyn InvalidBlockHook<N::Primitives>>,
81        sync_metrics_tx: MetricEventsSender,
82        to_engine: UnboundedSender<BeaconEngineMessage<N::Payload>>,
83        from_engine: EngineMessageStream<N::Payload>,
84        mode: MiningMode,
85        payload_attributes_builder: B,
86        evm_config: C,
87        data_dir: ChainPath<DataDirPath>,
88    ) -> Self
89    where
90        B: PayloadAttributesBuilder<<N::Payload as PayloadTypes>::PayloadAttributes>,
91        V: EngineValidator<N::Payload, Block = BlockTy<N>>,
92        C: ConfigureEvm<Primitives = N::Primitives> + 'static,
93    {
94        let chain_spec = provider.chain_spec();
95        let engine_kind =
96            if chain_spec.is_optimism() { EngineApiKind::OpStack } else { EngineApiKind::Ethereum };
97
98        let persistence_handle =
99            PersistenceHandle::<N::Primitives>::spawn_service(provider, pruner, sync_metrics_tx);
100        let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
101        let backup_handle = BackupHandle::spawn_service(data_dir);
102
103        let (to_tree_tx, from_tree) = EngineApiTreeHandler::<N::Primitives, _, _, _, _>::spawn_new(
104            blockchain_db.clone(),
105            consensus,
106            payload_validator,
107            persistence_handle,
108            payload_builder.clone(),
109            canonical_in_memory_state,
110            tree_config,
111            invalid_block_hook,
112            engine_kind,
113            evm_config,
114            backup_handle,
115        );
116
117        let handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
118
119        LocalMiner::spawn_new(
120            blockchain_db,
121            payload_attributes_builder,
122            to_engine,
123            mode,
124            payload_builder,
125        );
126
127        Self { handler, incoming_requests: from_engine }
128    }
129}
130
131impl<N> Stream for LocalEngineService<N>
132where
133    N: ProviderNodeTypes,
134{
135    type Item = ChainEvent<BeaconConsensusEngineEvent<N::Primitives>>;
136
137    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
138        let this = self.get_mut();
139
140        if let Poll::Ready(ev) = this.handler.poll(cx) {
141            return match ev {
142                RequestHandlerEvent::HandlerEvent(ev) => match ev {
143                    HandlerEvent::BackfillAction(_) => {
144                        error!(target: "engine::local", "received backfill request in local engine");
145                        Poll::Ready(Some(ChainEvent::FatalError))
146                    }
147                    HandlerEvent::Event(ev) => Poll::Ready(Some(ChainEvent::Handler(ev))),
148                    HandlerEvent::FatalError => Poll::Ready(Some(ChainEvent::FatalError)),
149                },
150                RequestHandlerEvent::Download(_) => {
151                    error!(target: "engine::local", "received download request in local engine");
152                    Poll::Ready(Some(ChainEvent::FatalError))
153                }
154            }
155        }
156
157        // forward incoming requests to the handler
158        while let Poll::Ready(Some(req)) = this.incoming_requests.poll_next_unpin(cx) {
159            this.handler.on_event(FromEngine::Request(req.into()));
160        }
161
162        Poll::Pending
163    }
164}
165
166impl<N: ProviderNodeTypes> Debug for LocalEngineService<N> {
167    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
168        f.debug_struct("LocalEngineService").finish_non_exhaustive()
169    }
170}