1use futures::Stream;
4use reth_engine_primitives::{BeaconEngineMessage, EngineTypes};
5use reth_payload_validator::ExecutionPayloadValidator;
6use std::path::PathBuf;
7use tokio_util::either::Either;
8
9pub mod engine_store;
10use engine_store::EngineStoreStream;
11
12pub mod skip_fcu;
13use skip_fcu::EngineSkipFcu;
14
15pub mod skip_new_payload;
16use skip_new_payload::EngineSkipNewPayload;
17
18pub mod reorg;
19use reorg::EngineReorg;
20
21pub trait EngineMessageStreamExt<Engine: EngineTypes>:
23 Stream<Item = BeaconEngineMessage<Engine>>
24{
25 fn skip_fcu(self, count: usize) -> EngineSkipFcu<Self>
28 where
29 Self: Sized,
30 {
31 EngineSkipFcu::new(self, count)
32 }
33
34 fn maybe_skip_fcu(self, maybe_count: Option<usize>) -> Either<EngineSkipFcu<Self>, Self>
37 where
38 Self: Sized,
39 {
40 if let Some(count) = maybe_count {
41 Either::Left(self.skip_fcu(count))
42 } else {
43 Either::Right(self)
44 }
45 }
46
47 fn skip_new_payload(self, count: usize) -> EngineSkipNewPayload<Self>
50 where
51 Self: Sized,
52 {
53 EngineSkipNewPayload::new(self, count)
54 }
55
56 fn maybe_skip_new_payload(
59 self,
60 maybe_count: Option<usize>,
61 ) -> Either<EngineSkipNewPayload<Self>, Self>
62 where
63 Self: Sized,
64 {
65 if let Some(count) = maybe_count {
66 Either::Left(self.skip_new_payload(count))
67 } else {
68 Either::Right(self)
69 }
70 }
71
72 fn store_messages(self, path: PathBuf) -> EngineStoreStream<Self>
74 where
75 Self: Sized,
76 {
77 EngineStoreStream::new(self, path)
78 }
79
80 fn maybe_store_messages(
83 self,
84 maybe_path: Option<PathBuf>,
85 ) -> Either<EngineStoreStream<Self>, Self>
86 where
87 Self: Sized,
88 {
89 if let Some(path) = maybe_path {
90 Either::Left(self.store_messages(path))
91 } else {
92 Either::Right(self)
93 }
94 }
95
96 fn reorg<Provider, Evm, Spec>(
98 self,
99 provider: Provider,
100 evm_config: Evm,
101 payload_validator: ExecutionPayloadValidator<Spec>,
102 frequency: usize,
103 depth: Option<usize>,
104 ) -> EngineReorg<Self, Engine, Provider, Evm, Spec>
105 where
106 Self: Sized,
107 {
108 EngineReorg::new(
109 self,
110 provider,
111 evm_config,
112 payload_validator,
113 frequency,
114 depth.unwrap_or_default(),
115 )
116 }
117
118 fn maybe_reorg<Provider, Evm, Spec>(
121 self,
122 provider: Provider,
123 evm_config: Evm,
124 payload_validator: ExecutionPayloadValidator<Spec>,
125 frequency: Option<usize>,
126 depth: Option<usize>,
127 ) -> Either<EngineReorg<Self, Engine, Provider, Evm, Spec>, Self>
128 where
129 Self: Sized,
130 {
131 if let Some(frequency) = frequency {
132 Either::Left(reorg::EngineReorg::new(
133 self,
134 provider,
135 evm_config,
136 payload_validator,
137 frequency,
138 depth.unwrap_or_default(),
139 ))
140 } else {
141 Either::Right(self)
142 }
143 }
144}
145
146impl<Engine, T> EngineMessageStreamExt<Engine> for T
147where
148 Engine: EngineTypes,
149 T: Stream<Item = BeaconEngineMessage<Engine>>,
150{
151}