1use alloy_consensus::{Header, Transaction};
4use alloy_primitives::U256;
5use alloy_rpc_types_engine::{
6 CancunPayloadFields, ExecutionPayload, ExecutionPayloadSidecar, ForkchoiceState, PayloadStatus,
7};
8use futures::{stream::FuturesUnordered, Stream, StreamExt, TryFutureExt};
9use itertools::Either;
10use reth_engine_primitives::{
11 BeaconEngineMessage, BeaconOnNewPayloadError, EngineApiMessageVersion, EngineTypes,
12 OnForkChoiceUpdated,
13};
14use reth_errors::{BlockExecutionError, BlockValidationError, RethError, RethResult};
15use reth_ethereum_forks::EthereumHardforks;
16use reth_evm::{
17 state_change::post_block_withdrawals_balance_increments, system_calls::SystemCaller,
18 ConfigureEvm,
19};
20use reth_payload_validator::ExecutionPayloadValidator;
21use reth_primitives::{
22 proofs, transaction::SignedTransactionIntoRecoveredExt, Block, BlockBody, BlockExt, Receipt,
23 Receipts,
24};
25use reth_provider::{BlockReader, ExecutionOutcome, ProviderError, StateProviderFactory};
26use reth_revm::{
27 database::StateProviderDatabase,
28 db::{states::bundle_state::BundleRetention, State},
29 DatabaseCommit,
30};
31use reth_rpc_types_compat::engine::payload::block_to_payload;
32use revm_primitives::{calc_excess_blob_gas, EVMError, EnvWithHandlerCfg};
33use std::{
34 collections::VecDeque,
35 future::Future,
36 pin::Pin,
37 task::{ready, Context, Poll},
38};
39use tokio::sync::oneshot;
40use tracing::*;
41
42#[derive(Debug)]
43enum EngineReorgState<Engine: EngineTypes> {
44 Forward,
45 Reorg { queue: VecDeque<BeaconEngineMessage<Engine>> },
46}
47
48type EngineReorgResponse = Result<
49 Either<Result<PayloadStatus, BeaconOnNewPayloadError>, RethResult<OnForkChoiceUpdated>>,
50 oneshot::error::RecvError,
51>;
52
53type ReorgResponseFut = Pin<Box<dyn Future<Output = EngineReorgResponse> + Send + Sync>>;
54
55#[derive(Debug)]
57#[pin_project::pin_project]
58pub struct EngineReorg<S, Engine: EngineTypes, Provider, Evm, Spec> {
59 #[pin]
61 stream: S,
62 provider: Provider,
64 evm_config: Evm,
66 payload_validator: ExecutionPayloadValidator<Spec>,
68 frequency: usize,
70 depth: usize,
72 forkchoice_states_forwarded: usize,
75 state: EngineReorgState<Engine>,
77 last_forkchoice_state: Option<ForkchoiceState>,
79 reorg_responses: FuturesUnordered<ReorgResponseFut>,
81}
82
83impl<S, Engine: EngineTypes, Provider, Evm, Spec> EngineReorg<S, Engine, Provider, Evm, Spec> {
84 pub fn new(
86 stream: S,
87 provider: Provider,
88 evm_config: Evm,
89 payload_validator: ExecutionPayloadValidator<Spec>,
90 frequency: usize,
91 depth: usize,
92 ) -> Self {
93 Self {
94 stream,
95 provider,
96 evm_config,
97 payload_validator,
98 frequency,
99 depth,
100 state: EngineReorgState::Forward,
101 forkchoice_states_forwarded: 0,
102 last_forkchoice_state: None,
103 reorg_responses: FuturesUnordered::new(),
104 }
105 }
106}
107
108impl<S, Engine, Provider, Evm, Spec> Stream for EngineReorg<S, Engine, Provider, Evm, Spec>
109where
110 S: Stream<Item = BeaconEngineMessage<Engine>>,
111 Engine: EngineTypes,
112 Provider: BlockReader<Block = reth_primitives::Block> + StateProviderFactory,
113 Evm: ConfigureEvm<Header = Header, Transaction = reth_primitives::TransactionSigned>,
114 Spec: EthereumHardforks,
115{
116 type Item = S::Item;
117
118 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
119 let mut this = self.project();
120
121 loop {
122 if let Poll::Ready(Some(response)) = this.reorg_responses.poll_next_unpin(cx) {
123 match response {
124 Ok(Either::Left(Ok(payload_status))) => {
125 debug!(target: "engine::stream::reorg", ?payload_status, "Received response for reorg new payload");
126 }
127 Ok(Either::Left(Err(payload_error))) => {
128 error!(target: "engine::stream::reorg", %payload_error, "Error on reorg new payload");
129 }
130 Ok(Either::Right(Ok(fcu_status))) => {
131 debug!(target: "engine::stream::reorg", ?fcu_status, "Received response for reorg forkchoice update");
132 }
133 Ok(Either::Right(Err(fcu_error))) => {
134 error!(target: "engine::stream::reorg", %fcu_error, "Error on reorg forkchoice update");
135 }
136 Err(_) => {}
137 };
138 continue
139 }
140
141 if let EngineReorgState::Reorg { queue } = &mut this.state {
142 match queue.pop_front() {
143 Some(msg) => return Poll::Ready(Some(msg)),
144 None => {
145 *this.forkchoice_states_forwarded = 0;
146 *this.state = EngineReorgState::Forward;
147 }
148 }
149 }
150
151 let next = ready!(this.stream.poll_next_unpin(cx));
152 let item = match (next, &this.last_forkchoice_state) {
153 (
154 Some(BeaconEngineMessage::NewPayload { payload, sidecar, tx }),
155 Some(last_forkchoice_state),
156 ) if this.forkchoice_states_forwarded > this.frequency &&
157 last_forkchoice_state.head_block_hash == payload.parent_hash() =>
159 {
160 let (reorg_payload, reorg_sidecar) = match create_reorg_head(
170 this.provider,
171 this.evm_config,
172 this.payload_validator,
173 *this.depth,
174 payload.clone(),
175 sidecar.clone(),
176 ) {
177 Ok(result) => result,
178 Err(error) => {
179 error!(target: "engine::stream::reorg", %error, "Error attempting to create reorg head");
180 return Poll::Ready(Some(BeaconEngineMessage::NewPayload {
183 payload,
184 sidecar,
185 tx,
186 }))
187 }
188 };
189 let reorg_forkchoice_state = ForkchoiceState {
190 finalized_block_hash: last_forkchoice_state.finalized_block_hash,
191 safe_block_hash: last_forkchoice_state.safe_block_hash,
192 head_block_hash: reorg_payload.block_hash(),
193 };
194
195 let (reorg_payload_tx, reorg_payload_rx) = oneshot::channel();
196 let (reorg_fcu_tx, reorg_fcu_rx) = oneshot::channel();
197 this.reorg_responses.extend([
198 Box::pin(reorg_payload_rx.map_ok(Either::Left)) as ReorgResponseFut,
199 Box::pin(reorg_fcu_rx.map_ok(Either::Right)) as ReorgResponseFut,
200 ]);
201
202 let queue = VecDeque::from([
203 BeaconEngineMessage::NewPayload { payload, sidecar, tx },
205 BeaconEngineMessage::NewPayload {
207 payload: reorg_payload,
208 sidecar: reorg_sidecar,
209 tx: reorg_payload_tx,
210 },
211 BeaconEngineMessage::ForkchoiceUpdated {
213 state: reorg_forkchoice_state,
214 payload_attrs: None,
215 tx: reorg_fcu_tx,
216 version: EngineApiMessageVersion::default(),
217 },
218 ]);
219 *this.state = EngineReorgState::Reorg { queue };
220 continue
221 }
222 (
223 Some(BeaconEngineMessage::ForkchoiceUpdated {
224 state,
225 payload_attrs,
226 tx,
227 version,
228 }),
229 _,
230 ) => {
231 *this.last_forkchoice_state = Some(state);
235 *this.forkchoice_states_forwarded += 1;
236 Some(BeaconEngineMessage::ForkchoiceUpdated {
237 state,
238 payload_attrs,
239 tx,
240 version,
241 })
242 }
243 (item, _) => item,
244 };
245 return Poll::Ready(item)
246 }
247 }
248}
249
250fn create_reorg_head<Provider, Evm, Spec>(
251 provider: &Provider,
252 evm_config: &Evm,
253 payload_validator: &ExecutionPayloadValidator<Spec>,
254 mut depth: usize,
255 next_payload: ExecutionPayload,
256 next_sidecar: ExecutionPayloadSidecar,
257) -> RethResult<(ExecutionPayload, ExecutionPayloadSidecar)>
258where
259 Provider: BlockReader<Block = reth_primitives::Block> + StateProviderFactory,
260 Evm: ConfigureEvm<Header = Header, Transaction = reth_primitives::TransactionSigned>,
261 Spec: EthereumHardforks,
262{
263 let chain_spec = payload_validator.chain_spec();
264
265 let next_block = payload_validator
267 .ensure_well_formed_payload(next_payload, next_sidecar)
268 .map_err(RethError::msg)?;
269
270 let mut previous_hash = next_block.parent_hash;
272 let mut candidate_transactions = next_block.body.transactions;
273 let reorg_target = 'target: {
274 loop {
275 let reorg_target = provider
276 .block_by_hash(previous_hash)?
277 .ok_or_else(|| ProviderError::HeaderNotFound(previous_hash.into()))?;
278 if depth == 0 {
279 break 'target reorg_target
280 }
281
282 depth -= 1;
283 previous_hash = reorg_target.parent_hash;
284 candidate_transactions = reorg_target.body.transactions;
285 }
286 };
287 let reorg_target_parent = provider
288 .block_by_hash(reorg_target.parent_hash)?
289 .ok_or_else(|| ProviderError::HeaderNotFound(reorg_target.parent_hash.into()))?;
290
291 debug!(target: "engine::stream::reorg", number = reorg_target.number, hash = %previous_hash, "Selected reorg target");
292
293 let state_provider = provider.state_by_block_hash(reorg_target.parent_hash)?;
295 let mut state = State::builder()
296 .with_database_ref(StateProviderDatabase::new(&state_provider))
297 .with_bundle_update()
298 .build();
299
300 let (cfg, block_env) = evm_config.cfg_and_block_env(&reorg_target.header, U256::MAX);
302 let env = EnvWithHandlerCfg::new_with_cfg_env(cfg, block_env, Default::default());
303 let mut evm = evm_config.evm_with_env(&mut state, env);
304
305 let mut system_caller = SystemCaller::new(evm_config.clone(), chain_spec.clone());
307
308 system_caller.apply_beacon_root_contract_call(
309 reorg_target.timestamp,
310 reorg_target.number,
311 reorg_target.parent_beacon_block_root,
312 &mut evm,
313 )?;
314
315 let mut cumulative_gas_used = 0;
316 let mut sum_blob_gas_used = 0;
317 let mut transactions = Vec::new();
318 let mut receipts = Vec::new();
319 let mut versioned_hashes = Vec::new();
320 for tx in candidate_transactions {
321 if cumulative_gas_used + tx.gas_limit() > reorg_target.gas_limit {
323 continue
324 }
325
326 let tx_recovered = tx.clone().try_into_ecrecovered().map_err(|_| {
328 BlockExecutionError::Validation(BlockValidationError::SenderRecoveryError)
329 })?;
330 evm_config.fill_tx_env(evm.tx_mut(), &tx_recovered, tx_recovered.signer()).map_err(
331 |err| {
332 return RethError::Execution(BlockExecutionError::Validation(
333 BlockValidationError::EVM {
334 hash: tx.hash(),
335 error: Box::new(err.map_db_err(|e| e.into())),
336 },
337 ))
338 },
339 )?;
340 let exec_result = match evm.transact() {
341 Ok(result) => result,
342 error @ Err(EVMError::Transaction(_) | EVMError::Header(_)) => {
343 trace!(target: "engine::stream::reorg", hash = %tx.hash(), ?error, "Error executing transaction from next block");
344 continue
345 }
346 Err(error) => {
348 return Err(RethError::Execution(BlockExecutionError::Validation(
349 BlockValidationError::EVM { hash: tx.hash(), error: Box::new(error) },
350 )))
351 }
352 };
353 evm.db_mut().commit(exec_result.state);
354
355 if let Some(blob_tx) = tx.transaction.as_eip4844() {
356 sum_blob_gas_used += blob_tx.blob_gas();
357 versioned_hashes.extend(blob_tx.blob_versioned_hashes.clone());
358 }
359
360 cumulative_gas_used += exec_result.result.gas_used();
361 #[allow(clippy::needless_update)] receipts.push(Some(Receipt {
363 tx_type: tx.tx_type(),
364 success: exec_result.result.is_success(),
365 cumulative_gas_used,
366 logs: exec_result.result.into_logs().into_iter().map(Into::into).collect(),
367 ..Default::default()
368 }));
369
370 transactions.push(tx);
372 }
373 drop(evm);
374
375 if let Some(withdrawals) = &reorg_target.body.withdrawals {
376 state.increment_balances(post_block_withdrawals_balance_increments(
377 chain_spec,
378 reorg_target.timestamp,
379 withdrawals,
380 ))?;
381 }
382
383 state.merge_transitions(BundleRetention::PlainState);
386
387 let outcome: ExecutionOutcome = ExecutionOutcome::new(
388 state.take_bundle(),
389 Receipts::from(vec![receipts]),
390 reorg_target.number,
391 Default::default(),
392 );
393 let hashed_state = state_provider.hashed_post_state(outcome.state());
394
395 let (blob_gas_used, excess_blob_gas) =
396 if chain_spec.is_cancun_active_at_timestamp(reorg_target.timestamp) {
397 (
398 Some(sum_blob_gas_used),
399 Some(calc_excess_blob_gas(
400 reorg_target_parent.excess_blob_gas.unwrap_or_default(),
401 reorg_target_parent.blob_gas_used.unwrap_or_default(),
402 )),
403 )
404 } else {
405 (None, None)
406 };
407
408 let reorg_block = Block {
409 header: Header {
410 parent_hash: reorg_target.header.parent_hash,
412 ommers_hash: reorg_target.header.ommers_hash,
413 beneficiary: reorg_target.header.beneficiary,
414 difficulty: reorg_target.header.difficulty,
415 number: reorg_target.header.number,
416 gas_limit: reorg_target.header.gas_limit,
417 timestamp: reorg_target.header.timestamp,
418 extra_data: reorg_target.header.extra_data,
419 mix_hash: reorg_target.header.mix_hash,
420 nonce: reorg_target.header.nonce,
421 base_fee_per_gas: reorg_target.header.base_fee_per_gas,
422 parent_beacon_block_root: reorg_target.header.parent_beacon_block_root,
423 withdrawals_root: reorg_target.header.withdrawals_root,
424
425 transactions_root: proofs::calculate_transaction_root(&transactions),
427 receipts_root: outcome.ethereum_receipts_root(reorg_target.header.number).unwrap(),
428 logs_bloom: outcome.block_logs_bloom(reorg_target.header.number).unwrap(),
429 gas_used: cumulative_gas_used,
430 blob_gas_used: blob_gas_used.map(Into::into),
431 excess_blob_gas: excess_blob_gas.map(Into::into),
432 state_root: state_provider.state_root(hashed_state)?,
433 requests_hash: None, target_blobs_per_block: None, },
436 body: BlockBody {
437 transactions,
438 ommers: reorg_target.body.ommers,
439 withdrawals: reorg_target.body.withdrawals,
440 },
441 }
442 .seal_slow();
443
444 Ok((
445 block_to_payload(reorg_block),
446 reorg_target
448 .header
449 .parent_beacon_block_root
450 .map(|root| {
451 ExecutionPayloadSidecar::v3(CancunPayloadFields {
452 parent_beacon_block_root: root,
453 versioned_hashes,
454 })
455 })
456 .unwrap_or_else(ExecutionPayloadSidecar::none),
457 ))
458}