reth_engine_util/
reorg.rs

1//! Stream wrapper that simulates reorgs.
2
3use alloy_consensus::{Header, Transaction};
4use alloy_primitives::U256;
5use alloy_rpc_types_engine::{
6    CancunPayloadFields, ExecutionPayload, ExecutionPayloadSidecar, ForkchoiceState, PayloadStatus,
7};
8use futures::{stream::FuturesUnordered, Stream, StreamExt, TryFutureExt};
9use itertools::Either;
10use reth_engine_primitives::{
11    BeaconEngineMessage, BeaconOnNewPayloadError, EngineApiMessageVersion, EngineTypes,
12    OnForkChoiceUpdated,
13};
14use reth_errors::{BlockExecutionError, BlockValidationError, RethError, RethResult};
15use reth_ethereum_forks::EthereumHardforks;
16use reth_evm::{
17    state_change::post_block_withdrawals_balance_increments, system_calls::SystemCaller,
18    ConfigureEvm,
19};
20use reth_payload_validator::ExecutionPayloadValidator;
21use reth_primitives::{
22    proofs, transaction::SignedTransactionIntoRecoveredExt, Block, BlockBody, BlockExt, Receipt,
23    Receipts,
24};
25use reth_provider::{BlockReader, ExecutionOutcome, ProviderError, StateProviderFactory};
26use reth_revm::{
27    database::StateProviderDatabase,
28    db::{states::bundle_state::BundleRetention, State},
29    DatabaseCommit,
30};
31use reth_rpc_types_compat::engine::payload::block_to_payload;
32use revm_primitives::{calc_excess_blob_gas, EVMError, EnvWithHandlerCfg};
33use std::{
34    collections::VecDeque,
35    future::Future,
36    pin::Pin,
37    task::{ready, Context, Poll},
38};
39use tokio::sync::oneshot;
40use tracing::*;
41
42#[derive(Debug)]
43enum EngineReorgState<Engine: EngineTypes> {
44    Forward,
45    Reorg { queue: VecDeque<BeaconEngineMessage<Engine>> },
46}
47
48type EngineReorgResponse = Result<
49    Either<Result<PayloadStatus, BeaconOnNewPayloadError>, RethResult<OnForkChoiceUpdated>>,
50    oneshot::error::RecvError,
51>;
52
53type ReorgResponseFut = Pin<Box<dyn Future<Output = EngineReorgResponse> + Send + Sync>>;
54
55/// Engine API stream wrapper that simulates reorgs with specified frequency.
56#[derive(Debug)]
57#[pin_project::pin_project]
58pub struct EngineReorg<S, Engine: EngineTypes, Provider, Evm, Spec> {
59    /// Underlying stream
60    #[pin]
61    stream: S,
62    /// Database provider.
63    provider: Provider,
64    /// Evm configuration.
65    evm_config: Evm,
66    /// Payload validator.
67    payload_validator: ExecutionPayloadValidator<Spec>,
68    /// The frequency of reorgs.
69    frequency: usize,
70    /// The depth of reorgs.
71    depth: usize,
72    /// The number of forwarded forkchoice states.
73    /// This is reset after a reorg.
74    forkchoice_states_forwarded: usize,
75    /// Current state of the stream.
76    state: EngineReorgState<Engine>,
77    /// Last forkchoice state.
78    last_forkchoice_state: Option<ForkchoiceState>,
79    /// Pending engine responses to reorg messages.
80    reorg_responses: FuturesUnordered<ReorgResponseFut>,
81}
82
83impl<S, Engine: EngineTypes, Provider, Evm, Spec> EngineReorg<S, Engine, Provider, Evm, Spec> {
84    /// Creates new [`EngineReorg`] stream wrapper.
85    pub fn new(
86        stream: S,
87        provider: Provider,
88        evm_config: Evm,
89        payload_validator: ExecutionPayloadValidator<Spec>,
90        frequency: usize,
91        depth: usize,
92    ) -> Self {
93        Self {
94            stream,
95            provider,
96            evm_config,
97            payload_validator,
98            frequency,
99            depth,
100            state: EngineReorgState::Forward,
101            forkchoice_states_forwarded: 0,
102            last_forkchoice_state: None,
103            reorg_responses: FuturesUnordered::new(),
104        }
105    }
106}
107
108impl<S, Engine, Provider, Evm, Spec> Stream for EngineReorg<S, Engine, Provider, Evm, Spec>
109where
110    S: Stream<Item = BeaconEngineMessage<Engine>>,
111    Engine: EngineTypes,
112    Provider: BlockReader<Block = reth_primitives::Block> + StateProviderFactory,
113    Evm: ConfigureEvm<Header = Header, Transaction = reth_primitives::TransactionSigned>,
114    Spec: EthereumHardforks,
115{
116    type Item = S::Item;
117
118    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
119        let mut this = self.project();
120
121        loop {
122            if let Poll::Ready(Some(response)) = this.reorg_responses.poll_next_unpin(cx) {
123                match response {
124                    Ok(Either::Left(Ok(payload_status))) => {
125                        debug!(target: "engine::stream::reorg", ?payload_status, "Received response for reorg new payload");
126                    }
127                    Ok(Either::Left(Err(payload_error))) => {
128                        error!(target: "engine::stream::reorg", %payload_error, "Error on reorg new payload");
129                    }
130                    Ok(Either::Right(Ok(fcu_status))) => {
131                        debug!(target: "engine::stream::reorg", ?fcu_status, "Received response for reorg forkchoice update");
132                    }
133                    Ok(Either::Right(Err(fcu_error))) => {
134                        error!(target: "engine::stream::reorg", %fcu_error, "Error on reorg forkchoice update");
135                    }
136                    Err(_) => {}
137                };
138                continue
139            }
140
141            if let EngineReorgState::Reorg { queue } = &mut this.state {
142                match queue.pop_front() {
143                    Some(msg) => return Poll::Ready(Some(msg)),
144                    None => {
145                        *this.forkchoice_states_forwarded = 0;
146                        *this.state = EngineReorgState::Forward;
147                    }
148                }
149            }
150
151            let next = ready!(this.stream.poll_next_unpin(cx));
152            let item = match (next, &this.last_forkchoice_state) {
153                (
154                    Some(BeaconEngineMessage::NewPayload { payload, sidecar, tx }),
155                    Some(last_forkchoice_state),
156                ) if this.forkchoice_states_forwarded > this.frequency &&
157                        // Only enter reorg state if new payload attaches to current head.
158                        last_forkchoice_state.head_block_hash == payload.parent_hash() =>
159                {
160                    // Enter the reorg state.
161                    // The current payload will be immediately forwarded by being in front of the
162                    // queue. Then we attempt to reorg the current head by generating a payload that
163                    // attaches to the head's parent and is based on the non-conflicting
164                    // transactions (txs from block `n + 1` that are valid at block `n` according to
165                    // consensus checks) from the current payload as well as the corresponding
166                    // forkchoice state. We will rely on CL to reorg us back to canonical chain.
167                    // TODO: This is an expensive blocking operation, ideally it's spawned as a task
168                    // so that the stream could yield the control back.
169                    let (reorg_payload, reorg_sidecar) = match create_reorg_head(
170                        this.provider,
171                        this.evm_config,
172                        this.payload_validator,
173                        *this.depth,
174                        payload.clone(),
175                        sidecar.clone(),
176                    ) {
177                        Ok(result) => result,
178                        Err(error) => {
179                            error!(target: "engine::stream::reorg", %error, "Error attempting to create reorg head");
180                            // Forward the payload and attempt to create reorg on top of
181                            // the next one
182                            return Poll::Ready(Some(BeaconEngineMessage::NewPayload {
183                                payload,
184                                sidecar,
185                                tx,
186                            }))
187                        }
188                    };
189                    let reorg_forkchoice_state = ForkchoiceState {
190                        finalized_block_hash: last_forkchoice_state.finalized_block_hash,
191                        safe_block_hash: last_forkchoice_state.safe_block_hash,
192                        head_block_hash: reorg_payload.block_hash(),
193                    };
194
195                    let (reorg_payload_tx, reorg_payload_rx) = oneshot::channel();
196                    let (reorg_fcu_tx, reorg_fcu_rx) = oneshot::channel();
197                    this.reorg_responses.extend([
198                        Box::pin(reorg_payload_rx.map_ok(Either::Left)) as ReorgResponseFut,
199                        Box::pin(reorg_fcu_rx.map_ok(Either::Right)) as ReorgResponseFut,
200                    ]);
201
202                    let queue = VecDeque::from([
203                        // Current payload
204                        BeaconEngineMessage::NewPayload { payload, sidecar, tx },
205                        // Reorg payload
206                        BeaconEngineMessage::NewPayload {
207                            payload: reorg_payload,
208                            sidecar: reorg_sidecar,
209                            tx: reorg_payload_tx,
210                        },
211                        // Reorg forkchoice state
212                        BeaconEngineMessage::ForkchoiceUpdated {
213                            state: reorg_forkchoice_state,
214                            payload_attrs: None,
215                            tx: reorg_fcu_tx,
216                            version: EngineApiMessageVersion::default(),
217                        },
218                    ]);
219                    *this.state = EngineReorgState::Reorg { queue };
220                    continue
221                }
222                (
223                    Some(BeaconEngineMessage::ForkchoiceUpdated {
224                        state,
225                        payload_attrs,
226                        tx,
227                        version,
228                    }),
229                    _,
230                ) => {
231                    // Record last forkchoice state forwarded to the engine.
232                    // We do not care if it's valid since engine should be able to handle
233                    // reorgs that rely on invalid forkchoice state.
234                    *this.last_forkchoice_state = Some(state);
235                    *this.forkchoice_states_forwarded += 1;
236                    Some(BeaconEngineMessage::ForkchoiceUpdated {
237                        state,
238                        payload_attrs,
239                        tx,
240                        version,
241                    })
242                }
243                (item, _) => item,
244            };
245            return Poll::Ready(item)
246        }
247    }
248}
249
250fn create_reorg_head<Provider, Evm, Spec>(
251    provider: &Provider,
252    evm_config: &Evm,
253    payload_validator: &ExecutionPayloadValidator<Spec>,
254    mut depth: usize,
255    next_payload: ExecutionPayload,
256    next_sidecar: ExecutionPayloadSidecar,
257) -> RethResult<(ExecutionPayload, ExecutionPayloadSidecar)>
258where
259    Provider: BlockReader<Block = reth_primitives::Block> + StateProviderFactory,
260    Evm: ConfigureEvm<Header = Header, Transaction = reth_primitives::TransactionSigned>,
261    Spec: EthereumHardforks,
262{
263    let chain_spec = payload_validator.chain_spec();
264
265    // Ensure next payload is valid.
266    let next_block = payload_validator
267        .ensure_well_formed_payload(next_payload, next_sidecar)
268        .map_err(RethError::msg)?;
269
270    // Fetch reorg target block depending on its depth and its parent.
271    let mut previous_hash = next_block.parent_hash;
272    let mut candidate_transactions = next_block.body.transactions;
273    let reorg_target = 'target: {
274        loop {
275            let reorg_target = provider
276                .block_by_hash(previous_hash)?
277                .ok_or_else(|| ProviderError::HeaderNotFound(previous_hash.into()))?;
278            if depth == 0 {
279                break 'target reorg_target
280            }
281
282            depth -= 1;
283            previous_hash = reorg_target.parent_hash;
284            candidate_transactions = reorg_target.body.transactions;
285        }
286    };
287    let reorg_target_parent = provider
288        .block_by_hash(reorg_target.parent_hash)?
289        .ok_or_else(|| ProviderError::HeaderNotFound(reorg_target.parent_hash.into()))?;
290
291    debug!(target: "engine::stream::reorg", number = reorg_target.number, hash = %previous_hash, "Selected reorg target");
292
293    // Configure state
294    let state_provider = provider.state_by_block_hash(reorg_target.parent_hash)?;
295    let mut state = State::builder()
296        .with_database_ref(StateProviderDatabase::new(&state_provider))
297        .with_bundle_update()
298        .build();
299
300    // Configure environments
301    let (cfg, block_env) = evm_config.cfg_and_block_env(&reorg_target.header, U256::MAX);
302    let env = EnvWithHandlerCfg::new_with_cfg_env(cfg, block_env, Default::default());
303    let mut evm = evm_config.evm_with_env(&mut state, env);
304
305    // apply eip-4788 pre block contract call
306    let mut system_caller = SystemCaller::new(evm_config.clone(), chain_spec.clone());
307
308    system_caller.apply_beacon_root_contract_call(
309        reorg_target.timestamp,
310        reorg_target.number,
311        reorg_target.parent_beacon_block_root,
312        &mut evm,
313    )?;
314
315    let mut cumulative_gas_used = 0;
316    let mut sum_blob_gas_used = 0;
317    let mut transactions = Vec::new();
318    let mut receipts = Vec::new();
319    let mut versioned_hashes = Vec::new();
320    for tx in candidate_transactions {
321        // ensure we still have capacity for this transaction
322        if cumulative_gas_used + tx.gas_limit() > reorg_target.gas_limit {
323            continue
324        }
325
326        // Configure the environment for the block.
327        let tx_recovered = tx.clone().try_into_ecrecovered().map_err(|_| {
328            BlockExecutionError::Validation(BlockValidationError::SenderRecoveryError)
329        })?;
330        evm_config.fill_tx_env(evm.tx_mut(), &tx_recovered, tx_recovered.signer()).map_err(
331            |err| {
332                return RethError::Execution(BlockExecutionError::Validation(
333                    BlockValidationError::EVM {
334                        hash: tx.hash(),
335                        error: Box::new(err.map_db_err(|e| e.into())),
336                    },
337                ))
338            },
339        )?;
340        let exec_result = match evm.transact() {
341            Ok(result) => result,
342            error @ Err(EVMError::Transaction(_) | EVMError::Header(_)) => {
343                trace!(target: "engine::stream::reorg", hash = %tx.hash(), ?error, "Error executing transaction from next block");
344                continue
345            }
346            // Treat error as fatal
347            Err(error) => {
348                return Err(RethError::Execution(BlockExecutionError::Validation(
349                    BlockValidationError::EVM { hash: tx.hash(), error: Box::new(error) },
350                )))
351            }
352        };
353        evm.db_mut().commit(exec_result.state);
354
355        if let Some(blob_tx) = tx.transaction.as_eip4844() {
356            sum_blob_gas_used += blob_tx.blob_gas();
357            versioned_hashes.extend(blob_tx.blob_versioned_hashes.clone());
358        }
359
360        cumulative_gas_used += exec_result.result.gas_used();
361        #[allow(clippy::needless_update)] // side-effect of optimism fields
362        receipts.push(Some(Receipt {
363            tx_type: tx.tx_type(),
364            success: exec_result.result.is_success(),
365            cumulative_gas_used,
366            logs: exec_result.result.into_logs().into_iter().map(Into::into).collect(),
367            ..Default::default()
368        }));
369
370        // append transaction to the list of executed transactions
371        transactions.push(tx);
372    }
373    drop(evm);
374
375    if let Some(withdrawals) = &reorg_target.body.withdrawals {
376        state.increment_balances(post_block_withdrawals_balance_increments(
377            chain_spec,
378            reorg_target.timestamp,
379            withdrawals,
380        ))?;
381    }
382
383    // merge all transitions into bundle state, this would apply the withdrawal balance changes
384    // and 4788 contract call
385    state.merge_transitions(BundleRetention::PlainState);
386
387    let outcome: ExecutionOutcome = ExecutionOutcome::new(
388        state.take_bundle(),
389        Receipts::from(vec![receipts]),
390        reorg_target.number,
391        Default::default(),
392    );
393    let hashed_state = state_provider.hashed_post_state(outcome.state());
394
395    let (blob_gas_used, excess_blob_gas) =
396        if chain_spec.is_cancun_active_at_timestamp(reorg_target.timestamp) {
397            (
398                Some(sum_blob_gas_used),
399                Some(calc_excess_blob_gas(
400                    reorg_target_parent.excess_blob_gas.unwrap_or_default(),
401                    reorg_target_parent.blob_gas_used.unwrap_or_default(),
402                )),
403            )
404        } else {
405            (None, None)
406        };
407
408    let reorg_block = Block {
409        header: Header {
410            // Set same fields as the reorg target
411            parent_hash: reorg_target.header.parent_hash,
412            ommers_hash: reorg_target.header.ommers_hash,
413            beneficiary: reorg_target.header.beneficiary,
414            difficulty: reorg_target.header.difficulty,
415            number: reorg_target.header.number,
416            gas_limit: reorg_target.header.gas_limit,
417            timestamp: reorg_target.header.timestamp,
418            extra_data: reorg_target.header.extra_data,
419            mix_hash: reorg_target.header.mix_hash,
420            nonce: reorg_target.header.nonce,
421            base_fee_per_gas: reorg_target.header.base_fee_per_gas,
422            parent_beacon_block_root: reorg_target.header.parent_beacon_block_root,
423            withdrawals_root: reorg_target.header.withdrawals_root,
424
425            // Compute or add new fields
426            transactions_root: proofs::calculate_transaction_root(&transactions),
427            receipts_root: outcome.ethereum_receipts_root(reorg_target.header.number).unwrap(),
428            logs_bloom: outcome.block_logs_bloom(reorg_target.header.number).unwrap(),
429            gas_used: cumulative_gas_used,
430            blob_gas_used: blob_gas_used.map(Into::into),
431            excess_blob_gas: excess_blob_gas.map(Into::into),
432            state_root: state_provider.state_root(hashed_state)?,
433            requests_hash: None,          // TODO(prague)
434            target_blobs_per_block: None, // TODO(prague)
435        },
436        body: BlockBody {
437            transactions,
438            ommers: reorg_target.body.ommers,
439            withdrawals: reorg_target.body.withdrawals,
440        },
441    }
442    .seal_slow();
443
444    Ok((
445        block_to_payload(reorg_block),
446        // todo(onbjerg): how do we support execution requests?
447        reorg_target
448            .header
449            .parent_beacon_block_root
450            .map(|root| {
451                ExecutionPayloadSidecar::v3(CancunPayloadFields {
452                    parent_beacon_block_root: root,
453                    versioned_hashes,
454                })
455            })
456            .unwrap_or_else(ExecutionPayloadSidecar::none),
457    ))
458}