reth_engine_local/
miner.rs1use alloy_consensus::BlockHeader;
4use alloy_primitives::{TxHash, B256};
5use alloy_rpc_types_engine::{CancunPayloadFields, ExecutionPayloadSidecar, ForkchoiceState};
6use eyre::OptionExt;
7use futures_util::{stream::Fuse, StreamExt};
8use reth_chainspec::EthereumHardforks;
9use reth_engine_primitives::{BeaconEngineMessage, EngineApiMessageVersion, EngineTypes};
10use reth_payload_builder::PayloadBuilderHandle;
11use reth_payload_builder_primitives::PayloadBuilder;
12use reth_payload_primitives::{BuiltPayload, PayloadAttributesBuilder, PayloadKind, PayloadTypes};
13use reth_provider::{BlockReader, ChainSpecProvider};
14use reth_rpc_types_compat::engine::payload::block_to_payload;
15use reth_transaction_pool::TransactionPool;
16use std::{
17 future::Future,
18 pin::Pin,
19 task::{Context, Poll},
20 time::{Duration, UNIX_EPOCH},
21};
22use tokio::{
23 sync::{mpsc::UnboundedSender, oneshot},
24 time::Interval,
25};
26use tokio_stream::wrappers::ReceiverStream;
27use tracing::error;
28
29#[derive(Debug)]
31pub enum MiningMode {
32 Instant(Fuse<ReceiverStream<TxHash>>),
35 Interval(Interval),
37}
38
39impl MiningMode {
40 pub fn instant<Pool: TransactionPool>(pool: Pool) -> Self {
42 let rx = pool.pending_transactions_listener();
43 Self::Instant(ReceiverStream::new(rx).fuse())
44 }
45
46 pub fn interval(duration: Duration) -> Self {
48 let start = tokio::time::Instant::now() + duration;
49 Self::Interval(tokio::time::interval_at(start, duration))
50 }
51}
52
53impl Future for MiningMode {
54 type Output = ();
55
56 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
57 let this = self.get_mut();
58 match this {
59 Self::Instant(rx) => {
60 if let Poll::Ready(Some(_)) = rx.poll_next_unpin(cx) {
62 return Poll::Ready(())
63 }
64 Poll::Pending
65 }
66 Self::Interval(interval) => {
67 if interval.poll_tick(cx).is_ready() {
68 return Poll::Ready(())
69 }
70 Poll::Pending
71 }
72 }
73 }
74}
75
76#[derive(Debug)]
78pub struct LocalMiner<EngineT: EngineTypes, Provider, B> {
79 provider: Provider,
81 payload_attributes_builder: B,
83 to_engine: UnboundedSender<BeaconEngineMessage<EngineT>>,
85 mode: MiningMode,
87 payload_builder: PayloadBuilderHandle<EngineT>,
89 last_timestamp: u64,
91 last_block_hashes: Vec<B256>,
93}
94
95impl<EngineT, Provider, B> LocalMiner<EngineT, Provider, B>
96where
97 EngineT: EngineTypes,
98 Provider: BlockReader + ChainSpecProvider<ChainSpec: EthereumHardforks> + 'static,
99 B: PayloadAttributesBuilder<<EngineT as PayloadTypes>::PayloadAttributes>,
100{
101 pub fn spawn_new(
103 provider: Provider,
104 payload_attributes_builder: B,
105 to_engine: UnboundedSender<BeaconEngineMessage<EngineT>>,
106 mode: MiningMode,
107 payload_builder: PayloadBuilderHandle<EngineT>,
108 ) {
109 let latest_header =
110 provider.sealed_header(provider.best_block_number().unwrap()).unwrap().unwrap();
111
112 let miner = Self {
113 provider,
114 payload_attributes_builder,
115 to_engine,
116 mode,
117 payload_builder,
118 last_timestamp: latest_header.timestamp(),
119 last_block_hashes: vec![latest_header.hash()],
120 };
121
122 tokio::spawn(miner.run());
124 }
125
126 async fn run(mut self) {
128 let mut fcu_interval = tokio::time::interval(Duration::from_secs(1));
129 loop {
130 tokio::select! {
131 _ = &mut self.mode => {
133 if let Err(e) = self.advance().await {
134 error!(target: "engine::local", "Error advancing the chain: {:?}", e);
135 }
136 }
137 _ = fcu_interval.tick() => {
139 if let Err(e) = self.update_forkchoice_state().await {
140 error!(target: "engine::local", "Error updating fork choice: {:?}", e);
141 }
142 }
143 }
144 }
145 }
146
147 fn forkchoice_state(&self) -> ForkchoiceState {
149 ForkchoiceState {
150 head_block_hash: *self.last_block_hashes.last().expect("at least 1 block exists"),
151 safe_block_hash: *self
152 .last_block_hashes
153 .get(self.last_block_hashes.len().saturating_sub(32))
154 .expect("at least 1 block exists"),
155 finalized_block_hash: *self
156 .last_block_hashes
157 .get(self.last_block_hashes.len().saturating_sub(64))
158 .expect("at least 1 block exists"),
159 }
160 }
161
162 async fn update_forkchoice_state(&self) -> eyre::Result<()> {
164 let (tx, rx) = oneshot::channel();
165 self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
166 state: self.forkchoice_state(),
167 payload_attrs: None,
168 tx,
169 version: EngineApiMessageVersion::default(),
170 })?;
171
172 let res = rx.await??;
173 if !res.forkchoice_status().is_valid() {
174 eyre::bail!("Invalid fork choice update")
175 }
176
177 Ok(())
178 }
179
180 async fn advance(&mut self) -> eyre::Result<()> {
183 let timestamp = std::cmp::max(
184 self.last_timestamp + 1,
185 std::time::SystemTime::now()
186 .duration_since(UNIX_EPOCH)
187 .expect("cannot be earlier than UNIX_EPOCH")
188 .as_secs(),
189 );
190
191 let (tx, rx) = oneshot::channel();
192 self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
193 state: self.forkchoice_state(),
194 payload_attrs: Some(self.payload_attributes_builder.build(timestamp)),
195 tx,
196 version: EngineApiMessageVersion::default(),
197 })?;
198
199 let res = rx.await??.await?;
200 if !res.payload_status.is_valid() {
201 eyre::bail!("Invalid payload status")
202 }
203
204 let payload_id = res.payload_id.ok_or_eyre("No payload id")?;
205
206 let Some(Ok(payload)) =
207 self.payload_builder.resolve_kind(payload_id, PayloadKind::WaitForPending).await
208 else {
209 eyre::bail!("No payload")
210 };
211
212 let block = payload.block();
213
214 let cancun_fields = self
215 .provider
216 .chain_spec()
217 .is_cancun_active_at_timestamp(block.timestamp)
218 .then(|| CancunPayloadFields {
219 parent_beacon_block_root: block.parent_beacon_block_root.unwrap(),
220 versioned_hashes: block.body.blob_versioned_hashes().into_iter().copied().collect(),
221 });
222
223 let (tx, rx) = oneshot::channel();
224 self.to_engine.send(BeaconEngineMessage::NewPayload {
225 payload: block_to_payload(payload.block().clone()),
226 sidecar: cancun_fields
228 .map(ExecutionPayloadSidecar::v3)
229 .unwrap_or_else(ExecutionPayloadSidecar::none),
230 tx,
231 })?;
232
233 let res = rx.await??;
234
235 if !res.is_valid() {
236 eyre::bail!("Invalid payload")
237 }
238
239 self.last_timestamp = timestamp;
240 self.last_block_hashes.push(block.hash());
241 if self.last_block_hashes.len() > 64 {
243 self.last_block_hashes =
244 self.last_block_hashes.split_off(self.last_block_hashes.len() - 64);
245 }
246
247 Ok(())
248 }
249}