reth_consensus_debug_client/
client.rs

1use alloy_consensus::Transaction;
2use alloy_eips::eip2718::Encodable2718;
3use alloy_primitives::B256;
4use alloy_rpc_types_engine::{ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3};
5use alloy_rpc_types_eth::{Block, BlockTransactions};
6use reth_node_api::EngineTypes;
7use reth_rpc_builder::auth::AuthServerHandle;
8use reth_tracing::tracing::warn;
9use ringbuffer::{AllocRingBuffer, RingBuffer};
10use std::future::Future;
11use tokio::sync::mpsc;
12
13/// Supplies consensus client with new blocks sent in `tx` and a callback to find specific blocks
14/// by number to fetch past finalized and safe blocks.
15#[auto_impl::auto_impl(&, Arc, Box)]
16pub trait BlockProvider: Send + Sync + 'static {
17    /// Runs a block provider to send new blocks to the given sender.
18    ///
19    /// Note: This is expected to be spawned in a separate task.
20    fn subscribe_blocks(&self, tx: mpsc::Sender<Block>) -> impl Future<Output = ()> + Send;
21
22    /// Get a past block by number.
23    fn get_block(&self, block_number: u64) -> impl Future<Output = eyre::Result<Block>> + Send;
24
25    /// Get previous block hash using previous block hash buffer. If it isn't available (buffer
26    /// started more recently than `offset`), fetch it using `get_block`.
27    fn get_or_fetch_previous_block(
28        &self,
29        previous_block_hashes: &AllocRingBuffer<B256>,
30        current_block_number: u64,
31        offset: usize,
32    ) -> impl Future<Output = eyre::Result<B256>> + Send {
33        async move {
34            let stored_hash = previous_block_hashes
35                .len()
36                .checked_sub(offset)
37                .and_then(|index| previous_block_hashes.get(index));
38            if let Some(hash) = stored_hash {
39                return Ok(*hash);
40            }
41
42            // Return zero hash if the chain isn't long enough to have the block at the offset.
43            let previous_block_number = match current_block_number.checked_sub(offset as u64) {
44                Some(number) => number,
45                None => return Ok(B256::default()),
46            };
47            let block = self.get_block(previous_block_number).await?;
48            Ok(block.header.hash)
49        }
50    }
51}
52
53/// Debug consensus client that sends FCUs and new payloads using recent blocks from an external
54/// provider like Etherscan or an RPC endpoint.
55#[derive(Debug)]
56pub struct DebugConsensusClient<P: BlockProvider> {
57    /// Handle to execution client.
58    auth_server: AuthServerHandle,
59    /// Provider to get consensus blocks from.
60    block_provider: P,
61}
62
63impl<P: BlockProvider> DebugConsensusClient<P> {
64    /// Create a new debug consensus client with the given handle to execution
65    /// client and block provider.
66    pub const fn new(auth_server: AuthServerHandle, block_provider: P) -> Self {
67        Self { auth_server, block_provider }
68    }
69}
70
71impl<P: BlockProvider + Clone> DebugConsensusClient<P> {
72    /// Spawn the client to start sending FCUs and new payloads by periodically fetching recent
73    /// blocks.
74    pub async fn run<T: EngineTypes>(self) {
75        let execution_client = self.auth_server.http_client();
76        let mut previous_block_hashes = AllocRingBuffer::new(64);
77
78        let mut block_stream = {
79            let (tx, rx) = mpsc::channel::<Block>(64);
80            let block_provider = self.block_provider.clone();
81            tokio::spawn(async move {
82                block_provider.subscribe_blocks(tx).await;
83            });
84            rx
85        };
86
87        while let Some(block) = block_stream.recv().await {
88            let payload = block_to_execution_payload_v3(block);
89
90            let block_hash = payload.block_hash();
91            let block_number = payload.block_number();
92
93            previous_block_hashes.push(block_hash);
94
95            // Send new events to execution client
96            let _ = reth_rpc_api::EngineApiClient::<T>::new_payload_v3(
97                &execution_client,
98                payload.execution_payload_v3,
99                payload.versioned_hashes,
100                payload.parent_beacon_block_root,
101            )
102            .await
103                .inspect_err(|err|  {
104                    warn!(target: "consensus::debug-client", %err, %block_hash,  %block_number, "failed to submit new payload to execution client");
105                });
106
107            // Load previous block hashes. We're using (head - 32) and (head - 64) as the safe and
108            // finalized block hashes.
109            let safe_block_hash = self.block_provider.get_or_fetch_previous_block(
110                &previous_block_hashes,
111                block_number,
112                32,
113            );
114            let finalized_block_hash = self.block_provider.get_or_fetch_previous_block(
115                &previous_block_hashes,
116                block_number,
117                64,
118            );
119            let (safe_block_hash, finalized_block_hash) =
120                tokio::join!(safe_block_hash, finalized_block_hash);
121            let (safe_block_hash, finalized_block_hash) = match (
122                safe_block_hash,
123                finalized_block_hash,
124            ) {
125                (Ok(safe_block_hash), Ok(finalized_block_hash)) => {
126                    (safe_block_hash, finalized_block_hash)
127                }
128                (safe_block_hash, finalized_block_hash) => {
129                    warn!(target: "consensus::debug-client", ?safe_block_hash, ?finalized_block_hash, "failed to fetch safe or finalized hash from etherscan");
130                    continue;
131                }
132            };
133            let state = alloy_rpc_types_engine::ForkchoiceState {
134                head_block_hash: block_hash,
135                safe_block_hash,
136                finalized_block_hash,
137            };
138            let _ = reth_rpc_api::EngineApiClient::<T>::fork_choice_updated_v3(
139                &execution_client,
140                state,
141                None,
142            )
143            .await
144            .inspect_err(|err|  {
145                warn!(target: "consensus::debug-client", %err, ?state, "failed to submit fork choice update to execution client");
146            });
147        }
148    }
149}
150
151/// Cancun "new payload" event.
152#[derive(Debug)]
153pub struct ExecutionNewPayload {
154    pub execution_payload_v3: ExecutionPayloadV3,
155    pub versioned_hashes: Vec<B256>,
156    pub parent_beacon_block_root: B256,
157}
158
159impl ExecutionNewPayload {
160    /// Get block hash from block in the payload
161    pub const fn block_hash(&self) -> B256 {
162        self.execution_payload_v3.payload_inner.payload_inner.block_hash
163    }
164
165    /// Get block number from block in the payload
166    pub const fn block_number(&self) -> u64 {
167        self.execution_payload_v3.payload_inner.payload_inner.block_number
168    }
169}
170
171/// Convert a block from RPC / Etherscan to params for an execution client's "new payload"
172/// method. Assumes that the block contains full transactions.
173pub fn block_to_execution_payload_v3(block: Block) -> ExecutionNewPayload {
174    let transactions = match &block.transactions {
175        BlockTransactions::Full(txs) => txs.clone(),
176        // Empty array gets deserialized as BlockTransactions::Hashes.
177        BlockTransactions::Hashes(txs) if txs.is_empty() => vec![],
178        BlockTransactions::Hashes(_) | BlockTransactions::Uncle => {
179            panic!("Received uncle block or hash-only transactions from Etherscan API")
180        }
181    };
182
183    // Concatenate all blob hashes from all transactions in order
184    // https://github.com/ethereum/execution-apis/blob/main/src/engine/cancun.md#specification
185    let versioned_hashes = transactions
186        .iter()
187        .flat_map(|tx| tx.blob_versioned_hashes().unwrap_or_default())
188        .copied()
189        .collect();
190
191    let payload: ExecutionPayloadV3 = ExecutionPayloadV3 {
192        payload_inner: ExecutionPayloadV2 {
193            payload_inner: ExecutionPayloadV1 {
194                parent_hash: block.header.parent_hash,
195                fee_recipient: block.header.beneficiary,
196                state_root: block.header.state_root,
197                receipts_root: block.header.receipts_root,
198                logs_bloom: block.header.logs_bloom,
199                prev_randao: block.header.mix_hash,
200                block_number: block.header.number,
201                gas_limit: block.header.gas_limit,
202                gas_used: block.header.gas_used,
203                timestamp: block.header.timestamp,
204                extra_data: block.header.extra_data.clone(),
205                base_fee_per_gas: block.header.base_fee_per_gas.unwrap().try_into().unwrap(),
206                block_hash: block.header.hash,
207                transactions: transactions
208                    .into_iter()
209                    .map(|tx| tx.inner.encoded_2718().into())
210                    .collect(),
211            },
212            withdrawals: block.withdrawals.clone().unwrap_or_default().into_inner(),
213        },
214        blob_gas_used: block.header.blob_gas_used.unwrap(),
215        excess_blob_gas: block.header.excess_blob_gas.unwrap(),
216    };
217
218    ExecutionNewPayload {
219        execution_payload_v3: payload,
220        versioned_hashes,
221        parent_beacon_block_root: block.header.parent_beacon_block_root.unwrap(),
222    }
223}