reth_e2e_test_utils/
node.rs

1use crate::{network::NetworkTestContext, payload::PayloadTestContext, rpc::RpcTestContext};
2use alloy_consensus::BlockHeader;
3use alloy_eips::BlockId;
4use alloy_primitives::{BlockHash, BlockNumber, Bytes, Sealable, B256};
5use alloy_rpc_types_engine::ForkchoiceState;
6use alloy_rpc_types_eth::BlockNumberOrTag;
7use eyre::Ok;
8use futures_util::Future;
9use jsonrpsee::http_client::HttpClient;
10use reth_chainspec::EthereumHardforks;
11use reth_network_api::test_utils::PeersHandleProvider;
12use reth_node_api::{
13    Block, BlockBody, BlockTy, EngineApiMessageVersion, FullNodeComponents, PayloadTypes,
14    PrimitivesTy,
15};
16use reth_node_builder::{rpc::RethRpcAddOns, FullNode, NodeTypes};
17use reth_node_core::primitives::SignedTransaction;
18use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes};
19use reth_provider::{
20    BlockReader, BlockReaderIdExt, CanonStateNotificationStream, CanonStateSubscriptions,
21    StageCheckpointReader,
22};
23use reth_rpc_builder::auth::AuthServerHandle;
24use reth_rpc_eth_api::helpers::{EthApiSpec, TraceExt}; // upstream needs EthTransactions,
25use reth_stages_types::StageId;
26use std::pin::Pin;
27use tokio_stream::StreamExt;
28use url::Url;
29
30// seismic imports that upstream doesn't use
31use reth_rpc_eth_api::helpers::FullEthApi;
32
33/// An helper struct to handle node actions
34#[expect(missing_debug_implementations)]
35pub struct NodeTestContext<Node, AddOns>
36where
37    Node: FullNodeComponents,
38    AddOns: RethRpcAddOns<Node>,
39{
40    /// The core structure representing the full node.
41    pub inner: FullNode<Node, AddOns>,
42    /// Context for testing payload-related features.
43    pub payload: PayloadTestContext<<Node::Types as NodeTypes>::Payload>,
44    /// Context for testing network functionalities.
45    pub network: NetworkTestContext<Node::Network>,
46    /// Context for testing RPC features.
47    pub rpc: RpcTestContext<Node, AddOns::EthApi>,
48    /// Canonical state events.
49    pub canonical_stream: CanonStateNotificationStream<PrimitivesTy<Node::Types>>,
50}
51
52impl<Node, Payload, AddOns> NodeTestContext<Node, AddOns>
53where
54    Payload: PayloadTypes,
55    Node: FullNodeComponents,
56    Node::Types: NodeTypes<ChainSpec: EthereumHardforks, Payload = Payload>,
57    Node::Network: PeersHandleProvider,
58    AddOns: RethRpcAddOns<Node>,
59{
60    /// Creates a new test node
61    pub async fn new(
62        node: FullNode<Node, AddOns>,
63        attributes_generator: impl Fn(u64) -> Payload::PayloadBuilderAttributes + Send + Sync + 'static,
64    ) -> eyre::Result<Self> {
65        Ok(Self {
66            inner: node.clone(),
67            payload: PayloadTestContext::new(
68                node.payload_builder_handle.clone(),
69                attributes_generator,
70            )
71            .await?,
72            network: NetworkTestContext::new(node.network.clone()),
73            rpc: RpcTestContext { inner: node.add_ons_handle.rpc_registry },
74            canonical_stream: node.provider.canonical_state_stream(),
75        })
76    }
77
78    /// Establish a connection to the node
79    pub async fn connect(&mut self, node: &mut Self) {
80        self.network.add_peer(node.network.record()).await;
81        node.network.next_session_established().await;
82        self.network.next_session_established().await;
83    }
84
85    /// Advances the chain `length` blocks.
86    ///
87    /// Returns the added chain as a Vec of block hashes.
88    pub async fn advance(
89        &mut self,
90        length: u64,
91        tx_generator: impl Fn(u64) -> Pin<Box<dyn Future<Output = Bytes>>>,
92    ) -> eyre::Result<Vec<Payload::BuiltPayload>>
93    where
94        AddOns::EthApi:
95            EthApiSpec<Provider: BlockReader<Block = BlockTy<Node::Types>>> + FullEthApi + TraceExt,
96    {
97        let mut chain = Vec::with_capacity(length as usize);
98        for i in 0..length {
99            let raw_tx = tx_generator(i).await;
100            let tx_hash = self.rpc.inject_tx(raw_tx).await?;
101            let payload = self.advance_block().await?;
102            let block_hash = payload.block().hash();
103            let block_number = payload.block().number();
104            self.assert_new_block(tx_hash, block_hash, block_number).await?;
105            chain.push(payload);
106        }
107        Ok(chain)
108    }
109
110    /// Creates a new payload from given attributes generator
111    /// expects a payload attribute event and waits until the payload is built.
112    ///
113    /// It triggers the resolve payload via engine api and expects the built payload event.
114    pub async fn new_payload(&mut self) -> eyre::Result<Payload::BuiltPayload> {
115        // trigger new payload building draining the pool
116        let eth_attr = self.payload.new_payload().await.unwrap();
117        // first event is the payload attributes
118        self.payload.expect_attr_event(eth_attr.clone()).await?;
119        // wait for the payload builder to have finished building
120        self.payload.wait_for_built_payload(eth_attr.payload_id()).await;
121        // ensure we're also receiving the built payload as event
122        Ok(self.payload.expect_built_payload().await?)
123    }
124
125    /// Triggers payload building job and submits it to the engine.
126    pub async fn build_and_submit_payload(&mut self) -> eyre::Result<Payload::BuiltPayload> {
127        let payload = self.new_payload().await?;
128
129        self.submit_payload(payload.clone()).await?;
130
131        Ok(payload)
132    }
133
134    /// Advances the node forward one block
135    pub async fn advance_block(&mut self) -> eyre::Result<Payload::BuiltPayload> {
136        let payload = self.build_and_submit_payload().await?;
137
138        // trigger forkchoice update via engine api to commit the block to the blockchain
139        self.update_forkchoice(payload.block().hash(), payload.block().hash()).await?;
140
141        Ok(payload)
142    }
143
144    /// Waits for block to be available on node.
145    pub async fn wait_block(
146        &self,
147        number: BlockNumber,
148        expected_block_hash: BlockHash,
149        wait_finish_checkpoint: bool,
150    ) -> eyre::Result<()> {
151        let mut check = !wait_finish_checkpoint;
152        loop {
153            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
154
155            if !check && wait_finish_checkpoint {
156                if let Some(checkpoint) =
157                    self.inner.provider.get_stage_checkpoint(StageId::Finish)?
158                {
159                    if checkpoint.block_number >= number {
160                        check = true
161                    }
162                }
163            }
164
165            if check {
166                if let Some(latest_block) = self.inner.provider.block_by_number(number)? {
167                    assert_eq!(latest_block.header().hash_slow(), expected_block_hash);
168                    break
169                }
170                assert!(
171                    !wait_finish_checkpoint,
172                    "Finish checkpoint matches, but could not fetch block."
173                );
174            }
175        }
176        Ok(())
177    }
178
179    /// Waits for the node to unwind to the given block number
180    pub async fn wait_unwind(&self, number: BlockNumber) -> eyre::Result<()> {
181        loop {
182            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
183            if let Some(checkpoint) = self.inner.provider.get_stage_checkpoint(StageId::Headers)? {
184                if checkpoint.block_number == number {
185                    break
186                }
187            }
188        }
189        Ok(())
190    }
191
192    /// Asserts that a new block has been added to the blockchain
193    /// and the tx has been included in the block.
194    ///
195    /// Does NOT work for pipeline since there's no stream notification!
196    pub async fn assert_new_block(
197        &mut self,
198        tip_tx_hash: B256,
199        block_hash: B256,
200        block_number: BlockNumber,
201    ) -> eyre::Result<()> {
202        // get head block from notifications stream and verify the tx has been pushed to the
203        // pool is actually present in the canonical block
204        let head = self.canonical_stream.next().await.unwrap();
205        let tx = head.tip().body().transactions().first();
206        assert_eq!(tx.unwrap().tx_hash().as_slice(), tip_tx_hash.as_slice());
207
208        loop {
209            // wait for the block to commit
210            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
211            if let Some(latest_block) =
212                self.inner.provider.block_by_number_or_tag(BlockNumberOrTag::Latest)?
213            {
214                if latest_block.header().number() == block_number {
215                    // make sure the block hash we submitted via FCU engine api is the new latest
216                    // block using an RPC call
217                    assert_eq!(latest_block.header().hash_slow(), block_hash);
218                    break
219                }
220            }
221        }
222        Ok(())
223    }
224
225    /// Gets block hash by number.
226    pub fn block_hash(&self, number: u64) -> BlockHash {
227        self.inner
228            .provider
229            .sealed_header_by_number_or_tag(BlockNumberOrTag::Number(number))
230            .unwrap()
231            .unwrap()
232            .hash()
233    }
234
235    /// Sends FCU and waits for the node to sync to the given block.
236    pub async fn sync_to(&self, block: BlockHash) -> eyre::Result<()> {
237        let start = std::time::Instant::now();
238
239        while self
240            .inner
241            .provider
242            .sealed_header_by_id(BlockId::Number(BlockNumberOrTag::Latest))?
243            .is_none_or(|h| h.hash() != block)
244        {
245            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
246            self.update_forkchoice(block, block).await?;
247
248            assert!(start.elapsed() <= std::time::Duration::from_secs(40), "timed out");
249        }
250
251        // Hack to make sure that all components have time to process canonical state update.
252        // Otherwise, this might result in e.g "nonce too low" errors when advancing chain further,
253        // making tests flaky.
254        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
255
256        Ok(())
257    }
258
259    /// Sends a forkchoice update message to the engine.
260    pub async fn update_forkchoice(&self, current_head: B256, new_head: B256) -> eyre::Result<()> {
261        self.inner
262            .add_ons_handle
263            .beacon_engine_handle
264            .fork_choice_updated(
265                ForkchoiceState {
266                    head_block_hash: new_head,
267                    safe_block_hash: current_head,
268                    finalized_block_hash: current_head,
269                },
270                None,
271                EngineApiMessageVersion::default(),
272            )
273            .await?;
274
275        Ok(())
276    }
277
278    /// Sends forkchoice update to the engine api with a zero finalized hash
279    pub async fn update_optimistic_forkchoice(&self, hash: B256) -> eyre::Result<()> {
280        self.update_forkchoice(B256::ZERO, hash).await
281    }
282
283    /// Submits a payload to the engine.
284    pub async fn submit_payload(&self, payload: Payload::BuiltPayload) -> eyre::Result<B256> {
285        let block_hash = payload.block().hash();
286        self.inner
287            .add_ons_handle
288            .beacon_engine_handle
289            .new_payload(Payload::block_to_payload(payload.block().clone()))
290            .await?;
291
292        Ok(block_hash)
293    }
294
295    /// Returns the RPC URL.
296    pub fn rpc_url(&self) -> Url {
297        let addr = self.inner.rpc_server_handle().http_local_addr().unwrap();
298        format!("http://{addr}").parse().unwrap()
299    }
300
301    /// Returns an RPC client.
302    pub fn rpc_client(&self) -> Option<HttpClient> {
303        self.inner.rpc_server_handle().http_client()
304    }
305
306    /// Returns an Engine API client.
307    pub fn auth_server_handle(&self) -> AuthServerHandle {
308        self.inner.auth_server_handle().clone()
309    }
310}