1use crate::{network::NetworkTestContext, payload::PayloadTestContext, rpc::RpcTestContext};
2use alloy_consensus::BlockHeader;
3use alloy_eips::BlockId;
4use alloy_primitives::{BlockHash, BlockNumber, Bytes, Sealable, B256};
5use alloy_rpc_types_engine::ForkchoiceState;
6use alloy_rpc_types_eth::BlockNumberOrTag;
7use eyre::Ok;
8use futures_util::Future;
9use jsonrpsee::http_client::HttpClient;
10use reth_chainspec::EthereumHardforks;
11use reth_network_api::test_utils::PeersHandleProvider;
12use reth_node_api::{
13 Block, BlockBody, BlockTy, EngineApiMessageVersion, FullNodeComponents, PayloadTypes,
14 PrimitivesTy,
15};
16use reth_node_builder::{rpc::RethRpcAddOns, FullNode, NodeTypes};
17use reth_node_core::primitives::SignedTransaction;
18use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes};
19use reth_provider::{
20 BlockReader, BlockReaderIdExt, CanonStateNotificationStream, CanonStateSubscriptions,
21 StageCheckpointReader,
22};
23use reth_rpc_builder::auth::AuthServerHandle;
24use reth_rpc_eth_api::helpers::{EthApiSpec, TraceExt}; use reth_stages_types::StageId;
26use std::pin::Pin;
27use tokio_stream::StreamExt;
28use url::Url;
29
30use reth_rpc_eth_api::helpers::FullEthApi;
32
33#[expect(missing_debug_implementations)]
35pub struct NodeTestContext<Node, AddOns>
36where
37 Node: FullNodeComponents,
38 AddOns: RethRpcAddOns<Node>,
39{
40 pub inner: FullNode<Node, AddOns>,
42 pub payload: PayloadTestContext<<Node::Types as NodeTypes>::Payload>,
44 pub network: NetworkTestContext<Node::Network>,
46 pub rpc: RpcTestContext<Node, AddOns::EthApi>,
48 pub canonical_stream: CanonStateNotificationStream<PrimitivesTy<Node::Types>>,
50}
51
52impl<Node, Payload, AddOns> NodeTestContext<Node, AddOns>
53where
54 Payload: PayloadTypes,
55 Node: FullNodeComponents,
56 Node::Types: NodeTypes<ChainSpec: EthereumHardforks, Payload = Payload>,
57 Node::Network: PeersHandleProvider,
58 AddOns: RethRpcAddOns<Node>,
59{
60 pub async fn new(
62 node: FullNode<Node, AddOns>,
63 attributes_generator: impl Fn(u64) -> Payload::PayloadBuilderAttributes + Send + Sync + 'static,
64 ) -> eyre::Result<Self> {
65 Ok(Self {
66 inner: node.clone(),
67 payload: PayloadTestContext::new(
68 node.payload_builder_handle.clone(),
69 attributes_generator,
70 )
71 .await?,
72 network: NetworkTestContext::new(node.network.clone()),
73 rpc: RpcTestContext { inner: node.add_ons_handle.rpc_registry },
74 canonical_stream: node.provider.canonical_state_stream(),
75 })
76 }
77
78 pub async fn connect(&mut self, node: &mut Self) {
80 self.network.add_peer(node.network.record()).await;
81 node.network.next_session_established().await;
82 self.network.next_session_established().await;
83 }
84
85 pub async fn advance(
89 &mut self,
90 length: u64,
91 tx_generator: impl Fn(u64) -> Pin<Box<dyn Future<Output = Bytes>>>,
92 ) -> eyre::Result<Vec<Payload::BuiltPayload>>
93 where
94 AddOns::EthApi:
95 EthApiSpec<Provider: BlockReader<Block = BlockTy<Node::Types>>> + FullEthApi + TraceExt,
96 {
97 let mut chain = Vec::with_capacity(length as usize);
98 for i in 0..length {
99 let raw_tx = tx_generator(i).await;
100 let tx_hash = self.rpc.inject_tx(raw_tx).await?;
101 let payload = self.advance_block().await?;
102 let block_hash = payload.block().hash();
103 let block_number = payload.block().number();
104 self.assert_new_block(tx_hash, block_hash, block_number).await?;
105 chain.push(payload);
106 }
107 Ok(chain)
108 }
109
110 pub async fn new_payload(&mut self) -> eyre::Result<Payload::BuiltPayload> {
115 let eth_attr = self.payload.new_payload().await.unwrap();
117 self.payload.expect_attr_event(eth_attr.clone()).await?;
119 self.payload.wait_for_built_payload(eth_attr.payload_id()).await;
121 Ok(self.payload.expect_built_payload().await?)
123 }
124
125 pub async fn build_and_submit_payload(&mut self) -> eyre::Result<Payload::BuiltPayload> {
127 let payload = self.new_payload().await?;
128
129 self.submit_payload(payload.clone()).await?;
130
131 Ok(payload)
132 }
133
134 pub async fn advance_block(&mut self) -> eyre::Result<Payload::BuiltPayload> {
136 let payload = self.build_and_submit_payload().await?;
137
138 self.update_forkchoice(payload.block().hash(), payload.block().hash()).await?;
140
141 Ok(payload)
142 }
143
144 pub async fn wait_block(
146 &self,
147 number: BlockNumber,
148 expected_block_hash: BlockHash,
149 wait_finish_checkpoint: bool,
150 ) -> eyre::Result<()> {
151 let mut check = !wait_finish_checkpoint;
152 loop {
153 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
154
155 if !check && wait_finish_checkpoint {
156 if let Some(checkpoint) =
157 self.inner.provider.get_stage_checkpoint(StageId::Finish)?
158 {
159 if checkpoint.block_number >= number {
160 check = true
161 }
162 }
163 }
164
165 if check {
166 if let Some(latest_block) = self.inner.provider.block_by_number(number)? {
167 assert_eq!(latest_block.header().hash_slow(), expected_block_hash);
168 break
169 }
170 assert!(
171 !wait_finish_checkpoint,
172 "Finish checkpoint matches, but could not fetch block."
173 );
174 }
175 }
176 Ok(())
177 }
178
179 pub async fn wait_unwind(&self, number: BlockNumber) -> eyre::Result<()> {
181 loop {
182 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
183 if let Some(checkpoint) = self.inner.provider.get_stage_checkpoint(StageId::Headers)? {
184 if checkpoint.block_number == number {
185 break
186 }
187 }
188 }
189 Ok(())
190 }
191
192 pub async fn assert_new_block(
197 &mut self,
198 tip_tx_hash: B256,
199 block_hash: B256,
200 block_number: BlockNumber,
201 ) -> eyre::Result<()> {
202 let head = self.canonical_stream.next().await.unwrap();
205 let tx = head.tip().body().transactions().first();
206 assert_eq!(tx.unwrap().tx_hash().as_slice(), tip_tx_hash.as_slice());
207
208 loop {
209 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
211 if let Some(latest_block) =
212 self.inner.provider.block_by_number_or_tag(BlockNumberOrTag::Latest)?
213 {
214 if latest_block.header().number() == block_number {
215 assert_eq!(latest_block.header().hash_slow(), block_hash);
218 break
219 }
220 }
221 }
222 Ok(())
223 }
224
225 pub fn block_hash(&self, number: u64) -> BlockHash {
227 self.inner
228 .provider
229 .sealed_header_by_number_or_tag(BlockNumberOrTag::Number(number))
230 .unwrap()
231 .unwrap()
232 .hash()
233 }
234
235 pub async fn sync_to(&self, block: BlockHash) -> eyre::Result<()> {
237 let start = std::time::Instant::now();
238
239 while self
240 .inner
241 .provider
242 .sealed_header_by_id(BlockId::Number(BlockNumberOrTag::Latest))?
243 .is_none_or(|h| h.hash() != block)
244 {
245 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
246 self.update_forkchoice(block, block).await?;
247
248 assert!(start.elapsed() <= std::time::Duration::from_secs(40), "timed out");
249 }
250
251 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
255
256 Ok(())
257 }
258
259 pub async fn update_forkchoice(&self, current_head: B256, new_head: B256) -> eyre::Result<()> {
261 self.inner
262 .add_ons_handle
263 .beacon_engine_handle
264 .fork_choice_updated(
265 ForkchoiceState {
266 head_block_hash: new_head,
267 safe_block_hash: current_head,
268 finalized_block_hash: current_head,
269 },
270 None,
271 EngineApiMessageVersion::default(),
272 )
273 .await?;
274
275 Ok(())
276 }
277
278 pub async fn update_optimistic_forkchoice(&self, hash: B256) -> eyre::Result<()> {
280 self.update_forkchoice(B256::ZERO, hash).await
281 }
282
283 pub async fn submit_payload(&self, payload: Payload::BuiltPayload) -> eyre::Result<B256> {
285 let block_hash = payload.block().hash();
286 self.inner
287 .add_ons_handle
288 .beacon_engine_handle
289 .new_payload(Payload::block_to_payload(payload.block().clone()))
290 .await?;
291
292 Ok(block_hash)
293 }
294
295 pub fn rpc_url(&self) -> Url {
297 let addr = self.inner.rpc_server_handle().http_local_addr().unwrap();
298 format!("http://{addr}").parse().unwrap()
299 }
300
301 pub fn rpc_client(&self) -> Option<HttpClient> {
303 self.inner.rpc_server_handle().http_client()
304 }
305
306 pub fn auth_server_handle(&self) -> AuthServerHandle {
308 self.inner.auth_server_handle().clone()
309 }
310}