1use crate::{
2 engine_api::EngineApiTestContext, network::NetworkTestContext, payload::PayloadTestContext,
3 rpc::RpcTestContext, traits::PayloadEnvelopeExt,
4};
5use alloy_consensus::BlockHeader;
6use alloy_eips::BlockId;
7use alloy_primitives::{BlockHash, BlockNumber, Bytes, B256};
8use alloy_rpc_types_engine::PayloadStatusEnum;
9use alloy_rpc_types_eth::BlockNumberOrTag;
10use eyre::Ok;
11use futures_util::Future;
12use reth_chainspec::EthereumHardforks;
13use reth_network_api::test_utils::PeersHandleProvider;
14use reth_node_api::{Block, EngineTypes, FullNodeComponents};
15use reth_node_builder::{rpc::RethRpcAddOns, FullNode, NodeTypes, NodeTypesWithEngine};
16use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes};
17use reth_primitives::EthPrimitives;
18use reth_provider::{
19 BlockReader, BlockReaderIdExt, CanonStateSubscriptions, StageCheckpointReader,
20};
21use reth_rpc_eth_api::helpers::{EthApiSpec, FullEthApi, TraceExt};
22use reth_stages_types::StageId;
23use std::{marker::PhantomData, pin::Pin};
24use tokio_stream::StreamExt;
25use url::Url;
26
27#[allow(missing_debug_implementations)]
29pub struct NodeTestContext<Node, AddOns>
30where
31 Node: FullNodeComponents,
32 AddOns: RethRpcAddOns<Node>,
33{
34 pub inner: FullNode<Node, AddOns>,
36 pub payload: PayloadTestContext<<Node::Types as NodeTypesWithEngine>::Engine>,
38 pub network: NetworkTestContext<Node::Network>,
40 pub engine_api: EngineApiTestContext<
42 <Node::Types as NodeTypesWithEngine>::Engine,
43 <Node::Types as NodeTypes>::ChainSpec,
44 >,
45 pub rpc: RpcTestContext<Node, AddOns::EthApi>,
47}
48
49impl<Node, Engine, AddOns> NodeTestContext<Node, AddOns>
50where
51 Engine: EngineTypes,
52 Node: FullNodeComponents,
53 Node::Types: NodeTypesWithEngine<
54 ChainSpec: EthereumHardforks,
55 Engine = Engine,
56 Primitives = EthPrimitives,
57 >,
58 Node::Network: PeersHandleProvider,
59 AddOns: RethRpcAddOns<Node>,
60{
61 pub async fn new(
63 node: FullNode<Node, AddOns>,
64 attributes_generator: impl Fn(u64) -> Engine::PayloadBuilderAttributes + 'static,
65 ) -> eyre::Result<Self> {
66 let builder = node.payload_builder.clone();
67
68 Ok(Self {
69 inner: node.clone(),
70 payload: PayloadTestContext::new(builder, attributes_generator).await?,
71 network: NetworkTestContext::new(node.network.clone()),
72 engine_api: EngineApiTestContext {
73 chain_spec: node.chain_spec(),
74 engine_api_client: node.auth_server_handle().http_client(),
75 canonical_stream: node.provider.canonical_state_stream(),
76 _marker: PhantomData::<Engine>,
77 },
78 rpc: RpcTestContext { inner: node.add_ons_handle.rpc_registry },
79 })
80 }
81
82 pub async fn connect(&mut self, node: &mut Self) {
84 self.network.add_peer(node.network.record()).await;
85 node.network.next_session_established().await;
86 self.network.next_session_established().await;
87 }
88
89 pub async fn advance(
93 &mut self,
94 length: u64,
95 tx_generator: impl Fn(u64) -> Pin<Box<dyn Future<Output = Bytes>>>,
96 ) -> eyre::Result<Vec<(Engine::BuiltPayload, Engine::PayloadBuilderAttributes)>>
97 where
98 Engine::ExecutionPayloadEnvelopeV3: From<Engine::BuiltPayload> + PayloadEnvelopeExt,
99 Engine::ExecutionPayloadEnvelopeV4: From<Engine::BuiltPayload> + PayloadEnvelopeExt,
100 AddOns::EthApi: EthApiSpec<Provider: BlockReader<Block = reth_primitives::Block>>
101 + FullEthApi
102 + TraceExt,
103 {
104 let mut chain = Vec::with_capacity(length as usize);
105 for i in 0..length {
106 let raw_tx = tx_generator(i).await;
107 let tx_hash = self.rpc.inject_tx(raw_tx).await?;
108 let (payload, eth_attr) = self.advance_block().await?;
109 let block_hash = payload.block().hash();
110 let block_number = payload.block().number;
111 self.assert_new_block(tx_hash, block_hash, block_number).await?;
112 chain.push((payload, eth_attr));
113 }
114 Ok(chain)
115 }
116
117 pub async fn new_payload(
122 &mut self,
123 ) -> eyre::Result<(Engine::BuiltPayload, Engine::PayloadBuilderAttributes)>
124 where
125 <Engine as EngineTypes>::ExecutionPayloadEnvelopeV3:
126 From<Engine::BuiltPayload> + PayloadEnvelopeExt,
127 {
128 let eth_attr = self.payload.new_payload().await.unwrap();
130 self.payload.expect_attr_event(eth_attr.clone()).await?;
132 self.payload.wait_for_built_payload(eth_attr.payload_id()).await;
134 self.engine_api.get_payload_v3_value(eth_attr.payload_id()).await?;
136 Ok((self.payload.expect_built_payload().await?, eth_attr))
138 }
139
140 pub async fn build_and_submit_payload(
142 &mut self,
143 ) -> eyre::Result<(Engine::BuiltPayload, Engine::PayloadBuilderAttributes)>
144 where
145 <Engine as EngineTypes>::ExecutionPayloadEnvelopeV3:
146 From<Engine::BuiltPayload> + PayloadEnvelopeExt,
147 <Engine as EngineTypes>::ExecutionPayloadEnvelopeV4:
148 From<Engine::BuiltPayload> + PayloadEnvelopeExt,
149 {
150 let (payload, eth_attr) = self.new_payload().await?;
151
152 self.engine_api
153 .submit_payload(payload.clone(), eth_attr.clone(), PayloadStatusEnum::Valid)
154 .await?;
155
156 Ok((payload, eth_attr))
157 }
158
159 pub async fn advance_block(
161 &mut self,
162 ) -> eyre::Result<(Engine::BuiltPayload, Engine::PayloadBuilderAttributes)>
163 where
164 <Engine as EngineTypes>::ExecutionPayloadEnvelopeV3:
165 From<Engine::BuiltPayload> + PayloadEnvelopeExt,
166 <Engine as EngineTypes>::ExecutionPayloadEnvelopeV4:
167 From<Engine::BuiltPayload> + PayloadEnvelopeExt,
168 {
169 let (payload, eth_attr) = self.build_and_submit_payload().await?;
170
171 self.engine_api.update_forkchoice(payload.block().hash(), payload.block().hash()).await?;
173
174 Ok((payload, eth_attr))
175 }
176
177 pub async fn wait_block(
179 &self,
180 number: BlockNumber,
181 expected_block_hash: BlockHash,
182 wait_finish_checkpoint: bool,
183 ) -> eyre::Result<()> {
184 let mut check = !wait_finish_checkpoint;
185 loop {
186 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
187
188 if !check && wait_finish_checkpoint {
189 if let Some(checkpoint) =
190 self.inner.provider.get_stage_checkpoint(StageId::Finish)?
191 {
192 if checkpoint.block_number >= number {
193 check = true
194 }
195 }
196 }
197
198 if check {
199 if let Some(latest_block) = self.inner.provider.block_by_number(number)? {
200 assert_eq!(latest_block.header().hash_slow(), expected_block_hash);
201 break
202 }
203 assert!(
204 !wait_finish_checkpoint,
205 "Finish checkpoint matches, but could not fetch block."
206 );
207 }
208 }
209 Ok(())
210 }
211
212 pub async fn wait_unwind(&self, number: BlockNumber) -> eyre::Result<()> {
214 loop {
215 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
216 if let Some(checkpoint) = self.inner.provider.get_stage_checkpoint(StageId::Headers)? {
217 if checkpoint.block_number == number {
218 break
219 }
220 }
221 }
222 Ok(())
223 }
224
225 pub async fn assert_new_block(
230 &mut self,
231 tip_tx_hash: B256,
232 block_hash: B256,
233 block_number: BlockNumber,
234 ) -> eyre::Result<()> {
235 let head = self.engine_api.canonical_stream.next().await.unwrap();
238 let tx = head.tip().transactions().first();
239 assert_eq!(tx.unwrap().hash().as_slice(), tip_tx_hash.as_slice());
240
241 loop {
242 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
244 if let Some(latest_block) =
245 self.inner.provider.block_by_number_or_tag(BlockNumberOrTag::Latest)?
246 {
247 if latest_block.header().number() == block_number {
248 assert_eq!(latest_block.header().hash_slow(), block_hash);
251 break
252 }
253 }
254 }
255 Ok(())
256 }
257
258 pub fn block_hash(&self, number: u64) -> BlockHash {
260 self.inner
261 .provider
262 .sealed_header_by_number_or_tag(BlockNumberOrTag::Number(number))
263 .unwrap()
264 .unwrap()
265 .hash()
266 }
267
268 pub async fn sync_to(&self, block: BlockHash) -> eyre::Result<()> {
270 self.engine_api.update_forkchoice(block, block).await?;
271
272 let start = std::time::Instant::now();
273
274 while self
275 .inner
276 .provider
277 .sealed_header_by_id(BlockId::Number(BlockNumberOrTag::Latest))?
278 .is_none_or(|h| h.hash() != block)
279 {
280 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
281
282 assert!(start.elapsed() <= std::time::Duration::from_secs(30), "timed out");
283 }
284
285 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
289
290 Ok(())
291 }
292
293 pub fn rpc_url(&self) -> Url {
295 let addr = self.inner.rpc_server_handle().http_local_addr().unwrap();
296 format!("http://{}", addr).parse().unwrap()
297 }
298}