reth_bench/bench/
new_payload_fcu.rs

1//! Runs the `reth bench` command, calling first newPayload for each block, then calling
2//! forkchoiceUpdated.
3
4use crate::{
5    bench::{
6        context::BenchContext,
7        output::{
8            CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow, COMBINED_OUTPUT_SUFFIX,
9            GAS_OUTPUT_SUFFIX,
10        },
11    },
12    valid_payload::{call_forkchoice_updated, call_new_payload},
13};
14use alloy_primitives::B256;
15use alloy_provider::Provider;
16use alloy_rpc_types_engine::ForkchoiceState;
17use clap::Parser;
18use csv::Writer;
19use reth_cli_runner::CliContext;
20use reth_node_core::args::BenchmarkArgs;
21use reth_primitives::{Block, BlockExt};
22use reth_rpc_types_compat::engine::payload::block_to_payload;
23use std::time::Instant;
24use tracing::{debug, info};
25
26/// `reth benchmark new-payload-fcu` command
27#[derive(Debug, Parser)]
28pub struct Command {
29    /// The RPC url to use for getting data.
30    #[arg(long, value_name = "RPC_URL", verbatim_doc_comment)]
31    rpc_url: String,
32
33    #[command(flatten)]
34    benchmark: BenchmarkArgs,
35}
36
37impl Command {
38    /// Execute `benchmark new-payload-fcu` command
39    pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
40        let BenchContext { benchmark_mode, block_provider, auth_provider, mut next_block } =
41            BenchContext::new(&self.benchmark, self.rpc_url).await?;
42
43        let (sender, mut receiver) = tokio::sync::mpsc::channel(1000);
44        tokio::task::spawn(async move {
45            while benchmark_mode.contains(next_block) {
46                let block_res =
47                    block_provider.get_block_by_number(next_block.into(), true.into()).await;
48                let block = block_res.unwrap().unwrap();
49                let block_hash = block.header.hash;
50                let block = Block::try_from(block).unwrap().seal(block_hash);
51                let head_block_hash = block.hash();
52                let safe_block_hash = block_provider
53                    .get_block_by_number(block.number.saturating_sub(32).into(), false.into());
54
55                let finalized_block_hash = block_provider
56                    .get_block_by_number(block.number.saturating_sub(64).into(), false.into());
57
58                let (safe, finalized) = tokio::join!(safe_block_hash, finalized_block_hash,);
59
60                let safe_block_hash = safe.unwrap().expect("finalized block exists").header.hash;
61                let finalized_block_hash =
62                    finalized.unwrap().expect("finalized block exists").header.hash;
63
64                next_block += 1;
65                sender
66                    .send((block, head_block_hash, safe_block_hash, finalized_block_hash))
67                    .await
68                    .unwrap();
69            }
70        });
71
72        // put results in a summary vec so they can be printed at the end
73        let mut results = Vec::new();
74        let total_benchmark_duration = Instant::now();
75
76        while let Some((block, head, safe, finalized)) = receiver.recv().await {
77            // just put gas used here
78            let gas_used = block.gas_used;
79            let block_number = block.header.number;
80
81            let versioned_hashes: Vec<B256> =
82                block.body.blob_versioned_hashes().into_iter().copied().collect();
83            let parent_beacon_block_root = block.parent_beacon_block_root;
84            let payload = block_to_payload(block);
85
86            debug!(?block_number, "Sending payload",);
87
88            // construct fcu to call
89            let forkchoice_state = ForkchoiceState {
90                head_block_hash: head,
91                safe_block_hash: safe,
92                finalized_block_hash: finalized,
93            };
94
95            let start = Instant::now();
96            let message_version = call_new_payload(
97                &auth_provider,
98                payload,
99                parent_beacon_block_root,
100                versioned_hashes,
101            )
102            .await?;
103
104            let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
105
106            call_forkchoice_updated(&auth_provider, message_version, forkchoice_state, None)
107                .await?;
108
109            // calculate the total duration and the fcu latency, record
110            let total_latency = start.elapsed();
111            let fcu_latency = total_latency - new_payload_result.latency;
112            let combined_result =
113                CombinedResult { block_number, new_payload_result, fcu_latency, total_latency };
114
115            // current duration since the start of the benchmark
116            let current_duration = total_benchmark_duration.elapsed();
117
118            // convert gas used to gigagas, then compute gigagas per second
119            info!(%combined_result);
120
121            // record the current result
122            let gas_row = TotalGasRow { block_number, gas_used, time: current_duration };
123            results.push((gas_row, combined_result));
124        }
125
126        let (gas_output_results, combined_results): (_, Vec<CombinedResult>) =
127            results.into_iter().unzip();
128
129        // write the csv output to files
130        if let Some(path) = self.benchmark.output {
131            // first write the combined results to a file
132            let output_path = path.join(COMBINED_OUTPUT_SUFFIX);
133            info!("Writing engine api call latency output to file: {:?}", output_path);
134            let mut writer = Writer::from_path(output_path)?;
135            for result in combined_results {
136                writer.serialize(result)?;
137            }
138            writer.flush()?;
139
140            // now write the gas output to a file
141            let output_path = path.join(GAS_OUTPUT_SUFFIX);
142            info!("Writing total gas output to file: {:?}", output_path);
143            let mut writer = Writer::from_path(output_path)?;
144            for row in &gas_output_results {
145                writer.serialize(row)?;
146            }
147            writer.flush()?;
148
149            info!("Finished writing benchmark output files to {:?}.", path);
150        }
151
152        // accumulate the results and calculate the overall Ggas/s
153        let gas_output = TotalGasOutput::new(gas_output_results);
154        info!(
155            total_duration=?gas_output.total_duration,
156            total_gas_used=?gas_output.total_gas_used,
157            blocks_processed=?gas_output.blocks_processed,
158            "Total Ggas/s: {:.4}",
159            gas_output.total_gigagas_per_second()
160        );
161
162        Ok(())
163    }
164}