reth_bench/bench/
new_payload_only.rs1use crate::{
4 bench::{
5 context::BenchContext,
6 output::{
7 NewPayloadResult, TotalGasOutput, TotalGasRow, GAS_OUTPUT_SUFFIX,
8 NEW_PAYLOAD_OUTPUT_SUFFIX,
9 },
10 },
11 valid_payload::call_new_payload,
12};
13use alloy_primitives::B256;
14use alloy_provider::Provider;
15use clap::Parser;
16use csv::Writer;
17use reth_cli_runner::CliContext;
18use reth_node_core::args::BenchmarkArgs;
19use reth_primitives::{Block, BlockExt};
20use reth_rpc_types_compat::engine::payload::block_to_payload;
21use std::time::Instant;
22use tracing::{debug, info};
23
24#[derive(Debug, Parser)]
26pub struct Command {
27 #[arg(long, value_name = "RPC_URL", verbatim_doc_comment)]
29 rpc_url: String,
30
31 #[command(flatten)]
32 benchmark: BenchmarkArgs,
33}
34
35impl Command {
36 pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
38 let BenchContext { benchmark_mode, block_provider, auth_provider, mut next_block } =
41 BenchContext::new(&self.benchmark, self.rpc_url).await?;
42
43 let (sender, mut receiver) = tokio::sync::mpsc::channel(1000);
44 tokio::task::spawn(async move {
45 while benchmark_mode.contains(next_block) {
46 let block_res =
47 block_provider.get_block_by_number(next_block.into(), true.into()).await;
48 let block = block_res.unwrap().unwrap();
49 let block_hash = block.header.hash;
50 let block = Block::try_from(block).unwrap().seal(block_hash);
51
52 next_block += 1;
53 sender.send(block).await.unwrap();
54 }
55 });
56
57 let mut results = Vec::new();
59 let total_benchmark_duration = Instant::now();
60
61 while let Some(block) = receiver.recv().await {
62 let gas_used = block.gas_used;
64
65 let versioned_hashes: Vec<B256> =
66 block.body.blob_versioned_hashes().into_iter().copied().collect();
67 let parent_beacon_block_root = block.parent_beacon_block_root;
68 let payload = block_to_payload(block);
69
70 let block_number = payload.block_number();
71
72 debug!(
73 number=?payload.block_number(),
74 "Sending payload to engine",
75 );
76
77 let start = Instant::now();
78 call_new_payload(&auth_provider, payload, parent_beacon_block_root, versioned_hashes)
79 .await?;
80
81 let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
82 info!(%new_payload_result);
83
84 let current_duration = total_benchmark_duration.elapsed();
86
87 let row = TotalGasRow { block_number, gas_used, time: current_duration };
89 results.push((row, new_payload_result));
90 }
91
92 let (gas_output_results, new_payload_results): (_, Vec<NewPayloadResult>) =
93 results.into_iter().unzip();
94
95 if let Some(path) = self.benchmark.output {
97 let output_path = path.join(NEW_PAYLOAD_OUTPUT_SUFFIX);
99 info!("Writing newPayload call latency output to file: {:?}", output_path);
100 let mut writer = Writer::from_path(output_path)?;
101 for result in new_payload_results {
102 writer.serialize(result)?;
103 }
104 writer.flush()?;
105
106 let output_path = path.join(GAS_OUTPUT_SUFFIX);
108 info!("Writing total gas output to file: {:?}", output_path);
109 let mut writer = Writer::from_path(output_path)?;
110 for row in &gas_output_results {
111 writer.serialize(row)?;
112 }
113 writer.flush()?;
114
115 info!("Finished writing benchmark output files to {:?}.", path);
116 }
117
118 let gas_output = TotalGasOutput::new(gas_output_results);
120 info!(
121 total_duration=?gas_output.total_duration,
122 total_gas_used=?gas_output.total_gas_used,
123 blocks_processed=?gas_output.blocks_processed,
124 "Total Ggas/s: {:.4}",
125 gas_output.total_gigagas_per_second()
126 );
127
128 Ok(())
129 }
130}