reth_e2e_test_utils/testsuite/
actions.rs

1//! Actions that can be performed in tests.
2
3use crate::testsuite::Environment;
4use alloy_primitives::{Bytes, B256, U256};
5use alloy_rpc_types_engine::{
6    payload::ExecutionPayloadEnvelopeV3, ExecutionPayloadV3, ForkchoiceState, PayloadAttributes,
7    PayloadStatusEnum,
8};
9use alloy_rpc_types_eth::{Block, Header, Receipt, Transaction};
10use eyre::Result;
11use futures_util::future::BoxFuture;
12use reth_node_api::{EngineTypes, PayloadTypes};
13use reth_rpc_api::clients::{EngineApiClient, EthApiClient};
14use std::{future::Future, marker::PhantomData, time::Duration};
15use tokio::time::sleep;
16use tracing::debug;
17
18/// An action that can be performed on an instance.
19///
20/// Actions execute operations and potentially make assertions in a single step.
21/// The action name indicates what it does (e.g., `AssertMineBlock` would both
22/// mine a block and assert it worked).
23pub trait Action<I>: Send + 'static {
24    /// Executes the action
25    fn execute<'a>(&'a mut self, env: &'a mut Environment<I>) -> BoxFuture<'a, Result<()>>;
26}
27
28/// Simplified action container for storage in tests
29#[expect(missing_debug_implementations)]
30pub struct ActionBox<I>(Box<dyn Action<I>>);
31
32impl<I: 'static> ActionBox<I> {
33    /// Constructor for [`ActionBox`].
34    pub fn new<A: Action<I>>(action: A) -> Self {
35        Self(Box::new(action))
36    }
37
38    /// Executes an [`ActionBox`] with the given [`Environment`] reference.
39    pub async fn execute(mut self, env: &mut Environment<I>) -> Result<()> {
40        self.0.execute(env).await
41    }
42}
43
44/// Implementation of `Action` for any function/closure that takes an Environment
45/// reference and returns a Future resolving to Result<()>.
46///
47/// This allows using closures directly as actions with `.with_action(async move |env| {...})`.
48impl<I, F, Fut> Action<I> for F
49where
50    F: FnMut(&Environment<I>) -> Fut + Send + 'static,
51    Fut: Future<Output = Result<()>> + Send + 'static,
52{
53    fn execute<'a>(&'a mut self, env: &'a mut Environment<I>) -> BoxFuture<'a, Result<()>> {
54        Box::pin(self(env))
55    }
56}
57
58/// Mine a single block with the given transactions and verify the block was created
59/// successfully.
60#[derive(Debug)]
61pub struct AssertMineBlock<Engine>
62where
63    Engine: PayloadTypes,
64{
65    /// The node index to mine
66    pub node_idx: usize,
67    /// Transactions to include in the block
68    pub transactions: Vec<Bytes>,
69    /// Expected block hash (optional)
70    pub expected_hash: Option<B256>,
71    /// Block's payload attributes
72    // TODO: refactor once we have actions to generate payload attributes.
73    pub payload_attributes: Engine::PayloadAttributes,
74    /// Tracks engine type
75    _phantom: PhantomData<Engine>,
76}
77
78impl<Engine> AssertMineBlock<Engine>
79where
80    Engine: PayloadTypes,
81{
82    /// Create a new `AssertMineBlock` action
83    pub fn new(
84        node_idx: usize,
85        transactions: Vec<Bytes>,
86        expected_hash: Option<B256>,
87        payload_attributes: Engine::PayloadAttributes,
88    ) -> Self {
89        Self {
90            node_idx,
91            transactions,
92            expected_hash,
93            payload_attributes,
94            _phantom: Default::default(),
95        }
96    }
97}
98
99impl<Engine> Action<Engine> for AssertMineBlock<Engine>
100where
101    Engine: EngineTypes,
102{
103    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
104        Box::pin(async move {
105            if self.node_idx >= env.node_clients.len() {
106                return Err(eyre::eyre!("Node index out of bounds: {}", self.node_idx));
107            }
108
109            let node_client = &env.node_clients[self.node_idx];
110            let rpc_client = &node_client.rpc;
111            let engine_client = node_client.engine.http_client();
112
113            // get the latest block to use as parent
114            let latest_block =
115                EthApiClient::<Transaction, Block, Receipt, Header>::block_by_number(
116                    rpc_client,
117                    alloy_eips::BlockNumberOrTag::Latest,
118                    false,
119                )
120                .await?;
121
122            let latest_block = latest_block.ok_or_else(|| eyre::eyre!("Latest block not found"))?;
123            let parent_hash = latest_block.header.hash;
124
125            debug!("Latest block hash: {parent_hash}");
126
127            // create a simple forkchoice state with the latest block as head
128            let fork_choice_state = ForkchoiceState {
129                head_block_hash: parent_hash,
130                safe_block_hash: parent_hash,
131                finalized_block_hash: parent_hash,
132            };
133
134            let fcu_result = EngineApiClient::<Engine>::fork_choice_updated_v2(
135                &engine_client,
136                fork_choice_state,
137                Some(self.payload_attributes.clone()),
138            )
139            .await?;
140
141            debug!("FCU result: {:?}", fcu_result);
142
143            // check if we got a valid payload ID
144            match fcu_result.payload_status.status {
145                PayloadStatusEnum::Valid => {
146                    if let Some(payload_id) = fcu_result.payload_id {
147                        debug!("Got payload ID: {payload_id}");
148
149                        // get the payload that was built
150                        let _engine_payload =
151                            EngineApiClient::<Engine>::get_payload_v2(&engine_client, payload_id)
152                                .await?;
153                        Ok(())
154                    } else {
155                        Err(eyre::eyre!("No payload ID returned from forkchoiceUpdated"))
156                    }
157                }
158                _ => Err(eyre::eyre!("Payload status not valid: {:?}", fcu_result.payload_status)),
159            }
160        })
161    }
162}
163/// Pick the next block producer based on the latest block information.
164#[derive(Debug, Default)]
165pub struct PickNextBlockProducer {}
166
167impl PickNextBlockProducer {
168    /// Create a new `PickNextBlockProducer` action
169    pub const fn new() -> Self {
170        Self {}
171    }
172}
173
174impl<Engine> Action<Engine> for PickNextBlockProducer
175where
176    Engine: EngineTypes,
177{
178    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
179        Box::pin(async move {
180            let num_clients = env.node_clients.len();
181            if num_clients == 0 {
182                return Err(eyre::eyre!("No node clients available"));
183            }
184
185            let latest_info = env
186                .latest_block_info
187                .as_ref()
188                .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
189
190            // Calculate the starting index based on the latest block number
191            let start_idx = ((latest_info.number + 1) % num_clients as u64) as usize;
192
193            for i in 0..num_clients {
194                let idx = (start_idx + i) % num_clients;
195                let node_client = &env.node_clients[idx];
196                let rpc_client = &node_client.rpc;
197
198                let latest_block =
199                    EthApiClient::<Transaction, Block, Receipt, Header>::block_by_number(
200                        rpc_client,
201                        alloy_eips::BlockNumberOrTag::Latest,
202                        false,
203                    )
204                    .await?;
205
206                if let Some(block) = latest_block {
207                    let block_number = block.header.number;
208                    let block_hash = block.header.hash;
209
210                    // Check if the block hash and number match the latest block info
211                    if block_hash == latest_info.hash && block_number == latest_info.number {
212                        env.last_producer_idx = Some(idx);
213                        debug!("Selected node {} as the next block producer", idx);
214                        return Ok(());
215                    }
216                }
217            }
218
219            Err(eyre::eyre!("No suitable block producer found"))
220        })
221    }
222}
223
224/// Store payload attributes for the next block.
225#[derive(Debug, Default)]
226pub struct GeneratePayloadAttributes {}
227
228impl<Engine> Action<Engine> for GeneratePayloadAttributes
229where
230    Engine: EngineTypes,
231{
232    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
233        Box::pin(async move {
234            let latest_block = env
235                .latest_block_info
236                .as_ref()
237                .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
238            let block_number = latest_block.number;
239            let timestamp = env.latest_header_time + env.block_timestamp_increment;
240            let payload_attributes = alloy_rpc_types_engine::PayloadAttributes {
241                timestamp,
242                prev_randao: B256::random(),
243                suggested_fee_recipient: alloy_primitives::Address::random(),
244                withdrawals: Some(vec![]),
245                parent_beacon_block_root: Some(B256::ZERO),
246            };
247
248            env.payload_attributes.insert(latest_block.number + 1, payload_attributes);
249            debug!("Stored payload attributes for block {}", block_number + 1);
250            Ok(())
251        })
252    }
253}
254/// Action that generates the next payload
255#[derive(Debug, Default)]
256pub struct GenerateNextPayload {}
257
258impl<Engine> Action<Engine> for GenerateNextPayload
259where
260    Engine: EngineTypes + PayloadTypes<PayloadAttributes = PayloadAttributes>,
261    reth_node_ethereum::engine::EthPayloadAttributes:
262        From<<Engine as EngineTypes>::ExecutionPayloadEnvelopeV3>,
263{
264    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
265        Box::pin(async move {
266            let latest_block = env
267                .latest_block_info
268                .as_ref()
269                .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
270
271            let parent_hash = latest_block.hash;
272            debug!("Latest block hash: {parent_hash}");
273
274            let fork_choice_state = ForkchoiceState {
275                head_block_hash: parent_hash,
276                safe_block_hash: parent_hash,
277                finalized_block_hash: parent_hash,
278            };
279
280            let payload_attributes: PayloadAttributes = env
281                .payload_attributes
282                .get(&latest_block.number)
283                .cloned()
284                .ok_or_else(|| eyre::eyre!("No payload attributes found for latest block"))?;
285
286            let fcu_result = EngineApiClient::<Engine>::fork_choice_updated_v3(
287                &env.node_clients[0].engine.http_client(),
288                fork_choice_state,
289                Some(payload_attributes.clone()),
290            )
291            .await?;
292
293            debug!("FCU result: {:?}", fcu_result);
294
295            let payload_id = fcu_result
296                .payload_id
297                .ok_or_else(|| eyre::eyre!("No payload ID returned from forkChoiceUpdated"))?;
298
299            debug!("Received payload ID: {:?}", payload_id);
300            env.next_payload_id = Some(payload_id);
301
302            sleep(Duration::from_secs(1)).await;
303
304            let built_payload: PayloadAttributes = EngineApiClient::<Engine>::get_payload_v3(
305                &env.node_clients[0].engine.http_client(),
306                payload_id,
307            )
308            .await?
309            .into();
310            env.payload_id_history.insert(latest_block.number + 1, payload_id);
311            env.latest_payload_built = Some(built_payload);
312
313            Ok(())
314        })
315    }
316}
317
318///Action that broadcasts the latest fork choice state to all clients
319#[derive(Debug, Default)]
320pub struct BroadcastLatestForkchoice {}
321
322impl<Engine> Action<Engine> for BroadcastLatestForkchoice
323where
324    Engine: EngineTypes + PayloadTypes<PayloadAttributes = PayloadAttributes>,
325    reth_node_ethereum::engine::EthPayloadAttributes:
326        From<<Engine as EngineTypes>::ExecutionPayloadEnvelopeV3>,
327{
328    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
329        Box::pin(async move {
330            let payload = env.latest_payload_executed.clone();
331
332            if env.node_clients.is_empty() {
333                return Err(eyre::eyre!("No node clients available"));
334            }
335            let latest_block = env
336                .latest_block_info
337                .as_ref()
338                .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
339
340            let parent_hash = latest_block.hash;
341            debug!("Latest block hash: {parent_hash}");
342
343            let fork_choice_state = ForkchoiceState {
344                head_block_hash: parent_hash,
345                safe_block_hash: parent_hash,
346                finalized_block_hash: parent_hash,
347            };
348            debug!(
349                "Broadcasting forkchoice update to {} clients. Head: {:?}",
350                env.node_clients.len(),
351                fork_choice_state.head_block_hash
352            );
353
354            for (idx, client) in env.node_clients.iter().enumerate() {
355                match EngineApiClient::<Engine>::fork_choice_updated_v3(
356                    &client.engine.http_client(),
357                    fork_choice_state,
358                    payload.clone(),
359                )
360                .await
361                {
362                    Ok(resp) => {
363                        debug!(
364                            "Client {}: Forkchoice update status: {:?}",
365                            idx, resp.payload_status.status
366                        );
367                    }
368                    Err(err) => {
369                        return Err(eyre::eyre!(
370                            "Client {}: Failed to broadcast forkchoice: {:?}",
371                            idx,
372                            err
373                        ));
374                    }
375                }
376            }
377            debug!("Forkchoice update broadcasted successfully");
378            Ok(())
379        })
380    }
381}
382
383/// Action that checks whether the broadcasted new payload has been accepted
384#[derive(Debug, Default)]
385pub struct CheckPayloadAccepted {}
386
387impl<Engine> Action<Engine> for CheckPayloadAccepted
388where
389    Engine: EngineTypes<ExecutionPayloadEnvelopeV3 = ExecutionPayloadEnvelopeV3>
390        + PayloadTypes<PayloadAttributes = PayloadAttributes>,
391    ExecutionPayloadEnvelopeV3: From<<Engine as EngineTypes>::ExecutionPayloadEnvelopeV3>,
392{
393    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
394        Box::pin(async move {
395            let mut accepted_check: bool = false;
396
397            let latest_block = env
398                .latest_block_info
399                .as_mut()
400                .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
401
402            let payload_id = *env
403                .payload_id_history
404                .get(&(latest_block.number + 1))
405                .ok_or_else(|| eyre::eyre!("Cannot find payload_id"))?;
406
407            for (idx, client) in env.node_clients.iter().enumerate() {
408                let rpc_client = &client.rpc;
409
410                // get the last header by number using latest_head_number
411                let rpc_latest_header =
412                    EthApiClient::<Transaction, Block, Receipt, Header>::header_by_number(
413                        rpc_client,
414                        alloy_eips::BlockNumberOrTag::Latest,
415                    )
416                    .await?
417                    .ok_or_else(|| eyre::eyre!("No latest header found from rpc"))?;
418
419                // perform several checks
420                let next_new_payload = env
421                    .latest_payload_built
422                    .as_ref()
423                    .ok_or_else(|| eyre::eyre!("No next built payload found"))?;
424
425                let built_payload = EngineApiClient::<Engine>::get_payload_v3(
426                    &client.engine.http_client(),
427                    payload_id,
428                )
429                .await?;
430
431                let execution_payload_envelope: ExecutionPayloadEnvelopeV3 = built_payload;
432                let new_payload_block_hash = execution_payload_envelope
433                    .execution_payload
434                    .payload_inner
435                    .payload_inner
436                    .block_hash;
437
438                if rpc_latest_header.hash != new_payload_block_hash {
439                    debug!(
440                        "Client {}: The hash is not matched: {:?} {:?}",
441                        idx, rpc_latest_header.hash, new_payload_block_hash
442                    );
443                    continue;
444                }
445
446                if rpc_latest_header.inner.difficulty != U256::ZERO {
447                    debug!(
448                        "Client {}: difficulty != 0: {:?}",
449                        idx, rpc_latest_header.inner.difficulty
450                    );
451                    continue;
452                }
453
454                if rpc_latest_header.inner.mix_hash != next_new_payload.prev_randao {
455                    debug!(
456                        "Client {}: The mix_hash and prev_randao is not same: {:?} {:?}",
457                        idx, rpc_latest_header.inner.mix_hash, next_new_payload.prev_randao
458                    );
459                    continue;
460                }
461
462                let extra_len = rpc_latest_header.inner.extra_data.len();
463                if extra_len <= 32 {
464                    debug!("Client {}: extra_len is fewer than 32. extra_len: {}", idx, extra_len);
465                    continue;
466                }
467
468                // at least one client passes all the check, save the header in Env
469                if !accepted_check {
470                    accepted_check = true;
471                    // save the header in Env
472                    env.latest_header_time = next_new_payload.timestamp;
473
474                    // add it to header history
475                    env.latest_fork_choice_state.head_block_hash = rpc_latest_header.hash;
476                    latest_block.hash = rpc_latest_header.hash as B256;
477                    latest_block.number = rpc_latest_header.inner.number;
478                }
479            }
480
481            if accepted_check {
482                Ok(())
483            } else {
484                Err(eyre::eyre!("No clients passed payload acceptance checks"))
485            }
486        })
487    }
488}
489
490/// Action that produces a sequence of blocks using the available clients
491#[derive(Debug)]
492pub struct ProduceBlocks<Engine> {
493    /// Number of blocks to produce
494    pub num_blocks: u64,
495    /// Tracks engine type
496    _phantom: PhantomData<Engine>,
497}
498
499impl<Engine> ProduceBlocks<Engine> {
500    /// Create a new `ProduceBlocks` action
501    pub fn new(num_blocks: u64) -> Self {
502        Self { num_blocks, _phantom: Default::default() }
503    }
504}
505
506impl<Engine> Default for ProduceBlocks<Engine> {
507    fn default() -> Self {
508        Self::new(0)
509    }
510}
511
512impl<Engine> Action<Engine> for ProduceBlocks<Engine>
513where
514    Engine: EngineTypes,
515{
516    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
517        Box::pin(async move {
518            // Create a sequence for producing a single block
519            let mut sequence = Sequence::new(vec![
520                Box::new(PickNextBlockProducer::default()),
521                Box::new(GeneratePayloadAttributes::default()),
522            ]);
523            for _ in 0..self.num_blocks {
524                sequence.execute(env).await?;
525            }
526            Ok(())
527        })
528    }
529}
530
531/// Run a sequence of actions in series.
532#[expect(missing_debug_implementations)]
533pub struct Sequence<I> {
534    /// Actions to execute in sequence
535    pub actions: Vec<Box<dyn Action<I>>>,
536}
537
538impl<I> Sequence<I> {
539    /// Create a new sequence of actions
540    pub fn new(actions: Vec<Box<dyn Action<I>>>) -> Self {
541        Self { actions }
542    }
543}
544
545impl<I: Sync + Send + 'static> Action<I> for Sequence<I> {
546    fn execute<'a>(&'a mut self, env: &'a mut Environment<I>) -> BoxFuture<'a, Result<()>> {
547        Box::pin(async move {
548            // Execute each action in sequence
549            for action in &mut self.actions {
550                action.execute(env).await?;
551            }
552
553            Ok(())
554        })
555    }
556}
557
558/// Action that broadcasts the next new payload
559#[derive(Debug, Default)]
560pub struct BroadcastNextNewPayload {}
561
562impl<Engine> Action<Engine> for BroadcastNextNewPayload
563where
564    Engine: EngineTypes + PayloadTypes<PayloadAttributes = PayloadAttributes>,
565    reth_node_ethereum::engine::EthPayloadAttributes:
566        From<<Engine as EngineTypes>::ExecutionPayloadEnvelopeV3>,
567{
568    fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
569        Box::pin(async move {
570            // Get the next new payload to broadcast
571            let next_new_payload = env
572                .latest_payload_built
573                .as_ref()
574                .ok_or_else(|| eyre::eyre!("No next built payload found"))?;
575            let parent_beacon_block_root = next_new_payload
576                .parent_beacon_block_root
577                .ok_or_else(|| eyre::eyre!("No parent beacon block root for next new payload"))?;
578
579            // Loop through all clients and broadcast the next new payload
580            let mut successful_broadcast: bool = false;
581
582            for client in &env.node_clients {
583                let engine = client.engine.http_client();
584                let rpc_client = &client.rpc;
585
586                // Get latest block from the client
587                let rpc_latest_block =
588                    EthApiClient::<Transaction, Block, Receipt, Header>::block_by_number(
589                        rpc_client,
590                        alloy_eips::BlockNumberOrTag::Latest,
591                        false,
592                    )
593                    .await?
594                    .ok_or_else(|| eyre::eyre!("No latest block found from rpc"))?;
595
596                let latest_block = reth_ethereum_primitives::Block {
597                    header: rpc_latest_block.header.inner,
598                    body: reth_ethereum_primitives::BlockBody {
599                        transactions: rpc_latest_block
600                            .transactions
601                            .into_transactions()
602                            .map(|tx| tx.inner.into_inner().into())
603                            .collect(),
604                        ommers: Default::default(),
605                        withdrawals: rpc_latest_block.withdrawals,
606                    },
607                };
608
609                // Validate block number matches expected
610                let latest_block_info = env
611                    .latest_block_info
612                    .as_ref()
613                    .ok_or_else(|| eyre::eyre!("No latest block info found"))?;
614
615                if latest_block.header.number != latest_block_info.number {
616                    return Err(eyre::eyre!(
617                        "Client block number {} does not match expected block number {}",
618                        latest_block.header.number,
619                        latest_block_info.number
620                    ));
621                }
622
623                // Validate parent beacon block root
624                let latest_block_parent_beacon_block_root =
625                    latest_block.parent_beacon_block_root.ok_or_else(|| {
626                        eyre::eyre!("No parent beacon block root for latest block")
627                    })?;
628
629                if parent_beacon_block_root != latest_block_parent_beacon_block_root {
630                    return Err(eyre::eyre!(
631                        "Parent beacon block root mismatch: expected {:?}, got {:?}",
632                        parent_beacon_block_root,
633                        latest_block_parent_beacon_block_root
634                    ));
635                }
636
637                // Construct and broadcast the execution payload from the latest block
638                // The latest block should contain the latest_payload_built
639                let execution_payload = ExecutionPayloadV3::from_block_slow(&latest_block);
640                let result = EngineApiClient::<Engine>::new_payload_v3(
641                    &engine,
642                    execution_payload,
643                    vec![],
644                    parent_beacon_block_root,
645                )
646                .await?;
647
648                // Check if broadcast was successful
649                if result.status == PayloadStatusEnum::Valid {
650                    successful_broadcast = true;
651                    // We don't need to update the latest payload built since it should be the same.
652                    // env.latest_payload_built = Some(next_new_payload.clone());
653                    env.latest_payload_executed = Some(next_new_payload.clone());
654                    break;
655                } else if let PayloadStatusEnum::Invalid { validation_error } = result.status {
656                    debug!(
657                        "Invalid payload status returned from broadcast: {:?}",
658                        validation_error
659                    );
660                }
661            }
662
663            if !successful_broadcast {
664                return Err(eyre::eyre!("Failed to successfully broadcast payload to any client"));
665            }
666
667            Ok(())
668        })
669    }
670}