reth_engine_tree/
chain.rs

1use crate::backfill::{BackfillAction, BackfillEvent, BackfillSync};
2use futures::Stream;
3use reth_stages_api::{ControlFlow, PipelineTarget};
4use std::{
5    fmt::{Display, Formatter, Result},
6    pin::Pin,
7    task::{Context, Poll},
8};
9use tracing::*;
10
11/// The type that drives the chain forward.
12///
13/// A state machine that orchestrates the components responsible for advancing the chain
14///
15///
16/// ## Control flow
17///
18/// The [`ChainOrchestrator`] is responsible for controlling the backfill sync and additional hooks.
19/// It polls the given `handler`, which is responsible for advancing the chain, how is up to the
20/// handler. However, due to database restrictions (e.g. exclusive write access), following
21/// invariants apply:
22///  - If the handler requests a backfill run (e.g. [`BackfillAction::Start`]), the handler must
23///    ensure that while the backfill sync is running, no other write access is granted.
24///  - At any time the [`ChainOrchestrator`] can request exclusive write access to the database
25///    (e.g. if pruning is required), but will not do so until the handler has acknowledged the
26///    request for write access.
27///
28/// The [`ChainOrchestrator`] polls the [`ChainHandler`] to advance the chain and handles the
29/// emitted events. Requests and events are passed to the [`ChainHandler`] via
30/// [`ChainHandler::on_event`].
31#[must_use = "Stream does nothing unless polled"]
32#[derive(Debug)]
33pub struct ChainOrchestrator<T, P>
34where
35    T: ChainHandler,
36    P: BackfillSync,
37{
38    /// The handler for advancing the chain.
39    handler: T,
40    /// Controls backfill sync.
41    backfill_sync: P,
42}
43
44impl<T, P> ChainOrchestrator<T, P>
45where
46    T: ChainHandler + Unpin,
47    P: BackfillSync + Unpin,
48{
49    /// Creates a new [`ChainOrchestrator`] with the given handler and backfill sync.
50    pub const fn new(handler: T, backfill_sync: P) -> Self {
51        Self { handler, backfill_sync }
52    }
53
54    /// Returns the handler
55    pub const fn handler(&self) -> &T {
56        &self.handler
57    }
58
59    /// Returns a mutable reference to the handler
60    pub fn handler_mut(&mut self) -> &mut T {
61        &mut self.handler
62    }
63
64    /// Triggers a backfill sync for the __valid__ given target.
65    ///
66    /// CAUTION: This function should be used with care and with a valid target.
67    pub fn start_backfill_sync(&mut self, target: impl Into<PipelineTarget>) {
68        self.backfill_sync.on_action(BackfillAction::Start(target.into()));
69    }
70
71    /// Internal function used to advance the chain.
72    ///
73    /// Polls the `ChainOrchestrator` for the next event.
74    #[tracing::instrument(level = "debug", name = "ChainOrchestrator::poll", skip(self, cx))]
75    fn poll_next_event(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<ChainEvent<T::Event>> {
76        let this = self.get_mut();
77
78        // This loop polls the components
79        //
80        // 1. Polls the backfill sync to completion, if active.
81        // 2. Advances the chain by polling the handler.
82        'outer: loop {
83            // try to poll the backfill sync to completion, if active
84            match this.backfill_sync.poll(cx) {
85                Poll::Ready(backfill_sync_event) => match backfill_sync_event {
86                    BackfillEvent::Started(_) => {
87                        // notify handler that backfill sync started
88                        this.handler.on_event(FromOrchestrator::BackfillSyncStarted);
89                        return Poll::Ready(ChainEvent::BackfillSyncStarted);
90                    }
91                    BackfillEvent::Finished(res) => {
92                        return match res {
93                            Ok(ctrl) => {
94                                tracing::debug!(?ctrl, "backfill sync finished");
95                                // notify handler that backfill sync finished
96                                this.handler.on_event(FromOrchestrator::BackfillSyncFinished(ctrl));
97                                Poll::Ready(ChainEvent::BackfillSyncFinished)
98                            }
99                            Err(err) => {
100                                tracing::error!( %err, "backfill sync failed");
101                                Poll::Ready(ChainEvent::FatalError)
102                            }
103                        }
104                    }
105                    BackfillEvent::TaskDropped(err) => {
106                        tracing::error!( %err, "backfill sync task dropped");
107                        return Poll::Ready(ChainEvent::FatalError);
108                    }
109                },
110                Poll::Pending => {}
111            }
112
113            // poll the handler for the next event
114            match this.handler.poll(cx) {
115                Poll::Ready(handler_event) => {
116                    match handler_event {
117                        HandlerEvent::BackfillAction(action) => {
118                            // forward action to backfill_sync
119                            this.backfill_sync.on_action(action);
120                            continue 'outer
121                        }
122                        HandlerEvent::Event(ev) => {
123                            // bubble up the event
124                            return Poll::Ready(ChainEvent::Handler(ev));
125                        }
126                        HandlerEvent::FatalError => {
127                            error!(target: "engine::tree", "Fatal error");
128                            return Poll::Ready(ChainEvent::FatalError)
129                        }
130                    }
131                }
132                Poll::Pending => {
133                    // no more events to process
134                    break 'outer
135                }
136            }
137        }
138
139        Poll::Pending
140    }
141}
142
143impl<T, P> Stream for ChainOrchestrator<T, P>
144where
145    T: ChainHandler + Unpin,
146    P: BackfillSync + Unpin,
147{
148    type Item = ChainEvent<T::Event>;
149
150    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
151        self.as_mut().poll_next_event(cx).map(Some)
152    }
153}
154
155/// Event emitted by the [`ChainOrchestrator`]
156///
157/// These are meant to be used for observability and debugging purposes.
158#[derive(Debug)]
159pub enum ChainEvent<T> {
160    /// Backfill sync started
161    BackfillSyncStarted,
162    /// Backfill sync finished
163    BackfillSyncFinished,
164    /// Fatal error
165    FatalError,
166    /// Event emitted by the handler
167    Handler(T),
168}
169
170impl<T: Display> Display for ChainEvent<T> {
171    fn fmt(&self, f: &mut Formatter<'_>) -> Result {
172        match self {
173            Self::BackfillSyncStarted => {
174                write!(f, "BackfillSyncStarted")
175            }
176            Self::BackfillSyncFinished => {
177                write!(f, "BackfillSyncFinished")
178            }
179            Self::FatalError => {
180                write!(f, "FatalError")
181            }
182            Self::Handler(event) => {
183                write!(f, "Handler({event})")
184            }
185        }
186    }
187}
188
189/// A trait that advances the chain by handling actions.
190///
191/// This is intended to be implement the chain consensus logic, for example `engine` API.
192///
193/// ## Control flow
194///
195/// The [`ChainOrchestrator`] is responsible for advancing this handler through
196/// [`ChainHandler::poll`] and handling the emitted events, for example
197/// [`HandlerEvent::BackfillAction`] to start a backfill sync. Events from the [`ChainOrchestrator`]
198/// are passed to the handler via [`ChainHandler::on_event`], e.g.
199/// [`FromOrchestrator::BackfillSyncStarted`] once the backfill sync started or finished.
200pub trait ChainHandler: Send + Sync {
201    /// Event generated by this handler that orchestrator can bubble up;
202    type Event: Send;
203
204    /// Informs the handler about an event from the [`ChainOrchestrator`].
205    fn on_event(&mut self, event: FromOrchestrator);
206
207    /// Polls for actions that [`ChainOrchestrator`] should handle.
208    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<HandlerEvent<Self::Event>>;
209}
210
211/// Events/Requests that the [`ChainHandler`] can emit to the [`ChainOrchestrator`].
212#[derive(Clone, Debug)]
213pub enum HandlerEvent<T> {
214    /// Request an action to backfill sync
215    BackfillAction(BackfillAction),
216    /// Other event emitted by the handler
217    Event(T),
218    /// Fatal error
219    FatalError,
220}
221
222/// Internal events issued by the [`ChainOrchestrator`].
223#[derive(Clone, Debug)]
224pub enum FromOrchestrator {
225    /// Invoked when backfill sync finished
226    BackfillSyncFinished(ControlFlow),
227    /// Invoked when backfill sync started
228    BackfillSyncStarted,
229}