reth_consensus_debug_client/
client.rs1use alloy_consensus::Transaction;
2use alloy_eips::eip2718::Encodable2718;
3use alloy_primitives::B256;
4use alloy_rpc_types_engine::{ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3};
5use alloy_rpc_types_eth::{Block, BlockTransactions};
6use reth_node_api::EngineTypes;
7use reth_rpc_builder::auth::AuthServerHandle;
8use reth_tracing::tracing::warn;
9use ringbuffer::{AllocRingBuffer, RingBuffer};
10use std::future::Future;
11use tokio::sync::mpsc;
12
13#[auto_impl::auto_impl(&, Arc, Box)]
16pub trait BlockProvider: Send + Sync + 'static {
17 fn subscribe_blocks(&self, tx: mpsc::Sender<Block>) -> impl Future<Output = ()> + Send;
21
22 fn get_block(&self, block_number: u64) -> impl Future<Output = eyre::Result<Block>> + Send;
24
25 fn get_or_fetch_previous_block(
28 &self,
29 previous_block_hashes: &AllocRingBuffer<B256>,
30 current_block_number: u64,
31 offset: usize,
32 ) -> impl Future<Output = eyre::Result<B256>> + Send {
33 async move {
34 let stored_hash = previous_block_hashes
35 .len()
36 .checked_sub(offset)
37 .and_then(|index| previous_block_hashes.get(index));
38 if let Some(hash) = stored_hash {
39 return Ok(*hash);
40 }
41
42 let previous_block_number = match current_block_number.checked_sub(offset as u64) {
44 Some(number) => number,
45 None => return Ok(B256::default()),
46 };
47 let block = self.get_block(previous_block_number).await?;
48 Ok(block.header.hash)
49 }
50 }
51}
52
53#[derive(Debug)]
56pub struct DebugConsensusClient<P: BlockProvider> {
57 auth_server: AuthServerHandle,
59 block_provider: P,
61}
62
63impl<P: BlockProvider> DebugConsensusClient<P> {
64 pub const fn new(auth_server: AuthServerHandle, block_provider: P) -> Self {
67 Self { auth_server, block_provider }
68 }
69}
70
71impl<P: BlockProvider + Clone> DebugConsensusClient<P> {
72 pub async fn run<T: EngineTypes>(self) {
75 let execution_client = self.auth_server.http_client();
76 let mut previous_block_hashes = AllocRingBuffer::new(64);
77
78 let mut block_stream = {
79 let (tx, rx) = mpsc::channel::<Block>(64);
80 let block_provider = self.block_provider.clone();
81 tokio::spawn(async move {
82 block_provider.subscribe_blocks(tx).await;
83 });
84 rx
85 };
86
87 while let Some(block) = block_stream.recv().await {
88 let payload = block_to_execution_payload_v3(block);
89
90 let block_hash = payload.block_hash();
91 let block_number = payload.block_number();
92
93 previous_block_hashes.push(block_hash);
94
95 let _ = reth_rpc_api::EngineApiClient::<T>::new_payload_v3(
97 &execution_client,
98 payload.execution_payload_v3,
99 payload.versioned_hashes,
100 payload.parent_beacon_block_root,
101 )
102 .await
103 .inspect_err(|err| {
104 warn!(target: "consensus::debug-client", %err, %block_hash, %block_number, "failed to submit new payload to execution client");
105 });
106
107 let safe_block_hash = self.block_provider.get_or_fetch_previous_block(
110 &previous_block_hashes,
111 block_number,
112 32,
113 );
114 let finalized_block_hash = self.block_provider.get_or_fetch_previous_block(
115 &previous_block_hashes,
116 block_number,
117 64,
118 );
119 let (safe_block_hash, finalized_block_hash) =
120 tokio::join!(safe_block_hash, finalized_block_hash);
121 let (safe_block_hash, finalized_block_hash) = match (
122 safe_block_hash,
123 finalized_block_hash,
124 ) {
125 (Ok(safe_block_hash), Ok(finalized_block_hash)) => {
126 (safe_block_hash, finalized_block_hash)
127 }
128 (safe_block_hash, finalized_block_hash) => {
129 warn!(target: "consensus::debug-client", ?safe_block_hash, ?finalized_block_hash, "failed to fetch safe or finalized hash from etherscan");
130 continue;
131 }
132 };
133 let state = alloy_rpc_types_engine::ForkchoiceState {
134 head_block_hash: block_hash,
135 safe_block_hash,
136 finalized_block_hash,
137 };
138 let _ = reth_rpc_api::EngineApiClient::<T>::fork_choice_updated_v3(
139 &execution_client,
140 state,
141 None,
142 )
143 .await
144 .inspect_err(|err| {
145 warn!(target: "consensus::debug-client", %err, ?state, "failed to submit fork choice update to execution client");
146 });
147 }
148 }
149}
150
151#[derive(Debug)]
153pub struct ExecutionNewPayload {
154 pub execution_payload_v3: ExecutionPayloadV3,
155 pub versioned_hashes: Vec<B256>,
156 pub parent_beacon_block_root: B256,
157}
158
159impl ExecutionNewPayload {
160 pub const fn block_hash(&self) -> B256 {
162 self.execution_payload_v3.payload_inner.payload_inner.block_hash
163 }
164
165 pub const fn block_number(&self) -> u64 {
167 self.execution_payload_v3.payload_inner.payload_inner.block_number
168 }
169}
170
171pub fn block_to_execution_payload_v3(block: Block) -> ExecutionNewPayload {
174 let transactions = match &block.transactions {
175 BlockTransactions::Full(txs) => txs.clone(),
176 BlockTransactions::Hashes(txs) if txs.is_empty() => vec![],
178 BlockTransactions::Hashes(_) | BlockTransactions::Uncle => {
179 panic!("Received uncle block or hash-only transactions from Etherscan API")
180 }
181 };
182
183 let versioned_hashes = transactions
186 .iter()
187 .flat_map(|tx| tx.blob_versioned_hashes().unwrap_or_default())
188 .copied()
189 .collect();
190
191 let payload: ExecutionPayloadV3 = ExecutionPayloadV3 {
192 payload_inner: ExecutionPayloadV2 {
193 payload_inner: ExecutionPayloadV1 {
194 parent_hash: block.header.parent_hash,
195 fee_recipient: block.header.beneficiary,
196 state_root: block.header.state_root,
197 receipts_root: block.header.receipts_root,
198 logs_bloom: block.header.logs_bloom,
199 prev_randao: block.header.mix_hash,
200 block_number: block.header.number,
201 gas_limit: block.header.gas_limit,
202 gas_used: block.header.gas_used,
203 timestamp: block.header.timestamp,
204 extra_data: block.header.extra_data.clone(),
205 base_fee_per_gas: block.header.base_fee_per_gas.unwrap().try_into().unwrap(),
206 block_hash: block.header.hash,
207 transactions: transactions
208 .into_iter()
209 .map(|tx| tx.inner.encoded_2718().into())
210 .collect(),
211 },
212 withdrawals: block.withdrawals.clone().unwrap_or_default().into_inner(),
213 },
214 blob_gas_used: block.header.blob_gas_used.unwrap(),
215 excess_blob_gas: block.header.excess_blob_gas.unwrap(),
216 };
217
218 ExecutionNewPayload {
219 execution_payload_v3: payload,
220 versioned_hashes,
221 parent_beacon_block_root: block.header.parent_beacon_block_root.unwrap(),
222 }
223}