reth_e2e_test_utils/testsuite/
actions.rs1use crate::testsuite::Environment;
4use alloy_primitives::{Bytes, B256, U256};
5use alloy_rpc_types_engine::{
6 payload::ExecutionPayloadEnvelopeV3, ExecutionPayloadV3, ForkchoiceState, PayloadAttributes,
7 PayloadStatusEnum,
8};
9use alloy_rpc_types_eth::{Block, Header, Receipt, Transaction};
10use eyre::Result;
11use futures_util::future::BoxFuture;
12use reth_node_api::{EngineTypes, PayloadTypes};
13use reth_rpc_api::clients::{EngineApiClient, EthApiClient};
14use std::{future::Future, marker::PhantomData, time::Duration};
15use tokio::time::sleep;
16use tracing::debug;
17
18pub trait Action<I>: Send + 'static {
24 fn execute<'a>(&'a mut self, env: &'a mut Environment<I>) -> BoxFuture<'a, Result<()>>;
26}
27
28#[expect(missing_debug_implementations)]
30pub struct ActionBox<I>(Box<dyn Action<I>>);
31
32impl<I: 'static> ActionBox<I> {
33 pub fn new<A: Action<I>>(action: A) -> Self {
35 Self(Box::new(action))
36 }
37
38 pub async fn execute(mut self, env: &mut Environment<I>) -> Result<()> {
40 self.0.execute(env).await
41 }
42}
43
44impl<I, F, Fut> Action<I> for F
49where
50 F: FnMut(&Environment<I>) -> Fut + Send + 'static,
51 Fut: Future<Output = Result<()>> + Send + 'static,
52{
53 fn execute<'a>(&'a mut self, env: &'a mut Environment<I>) -> BoxFuture<'a, Result<()>> {
54 Box::pin(self(env))
55 }
56}
57
58#[derive(Debug)]
61pub struct AssertMineBlock<Engine>
62where
63 Engine: PayloadTypes,
64{
65 pub node_idx: usize,
67 pub transactions: Vec<Bytes>,
69 pub expected_hash: Option<B256>,
71 pub payload_attributes: Engine::PayloadAttributes,
74 _phantom: PhantomData<Engine>,
76}
77
78impl<Engine> AssertMineBlock<Engine>
79where
80 Engine: PayloadTypes,
81{
82 pub fn new(
84 node_idx: usize,
85 transactions: Vec<Bytes>,
86 expected_hash: Option<B256>,
87 payload_attributes: Engine::PayloadAttributes,
88 ) -> Self {
89 Self {
90 node_idx,
91 transactions,
92 expected_hash,
93 payload_attributes,
94 _phantom: Default::default(),
95 }
96 }
97}
98
99impl<Engine> Action<Engine> for AssertMineBlock<Engine>
100where
101 Engine: EngineTypes,
102{
103 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
104 Box::pin(async move {
105 if self.node_idx >= env.node_clients.len() {
106 return Err(eyre::eyre!("Node index out of bounds: {}", self.node_idx));
107 }
108
109 let node_client = &env.node_clients[self.node_idx];
110 let rpc_client = &node_client.rpc;
111 let engine_client = node_client.engine.http_client();
112
113 let latest_block =
115 EthApiClient::<Transaction, Block, Receipt, Header>::block_by_number(
116 rpc_client,
117 alloy_eips::BlockNumberOrTag::Latest,
118 false,
119 )
120 .await?;
121
122 let latest_block = latest_block.ok_or_else(|| eyre::eyre!("Latest block not found"))?;
123 let parent_hash = latest_block.header.hash;
124
125 debug!("Latest block hash: {parent_hash}");
126
127 let fork_choice_state = ForkchoiceState {
129 head_block_hash: parent_hash,
130 safe_block_hash: parent_hash,
131 finalized_block_hash: parent_hash,
132 };
133
134 let fcu_result = EngineApiClient::<Engine>::fork_choice_updated_v2(
135 &engine_client,
136 fork_choice_state,
137 Some(self.payload_attributes.clone()),
138 )
139 .await?;
140
141 debug!("FCU result: {:?}", fcu_result);
142
143 match fcu_result.payload_status.status {
145 PayloadStatusEnum::Valid => {
146 if let Some(payload_id) = fcu_result.payload_id {
147 debug!("Got payload ID: {payload_id}");
148
149 let _engine_payload =
151 EngineApiClient::<Engine>::get_payload_v2(&engine_client, payload_id)
152 .await?;
153 Ok(())
154 } else {
155 Err(eyre::eyre!("No payload ID returned from forkchoiceUpdated"))
156 }
157 }
158 _ => Err(eyre::eyre!("Payload status not valid: {:?}", fcu_result.payload_status)),
159 }
160 })
161 }
162}
163#[derive(Debug, Default)]
165pub struct PickNextBlockProducer {}
166
167impl PickNextBlockProducer {
168 pub const fn new() -> Self {
170 Self {}
171 }
172}
173
174impl<Engine> Action<Engine> for PickNextBlockProducer
175where
176 Engine: EngineTypes,
177{
178 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
179 Box::pin(async move {
180 let num_clients = env.node_clients.len();
181 if num_clients == 0 {
182 return Err(eyre::eyre!("No node clients available"));
183 }
184
185 let latest_info = env
186 .latest_block_info
187 .as_ref()
188 .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
189
190 let start_idx = ((latest_info.number + 1) % num_clients as u64) as usize;
192
193 for i in 0..num_clients {
194 let idx = (start_idx + i) % num_clients;
195 let node_client = &env.node_clients[idx];
196 let rpc_client = &node_client.rpc;
197
198 let latest_block =
199 EthApiClient::<Transaction, Block, Receipt, Header>::block_by_number(
200 rpc_client,
201 alloy_eips::BlockNumberOrTag::Latest,
202 false,
203 )
204 .await?;
205
206 if let Some(block) = latest_block {
207 let block_number = block.header.number;
208 let block_hash = block.header.hash;
209
210 if block_hash == latest_info.hash && block_number == latest_info.number {
212 env.last_producer_idx = Some(idx);
213 debug!("Selected node {} as the next block producer", idx);
214 return Ok(());
215 }
216 }
217 }
218
219 Err(eyre::eyre!("No suitable block producer found"))
220 })
221 }
222}
223
224#[derive(Debug, Default)]
226pub struct GeneratePayloadAttributes {}
227
228impl<Engine> Action<Engine> for GeneratePayloadAttributes
229where
230 Engine: EngineTypes,
231{
232 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
233 Box::pin(async move {
234 let latest_block = env
235 .latest_block_info
236 .as_ref()
237 .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
238 let block_number = latest_block.number;
239 let timestamp = env.latest_header_time + env.block_timestamp_increment;
240 let payload_attributes = alloy_rpc_types_engine::PayloadAttributes {
241 timestamp,
242 prev_randao: B256::random(),
243 suggested_fee_recipient: alloy_primitives::Address::random(),
244 withdrawals: Some(vec![]),
245 parent_beacon_block_root: Some(B256::ZERO),
246 };
247
248 env.payload_attributes.insert(latest_block.number + 1, payload_attributes);
249 debug!("Stored payload attributes for block {}", block_number + 1);
250 Ok(())
251 })
252 }
253}
254#[derive(Debug, Default)]
256pub struct GenerateNextPayload {}
257
258impl<Engine> Action<Engine> for GenerateNextPayload
259where
260 Engine: EngineTypes + PayloadTypes<PayloadAttributes = PayloadAttributes>,
261 reth_node_ethereum::engine::EthPayloadAttributes:
262 From<<Engine as EngineTypes>::ExecutionPayloadEnvelopeV3>,
263{
264 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
265 Box::pin(async move {
266 let latest_block = env
267 .latest_block_info
268 .as_ref()
269 .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
270
271 let parent_hash = latest_block.hash;
272 debug!("Latest block hash: {parent_hash}");
273
274 let fork_choice_state = ForkchoiceState {
275 head_block_hash: parent_hash,
276 safe_block_hash: parent_hash,
277 finalized_block_hash: parent_hash,
278 };
279
280 let payload_attributes: PayloadAttributes = env
281 .payload_attributes
282 .get(&latest_block.number)
283 .cloned()
284 .ok_or_else(|| eyre::eyre!("No payload attributes found for latest block"))?;
285
286 let fcu_result = EngineApiClient::<Engine>::fork_choice_updated_v3(
287 &env.node_clients[0].engine.http_client(),
288 fork_choice_state,
289 Some(payload_attributes.clone()),
290 )
291 .await?;
292
293 debug!("FCU result: {:?}", fcu_result);
294
295 let payload_id = fcu_result
296 .payload_id
297 .ok_or_else(|| eyre::eyre!("No payload ID returned from forkChoiceUpdated"))?;
298
299 debug!("Received payload ID: {:?}", payload_id);
300 env.next_payload_id = Some(payload_id);
301
302 sleep(Duration::from_secs(1)).await;
303
304 let built_payload: PayloadAttributes = EngineApiClient::<Engine>::get_payload_v3(
305 &env.node_clients[0].engine.http_client(),
306 payload_id,
307 )
308 .await?
309 .into();
310 env.payload_id_history.insert(latest_block.number + 1, payload_id);
311 env.latest_payload_built = Some(built_payload);
312
313 Ok(())
314 })
315 }
316}
317
318#[derive(Debug, Default)]
320pub struct BroadcastLatestForkchoice {}
321
322impl<Engine> Action<Engine> for BroadcastLatestForkchoice
323where
324 Engine: EngineTypes + PayloadTypes<PayloadAttributes = PayloadAttributes>,
325 reth_node_ethereum::engine::EthPayloadAttributes:
326 From<<Engine as EngineTypes>::ExecutionPayloadEnvelopeV3>,
327{
328 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
329 Box::pin(async move {
330 let payload = env.latest_payload_executed.clone();
331
332 if env.node_clients.is_empty() {
333 return Err(eyre::eyre!("No node clients available"));
334 }
335 let latest_block = env
336 .latest_block_info
337 .as_ref()
338 .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
339
340 let parent_hash = latest_block.hash;
341 debug!("Latest block hash: {parent_hash}");
342
343 let fork_choice_state = ForkchoiceState {
344 head_block_hash: parent_hash,
345 safe_block_hash: parent_hash,
346 finalized_block_hash: parent_hash,
347 };
348 debug!(
349 "Broadcasting forkchoice update to {} clients. Head: {:?}",
350 env.node_clients.len(),
351 fork_choice_state.head_block_hash
352 );
353
354 for (idx, client) in env.node_clients.iter().enumerate() {
355 match EngineApiClient::<Engine>::fork_choice_updated_v3(
356 &client.engine.http_client(),
357 fork_choice_state,
358 payload.clone(),
359 )
360 .await
361 {
362 Ok(resp) => {
363 debug!(
364 "Client {}: Forkchoice update status: {:?}",
365 idx, resp.payload_status.status
366 );
367 }
368 Err(err) => {
369 return Err(eyre::eyre!(
370 "Client {}: Failed to broadcast forkchoice: {:?}",
371 idx,
372 err
373 ));
374 }
375 }
376 }
377 debug!("Forkchoice update broadcasted successfully");
378 Ok(())
379 })
380 }
381}
382
383#[derive(Debug, Default)]
385pub struct CheckPayloadAccepted {}
386
387impl<Engine> Action<Engine> for CheckPayloadAccepted
388where
389 Engine: EngineTypes<ExecutionPayloadEnvelopeV3 = ExecutionPayloadEnvelopeV3>
390 + PayloadTypes<PayloadAttributes = PayloadAttributes>,
391 ExecutionPayloadEnvelopeV3: From<<Engine as EngineTypes>::ExecutionPayloadEnvelopeV3>,
392{
393 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
394 Box::pin(async move {
395 let mut accepted_check: bool = false;
396
397 let latest_block = env
398 .latest_block_info
399 .as_mut()
400 .ok_or_else(|| eyre::eyre!("No latest block information available"))?;
401
402 let payload_id = *env
403 .payload_id_history
404 .get(&(latest_block.number + 1))
405 .ok_or_else(|| eyre::eyre!("Cannot find payload_id"))?;
406
407 for (idx, client) in env.node_clients.iter().enumerate() {
408 let rpc_client = &client.rpc;
409
410 let rpc_latest_header =
412 EthApiClient::<Transaction, Block, Receipt, Header>::header_by_number(
413 rpc_client,
414 alloy_eips::BlockNumberOrTag::Latest,
415 )
416 .await?
417 .ok_or_else(|| eyre::eyre!("No latest header found from rpc"))?;
418
419 let next_new_payload = env
421 .latest_payload_built
422 .as_ref()
423 .ok_or_else(|| eyre::eyre!("No next built payload found"))?;
424
425 let built_payload = EngineApiClient::<Engine>::get_payload_v3(
426 &client.engine.http_client(),
427 payload_id,
428 )
429 .await?;
430
431 let execution_payload_envelope: ExecutionPayloadEnvelopeV3 = built_payload;
432 let new_payload_block_hash = execution_payload_envelope
433 .execution_payload
434 .payload_inner
435 .payload_inner
436 .block_hash;
437
438 if rpc_latest_header.hash != new_payload_block_hash {
439 debug!(
440 "Client {}: The hash is not matched: {:?} {:?}",
441 idx, rpc_latest_header.hash, new_payload_block_hash
442 );
443 continue;
444 }
445
446 if rpc_latest_header.inner.difficulty != U256::ZERO {
447 debug!(
448 "Client {}: difficulty != 0: {:?}",
449 idx, rpc_latest_header.inner.difficulty
450 );
451 continue;
452 }
453
454 if rpc_latest_header.inner.mix_hash != next_new_payload.prev_randao {
455 debug!(
456 "Client {}: The mix_hash and prev_randao is not same: {:?} {:?}",
457 idx, rpc_latest_header.inner.mix_hash, next_new_payload.prev_randao
458 );
459 continue;
460 }
461
462 let extra_len = rpc_latest_header.inner.extra_data.len();
463 if extra_len <= 32 {
464 debug!("Client {}: extra_len is fewer than 32. extra_len: {}", idx, extra_len);
465 continue;
466 }
467
468 if !accepted_check {
470 accepted_check = true;
471 env.latest_header_time = next_new_payload.timestamp;
473
474 env.latest_fork_choice_state.head_block_hash = rpc_latest_header.hash;
476 latest_block.hash = rpc_latest_header.hash as B256;
477 latest_block.number = rpc_latest_header.inner.number;
478 }
479 }
480
481 if accepted_check {
482 Ok(())
483 } else {
484 Err(eyre::eyre!("No clients passed payload acceptance checks"))
485 }
486 })
487 }
488}
489
490#[derive(Debug)]
492pub struct ProduceBlocks<Engine> {
493 pub num_blocks: u64,
495 _phantom: PhantomData<Engine>,
497}
498
499impl<Engine> ProduceBlocks<Engine> {
500 pub fn new(num_blocks: u64) -> Self {
502 Self { num_blocks, _phantom: Default::default() }
503 }
504}
505
506impl<Engine> Default for ProduceBlocks<Engine> {
507 fn default() -> Self {
508 Self::new(0)
509 }
510}
511
512impl<Engine> Action<Engine> for ProduceBlocks<Engine>
513where
514 Engine: EngineTypes,
515{
516 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
517 Box::pin(async move {
518 let mut sequence = Sequence::new(vec![
520 Box::new(PickNextBlockProducer::default()),
521 Box::new(GeneratePayloadAttributes::default()),
522 ]);
523 for _ in 0..self.num_blocks {
524 sequence.execute(env).await?;
525 }
526 Ok(())
527 })
528 }
529}
530
531#[expect(missing_debug_implementations)]
533pub struct Sequence<I> {
534 pub actions: Vec<Box<dyn Action<I>>>,
536}
537
538impl<I> Sequence<I> {
539 pub fn new(actions: Vec<Box<dyn Action<I>>>) -> Self {
541 Self { actions }
542 }
543}
544
545impl<I: Sync + Send + 'static> Action<I> for Sequence<I> {
546 fn execute<'a>(&'a mut self, env: &'a mut Environment<I>) -> BoxFuture<'a, Result<()>> {
547 Box::pin(async move {
548 for action in &mut self.actions {
550 action.execute(env).await?;
551 }
552
553 Ok(())
554 })
555 }
556}
557
558#[derive(Debug, Default)]
560pub struct BroadcastNextNewPayload {}
561
562impl<Engine> Action<Engine> for BroadcastNextNewPayload
563where
564 Engine: EngineTypes + PayloadTypes<PayloadAttributes = PayloadAttributes>,
565 reth_node_ethereum::engine::EthPayloadAttributes:
566 From<<Engine as EngineTypes>::ExecutionPayloadEnvelopeV3>,
567{
568 fn execute<'a>(&'a mut self, env: &'a mut Environment<Engine>) -> BoxFuture<'a, Result<()>> {
569 Box::pin(async move {
570 let next_new_payload = env
572 .latest_payload_built
573 .as_ref()
574 .ok_or_else(|| eyre::eyre!("No next built payload found"))?;
575 let parent_beacon_block_root = next_new_payload
576 .parent_beacon_block_root
577 .ok_or_else(|| eyre::eyre!("No parent beacon block root for next new payload"))?;
578
579 let mut successful_broadcast: bool = false;
581
582 for client in &env.node_clients {
583 let engine = client.engine.http_client();
584 let rpc_client = &client.rpc;
585
586 let rpc_latest_block =
588 EthApiClient::<Transaction, Block, Receipt, Header>::block_by_number(
589 rpc_client,
590 alloy_eips::BlockNumberOrTag::Latest,
591 false,
592 )
593 .await?
594 .ok_or_else(|| eyre::eyre!("No latest block found from rpc"))?;
595
596 let latest_block = reth_ethereum_primitives::Block {
597 header: rpc_latest_block.header.inner,
598 body: reth_ethereum_primitives::BlockBody {
599 transactions: rpc_latest_block
600 .transactions
601 .into_transactions()
602 .map(|tx| tx.inner.into_inner().into())
603 .collect(),
604 ommers: Default::default(),
605 withdrawals: rpc_latest_block.withdrawals,
606 },
607 };
608
609 let latest_block_info = env
611 .latest_block_info
612 .as_ref()
613 .ok_or_else(|| eyre::eyre!("No latest block info found"))?;
614
615 if latest_block.header.number != latest_block_info.number {
616 return Err(eyre::eyre!(
617 "Client block number {} does not match expected block number {}",
618 latest_block.header.number,
619 latest_block_info.number
620 ));
621 }
622
623 let latest_block_parent_beacon_block_root =
625 latest_block.parent_beacon_block_root.ok_or_else(|| {
626 eyre::eyre!("No parent beacon block root for latest block")
627 })?;
628
629 if parent_beacon_block_root != latest_block_parent_beacon_block_root {
630 return Err(eyre::eyre!(
631 "Parent beacon block root mismatch: expected {:?}, got {:?}",
632 parent_beacon_block_root,
633 latest_block_parent_beacon_block_root
634 ));
635 }
636
637 let execution_payload = ExecutionPayloadV3::from_block_slow(&latest_block);
640 let result = EngineApiClient::<Engine>::new_payload_v3(
641 &engine,
642 execution_payload,
643 vec![],
644 parent_beacon_block_root,
645 )
646 .await?;
647
648 if result.status == PayloadStatusEnum::Valid {
650 successful_broadcast = true;
651 env.latest_payload_executed = Some(next_new_payload.clone());
654 break;
655 } else if let PayloadStatusEnum::Invalid { validation_error } = result.status {
656 debug!(
657 "Invalid payload status returned from broadcast: {:?}",
658 validation_error
659 );
660 }
661 }
662
663 if !successful_broadcast {
664 return Err(eyre::eyre!("Failed to successfully broadcast payload to any client"));
665 }
666
667 Ok(())
668 })
669 }
670}