reth_engine_util/
lib.rs

1//! Collection of various stream utilities for consensus engine.
2
3use futures::Stream;
4use reth_engine_primitives::{BeaconEngineMessage, EngineTypes};
5use reth_payload_validator::ExecutionPayloadValidator;
6use std::path::PathBuf;
7use tokio_util::either::Either;
8
9pub mod engine_store;
10use engine_store::EngineStoreStream;
11
12pub mod skip_fcu;
13use skip_fcu::EngineSkipFcu;
14
15pub mod skip_new_payload;
16use skip_new_payload::EngineSkipNewPayload;
17
18pub mod reorg;
19use reorg::EngineReorg;
20
21/// The collection of stream extensions for engine API message stream.
22pub trait EngineMessageStreamExt<Engine: EngineTypes>:
23    Stream<Item = BeaconEngineMessage<Engine>>
24{
25    /// Skips the specified number of [`BeaconEngineMessage::ForkchoiceUpdated`] messages from the
26    /// engine message stream.
27    fn skip_fcu(self, count: usize) -> EngineSkipFcu<Self>
28    where
29        Self: Sized,
30    {
31        EngineSkipFcu::new(self, count)
32    }
33
34    /// If the count is [Some], returns the stream that skips the specified number of
35    /// [`BeaconEngineMessage::ForkchoiceUpdated`] messages. Otherwise, returns `Self`.
36    fn maybe_skip_fcu(self, maybe_count: Option<usize>) -> Either<EngineSkipFcu<Self>, Self>
37    where
38        Self: Sized,
39    {
40        if let Some(count) = maybe_count {
41            Either::Left(self.skip_fcu(count))
42        } else {
43            Either::Right(self)
44        }
45    }
46
47    /// Skips the specified number of [`BeaconEngineMessage::NewPayload`] messages from the
48    /// engine message stream.
49    fn skip_new_payload(self, count: usize) -> EngineSkipNewPayload<Self>
50    where
51        Self: Sized,
52    {
53        EngineSkipNewPayload::new(self, count)
54    }
55
56    /// If the count is [Some], returns the stream that skips the specified number of
57    /// [`BeaconEngineMessage::NewPayload`] messages. Otherwise, returns `Self`.
58    fn maybe_skip_new_payload(
59        self,
60        maybe_count: Option<usize>,
61    ) -> Either<EngineSkipNewPayload<Self>, Self>
62    where
63        Self: Sized,
64    {
65        if let Some(count) = maybe_count {
66            Either::Left(self.skip_new_payload(count))
67        } else {
68            Either::Right(self)
69        }
70    }
71
72    /// Stores engine messages at the specified location.
73    fn store_messages(self, path: PathBuf) -> EngineStoreStream<Self>
74    where
75        Self: Sized,
76    {
77        EngineStoreStream::new(self, path)
78    }
79
80    /// If the path is [Some], returns the stream that stores engine messages at the specified
81    /// location. Otherwise, returns `Self`.
82    fn maybe_store_messages(
83        self,
84        maybe_path: Option<PathBuf>,
85    ) -> Either<EngineStoreStream<Self>, Self>
86    where
87        Self: Sized,
88    {
89        if let Some(path) = maybe_path {
90            Either::Left(self.store_messages(path))
91        } else {
92            Either::Right(self)
93        }
94    }
95
96    /// Creates reorgs with specified frequency.
97    fn reorg<Provider, Evm, Spec>(
98        self,
99        provider: Provider,
100        evm_config: Evm,
101        payload_validator: ExecutionPayloadValidator<Spec>,
102        frequency: usize,
103        depth: Option<usize>,
104    ) -> EngineReorg<Self, Engine, Provider, Evm, Spec>
105    where
106        Self: Sized,
107    {
108        EngineReorg::new(
109            self,
110            provider,
111            evm_config,
112            payload_validator,
113            frequency,
114            depth.unwrap_or_default(),
115        )
116    }
117
118    /// If frequency is [Some], returns the stream that creates reorgs with
119    /// specified frequency. Otherwise, returns `Self`.
120    fn maybe_reorg<Provider, Evm, Spec>(
121        self,
122        provider: Provider,
123        evm_config: Evm,
124        payload_validator: ExecutionPayloadValidator<Spec>,
125        frequency: Option<usize>,
126        depth: Option<usize>,
127    ) -> Either<EngineReorg<Self, Engine, Provider, Evm, Spec>, Self>
128    where
129        Self: Sized,
130    {
131        if let Some(frequency) = frequency {
132            Either::Left(reorg::EngineReorg::new(
133                self,
134                provider,
135                evm_config,
136                payload_validator,
137                frequency,
138                depth.unwrap_or_default(),
139            ))
140        } else {
141            Either::Right(self)
142        }
143    }
144}
145
146impl<Engine, T> EngineMessageStreamExt<Engine> for T
147where
148    Engine: EngineTypes,
149    T: Stream<Item = BeaconEngineMessage<Engine>>,
150{
151}