reth_engine_local/
miner.rs

1//! Contains the implementation of the mining mode for the local engine.
2
3use alloy_consensus::BlockHeader;
4use alloy_primitives::{TxHash, B256};
5use alloy_rpc_types_engine::{CancunPayloadFields, ExecutionPayloadSidecar, ForkchoiceState};
6use eyre::OptionExt;
7use futures_util::{stream::Fuse, StreamExt};
8use reth_chainspec::EthereumHardforks;
9use reth_engine_primitives::{BeaconEngineMessage, EngineApiMessageVersion, EngineTypes};
10use reth_payload_builder::PayloadBuilderHandle;
11use reth_payload_builder_primitives::PayloadBuilder;
12use reth_payload_primitives::{BuiltPayload, PayloadAttributesBuilder, PayloadKind, PayloadTypes};
13use reth_provider::{BlockReader, ChainSpecProvider};
14use reth_rpc_types_compat::engine::payload::block_to_payload;
15use reth_transaction_pool::TransactionPool;
16use std::{
17    future::Future,
18    pin::Pin,
19    task::{Context, Poll},
20    time::{Duration, UNIX_EPOCH},
21};
22use tokio::{
23    sync::{mpsc::UnboundedSender, oneshot},
24    time::Interval,
25};
26use tokio_stream::wrappers::ReceiverStream;
27use tracing::error;
28
29/// A mining mode for the local dev engine.
30#[derive(Debug)]
31pub enum MiningMode {
32    /// In this mode a block is built as soon as
33    /// a valid transaction reaches the pool.
34    Instant(Fuse<ReceiverStream<TxHash>>),
35    /// In this mode a block is built at a fixed interval.
36    Interval(Interval),
37}
38
39impl MiningMode {
40    /// Constructor for a [`MiningMode::Instant`]
41    pub fn instant<Pool: TransactionPool>(pool: Pool) -> Self {
42        let rx = pool.pending_transactions_listener();
43        Self::Instant(ReceiverStream::new(rx).fuse())
44    }
45
46    /// Constructor for a [`MiningMode::Interval`]
47    pub fn interval(duration: Duration) -> Self {
48        let start = tokio::time::Instant::now() + duration;
49        Self::Interval(tokio::time::interval_at(start, duration))
50    }
51}
52
53impl Future for MiningMode {
54    type Output = ();
55
56    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
57        let this = self.get_mut();
58        match this {
59            Self::Instant(rx) => {
60                // drain all transactions notifications
61                if let Poll::Ready(Some(_)) = rx.poll_next_unpin(cx) {
62                    return Poll::Ready(())
63                }
64                Poll::Pending
65            }
66            Self::Interval(interval) => {
67                if interval.poll_tick(cx).is_ready() {
68                    return Poll::Ready(())
69                }
70                Poll::Pending
71            }
72        }
73    }
74}
75
76/// Local miner advancing the chain/
77#[derive(Debug)]
78pub struct LocalMiner<EngineT: EngineTypes, Provider, B> {
79    /// Provider to read the current tip of the chain.
80    provider: Provider,
81    /// The payload attribute builder for the engine
82    payload_attributes_builder: B,
83    /// Sender for events to engine.
84    to_engine: UnboundedSender<BeaconEngineMessage<EngineT>>,
85    /// The mining mode for the engine
86    mode: MiningMode,
87    /// The payload builder for the engine
88    payload_builder: PayloadBuilderHandle<EngineT>,
89    /// Timestamp for the next block.
90    last_timestamp: u64,
91    /// Stores latest mined blocks.
92    last_block_hashes: Vec<B256>,
93}
94
95impl<EngineT, Provider, B> LocalMiner<EngineT, Provider, B>
96where
97    EngineT: EngineTypes,
98    Provider: BlockReader + ChainSpecProvider<ChainSpec: EthereumHardforks> + 'static,
99    B: PayloadAttributesBuilder<<EngineT as PayloadTypes>::PayloadAttributes>,
100{
101    /// Spawns a new [`LocalMiner`] with the given parameters.
102    pub fn spawn_new(
103        provider: Provider,
104        payload_attributes_builder: B,
105        to_engine: UnboundedSender<BeaconEngineMessage<EngineT>>,
106        mode: MiningMode,
107        payload_builder: PayloadBuilderHandle<EngineT>,
108    ) {
109        let latest_header =
110            provider.sealed_header(provider.best_block_number().unwrap()).unwrap().unwrap();
111
112        let miner = Self {
113            provider,
114            payload_attributes_builder,
115            to_engine,
116            mode,
117            payload_builder,
118            last_timestamp: latest_header.timestamp(),
119            last_block_hashes: vec![latest_header.hash()],
120        };
121
122        // Spawn the miner
123        tokio::spawn(miner.run());
124    }
125
126    /// Runs the [`LocalMiner`] in a loop, polling the miner and building payloads.
127    async fn run(mut self) {
128        let mut fcu_interval = tokio::time::interval(Duration::from_secs(1));
129        loop {
130            tokio::select! {
131                // Wait for the interval or the pool to receive a transaction
132                _ = &mut self.mode => {
133                    if let Err(e) = self.advance().await {
134                        error!(target: "engine::local", "Error advancing the chain: {:?}", e);
135                    }
136                }
137                // send FCU once in a while
138                _ = fcu_interval.tick() => {
139                    if let Err(e) = self.update_forkchoice_state().await {
140                        error!(target: "engine::local", "Error updating fork choice: {:?}", e);
141                    }
142                }
143            }
144        }
145    }
146
147    /// Returns current forkchoice state.
148    fn forkchoice_state(&self) -> ForkchoiceState {
149        ForkchoiceState {
150            head_block_hash: *self.last_block_hashes.last().expect("at least 1 block exists"),
151            safe_block_hash: *self
152                .last_block_hashes
153                .get(self.last_block_hashes.len().saturating_sub(32))
154                .expect("at least 1 block exists"),
155            finalized_block_hash: *self
156                .last_block_hashes
157                .get(self.last_block_hashes.len().saturating_sub(64))
158                .expect("at least 1 block exists"),
159        }
160    }
161
162    /// Sends a FCU to the engine.
163    async fn update_forkchoice_state(&self) -> eyre::Result<()> {
164        let (tx, rx) = oneshot::channel();
165        self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
166            state: self.forkchoice_state(),
167            payload_attrs: None,
168            tx,
169            version: EngineApiMessageVersion::default(),
170        })?;
171
172        let res = rx.await??;
173        if !res.forkchoice_status().is_valid() {
174            eyre::bail!("Invalid fork choice update")
175        }
176
177        Ok(())
178    }
179
180    /// Generates payload attributes for a new block, passes them to FCU and inserts built payload
181    /// through newPayload.
182    async fn advance(&mut self) -> eyre::Result<()> {
183        let timestamp = std::cmp::max(
184            self.last_timestamp + 1,
185            std::time::SystemTime::now()
186                .duration_since(UNIX_EPOCH)
187                .expect("cannot be earlier than UNIX_EPOCH")
188                .as_secs(),
189        );
190
191        let (tx, rx) = oneshot::channel();
192        self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
193            state: self.forkchoice_state(),
194            payload_attrs: Some(self.payload_attributes_builder.build(timestamp)),
195            tx,
196            version: EngineApiMessageVersion::default(),
197        })?;
198
199        let res = rx.await??.await?;
200        if !res.payload_status.is_valid() {
201            eyre::bail!("Invalid payload status")
202        }
203
204        let payload_id = res.payload_id.ok_or_eyre("No payload id")?;
205
206        let Some(Ok(payload)) =
207            self.payload_builder.resolve_kind(payload_id, PayloadKind::WaitForPending).await
208        else {
209            eyre::bail!("No payload")
210        };
211
212        let block = payload.block();
213
214        let cancun_fields = self
215            .provider
216            .chain_spec()
217            .is_cancun_active_at_timestamp(block.timestamp)
218            .then(|| CancunPayloadFields {
219                parent_beacon_block_root: block.parent_beacon_block_root.unwrap(),
220                versioned_hashes: block.body.blob_versioned_hashes().into_iter().copied().collect(),
221            });
222
223        let (tx, rx) = oneshot::channel();
224        self.to_engine.send(BeaconEngineMessage::NewPayload {
225            payload: block_to_payload(payload.block().clone()),
226            // todo: prague support
227            sidecar: cancun_fields
228                .map(ExecutionPayloadSidecar::v3)
229                .unwrap_or_else(ExecutionPayloadSidecar::none),
230            tx,
231        })?;
232
233        let res = rx.await??;
234
235        if !res.is_valid() {
236            eyre::bail!("Invalid payload")
237        }
238
239        self.last_timestamp = timestamp;
240        self.last_block_hashes.push(block.hash());
241        // ensure we keep at most 64 blocks
242        if self.last_block_hashes.len() > 64 {
243            self.last_block_hashes =
244                self.last_block_hashes.split_off(self.last_block_hashes.len() - 64);
245        }
246
247        Ok(())
248    }
249}