reth_engine_util/
skip_fcu.rs1use futures::{Stream, StreamExt};
4use reth_engine_primitives::{BeaconEngineMessage, EngineTypes, OnForkChoiceUpdated};
5use std::{
6 pin::Pin,
7 task::{ready, Context, Poll},
8};
9
10#[derive(Debug)]
12#[pin_project::pin_project]
13pub struct EngineSkipFcu<S> {
14 #[pin]
15 stream: S,
16 threshold: usize,
18 skipped: usize,
20}
21
22impl<S> EngineSkipFcu<S> {
23 pub const fn new(stream: S, threshold: usize) -> Self {
25 Self {
26 stream,
27 threshold,
28 skipped: threshold,
30 }
31 }
32}
33
34impl<S, Engine> Stream for EngineSkipFcu<S>
35where
36 S: Stream<Item = BeaconEngineMessage<Engine>>,
37 Engine: EngineTypes,
38{
39 type Item = S::Item;
40
41 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
42 let mut this = self.project();
43
44 loop {
45 let next = ready!(this.stream.poll_next_unpin(cx));
46 let item = match next {
47 Some(BeaconEngineMessage::ForkchoiceUpdated {
48 state,
49 payload_attrs,
50 tx,
51 version,
52 }) => {
53 if this.skipped < this.threshold {
54 *this.skipped += 1;
55 tracing::warn!(target: "engine::stream::skip_fcu", ?state, ?payload_attrs, threshold=this.threshold, skipped=this.skipped, "Skipping FCU");
56 let _ = tx.send(Ok(OnForkChoiceUpdated::syncing()));
57 continue
58 }
59 *this.skipped = 0;
60 Some(BeaconEngineMessage::ForkchoiceUpdated {
61 state,
62 payload_attrs,
63 tx,
64 version,
65 })
66 }
67 next => next,
68 };
69 return Poll::Ready(item)
70 }
71 }
72}