reth_e2e_test_utils/
node.rs

1use crate::{
2    engine_api::EngineApiTestContext, network::NetworkTestContext, payload::PayloadTestContext,
3    rpc::RpcTestContext, traits::PayloadEnvelopeExt,
4};
5use alloy_consensus::BlockHeader;
6use alloy_eips::BlockId;
7use alloy_primitives::{BlockHash, BlockNumber, Bytes, B256};
8use alloy_rpc_types_engine::PayloadStatusEnum;
9use alloy_rpc_types_eth::BlockNumberOrTag;
10use eyre::Ok;
11use futures_util::Future;
12use reth_chainspec::EthereumHardforks;
13use reth_network_api::test_utils::PeersHandleProvider;
14use reth_node_api::{Block, EngineTypes, FullNodeComponents};
15use reth_node_builder::{rpc::RethRpcAddOns, FullNode, NodeTypes, NodeTypesWithEngine};
16use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes};
17use reth_primitives::EthPrimitives;
18use reth_provider::{
19    BlockReader, BlockReaderIdExt, CanonStateSubscriptions, StageCheckpointReader,
20};
21use reth_rpc_eth_api::helpers::{EthApiSpec, FullEthApi, TraceExt};
22use reth_stages_types::StageId;
23use std::{marker::PhantomData, pin::Pin};
24use tokio_stream::StreamExt;
25use url::Url;
26
27/// An helper struct to handle node actions
28#[allow(missing_debug_implementations)]
29pub struct NodeTestContext<Node, AddOns>
30where
31    Node: FullNodeComponents,
32    AddOns: RethRpcAddOns<Node>,
33{
34    /// The core structure representing the full node.
35    pub inner: FullNode<Node, AddOns>,
36    /// Context for testing payload-related features.
37    pub payload: PayloadTestContext<<Node::Types as NodeTypesWithEngine>::Engine>,
38    /// Context for testing network functionalities.
39    pub network: NetworkTestContext<Node::Network>,
40    /// Context for testing the Engine API.
41    pub engine_api: EngineApiTestContext<
42        <Node::Types as NodeTypesWithEngine>::Engine,
43        <Node::Types as NodeTypes>::ChainSpec,
44    >,
45    /// Context for testing RPC features.
46    pub rpc: RpcTestContext<Node, AddOns::EthApi>,
47}
48
49impl<Node, Engine, AddOns> NodeTestContext<Node, AddOns>
50where
51    Engine: EngineTypes,
52    Node: FullNodeComponents,
53    Node::Types: NodeTypesWithEngine<
54        ChainSpec: EthereumHardforks,
55        Engine = Engine,
56        Primitives = EthPrimitives,
57    >,
58    Node::Network: PeersHandleProvider,
59    AddOns: RethRpcAddOns<Node>,
60{
61    /// Creates a new test node
62    pub async fn new(
63        node: FullNode<Node, AddOns>,
64        attributes_generator: impl Fn(u64) -> Engine::PayloadBuilderAttributes + 'static,
65    ) -> eyre::Result<Self> {
66        let builder = node.payload_builder.clone();
67
68        Ok(Self {
69            inner: node.clone(),
70            payload: PayloadTestContext::new(builder, attributes_generator).await?,
71            network: NetworkTestContext::new(node.network.clone()),
72            engine_api: EngineApiTestContext {
73                chain_spec: node.chain_spec(),
74                engine_api_client: node.auth_server_handle().http_client(),
75                canonical_stream: node.provider.canonical_state_stream(),
76                _marker: PhantomData::<Engine>,
77            },
78            rpc: RpcTestContext { inner: node.add_ons_handle.rpc_registry },
79        })
80    }
81
82    /// Establish a connection to the node
83    pub async fn connect(&mut self, node: &mut Self) {
84        self.network.add_peer(node.network.record()).await;
85        node.network.next_session_established().await;
86        self.network.next_session_established().await;
87    }
88
89    /// Advances the chain `length` blocks.
90    ///
91    /// Returns the added chain as a Vec of block hashes.
92    pub async fn advance(
93        &mut self,
94        length: u64,
95        tx_generator: impl Fn(u64) -> Pin<Box<dyn Future<Output = Bytes>>>,
96    ) -> eyre::Result<Vec<(Engine::BuiltPayload, Engine::PayloadBuilderAttributes)>>
97    where
98        Engine::ExecutionPayloadEnvelopeV3: From<Engine::BuiltPayload> + PayloadEnvelopeExt,
99        Engine::ExecutionPayloadEnvelopeV4: From<Engine::BuiltPayload> + PayloadEnvelopeExt,
100        AddOns::EthApi: EthApiSpec<Provider: BlockReader<Block = reth_primitives::Block>>
101            + FullEthApi
102            + TraceExt,
103    {
104        let mut chain = Vec::with_capacity(length as usize);
105        for i in 0..length {
106            let raw_tx = tx_generator(i).await;
107            let tx_hash = self.rpc.inject_tx(raw_tx).await?;
108            let (payload, eth_attr) = self.advance_block().await?;
109            let block_hash = payload.block().hash();
110            let block_number = payload.block().number;
111            self.assert_new_block(tx_hash, block_hash, block_number).await?;
112            chain.push((payload, eth_attr));
113        }
114        Ok(chain)
115    }
116
117    /// Creates a new payload from given attributes generator
118    /// expects a payload attribute event and waits until the payload is built.
119    ///
120    /// It triggers the resolve payload via engine api and expects the built payload event.
121    pub async fn new_payload(
122        &mut self,
123    ) -> eyre::Result<(Engine::BuiltPayload, Engine::PayloadBuilderAttributes)>
124    where
125        <Engine as EngineTypes>::ExecutionPayloadEnvelopeV3:
126            From<Engine::BuiltPayload> + PayloadEnvelopeExt,
127    {
128        // trigger new payload building draining the pool
129        let eth_attr = self.payload.new_payload().await.unwrap();
130        // first event is the payload attributes
131        self.payload.expect_attr_event(eth_attr.clone()).await?;
132        // wait for the payload builder to have finished building
133        self.payload.wait_for_built_payload(eth_attr.payload_id()).await;
134        // trigger resolve payload via engine api
135        self.engine_api.get_payload_v3_value(eth_attr.payload_id()).await?;
136        // ensure we're also receiving the built payload as event
137        Ok((self.payload.expect_built_payload().await?, eth_attr))
138    }
139
140    /// Triggers payload building job and submits it to the engine.
141    pub async fn build_and_submit_payload(
142        &mut self,
143    ) -> eyre::Result<(Engine::BuiltPayload, Engine::PayloadBuilderAttributes)>
144    where
145        <Engine as EngineTypes>::ExecutionPayloadEnvelopeV3:
146            From<Engine::BuiltPayload> + PayloadEnvelopeExt,
147        <Engine as EngineTypes>::ExecutionPayloadEnvelopeV4:
148            From<Engine::BuiltPayload> + PayloadEnvelopeExt,
149    {
150        let (payload, eth_attr) = self.new_payload().await?;
151
152        self.engine_api
153            .submit_payload(payload.clone(), eth_attr.clone(), PayloadStatusEnum::Valid)
154            .await?;
155
156        Ok((payload, eth_attr))
157    }
158
159    /// Advances the node forward one block
160    pub async fn advance_block(
161        &mut self,
162    ) -> eyre::Result<(Engine::BuiltPayload, Engine::PayloadBuilderAttributes)>
163    where
164        <Engine as EngineTypes>::ExecutionPayloadEnvelopeV3:
165            From<Engine::BuiltPayload> + PayloadEnvelopeExt,
166        <Engine as EngineTypes>::ExecutionPayloadEnvelopeV4:
167            From<Engine::BuiltPayload> + PayloadEnvelopeExt,
168    {
169        let (payload, eth_attr) = self.build_and_submit_payload().await?;
170
171        // trigger forkchoice update via engine api to commit the block to the blockchain
172        self.engine_api.update_forkchoice(payload.block().hash(), payload.block().hash()).await?;
173
174        Ok((payload, eth_attr))
175    }
176
177    /// Waits for block to be available on node.
178    pub async fn wait_block(
179        &self,
180        number: BlockNumber,
181        expected_block_hash: BlockHash,
182        wait_finish_checkpoint: bool,
183    ) -> eyre::Result<()> {
184        let mut check = !wait_finish_checkpoint;
185        loop {
186            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
187
188            if !check && wait_finish_checkpoint {
189                if let Some(checkpoint) =
190                    self.inner.provider.get_stage_checkpoint(StageId::Finish)?
191                {
192                    if checkpoint.block_number >= number {
193                        check = true
194                    }
195                }
196            }
197
198            if check {
199                if let Some(latest_block) = self.inner.provider.block_by_number(number)? {
200                    assert_eq!(latest_block.header().hash_slow(), expected_block_hash);
201                    break
202                }
203                assert!(
204                    !wait_finish_checkpoint,
205                    "Finish checkpoint matches, but could not fetch block."
206                );
207            }
208        }
209        Ok(())
210    }
211
212    /// Waits for the node to unwind to the given block number
213    pub async fn wait_unwind(&self, number: BlockNumber) -> eyre::Result<()> {
214        loop {
215            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
216            if let Some(checkpoint) = self.inner.provider.get_stage_checkpoint(StageId::Headers)? {
217                if checkpoint.block_number == number {
218                    break
219                }
220            }
221        }
222        Ok(())
223    }
224
225    /// Asserts that a new block has been added to the blockchain
226    /// and the tx has been included in the block.
227    ///
228    /// Does NOT work for pipeline since there's no stream notification!
229    pub async fn assert_new_block(
230        &mut self,
231        tip_tx_hash: B256,
232        block_hash: B256,
233        block_number: BlockNumber,
234    ) -> eyre::Result<()> {
235        // get head block from notifications stream and verify the tx has been pushed to the
236        // pool is actually present in the canonical block
237        let head = self.engine_api.canonical_stream.next().await.unwrap();
238        let tx = head.tip().transactions().first();
239        assert_eq!(tx.unwrap().hash().as_slice(), tip_tx_hash.as_slice());
240
241        loop {
242            // wait for the block to commit
243            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
244            if let Some(latest_block) =
245                self.inner.provider.block_by_number_or_tag(BlockNumberOrTag::Latest)?
246            {
247                if latest_block.header().number() == block_number {
248                    // make sure the block hash we submitted via FCU engine api is the new latest
249                    // block using an RPC call
250                    assert_eq!(latest_block.header().hash_slow(), block_hash);
251                    break
252                }
253            }
254        }
255        Ok(())
256    }
257
258    /// Gets block hash by number.
259    pub fn block_hash(&self, number: u64) -> BlockHash {
260        self.inner
261            .provider
262            .sealed_header_by_number_or_tag(BlockNumberOrTag::Number(number))
263            .unwrap()
264            .unwrap()
265            .hash()
266    }
267
268    /// Sends FCU and waits for the node to sync to the given block.
269    pub async fn sync_to(&self, block: BlockHash) -> eyre::Result<()> {
270        self.engine_api.update_forkchoice(block, block).await?;
271
272        let start = std::time::Instant::now();
273
274        while self
275            .inner
276            .provider
277            .sealed_header_by_id(BlockId::Number(BlockNumberOrTag::Latest))?
278            .is_none_or(|h| h.hash() != block)
279        {
280            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
281
282            assert!(start.elapsed() <= std::time::Duration::from_secs(30), "timed out");
283        }
284
285        // Hack to make sure that all components have time to process canonical state update.
286        // Otherwise, this might result in e.g "nonce too low" errors when advancing chain further,
287        // making tests flaky.
288        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
289
290        Ok(())
291    }
292
293    /// Returns the RPC URL.
294    pub fn rpc_url(&self) -> Url {
295        let addr = self.inner.rpc_server_handle().http_local_addr().unwrap();
296        format!("http://{}", addr).parse().unwrap()
297    }
298}