reth_engine_util/
skip_fcu.rs

1//! Stream wrapper that skips specified number of FCUs.
2
3use futures::{Stream, StreamExt};
4use reth_engine_primitives::{BeaconEngineMessage, EngineTypes, OnForkChoiceUpdated};
5use std::{
6    pin::Pin,
7    task::{ready, Context, Poll},
8};
9
10/// Engine API stream wrapper that skips the specified number of forkchoice updated messages.
11#[derive(Debug)]
12#[pin_project::pin_project]
13pub struct EngineSkipFcu<S> {
14    #[pin]
15    stream: S,
16    /// The number of FCUs to skip.
17    threshold: usize,
18    /// Current count of skipped FCUs.
19    skipped: usize,
20}
21
22impl<S> EngineSkipFcu<S> {
23    /// Creates new [`EngineSkipFcu`] stream wrapper.
24    pub const fn new(stream: S, threshold: usize) -> Self {
25        Self {
26            stream,
27            threshold,
28            // Start with `threshold` so that the first FCU goes through.
29            skipped: threshold,
30        }
31    }
32}
33
34impl<S, Engine> Stream for EngineSkipFcu<S>
35where
36    S: Stream<Item = BeaconEngineMessage<Engine>>,
37    Engine: EngineTypes,
38{
39    type Item = S::Item;
40
41    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
42        let mut this = self.project();
43
44        loop {
45            let next = ready!(this.stream.poll_next_unpin(cx));
46            let item = match next {
47                Some(BeaconEngineMessage::ForkchoiceUpdated {
48                    state,
49                    payload_attrs,
50                    tx,
51                    version,
52                }) => {
53                    if this.skipped < this.threshold {
54                        *this.skipped += 1;
55                        tracing::warn!(target: "engine::stream::skip_fcu", ?state, ?payload_attrs, threshold=this.threshold, skipped=this.skipped, "Skipping FCU");
56                        let _ = tx.send(Ok(OnForkChoiceUpdated::syncing()));
57                        continue
58                    }
59                    *this.skipped = 0;
60                    Some(BeaconEngineMessage::ForkchoiceUpdated {
61                        state,
62                        payload_attrs,
63                        tx,
64                        version,
65                    })
66                }
67                next => next,
68            };
69            return Poll::Ready(item)
70        }
71    }
72}